Tag Archives: kafka

Poison Pills in Kafka (I)

What is a poison pill?

A “poison pill” is a record that always fails when consumed, no matter how many times it is attempted. They come in different forms:

  • Corrupted records.
  • Records that make your consumer deserializer fail (e.g., an Avro record whose writer schema is not compatible with the consumer reader schema).

The problem with a poison pill is unless the consumer eventually handles it, it blocks the consumption of the topic/partition that contains it, halting the consumer progress.

What can we do with poison pills?

There are many different strategies to deal with poison pills. The Kafka Streams FAQ describes some of them:

  • Log an error and shut down the application
  • Log an error and skip the poisonous record
  • Send the record to a separate topic (e.g., Dead Letter Queue) for further inspection (likely by a human operator)
  • Transform the poisonous record into a sentinel value that identifies a record that couldn’t be interpreted, downstream

Kafka Streams to the rescue

A year ago Kafka Streams added options to handle corrupt data during deserialization. This PR set the foundations for dealing with poison pills in Kafka Streams:

  • Added the new interface DeserializationExceptionHandler.
  • Introduced two default implementations for LogAndContinue and LogAndFail.
  • Added a new default.deserialization.exception.handler configuration value for StreamsConfig. Its default value is LogAndFail.

How does it work?

The key in this implementation is RecordDeserializer. This class executes whatever exception handling we have configured for our Streams application.

kafka streams poison pills

In the diagram, the classes shaded in blue are exclusive to Kafka Streams, while the class is red is part of the common Kafka code base.

If the strategy is to continue, a return null bubbles up a null ConsumerRecord to the layer above, RecordQueue, which discards the record and continue with the next one.

On runtime, for LogAndContinue, a series of warnings are logged, and the consumption continues. On the other hand, for LogAndFail a StreamsException is thrown.

There is also a set of metrics that can be monitored to make sure we are not skipping too many records.

Metric Name Description JMX path
skipped-records-rate The average number of skipped records per second. kafka.streams:type=stream-metrics,client-id=([-.\w]+)
skipped-records-total The total number of skipped records. kafka.streams:type=stream-metrics,client-id=([-.\w]+)

What about Kafka Consumer?

If your application is not using Kafka Streams, there is no support for handling poison pills in the KafkaConsumer basic consumer.

However, not all is lost. In the next installment in this series, we will see how we can implement a very similar strategy leveraging the Serde interface that KafkaConsumer uses to deserialize records before delivering them to your application.

Kafka defaults that you should re-consider (I)

Image taken from http://www.htbdpodcast.com

There is a vast number of configuration options for Apache Kafka, mostly because the product can be fine-tuned to perform in various scenarios (e.g., low latency, high throughput, durability). These defaults span across brokers, producers and consumers (plus other sidecar products like Connect or Streams).

The guys at Kafka do their best to provide a comprehensive set of defaults that will just work, but some of them can be relatively dangerous if used blindly, as they might have unexpected side effects, or be optimized for a use case different to yours.

In this topic, I’d like to review the most obvious ones in the brokers’ side, explain what they do, why their default can be problematic and propose an alternative value.

Change these defaults

auto.create.topics.enable

Defaults to ‘true’. You definitively want to change this one to false. Applications should be responsible for creating their topics, which the correct configuration settings for the various use cases.

If you keep it true, some other configuration values kick in to fulfill the default topic configuration:

  • log.retention.hours: by default, logs will be retained 7 days. Think carefully if this default is good enough. Any data older than that is not be available when replaying the topic.

  • min.insync.replicas: Default to 1. As the documentation mentions, a typical configuration is replication-factor minus 1, meaning with a replication factor of 3, min.insync.replicas should be 2. The problem with 1 is it puts you in a dangerous position, where the cluster accepts messages for which you only have 1 copy. On the other hand, a value equal to the replication factor means losing one node temporarily stops your cluster from accepting values until the missing partition has rebalanced to a healthy node.

  • default.replication.factor: Default to 1. This is a bad value since it effectively creates only one copy of an auto-created topic. If the disk that stores a partition of this topic dies, the data is lost. Even if there are backups, the consumers don’t benefit from automatic rebalancing to other brokers that have copies of the partition, resulting in consumption interruptions. I would suggest a value like 3 and then fine-tune topics that require more or less, independently.

  • num.partitions: Default to 1. Another bad value. If a topic only has one partition, it can be consumed by only one instance of an application at a time, hindering any parallelization that we might hope to achieve using Kadka. While partitions are not free and Kafka clusters have a limit on how many they can handle, a minimum value of 3 partitions per topic seems like a safer and more sensible default.

offsets.retention.minutes

Defaults to 1400 minutes (24 hours). This is a dangerous default. Some applications might be idle over the weekend, meaning they don’t publish to Kafka during that period.

