Tag Archives: kafka streams

Poison Pills in Kafka (I)

What is a poison pill?

A “poison pill” is a record that always fails when consumed, no matter how many times it is attempted. They come in different forms:

  • Corrupted records.
  • Records that make your consumer deserializer fail (e.g., an Avro record whose writer schema is not compatible with the consumer reader schema).

The problem with a poison pill is unless the consumer eventually handles it, it blocks the consumption of the topic/partition that contains it, halting the consumer progress.

What can we do with poison pills?

There are many different strategies to deal with poison pills. The Kafka Streams FAQ describes some of them:

  • Log an error and shut down the application
  • Log an error and skip the poisonous record
  • Send the record to a separate topic (e.g., Dead Letter Queue) for further inspection (likely by a human operator)
  • Transform the poisonous record into a sentinel value that identifies a record that couldn’t be interpreted, downstream

Kafka Streams to the rescue

A year ago Kafka Streams added options to handle corrupt data during deserialization. This PR set the foundations for dealing with poison pills in Kafka Streams:

  • Added the new interface DeserializationExceptionHandler.
  • Introduced two default implementations for LogAndContinue and LogAndFail.
  • Added a new default.deserialization.exception.handler configuration value for StreamsConfig. Its default value is LogAndFail.

How does it work?

The key in this implementation is RecordDeserializer. This class executes whatever exception handling we have configured for our Streams application.

kafka streams poison pills

In the diagram, the classes shaded in blue are exclusive to Kafka Streams, while the class is red is part of the common Kafka code base.

If the strategy is to continue, a return null bubbles up a null ConsumerRecord to the layer above, RecordQueue, which discards the record and continue with the next one.

On runtime, for LogAndContinue, a series of warnings are logged, and the consumption continues. On the other hand, for LogAndFail a StreamsException is thrown.

There is also a set of metrics that can be monitored to make sure we are not skipping too many records.

Metric Name Description JMX path
skipped-records-rate The average number of skipped records per second. kafka.streams:type=stream-metrics,client-id=([-.\w]+)
skipped-records-total The total number of skipped records. kafka.streams:type=stream-metrics,client-id=([-.\w]+)

What about Kafka Consumer?

If your application is not using Kafka Streams, there is no support for handling poison pills in the KafkaConsumer basic consumer.

However, not all is lost. In the next installment in this series, we will see how we can implement a very similar strategy leveraging the Serde interface that KafkaConsumer uses to deserialize records before delivering them to your application.

Timeouts in Kafka clients and Kafka Streams

broken-chain
Broken chain

IMPORTANT: This is information is based on Kafka and Kafka Streams 1.0.0. Past or future versions may defer.

As with any distributed system, Kafka relies on timeouts to detect failures. Those timeouts can be sent by clients and brokers that want to detect each other unavailability. The following is a description of the configuration values that control timeouts that both brokers and client will use to detect clients not being available.

The original design for the Poll() method in the Java consumer tried to kill two birds with one stone:

  • Guarantee consumer liveness
  • Guarantee progress as well, since a consumer could be alive but not moving forward

However, this design caused a few problems. The solution was to introduce separate configuration values and background thread based heartbeat mechanism.

heartbeat.interval.ms

Since Kafka 0.10.1.0, the heartbeat happens from a separate, background thread, different to the thread where Poll() runs. The description for the configuration value is:

The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.

The default value is 3 seconds. This heartbeat will guarantee an early detection when the consumer goes down, maybe due to an unexpected exception killing the process. However, back pressure or slow processing will not affect this heartbeat.

max.poll.interval.ms

Introduced with Kafka 0.10.1.0 as well, compensates for the background heart-beating but introducing a limit between Poll() calls. The description for the configuration value is:

The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.

The default value is 30 seconds, except for Kafka Streams, which increases it to Integer.MAX_VALUE.

When the timeout expires, the consumer will stop heart-beating and will leave the consumer group explicitly. With this new configuration value, we can set an upper limit to how long we expect a batch of records to be processed. Together with max.poll.record and the appropriate timeouts for third party calls, we should be able to determine fairly accurately how long an application may stay unresponsive while processing records.

Also, max.poll.interval.ms has a role in rebalances. Since we know it represents how long processing a batch can take, it is also implicitly timeout for how long a client should be awaited in the event of a rebalance. Therefore, the client sends this value when it joins the consumer group. On the event of a rebalance, the broker will wait this timeout for a client to respond, before kicking it out of the consumer group.

session.timeout.ms

Finally, while the previous values are used to get the client willingly out of the consumer group, this value controls when the broker can push it out itself. The description for this configuration value is:

The timeout used to detect consumer failures when using Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance.

The default is 10 seconds. Clients have to define a value between the range defined by group.min.session.timeout.ms and group.max.session.timeout.ms, which are defined in the broker side.

What does this all mean?

In a nutshell, it means that you have to configure two types of timeouts: heartbeat timeout and processing timeout. The former accounts for clients going down and the second for clients taking too long to make progress.

Heartbeating will be controlled by the expected heartbeat.interval.ms and the upper limit defined by session.timeout.ms.

Processing will be controlled by max.poll.interval.ms. On the client side, kicking the client out of the consumer group when the timeout expires. On the server side, communicating to the broker what is the expected rebalancing timeout.

The Integer.MAX_VALUE Kafka Streams default

max.poll.interval.ms default for Kafka Streams was changed to Integer.MAX_VALUE in Kafka 0.10.2.1 to strength its robustness in the scenario of larga state restores. Fortunately, after changes to the library in 0.11 and 1.0, this large value is not necessary anymore.

In any case, it is still recommended to use a generous timeout in case of calls to external third parties from a stream topology. For a node that goes down, session.timeout.ms will quickly be triggered since the background heartbeat will stop.

For a node that is simply taking too long to process records, the assumption is any other instance picking up those records would suffer the same delays with the third party.

Conclusion

Separating max.poll.interval.ms and session.timeout.ms allows a tighter control over applications going down with shorter session.timeout.ms, while still giving them room for longer processing times with an extended max.poll.interval.ms.

This is specially useful for Kafka Streams applications, where we can hook complicated, long-running, processing for every record.

Bibliography: