1. Introduction
This repo contains rapid development guide, how to quick bootstrap with spring and kafka.
Read project reference documentation on github pages
generated by generator-jvm yeoman generator (java-spring-boot)
2. Implementation
create project
brew install node
npm i -g yo generator-jvm
yo jvm -n spring-kafka-quickstart -t java-spring-boot
idea spring-kafka-quickstart/pom.xml
bootstrap kafka using spring boot (cloud) CLI
spring cloud kafka
add dependencies
pom.xml
file: <dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
add dependencies
build.gradle
file:dependencies {
implementation('org.springframework.kafka:spring-kafka')
testImplementation('org.springframework.kafka:spring-kafka-test')
}
add kafke listener:
@Log4j2
@Service
public class MessageListener {
@KafkaListener(topics = "messages")
public void on(final ConsumerRecord<Object, Object> message) {
log.info("received message: {}", message.value());
Mono.just(new Message().setAt(now())
.setBody(message.value().toString()))
.subscribe(msg -> log.info("saving message object: '{}'", msg));
}
}
add kafke sender functionality:
@Configuration
@RequiredArgsConstructor
class WebfluxRoutesConfig {
static final ParameterizedTypeReference<Map<String, String>> sendMessageRequestType
= new ParameterizedTypeReference<Map<String, String>>() {};
final KafkaTemplate<Object, Object> kafka;
@Bean
HandlerFunction<ServerResponse> sendMessageHandler() {
return request ->
ok().body(request.bodyToMono(sendMessageRequestType)
.map(it -> it.getOrDefault("message", ""))
.filter(it -> !it.trim().isEmpty())
.doOnNext(message -> kafka.send("messages", message))
.map(s -> "message sent.")
.flatMap(Mono::just), Object.class);
}
@Bean
RouterFunction routes(final HandlerFunction<ServerResponse> fallbackHandler) {
return
route(
POST("/"),
sendMessageHandler()
).andOther(
route(
GET("/**"),
fallbackHandler
)
)
;
}
}
build run and test (gradle)
./gradlew
bash -jar build/libs/*.jar
http :8080 message=ololo
http :8080 message=trololo
build run and test (maven)
./mvnw
bash target/*.jar