playing with kafka
Old note from 2020-04-13.
During the COVID-19 shutdown, one way to “pass time” is to learn and play with new things.
I made a small project, heart-rate-calculator, whose goal is to take a file containing heartbeat information in input, compute and write the heart rates in another file.
That was quite a nice project, but I was wondering if I want to go further, what the project would look like? Let’s say, I want to have a whole SAAS solution, production-ready, to read a stream of data from smart-watches that measure heartbeats, then compute their heart rates and display a graph to the user. Of course, if that’s just that, it will be too simple, so what about having the challenge of dealing with massive amount of data. Mmh, a bit more tricky.
This is a good opportunity to play with Kafka and Kafka Streams! In this blog post, I will show how to build a resilient system to compute and generate heart rates from heartbeat data.
If you are in hurry, you can check the source code here: https://github.com/l-lin/poc-kafka
Note that the micro-services are Java applications, using Spring Framework because that’s what I’m most familiar with.
I will use docker-compose to mount the whole environment.
The target architecture
I want to design a scalable, real-time and distributed pipeline to display the heart rate from a continuous stream of heartbeats provided by smart-watches.
One way is to use Apache Kafka, which is a distributed streaming platform, to help me achieve that (and that was the whole of this project: learn and play with Kafka).
The target architecture is the following:
- heart-beat-producer: when a smart-watch sends a heartbeat, it should end in Kafka right away
- heart-beat-validator: this service deals with a stream of heartbeats to validate them and flag them as invalid
- heart-rate-computor: now we have valid heartbeats, we aggregate them and compute the associated heart rates
- heart-rate-connector: we need to sink the heart rates into a database, like PostgreSQL, so that other services can pick them up and show the heart rates to the users
- heart-rate-consumer: this application will listen to the kafka topic to stream in real-time the heart rates, as well as displaying a time window of the heart rates of a user
Designing models with Apache Avro
Before diving in developing the micro-services, I need to define the models that will circulate
around the micro-services: HeartBeat
and HeartRate
.
It’s important to define the model, especially in a micro-services oriented architecture as each micro-service have their own responsibility, and in a real world company, they can be handled by different teams. So, in order to have coherent “data” circulating in the whole system, it’s important to be rigorous and define some sort of “contracts” so that each micro-services are aware of the data received and sent, and they can manipulate them without worry.
There are multiple solutions out there:
Apache Avro
I choose to use Apache Avro because I already manipulated ProtoBuf and JSON schema, and moreover Confluent, the company for which I used most of their community products for this project, advocates using Avro as they offer a Schema Management.
With Avro, you define the model schemas that contain the fields of the data along with their types. Once the Avro object is formed, it can easily be serialized as an array of bytes, which is the type Kafka likes. Another point to raise is that any other programming language can use Avro bytes and deserialize them into an object specific to their programming language, which is nice because that means every team can “communicate” data through Avro schemas.
Avro also support schema evolutivity: each schema that is registered in the registry has their own version. It stresses out the backward and forward compatibility by having strict verification in the schema registry.
What is the schema registry? It’s nice to design the Avro schemas, but they need to be stored and available somewhere so that the micro-services are aware of the schemas. That’s where the Schema Registry is all about.
Schema Registry
The Schema Registry is a serving layer for the Avro metadata. It provides RESTful interfaces for storing and retrieving Apache Avro schemas, stores a versioned history of all schemas based on a specified subject name strategy, provides multiple compatibility settings and allows evolution of schemas according to the configured compatibility settings and expanded Avro support. It provides serializers that plug into Apache Kafka clients that handle schema storage and retrieval for Kafka messages that are sent in the Avro format.
I configured the schema registry with the following docker-compose configuration:
version: '3'
services:
schema-registry:
image: confluentinc/cp-schema-registry:${CONFLUENT_TAG}
depends_on:
- zk1
- zk2
- zk3
- kafka1
- kafka2
- kafka3
ports:
- 8081:8081
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zk1:2181,zk2:2181,zk3:2181'
Heartbeat Avro schema
The heartbeat model is quite simple:
userId
: the ID of the user that uses the smart-watchHRI
: a heart rate instant value, detected by the smart-watchQRS
: the event describing the heart’s activity:- A: supra-ventricular
- V: premature ventricular
- N: normal
- F: fusion
- P: paced
- X: invalid
timestamp
: the time of the registered heartbeat
Using the Avro specifications, the Avro model looks like this:
[
{"namespace": "lin.louis.poc.models",
"name": "HeartBeatQRS",
"type": "enum",
"doc": "Event describing the activity of the heart: V: premature ventricular heartbeat - N: normal heartbeat - F: fusion heartbeat - P: paced heartbeat - X: invalid heartbeat",
"symbols": ["A", "V", "N", "F", "P", "X"]
},
{"namespace": "lin.louis.poc.models",
"name": "HeartBeat",
"type": "record",
"fields": [
{"name": "userId", "type": "long", "doc": "User ID of the heart beat"},
{"name": "hri", "type": "int", "doc": "Heart rate instant value"},
{"name": "qrs", "type": "lin.louis.poc.models.HeartBeatQRS", "doc": "Event describing heart's activity"},
{"name": "timestamp", "type": {"type": "long", "logicalType": "timestamp-millis"}, "doc": "Epoch timestamp of heart beat"}
]
}
]
I needed to have the declaration of those two models in a single schema file because the Schema Registry does not support having a schema that is dependent of another schema file.
I struggled a bit to make it work before finding this stackoverflow answer: https://stackoverflow.com/questions/40854529/nesting-avro-schemas#40865366
Heart rate model
The heart model is the following:
userId
: the ID of the user associated to the heart ratevalue
: the computed heart rate from the heartbeatstimestamp
: the time of the registered heart rateisReset
: Heart rate is reset if a gap is detected, a QRS type X is detected, HRI is out of range or the new timestamp is prior to the last detected timestamp
Which gives the following:
{"namespace": "lin.louis.poc.models",
"name": "HeartRate",
"type": "record",
"fields": [
{"name": "userId", "type": "long", "doc": "User ID of the heart beat"},
{"name": "value", "type": "double", "doc": "Heart rate value"},
{"name": "timestamp", "type": {"type": "long", "logicalType": "timestamp-millis"}, "doc": "Epoch timestamp of heart beat"},
{"name": "isReset", "type": "boolean", "default": false, "doc": "Heart rate is reset if a gap is detected, a QRS type X is detected, HRI is out of range or the new timestamp is prior to the last detected timestamp"}
]
}
Avro Java object generation
I used the avro-maven-plugin to
generate the Java POJOs. Tus I created a maven module
heart-models
, so the other projects
can use it the auto-generated Java POJOs.
The pom.xml
looks like this:
<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory>
<outputDirectory>${project.build.directory}/generated-sources</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
So, by running a simple mvn generate-sources
, this will generate the java classes in the folder
target/generated-sources
.
It’s also possible to check if the Avro schema defined in this project is compatible with what’s defined in the Schema Registry by using the kafka-schema-registry-maven-plugin:
<build>
<plugins>
<plugin>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-maven-plugin</artifactId>
<executions>
<execution>
<goals><goal>test-compatibility</goal></goals>
</execution>
</executions>
<configuration>
<schemaRegistryUrls>
<param>http://localhost:8081</param>
</schemaRegistryUrls>
<subjects>
<heart-beats-value>${project.basedir}/src/main/resources/avro/HeartBeats.avsc</heart-beats-value>
<heart-rates-value>${project.basedir}/src/main/resources/avro/HeartRate.avsc</heart-rates-value>
</subjects>
<outputDirectory/>
</configuration>
</plugin>
</plugins>
</build>
It can be useful to boost the SDLC to check any potential issues when updating Avro schemas.
Heart beat producer
The smart-watches must send their data somewhere. Thus, I started with a classical spring-boot
MVC project, auto-generated from the Spring Initializr. The pom.xml
looks like this:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
<build>
<finalName>heart-beat-producer</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</build>
Just a simple dependency to spring-boot-starter-web
and we can create the controller.
REST endpoint to register a heartbeat
Nothing too complicated:
@RestController
@RequestMapping(path = "/heart-beats")
public class HBController {
private final HBRepository hbRepository;
public HBController(HBRepository hbRepository) {
this.hbRepository = hbRepository;
}
/**
* Simple endpoint the smart-watch can attack to register a new heartbeat
*/
@PostMapping
@ResponseStatus(HttpStatus.NO_CONTENT)
public void create(@RequestBody HeartBeatDTO heartBeatDTO) {
var heartBeat = HeartBeat.newBuilder()
.setUserId(heartBeatDTO.getUserId())
.setHri(heartBeatDTO.getHri())
.setQrs(heartBeatDTO.getQrs())
.setTimestamp(Instant.now())
.build();
hbRepository.save(heartBeat);
}
}
You may notice that I use a HeartBeatDTO
, it’s because the HeartBeat
Java class, auto-generated
by the avro-maven-plugin, contains other methods, like getSchema()
, which Jackson do not like to
serialize.
Sending the heartbeat to Kafka
The HBRepository
is a simple interface that will save the heartbeat somewhere:
public interface HBRepository {
void save(HeartBeat heartBeat);
}
The implementation is the following:
/**
* Simple Spring Kafka producer implementation.
*
* @see <a href="https://docs.spring.io/spring-kafka/docs/2.3.7.RELEASE/reference/html/#kafka-template">Spring Kafka
* documentation</a>
*/
public class KafkaHBRepository implements HBRepository {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final String topicName;
private final KafkaTemplate<Long, HeartBeat> kafkaTemplate;
public KafkaHBRepository(String topicName, KafkaTemplate<Long, HeartBeat> kafkaTemplate) {
this.topicName = topicName;
this.kafkaTemplate = kafkaTemplate;
}
@Override
public void save(HeartBeat heartBeat) {
logger.debug(
"Sending to kafka topic '{}' the following heart beat in key {}: {}",
topicName,
heartBeat.getUserId(),
heartBeat
);
// using the userId as the topic key, so I can easily aggregate them afterwards
kafkaTemplate.send(topicName, heartBeat.getUserId(), heartBeat);
}
}
As you notice, I used Spring Kafka to help me easily configure the Kafka producer client
KafkaTemplate
by providing the right properties in my application.yml
file:
spring:
kafka:
bootstrap-servers:
- localhost:9092
properties:
# using a schema registry to fetch the Avro schemas
# see https://docs.confluent.io/current/schema-registry/index.html
schema.registry.url: http://localhost:8081
producer:
# producer properties can be found in org.apache.kafka.clients.producer.ProducerConfig
key-serializer: org.apache.kafka.common.serialization.LongSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
Kafka likes bytes array, so we need to specify the serializers to use when serializing the key and the value of the message.
The Kafka client also needs a URL to the Schema Registry to fetch the Avro schemas.
However, the timestamp is not registered correctly. Indeed, the timestamps in the input are in
Long
in milliseconds, and they are represented as an Instant
, whereas Jackson reads timestamp as
nanoseconds.
I found this blog post that helps me find a simple solution by adding the right properties in Jackson:
spring:
jackson:
# ensure the input/output in epoch milli are correctly read/written by Jackson
deserialization:
READ_DATE_TIMESTAMPS_AS_NANOSECONDS: false
serialization:
WRITE_DATE_TIMESTAMPS_AS_NANOSECONDS: false
Unit tests
Following the example to test Kafka given by
Spring, I added the
following dependencies to my pom.xml
:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
However, there is lots of boilerplate code… And I got lots of Caused by: java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class lin.louis.poc.models.HeartBeat (org.apache.avro.generic.GenericData$Record and lin.louis.poc.models.HeartBeat are in unnamed module of loader 'app')
. It’s due to the fact that my
Avro schema is considered as specific since I used a timestamp… I did not know this beforehand…
For me, it was a simple schema, so I naively used the GenericAvroSerializer
(and I did not know
the existence of the SpecificAvroSerializer
…). What a mistake! I struggled some hours just to
find a innocent comment on Stackoverflow that led me
go into “AHA” moment…
The Kafka producer configuration can be set like this:
private void buildProducer() {
// Using the helper provided by Spring KafkaTestUtils#producerProps to boilerplate the producer properties
var producerProps = KafkaTestUtils.producerProps(embeddedKafka);
// Use the right serializers for the topic key and value
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class);
producerProps.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistry.getUrl());
var template = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Long, HeartBeat>(producerProps));
hbRepository = new KafkaHBRepository(TOPIC, template);
}
As you may have notice, I use a schemaRegistry
instance. But since it’s a unit test, I do not want
to mount a Schema Registry only for my tests.
After some searches on the internet, I found a library, from Bakdata, schema-registry-mock that can help mock the Schema Registry! It provides a nice JUnit5 extension to start up a Schema Registry mock for each test. I need to add a new dependency:
<dependency>
<groupId>com.bakdata.fluent-kafka-streams-tests</groupId>
<artifactId>schema-registry-mock-junit5</artifactId>
<scope>test</scope>
</dependency>
Now, all I have to do is to declare it like this:
@RegisterExtension
final SchemaRegistryMockExtension schemaRegistry = new SchemaRegistryMockExtension();
Finally, I need a consumer to check if my messages are sent correctly to Kafka. Using the example provided by Spring, this is the consumer configuration:
private void buildConsumer() {
var consumerProps = KafkaTestUtils.consumerProps("consumer", "false", embeddedKafka);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class);
consumerProps.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistry.getUrl());
consumerRecords = new LinkedBlockingQueue<>();
container = new KafkaMessageListenerContainer<>(
new DefaultKafkaConsumerFactory<>(consumerProps),
new ContainerProperties(TOPIC)
);
container.setupMessageListener((MessageListener<Long, HeartBeat>) record -> consumerRecords.add(record));
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
}
Now I can perform the tests:
@Test
void send_shouldSendKafkaMessage() throws InterruptedException {
// GIVEN
var now = Instant.now();
var heartBeat = new HeartBeat(123L, 80, HeartBeatQRS.A, now);
// WHEN
hbRepository.save(heartBeat);
var received = consumerRecords.poll(10, TimeUnit.SECONDS);
// THEN
assertNotNull(received);
var receivedValue = received.value();
assertNotNull(receivedValue);
assertAll(() -> {
assertEquals(heartBeat.getUserId(), receivedValue.getUserId());
assertEquals(heartBeat.getHri(), receivedValue.getHri());
assertEquals(heartBeat.getQrs(), receivedValue.getQrs());
assertEquals(heartBeat.getTimestamp(), receivedValue.getTimestamp());
});
}
You can check the whole test file here.
Running heart-beat-producer
I want my whole environment to be run by docker-compose, so I need to build a Docker image for this project. Here is a simple Dockerfile:
FROM openjdk:11.0.6-jre-slim
EXPOSE 8180
WORKDIR /opt
COPY target/heart-beat-producer.jar /opt
ENTRYPOINT ["java", "-jar", "/opt/heart-beat-producer.jar"]
I use dockerfile-maven-plugin from Spotify to add the
docker image build during the maven build, so that mvn package
build the JAR and the docker image,
which reduce the number of command lines to execute. So in my pom.xml
, I add this dependency:
<build>
<finalName>heart-beat-producer</finalName>
<plugins>
<plugin>
<groupId>com.spotify</groupId>
<artifactId>dockerfile-maven-plugin</artifactId>
<executions>
<execution>
<id>default</id>
<goals><goal>build</goal></goals>
</execution>
</executions>
<configuration>
<repository>linlouis/heart-beat-producer</repository>
</configuration>
</plugin>
</plugins>
</build>
Then I declare the service in the docker-compose.yml
file like this:
version: '3'
services:
heart-beat-producer:
image: linlouis/heart-beat-producer
depends_on:
- schema-registry
ports:
- "8180-8199:8180"
command: [
"--spring.kafka.bootstrap-servers=kafka1:9092,kafka2:9092,kafka3:9092",
"--spring.kafka.properties.schema.registry.url=http://schema-registry:8081",
"--topic.partitions=3",
"--topic.replicas=3"
]
As I want to simulate a “real world case”, I need to be able to scale the services, that’s why I set
a range of ports to use "8180-8199:8180"
. I also need a load balancer to redirect the requests to
a heart-beat-producer app. I use Nginx to do that. The configuration in
nginx.conf
quite straightforward:
http {
upstream heart-beat-producer {
server heart-beat-producer:8180;
}
server {
listen 80;
location /heart-beat-producer {
proxy_pass http://heart-beat-producer;
}
}
}
Then the declaration in the docker-compose.yml
file:
version: '3'
services:
web:
image: nginx:1.17.9-alpine
depends_on:
- heart-beat-producer
ports:
- 80:80
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
Now, using docker-compose:
docker-compose up web --scale heart-beat-producer=3
This will fire up 3 zookeepers, 3 Kafkas, the schema registry, 3 heart-beat-producer apps and a load balancer.
Using HTTPie, register a heartbeat:
$ http :80/heart-beat-producer/heart-beats userId=1 hri=70 qrs=A
HTTP/1.1 204
Connection: keep-alive
Date: Mon, 13 Apr 2020 10:19:45 GMT
Keep-Alive: timeout=60
And observe the records in Kafka:
$ docker exec -it "${PWD##*/}_schema-registry_1" \
/usr/bin/kafka-avro-console-consumer \
--bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 \
--topic heart-beats \
--from-beginning
# Lots of logs...
{"userId":1,"hri":70,"qrs":"A","timestamp":1586773185132}
Excellent, we now have an endpoint that registers heartbeats in a Kafka topic.
Heart smart-watch simulator
The idea of this project is to learn and play with Kafka. So I did not implement a smart-watch. However, I need to have something in order to inject some data to the system.
Let’s create a heart-smartwatch-simulator
that will send random data to heart-beat-producer
at a
random interval.
I use Golang as it’s easy to spawn lots of goroutines that will send HTTP requests.
for i := 0; i < nbUsers; i++ {
go func(userID int) {
for {
// send HTTP request
// sleep for a random amount of time
}
}(i+1)
}
We can go further by adding the possibility to simulate invalid heartbeats.
You can check out the project if you are interested on the implementation.
Heart beat validator
We can now plug-in another service that will read that stream of heartbeats and apply a predicate to figure out if the heartbeat is valid or not. We can even plug-in a Machine learning model to identify the false positive heartbeats and handle them accordingly.
For this service, we will just use Kafka Streams. It is made for real-time applications and micro-services that get data from Kafka, manipulate and send them back to Kafka. The heart-beat-validator is a simple Java application that uses Kafka Streams to filter out the valid heartbeats and register the invalid ones into another Kafka topic, in case we want to study them in the future to perform other types of action.
For this project, I will need new libraries, especially ones that can manipulate Kafka Streams:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
</dependency>
Kafka streams topology
To use Kafka streams, we need to define a Kafka Streams topology, which is basically a sequence of actions.
First, I need to create the topics heart-beats-valid
and heart-beats-invalid
. So I use Spring
Kafka again to create them when the application starts up:
@Bean
NewTopic topicHeartBeatsValid(TopicsProperties topicsProperties) {
TopicsProperties.Topic t = topicsProperties.getTo().getValid();
return TopicBuilder.name(t.getName())
.partitions(t.getPartitions())
.replicas(t.getReplicas())
.build();
}
@Bean
NewTopic topicHeartBeatsInvalid(TopicsProperties topicsProperties) {
TopicsProperties.Topic t = topicsProperties.getTo().getInvalid();
return TopicBuilder.name(t.getName())
.partitions(t.getPartitions())
.replicas(t.getReplicas())
.build();
}
My configuration looks like this (using Spring properties file):
spring:
kafka:
bootstrap-servers:
- localhost:9092
properties:
# using a schema registry to fetch the Avro schemas
# see https://docs.confluent.io/current/schema-registry/index.html
schema.registry.url: http://localhost:8081
streams:
application-id: heart-beat-validator
replication-factor: 1
# streams properties can be found in org.apache.kafka.clients.streams.StreamsConfig
properties:
default.key.serde: org.apache.kafka.common.serialization.Serdes$LongSerde
# since my HeartBeat Avro schema is specific (I'm using a specific timestamp), I need to use SpecificAvroSerde, not GenericAvroSerde
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
Not sure if there is documentation to show a list of exhaustive serdes (I did not find it), I found those serdes directly from the source code.
I use the
KafkaStreamBrancher
from Spring to build in a convenient way conditional branches on top of KStream
:
public KStream<Long, HeartBeat> buildKStream() {
var from = streamsBuilder.<Long, HeartBeat>stream(topicFrom)
// we can peek on the stream, to log or do other stuffs
.peek((key, heartBeat) -> logger.debug("reading heart beat with key {}: {}", key, heartBeat));
// just showing we can print in System.out for debug purpose
from.print(Printed.toSysOut());
return new KafkaStreamBrancher<Long, HeartBeat>()
.branch(predicate, kStream -> kStream.to(topicValidTo))
.defaultBranch(kStream -> kStream.to(topicInvalidTo))
.onTopOf(from);
}
This code is quite straightforward: if the predicate
returns true
, then the record will be sent
to the heart-beats-valid
topic, other to the heart-beats-invalid
.
Unit tests
I use Bakdata fluent-kafka-streams-tests to write the Kafka Streams tests:
<dependency>
<groupId>com.bakdata.fluent-kafka-streams-tests</groupId>
<artifactId>schema-registry-mock-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.bakdata.fluent-kafka-streams-tests</groupId>
<artifactId>fluent-kafka-streams-tests-junit5</artifactId>
<scope>test</scope>
</dependency>
It offers a JUnit5 extension TestTopologyExtension
. However, it needs the Kafka properties, which
I do not have before the execution of the tests since I’m using
spring-kafka-test
that provides an @EmbeddedKafka
and EmbeddedKafkaBroker
that start up a Kafka for the tests.
Thus, I have to initialize the TestTopologyExtension
on my own, which gives something like this:
@EmbeddedKafka(
partitions = 1,
topics = { "heart-beats", "heart-beats-valid", "heart-beats-invalid" }
)
@ExtendWith(SpringExtension.class)
class HBValidatorStreamBuilderTest {
private static final String TOPIC_FROM = "heart-beats";
private static final String TOPIC_TO_VALID = "heart-beats-valid";
private static final String TOPIC_TO_INVALID = "heart-beats-invalid";
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
private TestTopologyExtension<Long, HeartBeat> testTopology;
@BeforeEach
void setUp() {
// not registering the TestTopology as a JUnit extension because Kafka is instanciated by Spring Test in runtime
testTopology = new TestTopologyExtension<>(topology, buildKafkaProperties(embeddedKafka));
testTopology.start();
}
@AfterEach
void tearDown() {
if (testTopology != null) {
testTopology.stop();
}
}
private Properties buildKafkaProperties(EmbeddedKafkaBroker embeddedKafka) {
var properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "heart-beat-validator");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.LongSerde.class.getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class.getName());
// we need to set this property, even if the URL does not exist, but it still needs to be syntactically valid
properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://dummy");
return properties;
}
}
I do not need to use the SchemaRegistryMockExtension
in this test as the TestTopologyExtension
does it automatically.
Do not forget to set the property SCHEMA_REGISTRY_URL_CONFIG
! Even if we are using a mocked schema
registry, it stills needs to have a valid URL.
The test looks like this:
@Test
void shouldSendToCorrespondingTopic() {
var validHeartBeats = new HeartBeat[] {
new HeartBeat(101L, 50, HeartBeatQRS.A, Instant.now()),
new HeartBeat(102L, 80, HeartBeatQRS.V, Instant.now()),
new HeartBeat(103L, 90, HeartBeatQRS.F, Instant.now()),
new HeartBeat(104L, 150, HeartBeatQRS.A, Instant.now()),
new HeartBeat(105L, 5, HeartBeatQRS.P, Instant.now())
};
var invalidHeartBeats = new HeartBeat[] {
new HeartBeat(-201L, 190, HeartBeatQRS.X, Instant.now()),
new HeartBeat(-202L, 390, HeartBeatQRS.A, Instant.now()),
new HeartBeat(-203L, -19, HeartBeatQRS.F, Instant.now())
};
testTopology.input(TOPIC_FROM)
.add(validHeartBeats[0].getUserId(), validHeartBeats[0])
.add(validHeartBeats[1].getUserId(), validHeartBeats[1])
.add(validHeartBeats[2].getUserId(), validHeartBeats[2])
.add(invalidHeartBeats[0].getUserId(), invalidHeartBeats[0])
.add(invalidHeartBeats[1].getUserId(), invalidHeartBeats[1])
.add(validHeartBeats[3].getUserId(), validHeartBeats[3])
.add(invalidHeartBeats[2].getUserId(), null)
.add(validHeartBeats[4].getUserId(), validHeartBeats[4]);
var testOutputValid = testTopology.streamOutput(TOPIC_TO_VALID);
Arrays.stream(validHeartBeats)
.forEach(validHeartBeat -> testOutputValid.expectNextRecord()
.hasKey(validHeartBeat.getUserId())
.hasValue(validHeartBeat));
testOutputValid.expectNoMoreRecord();
testTopology.streamOutput(TOPIC_TO_INVALID)
.expectNextRecord().hasKey(invalidHeartBeats[0].getUserId()).hasValue(invalidHeartBeats[0])
.expectNextRecord().hasKey(invalidHeartBeats[1].getUserId()).hasValue(invalidHeartBeats[1])
// can't check null
.expectNextRecord().hasKey(invalidHeartBeats[2].getUserId())
.expectNoMoreRecord();
}
Running heart-beat-validator
Using docker-compose:
docker-compose up heart-beat-validator
This will fire up 3 zookeepers, 3 Kafkas, the schema registry, the heart-beat-producer and the heart-beat-validator.
Using HTTPie, register a heartbeat:
$ http :8180/heart-beat-producer/heart-beats userId=1 hri=70 qrs=A
HTTP/1.1 204
Connection: keep-alive
Date: Mon, 13 Apr 2020 10:19:45 GMT
Keep-Alive: timeout=60
$ http :8180/heart-beat-producer/heart-beats userId=1 hri=-70 qrs=A
HTTP/1.1 204
Connection: keep-alive
Date: Mon, 13 Apr 2020 10:19:45 GMT
Keep-Alive: timeout=60
$ http :8180/heart-beat-producer/heart-beats userId=1 hri=-100 qrs=F
HTTP/1.1 204
Connection: keep-alive
Date: Mon, 13 Apr 2020 10:19:45 GMT
Keep-Alive: timeout=60
And observe the records in Kafka:
$ docker exec -it "${PWD##*/}_schema-registry_1" \
/usr/bin/kafka-avro-console-consumer \
--bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 \
--topic heart-beats-valid \
--from-beginning
# Lots of logs...
{"userId":1,"hri":70,"qrs":"A","timestamp":1586773185132}
$ docker exec -it "${PWD##*/}_schema-registry_1" \
/usr/bin/kafka-avro-console-consumer \
--bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 \
--topic heart-beats-invalid \
--from-beginning
# Lots of logs...
{"userId":1,"hri":-70,"qrs":"A","timestamp":1586773186132}
{"userId":1,"hri":-100,"qrs":"F","timestamp":1586773286132}
Heart rate computor
Everything we have done previously is just moving data across the network. It’s nice but it’s not the heart of the whole project. Now we are tackling the service that reads heartbeats and compute the heart rates.
This application is also a Kafka Streams application, however, it’s different from the previous services as it needs some sort of “state” to aggregate the heartbeats.
A heart rate is computed as following:
- its value is the median of the last 8 heartbeat HRI
- if it’s less than 8 heartbeats, then its value is NaN
I use the same dependencies as the ones from heart-beat-validator
.
Kafka Streams topology
The Kafka Streams configuration is the same as the previous Kafka Streams application
heart-beat-validator
.
First, let’s read the Kafka topic heart-beats-valid
:
KStream<Long, HeartBeat> kStream = streamsBuilder.<Long, HeartBeat>stream(topicFrom)
.peek((userId, heartBeat) -> logger.debug("reading heart beat of user {}: {}", userId, heartBeat));
We will use a KTable to aggregate the heartbeats by user IDs:
KTable<Long, HeartBeats> kTable = kStream
.groupByKey()
.aggregate(HeartBeats::new, HBAggregator.INSTANCE, Materialized.as("aggregated-heart-beats"));
You may have notice that I used a HeartBeats
class, and not a Collection<HeartBeat>
. I will
explain later why.
Now we have the KTable, we can perform the heart rate computation:
KStream<Long, HeartRate> outKStream = kTable
.toStream()
.flatMapValues(new HRValueMapper(hrFactory, nbHeartBeats))
.peek((userId, heartRate) -> logger.debug("heart rate computed for user {}: {}", userId, heartRate));
// then we write the heart rate into a kafka topic "heart-rates"
outKStream.to(topicTo);
HeartBeats avro schema
It seems that when aggregating, Kafka perform some serialization. Since HeartBeat
is a Avro object,
it will use the Avro serializer, and using a Collection
will not work and we might get a Caused by: java.lang.ClassCastException
.
Just having a simple POJO is not enough, otherwise, we will still get the same error:
Caused by: java.lang.ClassCastException: class lin.louis.poc.hrc.model.HeartBeats cannot be cast to class org.apache.avro.specific.SpecificRecord (lin.louis.poc.hrc.model.HeartBeats and org.apache.avro.specific.SpecificRecord are in unnamed module of loader 'app')
So I had to add a new Avro schema. However, the Avro specification is not clear on how to declare an array… This does not work:
{"name": "foo",
"type": "array",
"items": "string"
}
Neither does this work:
{"name": "foo",
"type": "record",
"fields": [
{"name": "list", "type": "array", "items": "string"}
]
}
It’s not until I sawy this blog
post
that I have to wrap a type
inside of the type
: {"type": {"type": "array", "items": "string"}}
.
If you start learning Avro, Kafka and stuff… There are so many gotchas and implicit knowledge,
it’s quite frustrating for beginners…
So my Avro schema for the array of heartbeats comes down to this:
{"namespace": "lin.louis.poc.models",
"name": "HeartBeats",
"type": "record",
"fields": [
{"name": "userId", "type": "long", "doc": "User ID of the heart beats"},
{"name": "heartBeats", "type": {"type": "array", "items": "lin.louis.poc.models.HeartBeat"}, "doc": "The representation of multiple heart beats of a user"}
]
}
Heart rate value mapper
I map the HeartBeats
into an Iterable<HeartRate>
using the HRValueMapper
:
public class HRValueMapper implements ValueMapperWithKey<Long, HeartBeats, Iterable<HeartRate>> {
@Override
public Iterable<HeartRate> apply(Long userId, HeartBeats value) {
var heartRates = new ArrayList<HeartRate>();
// compute and transform the HeartBeats into Iterable<HeartRate>
return heartRates;
}
}
If you are interested in the implementation, check out the source code.
:warning: this will only work with kafka in cluster as it needs the property
processing.guarantee=exactly_once
to ensure we have the expected behavior, i.e. fetching the
values from the KTable and computing directly in “real time”. When the property
“processing.guarantee” is set to “at_least_once”, the mapping will not be performed directly, and
there is a small time buffer before it performs the mapping, hence having weird behavior, like
having too many heartbeats for a single heart rate, or having heart beats with offset timestamps…
Unit tests
No secret now, it’s the same as the unit tests from heart-beat-validator.
The test will look like this:
@Test
void shouldSendToCorrespondingTopic() {
// GIVEN
var heartBeats = Arrays.asList(
new HeartBeat(USER_ID, 80, HeartBeatQRS.N, newInstant(1)),
new HeartBeat(USER_ID, 100, HeartBeatQRS.V, newInstant(2)),
new HeartBeat(USER_ID, 83, HeartBeatQRS.N, newInstant(3)),
new HeartBeat(USER_ID, 80, HeartBeatQRS.P, newInstant(4)),
new HeartBeat(USER_ID, 91, HeartBeatQRS.A, newInstant(5)),
new HeartBeat(USER_ID, 88, HeartBeatQRS.N, newInstant(7)),
new HeartBeat(USER_ID, 70, HeartBeatQRS.N, newInstant(8)),
new HeartBeat(USER_ID, 10, HeartBeatQRS.F, newInstant(10)), // 8
new HeartBeat(USER_ID, 90, HeartBeatQRS.F, newInstant(11)),
new HeartBeat(USER_ID, 201, HeartBeatQRS.A, newInstant(12)),
new HeartBeat(USER_ID, 88, HeartBeatQRS.A, newInstant(15)),
new HeartBeat(USER_ID, 222, HeartBeatQRS.V, newInstant(17)),
new HeartBeat(USER_ID, 89, HeartBeatQRS.P, newInstant(18)),
new HeartBeat(USER_ID, 100, HeartBeatQRS.F, newInstant(19)),
new HeartBeat(USER_ID, 101, HeartBeatQRS.F, newInstant(20))
);
// WHEN
var heartBeatSerde = new SpecificAvroSerde<HeartBeat>(testTopology.getSchemaRegistry());
heartBeatSerde.configure(testTopology.getStreamsConfig().originals(), false);
TestInput<Long, HeartBeat> testInput = testTopology.input(TOPIC_FROM)
.withKeySerde(new Serdes.LongSerde())
.withValueSerde(heartBeatSerde);
heartBeats.forEach(heartBeat -> testInput.add(heartBeat.getUserId(), heartBeat));
// THEN
TestOutput<Long, HeartRate> testOutput = testTopology.streamOutput(TOPIC_TO);
var heartRates = Arrays.asList(
new HeartRate(USER_ID, Double.NaN, heartBeats.get(0).getTimestamp(), false),
new HeartRate(USER_ID, Double.NaN, heartBeats.get(1).getTimestamp(), false),
new HeartRate(USER_ID, Double.NaN, heartBeats.get(2).getTimestamp(), false),
new HeartRate(USER_ID, Double.NaN, heartBeats.get(3).getTimestamp(), false),
new HeartRate(USER_ID, Double.NaN, heartBeats.get(4).getTimestamp(), false),
new HeartRate(USER_ID, Double.NaN, heartBeats.get(5).getTimestamp(), false),
new HeartRate(USER_ID, Double.NaN, heartBeats.get(6).getTimestamp(), false),
new HeartRate(USER_ID, 81.5, heartBeats.get(7).getTimestamp(), false),
new HeartRate(USER_ID, 85.5, heartBeats.get(8).getTimestamp(), false),
new HeartRate(USER_ID, 85.5, heartBeats.get(9).getTimestamp(), false),
new HeartRate(USER_ID, 88d, heartBeats.get(10).getTimestamp(), false),
new HeartRate(USER_ID, 89d, heartBeats.get(11).getTimestamp(), false),
new HeartRate(USER_ID, 88.5, heartBeats.get(12).getTimestamp(), false),
new HeartRate(USER_ID, 89.5, heartBeats.get(13).getTimestamp(), false),
new HeartRate(USER_ID, 95d, heartBeats.get(14).getTimestamp(), false)
);
heartRates.forEach(heartRate -> testOutput.expectNextRecord()
.hasKey(heartRate.getUserId())
.hasValue(heartRate));
testOutput.expectNoMoreRecord();
}
Running heart-rate-computor
Using docker-compose:
docker-compose up heart-rate-computor
This will fire up 3 zookeepers, 3 Kafkas, the schema registry, the heart-beat-producer, the heart-beat-validator and the heart-rate-computor
Using HTTPie, register heartbeats every second:
$ while true; do http :8180/heart-beat-producer/heart-beats userId=1 hri=70 qrs=A; sleep 1; done
And observe the records in Kafka:
$ docker exec -it "${PWD##*/}_schema-registry_1" \
/usr/bin/kafka-avro-console-consumer \
--bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 \
--topic heart-rates \
--from-beginning
# Lots of logs...
{"userId":1,"value":70,"timestamp":1586788974000, "isReset": false}
{"userId":1,"value":71,"timestamp":1586788975000, "isReset": false}
{"userId":1,"value":71.5,"timestamp":1586788976000, "isReset": false}
# ...
Heart rate connector
Most modern backend services use a database to store and read data. So I choose PostgreSQL because that’s the kind a database I’m most familiar with. However, it’s totally possible to use another type of database, like a NoSQL one (choose this type if you are dealing with really large amount of data, like ScyllaDB), or a search index such as Elasticsearch.
There are lots of connectors supported.
Configuring Kafka connect
There is lots of documentation on Confluent website, however, it’s more like a reference documentation, not a tutorial. I honestly struggled to understand what properties I needed to use.
My final configuration looks like this on my docker-compose.yml
file:
version: '3'
services:
heart-rate-connector:
image: confluentinc/cp-kafka-connect:${CONFLUENT_TAG}
depends_on:
- kafka1
- kafka2
- kafka3
- db
- schema-registry
ports:
- 8082:8082
# https://docs.confluent.io/current/installation/docker/config-reference.html#kafka-connect-configuration
environment:
CONNECT_REST_PORT: 8082
CONNECT_REST_ADVERTISED_HOST_NAME: heart-rate-connector
CONNECT_TOPICS: heart-rates
# kafka configuration
CONNECT_BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092,kafka3:9092
CONNECT_GROUP_ID: heart-rate-connector
CONNECT_CONFIG_STORAGE_TOPIC: _heart-rate-connector-config
CONNECT_OFFSET_STORAGE_TOPIC: _heart-rate-connector-offset
CONNECT_STATUS_STORAGE_TOPIC: _heart-rate-connector-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# log
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_PLUGIN_PATH: '/usr/share/java'
# level of parallelism: increase that number up to the number of partitions it's reading
CONNECT_TASKS_MAX: 3
volumes:
- $PWD/scripts:/scripts
I set every properties described in the documentation:
- https://docs.confluent.io/current/connect/managing/configuring.html
- https://docs.confluent.io/current/installation/docker/config-reference.html#kafka-connect-configuration
- https://docs.confluent.io/current/connect/kafka-connect-jdbc/sink-connector/sink_config_options.html
But it does not create the table, nor does it insert data…
When I call http :8082/connectors
, it always returns an empty array []
…
It’s until I saw this example that I knew I need to add the connector manually. The docker image does not initialize the connector itself (like for example the PostgreSQL docker image where it’s possible to initialize the database with some dump).
Thus, I created a small script to initialize the connector:
curl -X "POST" "http://localhost:8082/connectors/" \
-H "Content-Type: application/json" \
-d '{
"name": "heart-rate-connector-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://db:5432/heart_monitor?applicationName=heart-rate-connector",
"connection.user": "postgres",
"connection.password": "postgres",
"auto.create":"true",
"auto.evolve":"true",
"pk.mode": "kafka",
"topics": "heart-rates",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"transforms": "ExtractTimestamp,RenameField",
"transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.ExtractTimestamp.timestamp.field" : "extract_ts",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames" : "userId:user_id,isReset:is_reset"
}
}'
Some properties to highlight:
key.converter
: the default converter provided by the connected workers being Avro, it would throw an error as the key type is aLong
, so I use theStringConverter
here instead (there are noLongConverter
, onlyStringConverter
,JsonConverter
and the Avro one).transforms
: by default, Kafka Connect will take the topic and its content as is, and create the table and columns with the same name as the one defined in the Kafka, i.e.userId
,isReset
. However, these field names are not “standard” for PostgreSQL (even if it works, it’s not really nice to query as we always need to escape the column names when writing a SQL query). It’s possible to rename using the “transforms” property by using RenameField.transforms
: I also added a new column that contains the timestamp for which the record has been added to Kafka. This was just an example that shows that Kafka Connect can add new fields.
Running Kafka Connect
Using docker-compose:
docker-compose up heart-rate-computor heart-rate-connector -d
Wait until Kafka connect is fully started, then execute the previous curl command to initialize the table.
Using HTTPie, register heartbeats every second:
$ while true; do http :80/heart-beat-producer/heart-beats userId=1 hri=70 qrs=A; sleep 1; done
Then you can check the content of the database:
$ docker exec -it "${PWD##*/}_db_1" \
psql -U postgres heart_monitor -c "SELECT * FROM \"heart-rates\""
Note: I did not find the configuration / a way to rename the table name…
Heart rate consumer
Now, I need to display the heart rates in a nice graph in two ways:
- in real-time by consuming directly the records in the Kafka topic
heart-rates
- a time based snapshot of the heart rates graph by reading in the database
- To avoid the user typing a timestamp, I will just have a functionality to show the N last seconds heart rates, where N can be configured by the user (with 60 seconds as the default value).
Kafka consumer configuration
Using Spring Kafka, I configure the properties directly in my application.yml
file:
spring:
kafka:
bootstrap-servers:
- localhost:9092
properties:
schema.registry.url: http://localhost:8081
# found here: io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.configure
# needed if we have Specific Avro object, not Generic Avro object (i.e. containing only simple fields)
specific.avro.reader: true
consumer:
# consumer properties can be found in org.apache.kafka.clients.consumer.ConsumerConfig
key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
# we do not want to retrace all the heart rate history
auto-offset-reset: latest
group-id: heart-rate-consumer
Some points to note:
- I need to set the flag
spring.kafka.properties.specific.avro.reader
because, again, I’m using a Specficic Avro object (and this is not written in the documentation, had to search for some time before I found it somewhere in stackoverflow…) - since it’s real time display, no need to retrace the history, so we will just consume the latest
records by configuring the property
spring.kafka.consumer.auto-offset-reset
tolatest
.
Now I want the heart-rate-consumer
to continuously stream the data. However, Kafka does not do it
natively (or maybe we need to tweak the code to make it work), but it’s not necessarily as we can
use Reactor Kafka. So let’s add the
dependency to the pom.xml
:
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
Now I can consume a stream of heart rates continuously with the following KafkaReceiver usage:
@Override
public Flux<HeartRate> read(long userId) {
var consumerProperties = kafkaProperties.buildConsumerProperties();
consumerProperties.put(
ConsumerConfig.GROUP_ID_CONFIG,
// Set a different Kafka group so the consumers are independent and can all read
consumerProperties.get(ConsumerConfig.GROUP_ID_CONFIG) + "-" + UUID.randomUUID().toString()
);
var receiverOptions = ReceiverOptions
.<Long, HeartRate>create(consumerProperties)
.subscription(Collections.singletonList("heart-rates"))
// just for logging purpose
.addAssignListener(partitions -> logger.debug("onPartitionsAssigned {}", partitions))
.addRevokeListener(partitions -> logger.debug("onPartitionsRevoked {}", partitions));
return KafkaReceiver.create(receiverOptions)
.receive()
.filter(r -> userId == r.key())
.map(ConsumerRecord::value);
}
Two points:
- no need to implement the use case when the client cancels, it’s already handled by
reactor.kafka.receiver.internals.DefaultKafkaReceiver#dispose()
- when I put another kafka consumer, only the first consumer reads the data because they are in
the same group (“Kafka guarantees that a message is only ever read by a single consumer in the
group.”). However, each time a user will display the real time graph, it should also read in the
Kafka topic. That’s why I “hacked” by generating a random group ID using
UUID.randomUUID()
.
Unit test Reactor Kafka
To test the Reactor Kafka consumer, I need to use the
reactor-tests
, so in
my pom.xml
:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
I need to initialize the consumer as well as a producer to generate the Kafka records in the topic:
@EmbeddedKafka(
partitions = 1,
topics = "heart-rates"
)
@ExtendWith(SpringExtension.class)
class KafkaHRFluxRepositoryTest {
private final Logger logger = LoggerFactory.getLogger(getClass());
public static final String TOPIC = "heart-rates";
@RegisterExtension
SchemaRegistryMockExtension schemaRegistry = new SchemaRegistryMockExtension();
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
private KafkaTemplate<Long, HeartRate> kafkaTemplate;
@BeforeEach
void setUp() {
// CONSUMER
var kafkaProperties = new KafkaProperties();
kafkaProperties.setBootstrapServers(Arrays.stream(embeddedKafka.getBrokerAddresses())
.map(BrokerAddress::toString)
.collect(Collectors.toList()));
kafkaProperties.getProperties().put(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistry.getUrl());
kafkaProperties.getProperties().put(SPECIFIC_AVRO_READER_CONFIG, "true");
kafkaProperties.getConsumer().setGroupId("heart-rate-consumer");
kafkaProperties.getConsumer().setKeyDeserializer(LongDeserializer.class);
kafkaProperties.getConsumer().setValueDeserializer(SpecificAvroDeserializer.class);
// need to set to earliest, because we send the kafka message first, before reading
kafkaProperties.getConsumer().setAutoOffsetReset("earliest");
hrFluxRepository = new KafkaHRFluxRepository(kafkaProperties);
// PRODUCER
var producerProps = KafkaTestUtils.producerProps(embeddedKafka);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class);
producerProps.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistry.getUrl());
kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerProps));
}
}
I have to set the property auto-offset-reset
to earliest
because I’m dealing with a single
thread test, so I will first send the record to Kafka, then use my Kafka consumer to read the
record.
@Test
void read_shouldReadKafkaMessages() {
// GIVEN
var heartRates = Arrays.asList(
new HeartRate(USER_ID, 90d, Instant.now(), false),
new HeartRate(USER_ID, 91d, Instant.now(), false),
new HeartRate(USER_ID, Double.NaN, Instant.now(), false)
);
heartRates.forEach(heartRate -> kafkaTemplate.send(TOPIC, USER_ID, heartRate));
// WHEN
var step = StepVerifier.create(hrFluxRepository.read(USER_ID));
// THEN
heartRates.forEach(heartRate -> {
step.assertNext(heartRateRead -> {
assertNotNull(heartRateRead);
logger.info("Read heart rate: {}", heartRateRead);
assertEquals(USER_ID, heartRateRead.getUserId());
assertEquals(heartRate.getValue(), heartRateRead.getValue());
assertEquals(heartRate.getTimestamp(), heartRateRead.getTimestamp());
assertEquals(heartRate.getIsReset(), heartRateRead.getIsReset());
});
});
step.verifyTimeout(Duration.ofSeconds(1));
}
I use the
StepVerifier
to simulate the consumption and to test the record.
Something to remind when dealing with reactive programming:
Nothing happen until you subscribe
That means until step.verify*
is called, nothing will happen.
Endpoint to read in real time
There are lots of possibility to have a frontend client to get a stream of data continuously:
- polling
- server-sent events
- websockets
I read this excellent blog post which led me into developing the server-sent event approach.
To implement it, I use
spring-webflux.
So in my pom.xml
:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
My controller looks like this:
@RestController
public class HRController {
private final HRFetcher hrFetcher;
public HRController(HRFetcher hrFetcher) {
this.hrFetcher = hrFetcher;
}
@GetMapping(path = "/users/{userId}/heart-rates/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Publisher<HeartRateDTO> heartRateFlux(@PathVariable long userId) {
return hrFetcher.fetch(userId);
}
}
You may notice I returned a HeartRateDTO
, not a HeartRate
because the auto-generated HeartRate
contains some getter like getSchema()
that Jackson does not really like to serialize/deserialize.
Unit tests HRController
To test the controller, I simply use WebTestClient
:
@ExtendWith(SpringExtension.class)
@WebFluxTest(controllers = HRController.class)
class HRControllerTest {
private static final long USER_ID = 123L;
@MockBean
private HRFetcher hrFetcher;
@Autowired
private WebTestClient webTestClient;
@Test
void heartRateFlux() {
// GIVEN
var heartRates = new HeartRateDTO[] {
new HeartRateDTO(USER_ID, 90d, Instant.now(), false),
new HeartRateDTO(USER_ID, 91d, Instant.now(), false),
new HeartRateDTO(USER_ID, Double.NaN, Instant.now(), false)
};
Mockito.when(hrFetcher.fetch(USER_ID)).thenReturn(Flux.fromArray(heartRates));
// WHEN
var heartRateList = webTestClient.get().uri("/users/{userId}/heart-rates/stream", USER_ID)
.accept(MediaType.TEXT_EVENT_STREAM)
.exchange()
.expectStatus().isOk()
.returnResult(HeartRateDTO.class)
.getResponseBody()
.take(heartRates.length)
.collectList()
.block();
// THEN
assertNotNull(heartRateList);
assertEquals(heartRates.length, heartRateList.size());
for (int i = 0; i < heartRates.length; i++) {
assertEquals(heartRates[i].getUserId(), heartRateList.get(i).getUserId());
assertEquals(heartRates[i].getValue(), heartRateList.get(i).getValue());
assertEquals(heartRates[i].getTimestamp(), heartRateList.get(i).getTimestamp());
assertEquals(heartRates[i].isReset(), heartRateList.get(i).isReset());
}
}
}
Front to display the heart rates in real time
I use Flot JS library to display a graph. So I configure it like in this example:
let plot = $.plot('.heart-rate-placeholder', [ buildData() ], {
series: {
shadowSize: 0 // Drawing is faster without shadows
},
yaxis: {
min: 0,
max: 250
},
xaxis: {
show: false
}
});
plot.setupGrid();
plot.draw();
I built the data like this:
const buildData = () => {
if (data.length > 0) {
data = data.slice(1);
}
if (heartRate && !isNaN(heartRate.value)) {
data.push(heartRate.value);
}
while (data.length < totalPoints) {
data.push(0);
}
let res = [];
for (let i = 0; i < data.length; ++i) {
res.push([i, data[i]])
}
return res;
}
To use the server-sent events in the front, we have to use the EventSource
from native JS:
let source = new EventSource(buildEventUrl(userId));
source.addEventListener('message', (event) => {
heartRate = JSON.parse(event.data);
console.info('read heart rate', heartRate);
});
Now, we need to update the data every second, so I use the native JS setTimeout
:
const update = () => {
plot.setData([buildData()]);
plot.draw();
setTimeout(update, updateInterval);
};
update();
You can check the whole file here.
When all up and running, it displays a nice graph that updated in real time:
Database Access
I use spring-boot-data-jpa
to read in the
database. My configuration looks like this:
spring:
datasource:
url: jdbc:postgresql://localhost:5432/heart_monitor?applicationName=heart-rate-consumer
username: postgres
password: postgres
I need to create an @Entity
:
@Entity
// the table name must be escaped because it's not a standard name in Postgres world to have a hyphen in the table name
// which is auto-generated by heart-rate-computor (i.e. kafka connect)
@Table(name = "\"heart-rates\"")
public class HeartRateEntity {
@Id
@EmbeddedId
private HeartRateId heartRateId;
private long userId;
private double value;
private Instant timestamp;
private boolean isReset;
// getters & setters
}
Since I configured Kafka Connect to auto-generate the primary key, it has created 3 columns that serve as the primary key:
__connect_topic
__connect_partition
__connect_offset
We do not use the primary key, but it’s necessarily to have a primary key when using Spring data
JPA. So I implemented the HeartRateId
like this:
public class HeartRateId implements Serializable {
@Column(name = "__connect_topic")
private String connectTopic;
@Column(name = "__connect_partition")
private int connectPartition;
@Column(name = "__connect_offset")
private long connectOffset;
// getters & setters
Then, the repository looks like this:
public interface HRRepository extends CrudRepository<HeartRateEntity, Void> {
@Query(value = "SELECT DISTINCT user_id FROM \"heart-rates\" ORDER BY user_id", nativeQuery = true)
List<Long> findUserIds();
List<HeartRateEntity> findByUserIdAndTimestampIsAfterOrderByTimestampDesc(long userId, Instant timestampRef);
}
I need to fetch the list of user IDs so the user can select which heart rate graph (s)he wants to see.
The other method is used to fetch the last N seconds heart rates.
Router
Now, let’s implement the router that will serve the last N seconds heart rates:
@Component
public class HRRouter {
@Bean
RouterFunction<ServerResponse> hrRoute(HRRepository hrRepository) {
return route(
GET("/users/{userId}/heart-rates").and(accept(MediaType.APPLICATION_JSON)),
request -> heartRateHandler(request, hrRepository)
);
}
private Mono<ServerResponse> heartRateHandler(ServerRequest request, HRRepository hrRepository) {
var userId = Long.parseLong(request.pathVariable("userId"));
var seconds = Integer.parseInt(request.queryParam("lastNSeconds").orElse("60"));
var lastNSeconds = Instant.now().minus(seconds, ChronoUnit.SECONDS);
var hearRates = hrRepository.findByUserIdAndTimestampIsAfterOrderByTimestampDesc(userId, lastNSeconds);
var hearRatesDTO = hearRates.stream()
.map(hr -> new HeartRateDTO(
hr.getUserId(),
hr.getValue(),
hr.getTimestamp(),
hr.isReset()
))
.collect(Collectors.toList());
return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue(hearRatesDTO));
}
}
This is quite straightforward.
Front to display the last N seconds
I simply use the native XMLHttpRequest
to fetch the heart rates, then use the JS library Flot to
render the graph:
const oReq = new XMLHttpRequest();
oReq.addEventListener('load', function() {
if (this.status !== 200) {
console.error('Server did not return the user id. Response status was', this.status);
return;
}
const heartRates = JSON.parse(this.responseText);
if (heartRates.length === 0) {
$('.heart-rate-placeholder').text('No heart rate registered during this period');
} else {
const data = heartRatesToFlotData(heartRates);
$.plot('.heart-rate-placeholder', [heartRatesToFlotData(heartRates)], {
xaxis: {
autoScale: 'none',
mode: 'time',
minTickSize: [1, 'second'],
min: data[0][0],
max: data[data.length - 1][0],
timeBase: 'milliseconds'
}
});
}
});
oReq.open('GET', buildUrl(userId, lastNSeconds));
oReq.send();
Mapping the received heart rates into data understandable by Flot is performed by this function:
const heartRatesToFlotData = (heartRates) => {
let data = [];
for (let i = 0; i < heartRates.length; i++) {
let hr = heartRates[i];
data.push([new Date(hr.timestamp).getTime(), hr.value]);
}
return data;
};
Flot accept an array of array, for which the first element of the second array is the X axis, and the second element of the second array is the heart rate value.
So it will display something like this:
Running heart-rate-consumer
Using docker-compose:
docker-compose up -d
Wait until Kafka connect is fully started, then execute the script to initialize the environment:
./scripts/setup.sh
Then use the heart-rate-consumer
webapp:
firefox http://localhost
Final notes
- The
heart-beat-validator
could completely be replaced by a KSQL query. - Using a Time
window
in the
heart-rate-computor
could be another way to perform the aggregation and heart rate computation.
You can run the whole system, scale the application to check out how it behaves when under heavy load:
- change the value of
nb-users
of the heart-smartwatch-simulator to perform more requests / seconds - you can use docker-compose scale flag to mount multiple instances of the services
docker-compose up -d \
--scale heart-beat-producer=3 \
--scale heart-beat-validator=3 \
--scale heart-rate-computor=3
:warning: you will need lots of resources. In my laptop with 4 CPU and 16Go RAM:
Resources
Kafka
- How to transform a batch pipeline into real time one
- Confluent on testing streaming application
- The internal of Kafka Gitbook
- The internal of Kafka Streams Gitbook
- Example of using Kafka Streams API
- Exploring Kafka Streams
- Reactor Kafka
- Introduction to reactor Kafka
- How to choose number of topic partition
- Kafka replication explained
Avro
- Avro and the schema registry
- Avro specifications
- CodeNotFound Spring Kafka - Apache Avro serializer / deserializer example
- Confluent documentation on schema registry
- Stackoverflow question on testing Kafka consumer with Avro schema
- Bakdata fluent-kafka-streams-test to test Kafka streams
Spring
- Spring Kafka
- Using Apache Kafka and Spring platform to build event-driven micro-services
- Spring Kafka Avro without registry for unit tests
- Spring Kafka Avro Streams example
- Spring official documentation on Kafka Streams brancher
- Spring example to use KafkaStreamBrancher
- Spring tutorial for building interactive web app using websocket
- Spring tutorial for accessing data with JPA
- Serialize
Instant
in epoch milliseconds
Reactor
- Spring Webflux and SSE
- Yong Mook Kim Spring Webflux and SSE tutorial
- Okta tutorial on getting started with Reactive programming in Spring
- Stream realtime data the reactive way with Angular+Spring boot+Kafka
- Sample reactive app in Spring
- Samples from official Reactor Kafka project
- Another sample using Reactor Kafka
- WebFluxTest with WebTestClient
- Testing with reactor-test in JUnit
KSQL
Front