The morning after, if they restart before they consume from Kafka, the new instances don’t find any committed offsets for their consumer group, since they have expired.

At that point, the auto.offset.reset configuration in the consumer kicks in, sending the application to the earliest message, latest, or failing. In any case, this is not desirable.

The recommendation is to increase this value to something like 7 days for extra safeties.

Keep these defaults

auto.leader.rebalance.enable

Defaults to true. Unless you know what you’re doing, you don’t want to rebalance partitions manually. Let Kafka do it for you.

delete.topic.enable

Defaults to true. If you find yourself in a highly regulated environment, you might not be allowed to delete anything, ever. Otherwise, allowing topic deletion guarantees that you can get rid of data quickly and easily.

That is especially useful in development clusters. Don’t set this to false there; you will shoot yourself on foot.

log.flush.scheduler.interval.ms

Default to ‘never’ (represented as a ridiculously long number of ms). Kafka is so performant because it enables zero-copy data transfers from producers to consumers.

While that is a fantastic mechanism for moving tons of data quickly, the durability aspect can be a concern. To account for that, Kafka proposes using replication across nodes to guarantee the information is lost, instead of explicitly flushing messages to disk as they come. The result of that is a lack of certainty about when the messages are actually written to the disk.

You could effectively force Kafka to flush to disk using this and other configuration properties. However, you would most likely kill Kafka performance in the process. Hence, the recommendation is to keep the default value.

offsets.commit.required.acks

Defaults to ‘-1’, which means messages are not acknowledged by a leader until they the min.in.sync.replicas value for the topic is honored.

That is a safe default, falling on the side of durability, versus lower latency. You should consider particular configurations at the topic level, dependent on the nature of the stored information (e.g., ‘logs’ been a lower value than ‘orders’).

offsets.topic.num.partitions

Defaults to ’50’. Kafka automatically created the topic __consumer_offsets with this number of partitions. Since this is likely to be the busiest topic in your cluster, it’s a good idea to keep the number of partitions high so that the load is spread across as many nodes as possible.

__consumer_offsets cannot be changed for the lifetime of the cluster, so even if you are not planning to have 50 brokers in your cluster, it falls on the safe side to maintain this number as it is.

offsets.topic.replication.factor

Defaults to ‘3’. Similar to the previous value, but to configure how many copies of your __consumer_offsets you want. 3 copies is a safe default and should probably only be changed to rise to a more significant number.

More copies of the topic would make your cluster more resilient in the event of broker failure since there would be more followers ready to that the role of the fallen leader.

unclean.leader.election.enable

Defaults to ‘false’. Used to be ‘true’ by default because it was optimized for availability. In the case of a leader dying without any follower been up to date, the cluster to continue operating if this value is set to ‘true’. Unfortunately, data loss would result..

However, after Aphyr roasted Kafka for this data loss scenario, Kafka introduced this configuration value and eventually changed it to ‘false’ to prevent data loss. With this default, the cluster stops operating until a follower that was up to date with the fallen leader arises (potentially, the fixed leader itself), preventing any loss.

Summary

There are many more configuration values that play essential roles in the broker side, and we haven’t even mentioned any of the values in the client side (e.g., consumers, producers). In following posts, I’ll jump into those and describe what sensible defaults are and what you should think twice before blindly embracing.

Timeouts in Kafka clients and Kafka Streams

broken-chain
Broken chain

IMPORTANT: This is information is based on Kafka and Kafka Streams 1.0.0. Past or future versions may defer.

As with any distributed system, Kafka relies on timeouts to detect failures. Those timeouts can be sent by clients and brokers that want to detect each other unavailability. The following is a description of the configuration values that control timeouts that both brokers and client will use to detect clients not being available.

The original design for the Poll() method in the Java consumer tried to kill two birds with one stone:

  • Guarantee consumer liveness
  • Guarantee progress as well, since a consumer could be alive but not moving forward

However, this design caused a few problems. The solution was to introduce separate configuration values and background thread based heartbeat mechanism.

heartbeat.interval.ms

Since Kafka 0.10.1.0, the heartbeat happens from a separate, background thread, different to the thread where Poll() runs. The description for the configuration value is:

The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.

The default value is 3 seconds. This heartbeat will guarantee an early detection when the consumer goes down, maybe due to an unexpected exception killing the process. However, back pressure or slow processing will not affect this heartbeat.

max.poll.interval.ms

Introduced with Kafka 0.10.1.0 as well, compensates for the background heart-beating but introducing a limit between Poll() calls. The description for the configuration value is:

The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.

The default value is 30 seconds, except for Kafka Streams, which increases it to Integer.MAX_VALUE.

