If your application is not using Kafka Streams, there is no support for handling poison pills in the KafkaConsumer basic consumer.
However, not all is lost. In the next installment in this series, we will see how we can implement a very similar strategy leveraging the Serde interface that KafkaConsumer uses to deserialize records before delivering them to your application.
IMPORTANT: This is information is based on Kafka and Kafka Streams 1.0.0. Past or future versions may defer.
As with any distributed system, Kafka relies on timeouts to detect failures. Those timeouts can be sent by clients and brokers that want to detect each other unavailability. The following is a description of the configuration values that control timeouts that both brokers and client will use to detect clients not being available.
The original design for the Poll() method in the Java consumer tried to kill two birds with one stone:
Guarantee consumer liveness
Guarantee progress as well, since a consumer could be alive but not moving forward
Since Kafka 0.10.1.0, the heartbeat happens from a separate, background thread, different to the thread where Poll() runs. The description for the configuration value is:
The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.
The default value is 3 seconds. This heartbeat will guarantee an early detection when the consumer goes down, maybe due to an unexpected exception killing the process. However, back pressure or slow processing will not affect this heartbeat.
Introduced with Kafka 0.10.1.0 as well, compensates for the background heart-beating but introducing a limit between Poll() calls. The description for the configuration value is:
The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.
The default value is 30 seconds, except for Kafka Streams, which increases it to Integer.MAX_VALUE.
When the timeout expires, the consumer will stop heart-beating and will leave the consumer group explicitly. With this new configuration value, we can set an upper limit to how long we expect a batch of records to be processed. Together with max.poll.record and the appropriate timeouts for third party calls, we should be able to determine fairly accurately how long an application may stay unresponsive while processing records.
Also, max.poll.interval.ms has a role in rebalances. Since we know it represents how long processing a batch can take, it is also implicitly timeout for how long a client should be awaited in the event of a rebalance. Therefore, the client sends this value when it joins the consumer group. On the event of a rebalance, the broker will wait this timeout for a client to respond, before kicking it out of the consumer group.
Finally, while the previous values are used to get the client willingly out of the consumer group, this value controls when the broker can push it out itself. The description for this configuration value is:
The timeout used to detect consumer failures when using Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance.
The default is 10 seconds. Clients have to define a value between the range defined by group.min.session.timeout.ms and group.max.session.timeout.ms, which are defined in the broker side.
What does this all mean?
In a nutshell, it means that you have to configure two types of timeouts: heartbeat timeout and processing timeout. The former accounts for clients going down and the second for clients taking too long to make progress.
Heartbeating will be controlled by the expected heartbeat.interval.ms and the upper limit defined by session.timeout.ms.
Processing will be controlled by max.poll.interval.ms. On the client side, kicking the client out of the consumer group when the timeout expires. On the server side, communicating to the broker what is the expected rebalancing timeout.
The Integer.MAX_VALUE Kafka Streams default
max.poll.interval.ms default for Kafka Streams was changed to Integer.MAX_VALUE in Kafka 0.10.2.1 to strength its robustness in the scenario of larga state restores. Fortunately, after changes to the library in 0.11 and 1.0, this large value is not necessary anymore.
In any case, it is still recommended to use a generous timeout in case of calls to external third parties from a stream topology. For a node that goes down, session.timeout.ms will quickly be triggered since the background heartbeat will stop.
For a node that is simply taking too long to process records, the assumption is any other instance picking up those records would suffer the same delays with the third party.
Separating max.poll.interval.ms and session.timeout.ms allows a tighter control over applications going down with shorter session.timeout.ms, while still giving them room for longer processing times with an extended max.poll.interval.ms.
This is specially useful for Kafka Streams applications, where we can hook complicated, long-running, processing for every record.