Don't fork me!

Travis CI status: Build Status

1. Introduction

In progress: event-sourcing app using spring-boot + spring-cloud-kafka + kafka-streams

2. Components

  • api - module contains all commands, events and errors defined in application

  • commands-app - REST API application for sending payment system commands throw kafka

TODO:

  • queries-app - application for querying payment system

3. Prepare kafka

install spring boot cloud CLI
brew tap pivotal/tap
brew search spring
brew install springboot

spring install org.springframework.cloud:spring-cloud-cli:2.0.0.RELEASE

spring cloud kafka

# check if zookepper is running:
lsof -i:2181|awk '{print $2}'

# check if kafka is running:
lsof -i:9092|awk '{print $2}'

4. CQRS and Event Sourcing API (commands, queries, errors)

Commands REST API
sealed class Error(override val message: String) : RuntimeException(message)
sealed class Command
sealed class Event

5. Commands application

Including api project as included build using gradle composit builds approach
// file: settings.gradle
rootProject.name = 'commands-app'
enableFeaturePreview 'STABLE_PUBLISHING'

includeBuild('../api') {
  dependencySubstitution {
    substitute module('com.github.daggerok:api') with project(':')
  }
}

// file: gradle/java.gradle
allprojects {
  apply plugin: 'java'

  apply plugin: 'io.franzbecker.gradle-lombok'
  lombok.version = project.lombokVersion

  version = '0.0.1'
  group = 'com.github.daggerok'
  sourceCompatibility = targetCompatibility = "$javaVersion"

  defaultTasks 'clean', 'build'

  dependencies {
    implementation 'com.github.daggerok:api:0.0.1'
    // In java we trust...
    implementation "io.vavr:vavr:$vavrVersion"
  }
}
Commands REST API
@Configuration
class Rest(val producer: Producer) {

  companion object {
    val ref = mutableMapOf<String, String>()::class.java
  }

  @Bean
  fun routes() = router {
    ("/").nest {
      contentType(MediaType.APPLICATION_JSON_UTF8)

      POST("/") {
        ok().body(it.bodyToMono(ref)
            .map { it["message"].orEmpty() }
            .map { MessageBuilder.withPayload(it).build() }
            .map { producer.send(it) }
            .subscribeOn(Schedulers.elastic())
            .flatMap { Mono.just("sending message...") }
        )
      }

      GET("/**") {
        val map = mapOf(
            "errors" to listOf(
                Error::class.java.name
            ),
            "commands" to listOf(
                Command::class.java.name
            )
        )
        ok().body(
            Mono.just(map), map.javaClass
        )
      }
    }
  }
}