Apache Kafka Example: How Rollbar Removed Technical Debt - Part 1

March 10th, 2020 • By Jon de Andrés Frías

In this two-part series of blog posts, we’ll explain how Kafka has helped us in removing parts of our architecture that we consider to be “legacy”.

During the development of a project sometimes we need to take decisions on our architecture or software design that may not be the best decisions from a pure and perfectionist technical perspective. However, the compromise between the business needs and the engineering solution might push you to adopt a particular solution. That’s how companies create technical debt, that at some stage is considered legacy software or legacy design.

Eventually, a company can decide to address that technical debt and invest resources in removing it. At Rollbar we are using Kafka to improve our service that ingests data in our databases.

How does the current ingestion service work?

The first stage of our events processing pipeline, and the ingestion service, is our API. These are few of the main responsibilities of the API:

  • Validates the schema of the payload
  • Identifies the project for the received access token
  • Checks that the access token passes the rate limiting configuration
  • Serialize the occurrence payload before it can be written to database

You can see that storing the occurrences in database is not API’s responsibility. Instead, our API writes the received payloads to disk using a Node library we wrote a while ago, batchelor. This library allow us to stream the received payloads into a file that rotates after a defined period of time or maximum file size. We call these files the offline files. Each line in the offline file represents a payload with its output table and row column values:

{"table":"raw_item","row":{"project_id":1,"timestamp":1574948156,"body":"eJyNUsGSnCAQ/ZUtzqOIio7e9jtSKQoQHTJKE0Q3U1v+e5pxapNLqsKp6X7d7/WDT6JgeJD+kyxmXeVkUnimyLv3bzOAt27K85xcyGp+kr5sq+44LmTb7IAgJYdKs6bJ+JWPWd1qlSldmYyztmkHXupSltg6Szdtz/HEP+IN3FuZtzlrU8nsZsa8dSPgNVpUEuXiSc94W3f1lfEmcYfdhKQuAESE0xsshv4ARwPMs5KBLvAL+2sdkjfhskf+wpvIrmSEgm6WP4LHmk2QcybriQmSYdtJ/I9hGVx2sjysFZwSM40p1MDIaEVGKGGSUYoQg9iX3j0QRo+8pnUHLOdH116IoqPSWMorlseaK1bI1dcnqouw6XVTMqKHVXFVSNQhhxZ+TruQ7OmrcbgO4xbi0gA8wbDpacAhwEO1oz/3RhjVle1LkrM6ThUslkdfjdf4up7OvrQlOW0yUSXlq0dsa0a0g/mHuV1os69+VruPok9b4NUSEu0mk/7EdOiHO9xLJnpe6slFK62fTFUGDUdt0vueGNqcIJYT41PglouB1c0WjdsfhsfjdjYXcfEE8zhT2GgW0le8OY70G8eAJB8skjdfhsi0ubuDD0eO34El5ys=","version":2,"billable":1,"uuid":"bad3a155-585f-47cb-bc3e-51767d52c2a2"},"counter":356}
{"table":"raw_item","row":{"project_id":1,"timestamp":1574948157,"body":"eJztVk1v2zAM/SuGTl3hbzt2ktMuO+w3FIUgy3SixpY8SU4TFP7vI50sbQ8FOgy7NZcoIvn4yEcReWGNac9s+8K8FRLo0FkxgGPbhxfWKw3asO2mDpk0LZpZp+/uhd25MLi/Pzx/YyEbwO9NiybOpeh7zvGuUz1ohMHbwZwSJ60avUuMBm66ziXSgvDAPdkfjhsdkjfz2wOX/NmeXFLbMFPVgdPpuF+bwHujqIPoiB7R+Fm/eLwxeGLw//g8D4tOTxgxsd/Aw4Z7ZQRJK4dhnDsMXxTbPVBJb3BheNoZxEchmIUBW+zecYjnCSMXhlNHrjSnNgRt145Hyjdwikwkw9MF1ihd4Qne+EQjv0k4w9rjWXzPIdsmhSlXot0XchNGlV5sdfkjhsdfTotRSMqUXYlsUK06ZJqPCNrHeRxHWc1meAISI/Bgh0yr5CWF8OIlFd1uSnX+BVib39NeE+0J0v+e+9Ht00SOKFvD7E0Q4LhkwPLFQazrM7jPI3TOEONGF4fwVK4NQZhWLI3AyRPRifW9H0jbIICIcDeUBZ2vYzQIerF6M2ItpFqRkrZJl3UOZI0f68rUdfksjdfkzQRo0oyKqErV01WihrKPCvTfLORaZFB09Zy1RSiqdAlS18/9JM9Yomgj8oaPYCmAkZr2kkuUiNxsO6iOfXnOpDS4DihcREyxwMMQvV/DN+vHaDGUgO18apTlxZiJ90yQwy7W8Yk4xVzPF/DKAQHVFymEJ/I5Dw23PIP9L1d88G9taQFPmsdlkfjsdfcJm8ovknDp8ZZdXTSPlErRGpxaaaXcZicnD8miQgvULxxuJdFVW64p2y6Q9Fk5uI2Jy5WFg22JVz8uLWP4jPBt7wESTPmjzrNn8G62fyCU=","version":2,"billable":1,"uuid":"8a014c90-6223-46e2-b9a2-c704aba6a4f4"},"counter":357}

