Don't fork me!

Travis CI status: Build Status

1. Introduction

This repo contains rapid development guide, how to quick bootstrap with spring and kafka.

Read project reference documentation on github pages

generated by generator-jvm yeoman generator (java-spring-boot)

2. Implementation

create project
brew install node
npm i -g yo generator-jvm
yo jvm -n spring-kafka-quickstart -t java-spring-boot
idea spring-kafka-quickstart/pom.xml
bootstrap kafka using spring boot (cloud) CLI
spring cloud kafka
add dependencies pom.xml file:
  <dependencies>
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka-test</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>
add dependencies build.gradle file:
dependencies {
  implementation('org.springframework.kafka:spring-kafka')
  testImplementation('org.springframework.kafka:spring-kafka-test')
}
add kafke listener:
@Log4j2
@Service
public class MessageListener {

  @KafkaListener(topics = "messages")
  public void on(final ConsumerRecord<Object, Object> message) {
    log.info("received message: {}", message.value());
    Mono.just(new Message().setAt(now())
                           .setBody(message.value().toString()))
        .subscribe(msg -> log.info("saving message object: '{}'", msg));
  }
}
add kafke sender functionality:
@Configuration
@RequiredArgsConstructor
class WebfluxRoutesConfig {

  static final ParameterizedTypeReference<Map<String, String>> sendMessageRequestType
      = new ParameterizedTypeReference<Map<String, String>>() {};

  final KafkaTemplate<Object, Object> kafka;

  @Bean
  HandlerFunction<ServerResponse> sendMessageHandler() {
    return request ->
        ok().body(request.bodyToMono(sendMessageRequestType)
                         .map(it -> it.getOrDefault("message", ""))
                         .filter(it -> !it.trim().isEmpty())
                         .doOnNext(message -> kafka.send("messages", message))
                         .map(s -> "message sent.")
                         .flatMap(Mono::just), Object.class);
  }

  @Bean
  RouterFunction routes(final HandlerFunction<ServerResponse> fallbackHandler) {
    return
        route(
            POST("/"),
            sendMessageHandler()
        ).andOther(
            route(
                GET("/**"),
                fallbackHandler
            )
        )
        ;
  }
}
build run and test (gradle)
./gradlew
bash -jar build/libs/*.jar
http :8080 message=ololo
http :8080 message=trololo
build run and test (maven)
./mvnw
bash target/*.jar