Kafka 2.7.0

Book Details

  • Full Title: Kafka 2.7.0

  • Author: Various contributors to Kafka

  • ISBN/URL: N.A.

  • Reading Period: 2021.01.06–Ongoing

  • Source: One fine day I decided to read open-sourced code for fun.

General Review

  • The source code around Producer.java covers a wide range of topics, including metrics managed, user-configuration management, connections management etc.

    • Reading the code provides gives a good sense of balance between clean code and building a successful product.

Specific Takeaways


  • Collections.singletonMap can be used in place of HashMap when we know there will only be one key-value pair.

  • The KafkaProducer using JMX MBean to expose app info.

  • Some of the underlying classes used are as follows:

    • org.apache.kafka.common.network.Selector: For doing non-blocking multi-connection network I/O.

      • For context, select is a system call in Linux that provides a way to do non-blocking I/O—in gist, we pass select one or more file descriptors, and select will let us know which files are ready for what kind of I/O operations.

      • Selector in Java provides analogous functionality to Linux's select system call. Some of the differences are:

        • Instead of selecting over file descriptors, Selector selects over SocketChannel

        • From the code, it seems like the way to use Selector is as follows:

          1. Instantiate a SocketChannel

          2. Configure the SocketChannel (e.g., setting it to non-blocking, setting send and receive buffer sizes)

          3. Connect the SocketChannel to a particular address

          4. Register the SocketChannel to a Selector, specifying the kind of operations check for (e.g., OP_CONNECT for when the connection is established). This will return a SelectionKey.

          5. An attachment is then added to the SelectionKey for some reason.

      • For similar, see Javadoc on java.nio.channels.Selector: https://docs.oracle.com/javase/8/docs/api/java/nio/channels/Selector.html

    • IdleExpiryManager is used to close idle connections.

    • NetworkClient is the class responsible of asynchronous network I/O. It is not thread-safe.

      • It is instantiated, put inside a Sender, and sent to runned in a KafkaThread (which is a super thin wrapper around Thread).

  • Notes on usage of Java standard objects:

    • The compareAndSet() methods on the atomic objects in java.util.concurrent.atomic package can be used to set the first available value using .compareAndSet(null, targetValue).

    • LinkedHashMap can be used in place of HashMap when a consistent iteration order is desired (based on inserted order). The is also a constructor to create a LinkedHashMap from other Map implemention, and will have the same iteration order as the original Map, this may be useful when we are writing a library function that accepts a Map, and wishes to return a coresponding Map with same order.