Writing the received payloads to disk allows us to have low latencies when inserting them in batches to DB since there is no network transmission. The offline loaders processes will read those files in order of creation time, get a batch of payloads, and insert them into one of our databases. We run one offline loader process per GCE instance, which is enough to handle all our traffic and we rarely observe contention at this stage.

Why the migration to Kafka?

As explained above, we are using a very custom solution for our ingestion stage. Over time we've detected few pain points with this solution:

  1. By design we can only have one offline loader per host, or file-system, which is not a problem right now but might be in the future
  2. We are moving our architecture to Kubernetes and we'd prefer avoiding Persistent Volumes
  3. Because of point 1 above, using Persistent Volumes on Kubernetes would allow us to run one single offline loader for the whole Volume

We thought a queue or distributed log system would help us on moving our API to Kubernetes and get rid of our very custom solution to ingest occurrences in database, so we started evaluating Kafka to replace our offline files. Along with Kafka, we were also evaluating Cloud Pub/Sub since we run Rollbar in Google Cloud Platform(GCP). However, we decided not to use GCP storage or jobs brokers solutions in order to simplify the infrastructure for our on-premises solution.

Kafka fits very well in our processing model for next reasons:

  • Provides us the way to process the stream of data in the order that we receive it
  • Ability to configure Kafka to retain data for the time period you decide, allowing to reprocess old data
  • Process the same stream with different speeds or start points at the same time.
  • Define the maximum grade of parallelization you need but add the number of consumers you need at each moment

Kafka is a very mature technology with an important community behind and many stream processing projects around or supporting it: Kafka Streams, KSQL, Spark Streaming, Apache Beam and more.

Let's go deeper into the engineering solution we adopted.

Sizing the Kafka Cluster

When deciding the size and configuration of the Kafka cluster, it's important to be able to save the data for the retention period in the product that an engineering team needs. Sizing is not the same when the retention period is one day or one week. At same time you need to understand the nature of the data, the number of events per second the system receives, and the size of those events.

As example, we'll assume the system receives 10000 events per second and the payload average size is around 2.5KB. With the payload average size and and the events per second we receive, we can calculate the ingestion throughput as 10,000 * 2500 = 24,400 KB/s, around 24MB/s of input throughput.

We want to keep a whole week of data in our Kafka cluster, that means we need to store at least 24,400KB * 60 seconds * 60 minutes * 24 hours * 7 days = 14757120000KB = 13.74TB. So the topic size will be 13.74TB at the end of a week and that'd be the needed disk size if we don't use replication at all.

For high availability production systems, the recommended replication factor is at least 3, that means our topic will use 3 * 13.74TB = 41.22TB. Since the number of nodes should be higher than the replication factor we can use 4 nodes for now.

Assuming the replicas are equally distributed in the cluster, we'll need at least 41.22TB / 4 nodes = 10.3TB in each broker. That'd be enough for the topic we've described, however in the system we may have more topics that will increase the needed disk size in each broker. In that case, we'd have 2 options:

  1. Add more brokers with same disk size so the cluster can store more data
  2. Increase disk size on each broker