When the timeout expires, the consumer will stop heart-beating and will leave the consumer group explicitly. With this new configuration value, we can set an upper limit to how long we expect a batch of records to be processed. Together with max.poll.record and the appropriate timeouts for third party calls, we should be able to determine fairly accurately how long an application may stay unresponsive while processing records.

Also, max.poll.interval.ms has a role in rebalances. Since we know it represents how long processing a batch can take, it is also implicitly timeout for how long a client should be awaited in the event of a rebalance. Therefore, the client sends this value when it joins the consumer group. On the event of a rebalance, the broker will wait this timeout for a client to respond, before kicking it out of the consumer group.

session.timeout.ms

Finally, while the previous values are used to get the client willingly out of the consumer group, this value controls when the broker can push it out itself. The description for this configuration value is:

The timeout used to detect consumer failures when using Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance.

The default is 10 seconds. Clients have to define a value between the range defined by group.min.session.timeout.ms and group.max.session.timeout.ms, which are defined in the broker side.

What does this all mean?

In a nutshell, it means that you have to configure two types of timeouts: heartbeat timeout and processing timeout. The former accounts for clients going down and the second for clients taking too long to make progress.

Heartbeating will be controlled by the expected heartbeat.interval.ms and the upper limit defined by session.timeout.ms.

Processing will be controlled by max.poll.interval.ms. On the client side, kicking the client out of the consumer group when the timeout expires. On the server side, communicating to the broker what is the expected rebalancing timeout.

The Integer.MAX_VALUE Kafka Streams default

max.poll.interval.ms default for Kafka Streams was changed to Integer.MAX_VALUE in Kafka 0.10.2.1 to strength its robustness in the scenario of larga state restores. Fortunately, after changes to the library in 0.11 and 1.0, this large value is not necessary anymore.

In any case, it is still recommended to use a generous timeout in case of calls to external third parties from a stream topology. For a node that goes down, session.timeout.ms will quickly be triggered since the background heartbeat will stop.

For a node that is simply taking too long to process records, the assumption is any other instance picking up those records would suffer the same delays with the third party.

Conclusion

Separating max.poll.interval.ms and session.timeout.ms allows a tighter control over applications going down with shorter session.timeout.ms, while still giving them room for longer processing times with an extended max.poll.interval.ms.

This is specially useful for Kafka Streams applications, where we can hook complicated, long-running, processing for every record.

Bibliography:

Incompatible AVRO schema in Schema Registry

My company uses Apache Kafka as the spine for its next-generation architecture. Kafka is a distributed append-only log that can be used as a pub-sub mechanism. We use Kafka to publish events once business processes have completed successfully, allowing a high degree of decoupling between producers and consumers.

These events are encoded using Avro schemas. Avro is a binary serialization format that enables a compact representation of data, much more than, for instance, JSON. Given the high volume of events we publish to kafka, using a compact format is critical.

In combination with Avro we use Confluent’s Schema Registry to manage our schemas. The registry provides a RESTful API to store and retrieve schemas.

Compatibility modes

The Schema Registry can control what schemas get registered, ensuring a certain level of compatibility between existing and new schemas. This compatibility can be set to one of the next four modes:

  • BACKWARD: a new schema is allowed if it can be used to read all data ever published into the corresponding topic.
  • FORWARD: a new schema is allowed if it can be used to write data that all previous schemas would be able to read.
  • FULL: a new schema that fullfils both registrations.
  • NONE: a schema is allowed as long as it is valid Avro.

By default, Schema Registry sets BACKWARD compatibility, which is most likely your preferred option in PROD environment, unless you want to have a hard time with your consumers not quite understanding events published with a newer, incompatible version of the schema.

Incompatible schemas

In development phase it is perfectly fine to replace schemas with others that are incompatible. Schema Registry will prevent updating the existing schema to an incompatible newer version unless we change its default setting.

Fortunately Schema Registry offers a complete API that allows to register and retrieve schemas, but also to change some of its configuration. More specifically, it offers a /config endpoint to PUT new values for its compatibility setting.

The following command would change the compatibility setting to NONE for all schemas in the Registry:

curl -X PUT http://your-schema-registry-address/config 
     -d '{"compatibility": "NONE"}'
     -H "Content-Type:application/json"

This way next registration would be allowed by the Registry as long as the newer schema were valid Avro. The configuration can be set for an specific schema too, simply appending the name (i.e., /config/subject-name).

Once the incompatible schema has been registered, the setting should be set back to a more cautious value.

Summary

The combination of Kafka, Avro and Schema Registry is a great way to store your events in the most compact way possible, while still retains the ability to evolve the corresponding schemas.

However some of the limitations that the Schema Registry imposes make less sense on a development environment. On some occassions, making incompatible changes in a simple way is necessary and recommendable.

The Schema Registry API allows changing the compatibility setting to accept schemas that, otherwise, would be rejected.