Kafka 2.7.0
Book Details
Full Title: Kafka 2.7.0
Author: Various contributors to Kafka
ISBN/URL: N.A.
Reading Period: 2021.01.06–Ongoing
Source: One fine day I decided to read open-sourced code for fun.
General Review
-
The source code around
Producer.java
covers a wide range of topics, including metrics managed, user-configuration management, connections management etc.-
Reading the code provides gives a good sense of balance between clean code and building a successful product.
-
Specific Takeaways
KafkaProducer
-
Collections.singletonMap
can be used in place ofHashMap
when we know there will only be one key-value pair. -
The
KafkaProducer
using JMX MBean to expose app info.-
JMX stands for Java Management Extensions, and is a way to allow remote management of live Java applications (including monitoring).
-
For more infomation JMX and MBean, refer to Baeldung article: https://www.baeldung.com/java-management-extensions
-
-
Some of the underlying classes used are as follows:
-
org.apache.kafka.common.network.Selector
: For doing non-blocking multi-connection network I/O.-
For context,
select
is a system call in Linux that provides a way to do non-blocking I/O—in gist, we passselect
one or more file descriptors, andselect
will let us know which files are ready for what kind of I/O operations. -
Selector
in Java provides analogous functionality to Linux'sselect
system call. Some of the differences are:-
Instead of selecting over file descriptors,
Selector
selects overSocketChannel
-
From the code, it seems like the way to use
Selector
is as follows:-
Instantiate a
SocketChannel
-
Configure the
SocketChannel
(e.g., setting it to non-blocking, setting send and receive buffer sizes) -
Connect the
SocketChannel
to a particular address -
Register the
SocketChannel
to aSelector
, specifying the kind of operations check for (e.g.,OP_CONNECT
for when the connection is established). This will return aSelectionKey
. -
An attachment is then added to the
SelectionKey
for some reason.
-
-
-
For similar, see Javadoc on
java.nio.channels.Selector
: https://docs.oracle.com/javase/8/docs/api/java/nio/channels/Selector.html
-
-
IdleExpiryManager
is used to close idle connections. -
NetworkClient
is the class responsible of asynchronous network I/O. It is not thread-safe.-
It is instantiated, put inside a
Sender
, and sent to runned in aKafkaThread
(which is a super thin wrapper aroundThread
).
-
-
-
Notes on usage of Java standard objects:
-
The
compareAndSet()
methods on the atomic objects injava.util.concurrent.atomic
package can be used to set the first available value using.compareAndSet(null, targetValue)
. -
LinkedHashMap
can be used in place ofHashMap
when a consistent iteration order is desired (based on inserted order). The is also a constructor to create aLinkedHashMap
from otherMap
implemention, and will have the same iteration order as the originalMap
, this may be useful when we are writing a library function that accepts aMap
, and wishes to return a corespondingMap
with same order.
-
KafkaConsumer
From Javadoc:
-
Remember that open TCP connections is a network resource that must be released to avoid leakage:
The consumer maintains TCP connections to the necessary brokers to fetch data. Failure to close the consumer after use will leak these connections.
-
Is this true?
Each consumer in a group can dynamically set the list of topics it wants to subscribe to through one of the subscribe APIs
-
Kafka consumers each has two offsets:
-
\position\ gives the offset of the next record that will be given out
-
\committed position\ is the last offset that has been stored securely
-
-
The liveness of a Kafka consumer is checked in two ways:
-
Regular heartbeat from the consumer
-
Regular
poll()
call from the consumer (this is to avoid livelocks where heartbeats are sent, but the consumer is not otherwise processing the records). Themax.poll.interval.ms
andmax.poll.records
configuration options are relevant in controlling the behavior of the poll loop.
-
-
Kafka consumer supports manual partition assignments
-
For use cases where message processing time varies unpredictably, the recommmended way is to move message processing to another thread, which allows the consumer to continue calling
poll()
concurrently. Typically, we must also disable automatic commits and manually commit processed offsets after the thread is done handling them.
-
-
Kafka consumer supports saving the offset on a system external to Kafka (e.g., a relational database, to ensure that that the processing of record is truly exactly-once). The steps to do this is generally as follows:
-
Configure
enable.auto.commit=false
-
Use the offset provided with each
ConsumerRecord
to save our position -
On restart, restore the position of the consumer using
seek(TopicPartition, long)
.
-
-
Kakfa consumer supports dynamical consumption flow control—i.e., it is possible to pause consumption on particular topic-partition, and resume later on. The relevant methods are
pause(Collection)
andresume(Collection)
, where the parameter is aCollection<TopicPartition>
. -
The
wakeup()
method can be used to shutdown an Kafka consumer from another thread (see Javadoc for example). -
Kafka consumer supports two general threading models:
-
One Consumer Per Thread
-
Pros:
-
Easy to implement
-
Usually the fastest because no inter-thread co-ordination is needed
-
Makes in-order processing on a per-partition basis very easy to implement
-
-
Cons:
-
More consumers means more TCP connection to the cluster (one per thread). But Kafka in general handles connections very efficiently so this is generally a small cost.
-
Multiple consumers means more request being sent to teh server and slightly less batching of data which can cause some drop in I/O throughput.
-
The total number of threads across all processes will be limited by the total number of partitions.
-
-
-
Decouple Consumption and Processing (e.g., having one or more consumer threads that do all data consumption and hand off
ConsumerRecords
instances to a blocking queue consumed by a pool of processor threads)-
Pros:
-
Allows independently scaling the number of consumers and processers.
-
-
Cons:
-
Guaranteeing order across the processors requires particular care. This would not be an issue if there is not ordering requirement.
-
Manually committing the position becomes harder as it requires that all thread co-ordinate to ennsure that processing is complete for that partition.
-
-
-
Constructor
-
The
group.instance.id
configuration parameter can be used to enable static membership. This may be useful in a cloud setup (e.g., using Kubernetes), so each instance is associated with a different id, and the cloud automation tool responsible for health monitoring can restart an instance with the same id if necessary. The Kafka coordinator will recognize the instance id, and thus assign the same partition-topics. -
Refer to
AbstractConfig
andConsumerConfig
classes for examples of how to manage configurations that are both numerous and inter-related. -
Refer to
ConfigTransformer::transform(Map<String, String> configs)
for an example (not necessarily good) of handling nested maps in Java. -
Refer to
Metrics
and hierachy of classes for example of how metrics collection might be achieved in Java (spoilers: rather clunky). In gist, one usage pattern is as follows:-
We first create a
Metrics
instance by callingnew Metrics()
(i.e., no arguments), the aMetrics
represents some metric we are interested in. -
Then we call call
metric.sensor(<sensor-name>)
to obtain a sensor, which is used to take readings by callsensor.record(<reading>)
. -
But prior to calling
sensor.record()
, we need to "prime" thesensor
instance usingMetricName
andMeasurableStat
instances. -
MetricName
should be obvious. ButMeasurableStat
is an object that has arecord()
that knows how to handle the actual recording of the metrics—e.g., recording the max, the average, the rate etc.—based on theMetricConfig
instance in theSensor
instance (thisMetricConfig
is constructed when together with theSensor
instance when we callmetric.sensor(<sensor-name>)
, and can contain configurations like theRecordingLevel
, the time window etc.).Sensor.record()
ultimately delegates to thisrecord()
onMeasurableSet
.
-
-
For an example of how to instantiate classes using either
String
orClass<?>
type, seeAbstractConfig.getConfiguredInstance(Object klass, Class<T> t, Map<String, Object> configPairs)
.-
This might be useful because it is common in Java for configuration to specify a particular class, and for that class to be provided (somehow) into the classpath.
-
-
For an example of how Java Authentication and Authorization Service (JAAS) is used to support Simple Authentication and Security Layer (SASL), refer to
ChannelBuilder.create()
, under the switch case forSASL_SSL
andSASL_PLAINTEXT
. -
For an example of how state machines can be implemented, refer to
SubscriptionState.FetchState#transitionTo
interface method, which calls thevalidTransitions()
method which in turn returns a list of valid transition. This is couple with the concrete implementation of theSubscriptionState.FetchState
interface by the enumFetchStates
. -
Refer to
Cluster(String, boolean, Collection<Node>, Collection<PartitionInfo>, Set<String>, Set<String>, Set<String>, Node)
for an example of how to index a collection of objects by various keys. In this case,PartitionInfo
is indexed by topic (i.e.,String
),TopicPartition
, and by node ID (i.e.,Integer
).-
Various
Map
ofList
are created in overlapping loops, and ultimately converted into maps ofColletions.unmodifiableList()
-
Java language notes
-
The
Map.computeIfAbsent()
method can be used to achieve similar effect as Python'sdefaultdict()
andsetdefault()
. -
For enums with limited possible values, consider using
byte
for the enum's value. See for exampleIsolationLevel
enum in Kafka.
Dev Notes
-
METRICS_CONTEXT_PREFIX
inCommonClientConfigs
is not documented.
To Internalize Now
-
The
Map.ComputeIfAbsent()
in Java can be used in situations where I would have usedsetdefault()
in Python.
To Learn/Do Soon
-
Find other open-sourced repositories to read.
To Revisit When Necessary
Producer
class
-
Refer to the code surrounding
org.apache.kafka.common.network.Selector
on how to do asynchronous network I/O.-
Refer to the method
org.apache.kafka.common.network.Selector::poll()
to see the intricate logic required to handle polling data from multipleKafkaChannel
(with underlyingSocketChannel
), each of which may have data buffered.
-
-
Refer to the
RunOnce()
method inorg.apache.kafka.clients.producer.internals.Sender
to see how transactions are handled.-
In gist:
-
Sender
has aTransactionManager
, which in turn has aPriorityQueue<TransactionManager.TxnRequestHandler>
-
In each call to
RunOnce()
inSender
, themaybeSendAndPollTransactionalRequest()
on theTransactionManager
is called -
The
maybeSendAndPollTransactionalRequest()
method retrieves the nextTxnRequestHandler
, and uses it to:-
Find the
targetNode
to send the request -
Retrieve the
requestBuilder
-
Call
newClientRequest()
on the network client, passing in thetargetNode
, therequestBuilder
, and also theTxnRequestHandler
itself (which also implements the interface required for it to act as the callback).
-
-
-
Other Resources Referred To
-
N.A. (This is a source code, it would be weird if source code makes reference to general learning resources).
Progress
-
-
Paused at line 759 in
KakfaConsumer.java
, before reading aboutConsumerNetworkClient
.
-