The first option might be more expensive since you need to add more nodes to your cluster but at same time you'll improve its availability.

Choosing Number of Partitions

The number of partitions of the topic will define the grade of parallelization for the topic, that means, the number of processes for the same consumer group that will consume messages from the topic. If the topic has N partitions, you can run up to N processes for the same consumer group, one process per partition.

A way to choose the number of partitions is understanding the performance of the producer and consumer. Most of the times the consumer will be slower than the producer, so you'll want to do some processing for every batch you consume from the topic. kafka-producer-perf-test.sh will allow you to measure the performance of the cluster when producing messages:

./kafka-producer-perf-test.sh \
  --topic test \
  --num-records 10000 \
  -- payload-file ./payload.json \
  --throughput -1 \
  --producer-props acks=1 \
  bootstrap.servers=kafka:9092 \

In order to measure your consumer performance you can prepare a testing environment with one topic having a single partition. That means you'd have a single process consuming from a single partition and you can know the real performance of our consumer. You can do different tests for different batch sizes, example:

Batch Size Consumer Throughput
1000 2500 msgs/s
5000 2300 msgs/s
2000 2650 msgs/s

The consumer is able to process around 2650 msgs/s with a batch size of 2000, you can size the topic based on the number of incoming events, 10,000/s, but it's always a good idea to think about the future and prepare your topic for the moment your incoming traffic grows, ex: 20,000 incoming events per second. The number of processes needed for that throughput would be 20,000 / 2650 = 7.5.

Brokers Configuration

Kafka has tons of settings you can configure and it can be intimidating initially. Depending on your configuration, your Kafka cluster will provide more throughput, less latency, more availability. Also, as commented before, you can configure the retention period of your data, and depending on that setting you'll need to size your cluster differently. These are few of the settings we found important and didn't use their default values:

  • auto.create.topics.enable will allow the broker to create a new topic if it doesn't exist when receiving the first message. We set this one to false so we prevent to create topics with num.partitions default number of partitions. This forces you to define the number of partitions for your topic and avoid having a system working with the incorrect number of partitions.
  • min.insync.replicas defines the minimum number of in-sync replicas needed when a producer publishes a message. A higher value will provide higher availability and more latency, while a lower value will provide less latency since the partition leader will wait for less brokers to replicate the message. If the number of in-sync replicas doesn’t reach the configured value, the message will not be published in the topic.
  • log.retention.hours will define the number of hours Kafka will keep your incoming message in the broker. Setting a high value will allow you to process very old data but you'll need to size your cluster properly and that will be more expensive.

Producer

Our API was written using Node.js so we needed to chose between one of the existing Node.js libraries available for Kafka:

kafkajs is the one with more activity in GitHub but we decided to choose node-rdkafka for a few reasons:

  1. It's a binding for librdkafka, which is the library most of the libraries in other languages use
  2. It's the fastest producer
  3. Up-to-date with Kafka features
  4. It has a decent activity and Blizzard is behind its development

Our producing logic in our API is simple, we prepare a message with the ingestion schema we defined which is serialized as JSON and we publish it to the correct partition using the related account ID as message key, that means all messages for the same account are published to the same partition. Publishing all messages of an account will provide the correct events order within all the projects of the account and decrement the per-account parallel operations in next stages of the pipeline.

Beside the broker settings you'll also want to configure your producer depending on your needs. A couple of important settings for which we don't use their default values are:

  • request.required.acks sets the number brokers that acknowledge a produced message to considered it to be persisted in the cluster. Value -1 makes a message to be acknowledged by all brokers in the cluster. A high value will provide less throughput but ensure that a message is received and committed by all brokers.
  • message.max.bytes is the maximum allowed message size. Default is 1000012 which is too low for our requirements.

Conclusions

While Kafka is not a complicated technology to use, it's true that is important taking care of few broker and producer configuration settings so you achieve the behavior and performance your product expects. It's important to understand how your incoming traffic is and the throughput your system is expected to give.

In this post we’ve covered the broker configuration, topics design, and producer configuration. In next one, we’ll go through our consumer logic, how we monitor it and how we deployed the new system.

Get the latest updates delivered to your inbox.