During the COVID-19 shutdown, one way to “pass time” is to learn and play with new things.
I made a small project, heart-rate-calculator,
whose goal is to take a file containing heartbeat information in input, compute and write the
heart rates in another file.
That was quite a nice project, but I was wondering if I want to go further, what the project would
look like? Let’s say, I want to have a whole SAAS solution, production-ready, to read a stream of
data from smart-watches that measure heartbeats, then compute their heart rates and display a graph
to the user. Of course, if that’s just that, it will be too simple, so what about having the
challenge of dealing with massive amount of data. Mmh, a bit more tricky.
This is a good opportunity to play with Kafka and Kafka Streams! In this blog post, I will show how
to build a resilient system to compute and generate heart rates from heartbeat data.
Note that the micro-services are Java applications, using Spring Framework
because that’s what I’m most familiar with.
I will use docker-compose to mount the whole environment.
The target architecture
I want to design a scalable, real-time and distributed pipeline to display the heart rate from a
continuous stream of heartbeats provided by smart-watches.
One way is to use Apache Kafka, which is a distributed streaming
platform, to help me achieve that (and that was the whole of this project: learn and play with
Kafka).
The target architecture is the following:
heart-beat-producer: when a smart-watch sends a heartbeat, it should end in Kafka right away
heart-beat-validator: this service deals with a stream of heartbeats to validate them and flag
them as invalid
heart-rate-computor: now we have valid heartbeats, we aggregate them and compute the
associated heart rates
heart-rate-connector: we need to sink the heart rates into a database, like PostgreSQL, so
that other services can pick them up and show the heart rates to the users
heart-rate-consumer: this application will listen to the kafka topic to stream in real-time
the heart rates, as well as displaying a time window of the heart rates of a user
Designing models with Apache Avro
Before diving in developing the micro-services, I need to define the models that will circulate
around the micro-services: HeartBeat and HeartRate.
It’s important to define the model, especially in a micro-services oriented architecture as each
micro-service have their own responsibility, and in a real world company, they can be handled by
different teams. So, in order to have coherent “data” circulating in the whole system, it’s
important to be rigorous and define some sort of “contracts” so that each micro-services are aware
of the data received and sent, and they can manipulate them without worry.
I choose to use Apache Avro because I already manipulated ProtoBuf and JSON schema, and moreover
Confluent, the company for which I used most of their community products for this project, advocates
using Avro as they offer a Schema
Management.
With Avro, you define the model schemas that contain the fields of the data along with their types.
Once the Avro object is formed, it can easily be serialized as an array of bytes, which is the type
Kafka likes. Another point to raise is that any other programming language can use Avro bytes and
deserialize them into an object specific to their programming language, which is nice because that
means every team can “communicate” data through Avro schemas.
Avro also support schema evolutivity: each schema that is registered in the registry has their own
version. It stresses out the backward and forward compatibility by having strict verification in the
schema registry.
What is the schema registry? It’s nice to design the Avro schemas, but they need to be stored and
available somewhere so that the micro-services are aware of the schemas. That’s where the Schema
Registry is all about.
Schema Registry
The Schema Registry is a
serving layer for the Avro metadata. It provides RESTful interfaces for storing and retrieving
Apache Avro schemas, stores a versioned history of all schemas based on a specified subject name
strategy, provides multiple compatibility settings and allows evolution of schemas according to the
configured compatibility settings and expanded Avro support. It provides serializers that plug into
Apache Kafka clients that handle schema storage and retrieval for Kafka messages that are sent in
the Avro format.
I configured the schema registry with the following docker-compose configuration:
Heartbeat Avro schema
The heartbeat model is quite simple:
userId: the ID of the user that uses the smart-watch
HRI: a heart rate instant value, detected by the smart-watch
I needed to have the declaration of those two models in a single schema file because the Schema
Registry does not support having a schema that is dependent of another schema file.
userId: the ID of the user associated to the heart rate
value: the computed heart rate from the heartbeats
timestamp: the time of the registered heart rate
isReset: Heart rate is reset if a gap is detected, a QRS type X is detected, HRI is out of range
or the new timestamp is prior to the last detected timestamp
Which gives the following:
Avro Java object generation
I used the avro-maven-plugin to
generate the Java POJOs. Tus I created a maven module
heart-models, so the other projects
can use it the auto-generated Java POJOs.
The pom.xml looks like this:
So, by running a simple mvn generate-sources, this will generate the java classes in the folder
target/generated-sources.
It’s also possible to check if the Avro schema defined in this project is compatible with what’s
defined in the Schema Registry by using the
kafka-schema-registry-maven-plugin:
It can be useful to boost the SDLC to check any potential issues when updating Avro schemas.
Heart beat producer
The smart-watches must send their data somewhere. Thus, I started with a classical spring-boot
MVC project, auto-generated from the Spring Initializr. The pom.xml
looks like this:
Just a simple dependency to spring-boot-starter-web and we can create the controller.
REST endpoint to register a heartbeat
Nothing too complicated:
You may notice that I use a HeartBeatDTO, it’s because the HeartBeat Java class, auto-generated
by the avro-maven-plugin, contains other methods, like getSchema(), which Jackson do not like to
serialize.
Sending the heartbeat to Kafka
The HBRepository is a simple interface that will save the heartbeat somewhere:
The implementation is the following:
As you notice, I used Spring Kafka to help me easily configure the Kafka producer client
KafkaTemplate by providing the right properties in my application.yml file:
Kafka likes bytes array, so we need to specify the serializers to use when serializing the key and
the value of the message.
The Kafka client also needs a URL to the Schema Registry to fetch the Avro schemas.
However, the timestamp is not registered correctly. Indeed, the timestamps in the input are in
Long in milliseconds, and they are represented as an Instant, whereas Jackson reads timestamp as
nanoseconds.
I found this blog
post
that helps me find a simple solution by adding the right properties in Jackson:
However, there is lots of boilerplate code… And I got lots of Caused by: java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class lin.louis.poc.models.HeartBeat (org.apache.avro.generic.GenericData$Record and lin.louis.poc.models.HeartBeat are in unnamed module of loader 'app'). It’s due to the fact that my
Avro schema is considered as specific since I used a timestamp… I did not know this beforehand…
For me, it was a simple schema, so I naively used the GenericAvroSerializer (and I did not know
the existence of the SpecificAvroSerializer…). What a mistake! I struggled some hours just to
find a innocent comment on Stackoverflow that led me
go into “AHA” moment…
The Kafka producer configuration can be set like this:
As you may have notice, I use a schemaRegistry instance. But since it’s a unit test, I do not want
to mount a Schema Registry only for my tests.
After some searches on the internet, I found a library, from Bakdata,
schema-registry-mock
that can help mock the Schema Registry! It provides a nice JUnit5 extension to start up a Schema
Registry mock for each test. I need to add a new dependency:
Now, all I have to do is to declare it like this:
Finally, I need a consumer to check if my messages are sent correctly to Kafka. Using the example
provided by Spring,
this is the consumer configuration:
I use dockerfile-maven-plugin from Spotify to add the
docker image build during the maven build, so that mvn package build the JAR and the docker image,
which reduce the number of command lines to execute. So in my pom.xml, I add this dependency:
Then I declare the service in the docker-compose.yml file like this:
As I want to simulate a “real world case”, I need to be able to scale the services, that’s why I set
a range of ports to use "8180-8199:8180". I also need a load balancer to redirect the requests to
a heart-beat-producer app. I use Nginx to do that. The configuration in
nginx.conf quite straightforward:
Then the declaration in the docker-compose.yml file:
Excellent, we now have an endpoint that registers heartbeats in a Kafka topic.
Heart smart-watch simulator
The idea of this project is to learn and play with Kafka. So I did not implement a smart-watch.
However, I need to have something in order to inject some data to the system.
Let’s create a heart-smartwatch-simulator that will send random data to heart-beat-producer at a
random interval.
I use Golang as it’s easy to spawn lots of goroutines that will send HTTP requests.
We can go further by adding the possibility to simulate invalid heartbeats.
You can check out the
project if you are
interested on the implementation.
Heart beat validator
We can now plug-in another service that will read that stream of heartbeats and apply a predicate to
figure out if the heartbeat is valid or not. We can even plug-in a Machine learning model to
identify the false positive heartbeats and handle them accordingly.
For this service, we will just use Kafka Streams.
It is made for real-time applications and micro-services that get data from Kafka, manipulate and
send them back to Kafka. The heart-beat-validator is a simple Java application that uses Kafka
Streams to filter out the valid heartbeats and register the invalid ones into another Kafka topic, in
case we want to study them in the future to perform other types of action.
For this project, I will need new libraries, especially ones that can manipulate Kafka Streams:
Kafka streams topology
To use Kafka streams, we need to define a Kafka Streams topology, which is basically a sequence of
actions.
First, I need to create the topics heart-beats-valid and heart-beats-invalid. So I use Spring
Kafka again to create them when the application starts up:
My configuration looks like this (using Spring properties file):
Not sure if there is documentation to show a list of exhaustive serdes (I did not find it), I found
those serdes directly from the source code.
I use the
KafkaStreamBrancher
from Spring to build in a convenient way conditional branches on top of KStream:
This code is quite straightforward: if the predicate returns true, then the record will be sent
to the heart-beats-valid topic, other to the heart-beats-invalid.
It offers a JUnit5 extension TestTopologyExtension. However, it needs the Kafka properties, which
I do not have before the execution of the tests since I’m using
spring-kafka-test
that provides an @EmbeddedKafka and EmbeddedKafkaBroker that start up a Kafka for the tests.
Thus, I have to initialize the TestTopologyExtension on my own, which gives something like this:
I do not need to use the SchemaRegistryMockExtension in this test as the TestTopologyExtension
does it automatically.
Do not forget to set the property SCHEMA_REGISTRY_URL_CONFIG! Even if we are using a mocked schema
registry, it stills needs to have a valid URL.
Everything we have done previously is just moving data across the network. It’s nice but it’s not
the heart of the whole project. Now we are tackling the service that reads heartbeats and compute
the heart rates.
This application is also a Kafka Streams application, however, it’s different from the previous
services as it needs some sort of “state” to aggregate the heartbeats.
A heart rate is computed as following:
its value is the median of the last 8 heartbeat HRI
if it’s less than 8 heartbeats, then its value is NaN
The Kafka Streams configuration is the same as the previous Kafka Streams application
heart-beat-validator.
First, let’s read the Kafka topic heart-beats-valid:
We will use a
KTable
to aggregate the heartbeats by user IDs:
You may have notice that I used a HeartBeats class, and not a Collection<HeartBeat>. I will
explain later why.
Now we have the KTable, we can perform the heart rate computation:
HeartBeats avro schema
It seems that when aggregating, Kafka perform some serialization. Since HeartBeat is a Avro object,
it will use the Avro serializer, and using a Collection will not work and we might get a Caused by: java.lang.ClassCastException.
Just having a simple POJO is not enough, otherwise, we will still get the same error:
Caused by: java.lang.ClassCastException: class lin.louis.poc.hrc.model.HeartBeats cannot be cast to class org.apache.avro.specific.SpecificRecord (lin.louis.poc.hrc.model.HeartBeats and org.apache.avro.specific.SpecificRecord are in unnamed module of loader 'app')
So I had to add a new Avro schema. However, the Avro
specification is not clear on how to declare
an array… This does not work:
Neither does this work:
It’s not until I sawy this blog
post
that I have to wrap a type inside of the type: {"type": {"type": "array", "items": "string"}}.
If you start learning Avro, Kafka and stuff… There are so many gotchas and implicit knowledge,
it’s quite frustrating for beginners…
So my Avro schema for the array of heartbeats comes down to this:
Heart rate value mapper
I map the HeartBeats into an Iterable<HeartRate> using the HRValueMapper:
If you are interested in the implementation, check out the source
code.
:warning: this will only work with kafka in cluster as it needs the property
processing.guarantee=exactly_once to ensure we have the expected behavior, i.e. fetching the
values from the KTable and computing directly in “real time”. When the property
“processing.guarantee” is set to “at_least_once”, the mapping will not be performed directly, and
there is a small time buffer before it performs the mapping, hence having weird behavior, like
having too many heartbeats for a single heart rate, or having heart beats with offset timestamps…
Most modern backend services use a database to store and read data. So I choose PostgreSQL because
that’s the kind a database I’m most familiar with. However, it’s totally possible to use another
type of database, like a NoSQL one (choose this type if you are dealing with really large amount of
data, like ScyllaDB), or a search index such as Elasticsearch.
There is lots of documentation on Confluent
website,
however, it’s more like a reference documentation, not a tutorial. I honestly struggled to understand
what properties I needed to use.
My final configuration looks like this on my docker-compose.yml file:
I set every properties described in the documentation:
But it does not create the table, nor does it insert data…
When I call http :8082/connectors, it always returns an empty array []…
It’s until I saw this
example
that I knew I need to add the connector manually. The docker image does not initialize the connector
itself (like for example the PostgreSQL docker image where it’s possible to initialize the database
with some dump).
Thus, I created a small script to initialize the connector:
Some properties to highlight:
key.converter: the default converter provided by the connected workers being Avro, it would
throw an error as the key type is a Long, so I use the StringConverter here instead (there
are no LongConverter, only StringConverter, JsonConverter and the Avro one).
transforms: by default, Kafka Connect will take the topic and its content as is, and create the
table and columns with the same name as the one defined in the Kafka, i.e. userId, isReset.
However, these field names are not “standard” for PostgreSQL (even if it works, it’s not really
nice to query as we always need to escape the column names when writing a SQL query). It’s possible
to rename using the “transforms” property by using
RenameField.
transforms: I also added a new column that contains the timestamp for which the record has been
added to Kafka. This was just an example that shows that Kafka Connect can add new fields.
Note: I did not find the configuration / a way to rename the table name…
Heart rate consumer
Now, I need to display the heart rates in a nice graph in two ways:
in real-time by consuming directly the records in the Kafka topic heart-rates
a time based snapshot of the heart rates graph by reading in the database
To avoid the user typing a timestamp, I will just have a functionality to show the N last seconds
heart rates, where N can be configured by the user (with 60 seconds as the default value).
Kafka consumer configuration
Using Spring Kafka, I configure the properties directly in my application.yml file:
Some points to note:
I need to set the flag spring.kafka.properties.specific.avro.reader because, again, I’m using a
Specficic Avro object (and this is not written in the documentation, had to search for some time
before I found it somewhere in stackoverflow…)
since it’s real time display, no need to retrace the history, so we will just consume the latest
records by configuring the property spring.kafka.consumer.auto-offset-reset to latest.
Now I want the heart-rate-consumer to continuously stream the data. However, Kafka does not do it
natively (or maybe we need to tweak the code to make it work), but it’s not necessarily as we can
use Reactor Kafka. So let’s add the
dependency to the pom.xml:
Now I can consume a stream of heart rates continuously with the following KafkaReceiver usage:
when I put another kafka consumer, only the first consumer reads the data because they are in
the same group (“Kafka guarantees that a message is only ever read by a single consumer in the
group.”). However, each time a user will display the real time graph, it should also read in the
Kafka topic. That’s why I “hacked” by generating a random group ID using UUID.randomUUID().
Unit test Reactor Kafka
To test the Reactor Kafka consumer, I need to use the
reactor-tests, so in
my pom.xml:
I need to initialize the consumer as well as a producer to generate the Kafka records in the topic:
I have to set the property auto-offset-reset to earliest because I’m dealing with a single
thread test, so I will first send the record to Kafka, then use my Kafka consumer to read the
record.
I use the
StepVerifier
to simulate the consumption and to test the record.
Something to remind when dealing with reactive programming:
Nothing happen until you subscribe
That means until step.verify* is called, nothing will happen.
Endpoint to read in real time
There are lots of possibility to have a frontend client to get a stream of data continuously:
polling
server-sent events
websockets
I read this excellent blog
post
which led me into developing the server-sent event approach.
To implement it, I use
spring-webflux.
So in my pom.xml:
My controller looks like this:
You may notice I returned a HeartRateDTO, not a HeartRate because the auto-generated HeartRate
contains some getter like getSchema() that Jackson does not really like to serialize/deserialize.
Unit tests HRController
To test the controller, I simply use WebTestClient:
When all up and running, it displays a nice graph that updated in real time:
Database Access
I use spring-boot-data-jpa to read in the
database. My configuration looks like this:
I need to create an @Entity:
Since I configured Kafka Connect to auto-generate the primary key, it has created 3 columns that
serve as the primary key:
__connect_topic
__connect_partition
__connect_offset
We do not use the primary key, but it’s necessarily to have a primary key when using Spring data
JPA. So I implemented the HeartRateId like this:
Then, the repository looks like this:
I need to fetch the list of user IDs so the user can select which heart rate graph (s)he wants to
see.
The other method is used to fetch the last N seconds heart rates.
Router
Now, let’s implement the router that will serve the last N seconds heart rates:
This is quite straightforward.
Front to display the last N seconds
I simply use the native XMLHttpRequest to fetch the heart rates, then use the JS library Flot to
render the graph:
Mapping the received heart rates into data understandable by Flot is performed by this function:
Flot accept an array of array, for which the first element of the second array is the X axis, and
the second element of the second array is the heart rate value.