From Javadoc:

  • Remember that open TCP connections is a network resource that must be released to avoid leakage:

    The consumer maintains TCP connections to the necessary brokers to fetch data. Failure to close the consumer after use will leak these connections.

  • Is this true?

    Each consumer in a group can dynamically set the list of topics it wants to subscribe to through one of the subscribe APIs

  • Kafka consumers each has two offsets:

    1. \position\ gives the offset of the next record that will be given out

    2. \committed position\ is the last offset that has been stored securely

  • The liveness of a Kafka consumer is checked in two ways:

    1. Regular heartbeat from the consumer

    2. Regular poll() call from the consumer (this is to avoid livelocks where heartbeats are sent, but the consumer is not otherwise processing the records). The max.poll.interval.ms and max.poll.records configuration options are relevant in controlling the behavior of the poll loop.

  • Kafka consumer supports manual partition assignments

    • For use cases where message processing time varies unpredictably, the recommmended way is to move message processing to another thread, which allows the consumer to continue calling poll() concurrently. Typically, we must also disable automatic commits and manually commit processed offsets after the thread is done handling them.

  • Kafka consumer supports saving the offset on a system external to Kafka (e.g., a relational database, to ensure that that the processing of record is truly exactly-once). The steps to do this is generally as follows:

    1. Configure enable.auto.commit=false

    2. Use the offset provided with each ConsumerRecord to save our position

    3. On restart, restore the position of the consumer using seek(TopicPartition, long).

  • Kakfa consumer supports dynamical consumption flow control—i.e., it is possible to pause consumption on particular topic-partition, and resume later on. The relevant methods are pause(Collection) and resume(Collection), where the parameter is a Collection<TopicPartition>.

  • The wakeup() method can be used to shutdown an Kafka consumer from another thread (see Javadoc for example).

  • Kafka consumer supports two general threading models:

    1. One Consumer Per Thread

      • Pros:

        • Easy to implement

        • Usually the fastest because no inter-thread co-ordination is needed

        • Makes in-order processing on a per-partition basis very easy to implement

      • Cons:

        • More consumers means more TCP connection to the cluster (one per thread). But Kafka in general handles connections very efficiently so this is generally a small cost.

        • Multiple consumers means more request being sent to teh server and slightly less batching of data which can cause some drop in I/O throughput.

        • The total number of threads across all processes will be limited by the total number of partitions.

    2. Decouple Consumption and Processing (e.g., having one or more consumer threads that do all data consumption and hand off ConsumerRecords instances to a blocking queue consumed by a pool of processor threads)

      • Pros:

        • Allows independently scaling the number of consumers and processers.

      • Cons:

        • Guaranteeing order across the processors requires particular care. This would not be an issue if there is not ordering requirement.

        • Manually committing the position becomes harder as it requires that all thread co-ordinate to ennsure that processing is complete for that partition.


  • The group.instance.id configuration parameter can be used to enable static membership. This may be useful in a cloud setup (e.g., using Kubernetes), so each instance is associated with a different id, and the cloud automation tool responsible for health monitoring can restart an instance with the same id if necessary. The Kafka coordinator will recognize the instance id, and thus assign the same partition-topics.

  • Refer to AbstractConfig and ConsumerConfig classes for examples of how to manage configurations that are both numerous and inter-related.

  • Refer to ConfigTransformer::transform(Map<String, String> configs) for an example (not necessarily good) of handling nested maps in Java.

  • Refer to Metrics and hierachy of classes for example of how metrics collection might be achieved in Java (spoilers: rather clunky). In gist, one usage pattern is as follows:

    1. We first create a Metrics instance by calling new Metrics() (i.e., no arguments), the a Metrics represents some metric we are interested in.

    2. Then we call call metric.sensor(<sensor-name>) to obtain a sensor, which is used to take readings by call sensor.record(<reading>).

    3. But prior to calling sensor.record(), we need to "prime" the sensor instance using MetricName and MeasurableStat instances.

    4. MetricName should be obvious. But MeasurableStat is an object that has a record() that knows how to handle the actual recording of the metrics—e.g., recording the max, the average, the rate etc.—based on the MetricConfig instance in the Sensor instance (this MetricConfig is constructed when together with the Sensor instance when we call metric.sensor(<sensor-name>), and can contain configurations like the RecordingLevel, the time window etc.). Sensor.record() ultimately delegates to this record() on MeasurableSet.

  • For an example of how to instantiate classes using either String or Class<?> type, see AbstractConfig.getConfiguredInstance(Object klass, Class<T> t, Map<String, Object> configPairs).

    • This might be useful because it is common in Java for configuration to specify a particular class, and for that class to be provided (somehow) into the classpath.

  • For an example of how Java Authentication and Authorization Service (JAAS) is used to support Simple Authentication and Security Layer (SASL), refer to ChannelBuilder.create(), under the switch case for SASL_SSL and SASL_PLAINTEXT.

  • For an example of how state machines can be implemented, refer to SubscriptionState.FetchState#transitionTo interface method, which calls the validTransitions() method which in turn returns a list of valid transition. This is couple with the concrete implementation of the SubscriptionState.FetchState interface by the enum FetchStates.

  • Refer to Cluster(String, boolean, Collection<Node>, Collection<PartitionInfo>, Set<String>, Set<String>, Set<String>, Node) for an example of how to index a collection of objects by various keys. In this case, PartitionInfo is indexed by topic (i.e., String), TopicPartition, and by node ID (i.e., Integer).

    • Various Map of List are created in overlapping loops, and ultimately converted into maps of Colletions.unmodifiableList()

Java language notes

  • The Map.computeIfAbsent() method can be used to achieve similar effect as Python's defaultdict() and setdefault().

  • For enums with limited possible values, consider using byte for the enum's value. See for example IsolationLevel enum in Kafka.

Dev Notes

  • METRICS_CONTEXT_PREFIX in CommonClientConfigs is not documented.

To Internalize Now

  • The Map.ComputeIfAbsent() in Java can be used in situations where I would have used setdefault() in Python.

To Learn/Do Soon

  • Find other open-sourced repositories to read.

To Revisit When Necessary

Producer class

  • Refer to the code surrounding org.apache.kafka.common.network.Selector on how to do asynchronous network I/O.

    • Refer to the method org.apache.kafka.common.network.Selector::poll() to see the intricate logic required to handle polling data from multiple KafkaChannel (with underlying SocketChannel), each of which may have data buffered.

  • Refer to the RunOnce() method in org.apache.kafka.clients.producer.internals.Sender to see how transactions are handled.

    • In gist:

      1. Sender has a TransactionManager, which in turn has a PriorityQueue<TransactionManager.TxnRequestHandler>

      2. In each call to RunOnce() in Sender, the maybeSendAndPollTransactionalRequest() on the TransactionManager is called

      3. The maybeSendAndPollTransactionalRequest() method retrieves the next TxnRequestHandler, and uses it to:

        1. Find the targetNode to send the request

        2. Retrieve the requestBuilder

        3. Call newClientRequest() on the network client, passing in the targetNode, the requestBuilder, and also the TxnRequestHandler itself (which also implements the interface required for it to act as the callback).

Other Resources Referred To

  • N.A. (This is a source code, it would be weird if source code makes reference to general learning resources).


  • <2021-02-21 Sun>

    • Paused at line 759 in KakfaConsumer.java, before reading about ConsumerNetworkClient.