Kafka Streams in Action - Real-time apps and microservices with the Kafka Streams API
Book Details
Full Title: Kafka Streams in Action - Real-time apps and microservices with the Kafka Streams API
Author: William P.Bejeck Jr.
ISBN/URL: 9781617294471
Reading Period: 2021.01.01–2021.01.06
Source: Googl-ing for books on Kafka
General Review
-
An rather good book:
-
Covers the topic in sufficient detail
-
Provides practical semi-realistic examples with code snippets are appropriately condensed (with irrelevant parts omitted, but nonetheless available on accompanying repository)
-
Where related concepts are required but that are not directly related to Kafka Streams, the author provides generally concise and yet accurate explanation of such topics, and also provide links for further exploration.
-
-
Chapter 2 provides a helpful overview into vanilla Kafka (i.e., Producers and Consumers API, not Streams), going into some details about common configuration properties and why we would want to change the defaults.
Specific Takeaways
Chapter 1 Welcome to Kafka Streams
-
Certain concepts in Kafka have their roots in the big data processing using traditional mapreduce approach:
-
Hot to distribute data across a cluster to achieve scale in processing
-
The use of key/value pairs and partitions to group distributed data together
-
Instead of avoiding failure, embracing failure by using replication
-
-
Some good use cases for stream processing include:
-
Credit card fraud
-
Intrusion detection
-
A large race, such as the New York City Marathon
-
The financial industry
-
-
Some situations where stream processing may not be a good fit (i.e., when the focus is on analyzing data over time, rather than just the most current data):
-
Economic forecasting
-
School curriculum changes
-
-
In Kafka Streams, we define a topology of porcessing nodes.
-
Records flow through the graph in a depth-first manner—that is, each record is processed in full by the entire topology before another record is forwarded through the topology.
-
As such, there is no need to have backpressure within Kafka Streams.
-
-
Steps in building a Kafka Streams application:
-
Define a source. A source might be a single topic, multiple topics in a comma-separated list, or a regex that can match one or more topics.
-
Add processor(s), such as:
-
A mapper to update an object (i.e., take in object A and outputs object
-
An output to another Kafka topic for other application
-
An output to another Kafka topic for persisting into data store
-
-
Chapter 2 Kafka Quickly
-
How logs work in Kafka
-
The
log.dir
setting specify where Kafka storers log data. -
Each topic maps to a subdirectory under the specified log directory.
-
There will be as many subdirectories as there are topic partitions, with a format of partition-name_partition-number.
-
Inside each directory is the log file where incoming messages are appended. Once the log file reach a ceratin size (either a number of record or size on disk), or when a configured time difference between message timestamps is reached, the log file is "rolled", and Kafka appends incoming messages to a new log.
-
-
-
Partitioning in Kafka allows for (a) greater throughput and (b) spreading of a topic's message across multiple machines.
-
One reason for wanting to write a customer partitioner is when we have composite keys.
-
For example, we have purchase data flowing into Kafka, and the keys contain two values: a customer ID and transaction date. But we need to group values by customer ID, so taking a hash of the customer ID and the purchase date won't work.
-
-
Deterimining the correct number of partitions:
-
More partitions means higher throughput, which is often necessary when we have more data.
-
However, more partitions also means more TCP connections and open file handles.
-
Another thing that affects throughput is how long processing takes within the consumer.
-
-
There are two ways to deal with old log data: deletion and compaction.
-
Deletion
-
Log deletion involves two steps: first the logs are rolled into segments, and then the oldest segments are deleted.
-
Kafka can roll log files into segments when the timestamp of the latest incoming message is greater than the timestamp of the oldest message by certain duration, configurable by
log.roll.ms
orlog.roll.hours
. -
Kafka removes old segments based on the timestamp of the messages in the segment, based on the following configuration:
log.retention.ms
,log.retention.minutes
,log.retention.hours
. -
Other relevant configuration to consider includes:
log.retention.bytes
, andlog.segnment.bytes
. -
Deletion of logs works well for non-keyed records, or records that stand alone. But if we have keyed data and expected updates, we might want to consider using log compaction.
-
-
Compaction
-
Problem solved by log compaction: An application is restarted and needs to rebuild the information for each key, but some of the less frequently updated keys exists only in deleted log segments, and the data is lost.
-
Log compaction deletes old records per key in a log, as opposed to deleting entire segments based on time or size.
-
At a high level, log compaction is achieved by have a pool of log cleaner threads running in the background that recopies log segments while removing records for keys that has a newer record.
-
By default, the log cleaner is enabled. To use compaction for a topic, we'll need to set the
log.cleanup.policy=compact
. -
Compaction is used in Kakfa Streams when using state stores, but we don't need to create those logs/topics ourselves because its handled by the framework.
-
-
-
Kafka producers are thread-safe. All sends to Kafka are asynchronous—
Producer.send
returns immediately once the producer places the record in an internal buffer. Depending on the configuration, there might be some blocking if the buffer is full when attempting to send a message.-
Some of the common properties when starting a
KafkaProducer
are as follows:-
Bootstrap servers—
bootstrap.servers
is a comman-separated list ofhost:post
values. This list is used for initially connecting to the cluster. -
Serialization—
key.serializer
andvalue.serializer
instruct Kafka how to convert the keys and values into byte arrays. -
acts—
acks
specifiies the minimum number of acknowledgements from a broker that the producer will wait fro before considering a record send completed. Valid values areall
,0
, and1
. -
Retries—If sending a batch results in a failure,
retries
specifies the number of times to attempt to resend. If record order is important, we should consider settingmax.in.flight.requests.per.connection
to1
to prevent the scenario of a second batch being sent successfully before a failed record being sent as the result of a retry. -
Compression type
-
Partitioner class
-
-
When creating a
ProducerRecord
, we can manually specify the partiton and/or timestamp.-
One reason for manually specifying the partition is as follows: the incoming records are keyed, but the downstream consumer doesn't can handle any value that the key may hold, and we want to ensure the distribution of records to each partition is roughly equal.
-
-
-
Kafka consumers are stateful, and manages state by periodically committing the offsets of messages consumed from Kafka.
-
Committing an offset has two implications for a consumer:
-
Committing implies that the consumer has fully processed the message.
-
Committing also represents the starting point for that consumer in the case of a failure or restart.
-
-
When there is a new consumer instance (either a completely new one or because or restarts), where the consumer starts from will depend on the configuration:
-
auto.offset.reset="earliest"
-
auto.offset.reset="latest"
-
auto.offset.reset="none"—No reset strategy; the broker throws an exception to the consumer.
-
-
Automatic offset commits are enabled by default, and they are represented by the
enable.auto.commit
property. The companion optionauto.commit.interval.ms
specifies how often the consumer will commit offsets (default is 5 seconds). Note that if the value is too small, it will increase network traffic; if it's too large, it could result in the consumer receiving large amounts of repeated data in the event of a failure or restart. -
Manual offset commits:
-
There are two types of manually commited offsets—synchronous and asynchrous.
-
Examples of synchronous commits:
// Blocks until all offsets return from the last retrieval // succeed. Applies to all subscribed topics and partitions. consumer.commitSync(); // Commits only the offsets, partitions, and topics specified in the // map. consumer.commitSync(Map<TopicPartition, OffsetAndMetadata>);
-
-
Consumer is typically runned in a loop, where it polls for a period specified in milliseconds.
-
When running multiple consumer instances, the total thread count across all consumer instances shouldn't exceed the total number of partitions in the topic. This is because any threads in excess of the total partition count will be idle.
-
Rebalancing is the process of adding and removing topic-partition assignments to consumers.
-
It is possible for a
KafkaConsumer
to manually subscribe to a specific partition in the topic:TopicPartition fooTopicPartition_0 = new TopicPartition("foo", 0); TopicPartition barTopicPartition_0 = new TopicPartition("bar", 0); consumer.assign(Arrays.asList(fooTopicPartition_0, barTopicPartition_0);
-
There are however various tradeoff to be considered (e.g., failures will not result in topic partitions being reassigned, even for consumers with the same group ID).
-
-
Chapter 3 Developing Kafka Streams
-
Kafka Streams provides two APIs: the higher level Kafka Streams DSL, and the lower level Processor API.
-
A simple example of a Kafka Streams application using the Streams DSL is as follows:
KStream<String, String> simpleFirstStream = builder.stream("src-topic", Consumed.with(stringSerde, stringSerde)); KStream<String, String> upperCasedStream = simpleFirstStream.mapValues(String::toUpperCase); upperCasedStream.to("out-topic", Produced.with(stringSerde, stringSerde)); // Equivalently, using a fluent approach: builder.stream("src-topic", Consumed.with(stringSerde, stringSerde)) .mapValues(String::toUpperCase) .to("out-topic", Produced.with(stringSerde, stringSerde));
-
The general steps to creating a Kafka Streams application are as follows:
-
Create a
StreamsConfig
instance -
Create a
Serde
object -
Construct a processing topology, for example
-
mapping the keys and/or values to different objects (with
KStream.mapValues
and/orKStream.map
), -
filtering and/or branching (with
KStream.filter
andKStream.branch
), -
deriving completely new keys from the values (i.e., rekeying with
KStream.selectKey
)
-
-
Start the Kafka Streams program
-
-
The
KStream.print
andKStream.peek
methods are useful during development to print the records flow through Kafka to STDOUT / file.
Chapter 4 Streams and State
-
A related method to
KStream.mapValues()
isKStream.transformValues()
, which operate in the same way, save that the latter is able to use (i.e., maintain and update) local state. -
When retrieving values from local state using a particular identifier (e.g., customer ID) on the record, it is crucial that that the records are partitioned by that same identifier to ensure all records relating to the same identifier is sent to the same process (or
StreamTask
to be more accurate). -
KStream.through()
can be used to transparently repartition record by a different key.-
Under the hoods, the method is actually writing the records out to a different topic using the new key, and return a
KStream
that is reading from the new topic. -
Implement
StreamPartitioner
and pass it theKStream.through()
to customize how to determine which partition to send the record. Note: if we have already modified the key in an earlier step, it may not be necessary to implement a customStreamPartitioner
, and we can just use theDefaultPartitioner
. -
Note: Repartitioning comes at the cost of data duplication and additional overheads. As such, try to use
mapValues()
,transformValues()
, orflatMapValues()
whenever possible, becausemap()
,transform()
, andflatMap()
can trigger automatic repartitioning. It's best to use repartitioning logic sparingly.
-
-
State stores are customised using either
Materialized
orStoreBuilder
class. The former is typically used with the high-level Kafka Streams DSL, whereas the latter is typically used with the lower-level Processor API. -
Some of the available state stores include:
-
Stores.persistentKeyValueStore
-
Stores.lruMap
-
Stores.persistentWindowStore
-
Stores.persistentSessionStore
-
-
By default, all the
StateStoreSupplier
have logging enabled—that is, a Kafka topic is used as a changelog to back up the values in the store and provide fault tolerance. -
By default, changelog topics are compacted; however, the cleanup policy can be configured to be delete, compact, or both.
-
In Kakfa Streams, whenever we invoke a method that could result in generating a new key (
selectKey
,map
, ortransform
), an internal Boolean flag is set totrue
, indicating that the newKStream
instance requires repartitioning.-
With this Boolean flag set, if we perform a join, reduce, or aggregation operation, the repartitioning is handled for us automatically.
-
-
Two streams can be joined in various ways:
-
inner join—where a record from the joined stream is emitted only if the record for the particular key is present in both source streams within the time window specified.
-
outer join—where if either side of the join isn't present when the time window expires, the joined stream will emit a record built using only the available side.
-
left outer join—where a record is emitted from the joined stream if the key is present in main stream, but may or may not be present in the second stream.
-
-
In an event-driven system, there are usually three different possible notion of timestamps:
-
Event time—when the event occured, usually embedded within the event data
-
Ingest time—when the record is appended to the log
-
Processing time—when actual processing of the record starts
-
-
Kafka Stream provides a
TimestampExtractor
interface (with some concrete implementations) to allow choosing the correct type of timestamp as required by the application. -
Timestamps affects the following functionalities:
-
Joining streams (i.e., the window period)
-
Upadating a changelog (i.e.,
KTable
API) -
Deciding when the
Processor.punctuate()
method is triggered (i.e., Procesor API)
-
Chapter 5 The KTable API
-
Two important questions in relation to
KTable
are:-
Where are the data stored?
-
Ans: Using an internal state store.
-
-
How often are records emitted from the
KTable
?-
Ans: Depends on (a) rate of records entering the system, (b) how many distinct keys are in the data, and (c) the configuration parameters
cache.max.bytes.buffering
andcommit.interval.ms
. -
A larger cache means less frequent records. When a cache reaches its limit, records will be sent.
-
The commit interval specifies how often the state of the processor should be saved, during which there will be a cache flush, and the
KTable
will send the latest updated, deduplicated records downstream.
-
-
-
KGroupedStream
is an intermediate representation of the event stream after grouping by keys (KStream.groupBy
orKStream.groupByKey
) and is not meant to be worked with directly. It is used to perform aggregation operations.-
The analogous
KGroupedTable
(produced byKTable.groupBy
) is the intermediate representation of the update stream regrouped by key.
-
-
Aggregation operations can be performed on a rolling basis or windowed basis.
-
In Kafka Streams, three types of windows are available:
-
Session windows (activity-based, i.e., combined if inactivity gap is small;
SessionWindows.with(...).until(...)
) -
Tumbling windows (time-based, non-overlapping;
TimeWindows.of(...)
) -
Sliding/hopping windows (time-based, overlapping;
TimeWindows.of(...).advanceBy(...)
)
-
-
When performing a
KStream-to-KTable
join, there is no need to provide aJoinWindow
because there is only one record per key in theKTable
.-
The join is unrelated to time, the record is either present in the
KTable
or not.
-
-
When using
KTable
to maintain certain state to enrich otherKStream
, there will likely be repartitioning involved due to the need to change the key to something appropriate for the joining. Such repartitioning comes with overheads.-
When the lookup data for enriching the
KStream
is small enough to fit on every node, theGlobalKTable
may be used to avoid the repartitioning.GlobalKTable
also allows non-key joins.
-
-
The available joins (as of Kafka Streams 1.0.0) are as follows:
Left Join Inner Join Outer Join KStream-KStream KStream-KStream KStream-KStream KStream-KTable KStream-KTable N/A KTable-KTable KTable-KTable KTable-KTable KStream-GlobalKTable KStream-GlobalKTable N/A -
It is possible to make a state store directly queryable, providing benefits such as not needing to write to an external data store before being able to query the data.
Chapter 6 The Processor API
-
General steps for using the Processor API:
-
Use
Topology.addSource()
to "subscribe" to a topic. -
Add one or more processors using
Topology.addProcessor()
, passing in (among others) the name of the parent source or processor, and also theProcessorSupplier
that supply the actual processor for processing records.-
The
Processor
instance supplied is usually a subclass ofAbstractProcessor
, and we override the any, some or all of theinit()
,process()
,punctuate()
, andclose()
methods as required. -
context().forward(<key>, <value>, <downstream-node>)
is called within a processor to forward the recrod to a downstream node. -
Override the
init()
method if we require additional setup in the processor, like configuring a state store.
-
-
(Optional) Add one or more state stores using
Topology.addStateStore()
. -
Add one of more sinks using
Topology.addSink()
.
-
-
Keeping most of the business logic out of the processor is generally a good idea. One way to achieve this is for the processor to rely on another class which contains the actual business logic.
-
The
punctuate()
andprocessor()
methods are never runned concurrently. -
Review of Kafka Stream's architecture:
-
Each
StreamTask
has its own copy of a local state store, andStreamThread
objects don't share tasks or data. -
As records make their way through the topology, each node is visited in a depth-first manner, meaning there's never concurrent access to state stores from any given processor.
-
-
It is possible to combine the Processor API and Kafka Streams DSL.
-
The Kafka Streams DLS offers three methods for plugging in functionality using processor API:
KStream.process
,KStream.transform
, andKStream.transformValues
. -
KStream.process
creates a terminal node. -
KStream.transform
returns aKStream
instance for addition chaining.KStream.transform
is also stateful, so we'll need to provide a state store name.-
To return multiple records from a
KStream.transform
step, return aList<KeyValue<K,V>>
, and then attach aflapMap
orflapMapValues
to send individual records downstream.
-
-
Chapter 7 Monitoring and Performance
Measuring consumer and producer performance
-
The
kafka-consumer-groups.sh
script provide with Kafka allows us to check the consumer lag.
Intercepting the producer and consumer
-
Kafka Improvement Proposal 42 introduced interceptors for intercepting records between brokers and consumers/producers.
-
For example, the
ConsumerInterceptor.onConsume()
reads the record between the point where they're retrieved from the broker, and before the messages are return from theConsumer.poll()
method.-
Other interceptors include
ConsumerInterceptor.onCommit()
,ProducerInterceptor.onSend()
andProducerInterceptor.onAcknowledgement()
.
-
-
When interceptor(s) fail, errors will be logged, be the processing with continue without the changes that would have been made by the failed interceptor(s).
Application metrics
-
Some categories of metrics include:
-
Thread metrics
-
Average time for commits, poll, process operations
-
Tasks created per second, tasks closed per second
-
-
Task metrics
-
Average number of commits per second
-
Average commit time
-
-
Processor node metrics
-
Average and max processing time
-
Average number of process operations per second
-
Forward rate
-
-
State store metrics
-
Average execution time for put, get, and flush operations
-
Average number of put, get, and flush operations per second
-
-
-
Kafka Streams provides mechanisms to expose metrics to JMX, which then can be accessed via Java VisualVM, JConsole, and Java Mission Control.
-
Topology.described()
can be used to print information regarding the structure of the program, including any internal topics created to support repartitioning.-
A
Topology
object can be obtained even when using the Streams DSL by callingStreamsBuilder.build()
.
-
-
KafkaStreams.localThreadsMetadata()
can be used to accessStreamThread
objects, which can then be used to obtained information about the stream threads. -
A Kafka Streams application determines the number of tasks to create by taking the max partition size among all input topics.
-
The rebalance process then assigns the two tasks across various stream threads.
-
-
A Kafka Streams application can be in one of six states at any point in time:
-
Created (initial state)
-
Running
-
Rebalancing
-
Error (temporary state, leads to Pendig Shutdown)
-
Pending Shutdown (temporary state, leads to Not Running)
-
Not Running (terminal state)
-
-
KafkaStreams.setStateListener
(used withStateListener
) can be used to listen to state changes.-
The related
StateRestoreListener
allows listening to state store restoration events.
-
-
KafkaStreams.setUncaughtExceptionHandler
can be used to handle unexpected errors.
Chapter 8 Testing a Kafka Streams Application
Testing a topology
-
The
ProcessorTopologyTestDriver
can be used to test individualTopology
objects without needing to run Kafka.
Building the test
-
The steps are as follows:
-
Instantiate an instance of the
Topology
and configurations. -
Create an instance of
ProcessorTopologyTestDriver
using the objects in step 1 above. -
Call
process()
on theProcessorTopologyTestDriver
instance in step 2 above, passing in sample record. -
Call
readOutput()
on the sameProcessorTopologyTestDriver
instance in step 3 above to retrieve an output from a topic (as specified in the arguments). -
Assert on the output in step 4 above.
-
Testing a state store in the topology
-
ProcessorTopologyTestDriver.getKeyValueStore()
can be used to retrieve the state store from the topology, for performing assertions on the state store (or items retrieved from the state store).
Testing processors and transformers
-
Processor
andTranformer
objects depend onProcessorContext
; when testing, theProcessorContext
can be replaced with a mock from Mockito or theMockProcessorContext
object from Kafka, depending on the particular test scenario.-
If we intend to verify the parameters passed to a mock, the value returned, or any other behavior, it would be better to use a mock object generated by a framework like Mockito.
-
Integration testing
-
EmbeddedKafkaCluster
can be used to create a—wait for it—embedded Kafka cluster for integration testing purposes. Note that the@ClassRule
annotation is required to ensure the set up and tear down methods in relation toEmbeddedKafkaCluster
are runned. For example:private static final int NUM_BROKERS = 1; @ClassRule public static final EmbededKafkaCluster EMBEDDED_KAFKA = new EmbeddedKafkaCluster(NUM_BROKERS); @BeforeClass public static void setUpAll() throws Exception { EMBEDDED_KAFKA.createTopic(MY_IN_TOPIC); EMBEDDED_KAFKA.createTopic(MY_OUT_TOPIC); // Other overloads of createTopic are available if additional // configuration of the topic is required. }
-
The
IntegrationTestUtils
class provides useful helper methods likeproduceValuesSynchronously
andwaitUtilMinValuesRecordsReceived
for testing.
Chapter 9 Advanced Applications with Kafka Streams
Integrating Kafka with other data sources
-
Kafka JDBC Connector can be used to transfer data to/from RDBMS and Kakfa. When used to import data into Kakfa topics, some of the important configurations for the JDBC Connector are as follows:
-
name
-
connector.class
-
tasks.max
-
connection.url
-
mode
—How the connector will detect new rows it needs to load. -
incrementing.column.name
—Whenmode
isincrementing
, this option specify the particular column which has an incrementing value that the connector will use to detect new rows. (Note: whenmode
isincrementing
, updates to existing rows will not be picked up by the connector.) -
topic.prefix
-
-
Kakfa Connect itself also requires configuration. Some of the important ones are as follows:
-
bootstrap.servers
-
key.converter
-
value.converter
—e.g.,org.apache.kafka.connect.json.JsonConverter
-
value.converter.schemas.enable
-
plugin.path
-
offset.storage.file.filename
-
-
Kafka Connect has the concept of
Transformations
that let us perform lightweight transformations before Connect writes data to Kafka.-
Some built-in transformations include:
-
ValueToKey
(for extracting key from the value) -
FlattenStruct
(for flattening any struct outputs from earlier transformation steps, like when usingExtractKey
forValueToKey
transformation) -
TimestampConverter
(for converting timestamp from database format to different format for Kafka) -
ReplaceField
(for mapping table column names to JSON field names)
-
-
Kicking your database to the curb
-
State stores within Kafka can be exposed directly to external services for read-only operations.
-
The store might retrieved via the following example, and exposed via REST:
// Other store types include sessionStore and keyValueStore ReadOnlyWindowStore readOnlyStore = kafkaStreams.store(storeName, QueryableStoreTypes.windowStore());
-
Note however that Kafka Streams assigns a state store per task, so it is possible that thet state store that was queried may not contain the relevant key.
-
To enable interactive queries, we need to set the
StreamsConfig.APPLICATION_SERVER_CONFIG
parameter tohostname:port
of the Kafka Streams application, and the port that a query service will listen on. -
Several methods on the
KafkaStreams
object allow for retrieving information for all running instances with the same application ID and defining theAPPLICATION_SERVER_CONFIG
. For example:-
KafkaStreams.allMetadata()
-
KafkaStreams.AllMetadataForStore(<store-name>)
-
KafkaStremas.AllMetadataForKey(Key, Serializer)
-
KafkaStreams.AllMetadataForKey(Key, StreamPartitioner)
-
-
KSQL
-
KSQL is made up of two components: a CLI and a server.
-
The CLI communicates with the server via REST interface, and the server will communicate with Kafka (only a different JVM).
-
-
Before a topic may be used with KSQL with a CLI, the topic needs to be registered via a
CREATE STREAM
statement (analogous to theCREATE TABLE
in traditional SQL). -
Some useful commands on the CLI are:
-
list topics
—show a list of topics on the broker, and whether the topics are registered on the KSQL CLI. -
show properties
—shows the various properties in relation to the underlying Kafka Streams / Kafka.
-
Appendix A Additional Configuration Information
Limiting the number of rebalances on startup
-
The
group.initial.rebalance.delay.ms
option can be used to delay the initial consumer rebalance from theGroupCoordinator
. -
This option can be tuned to avoid the situation where the first instance of the Kafka Streams application gets assigned all partitions, and for each instance that is brought up shortly after, the partitions are rebalanced once more.
Resilience to broker outages
-
Some recommended settings to make the Kafka Streams application resilient in face of broker failures:
-
Set
Producer.NUM_RETRIES
toInteger.MAX_VALUE
-
Set
Producer.REQUEST_TIMEOUT
to305000
(5 minutes) -
Set
Producer.BLOCK_MS_CONFIG
toInteger.MAX_VALUE
-
Set
Consumer.MAX_POLL_CONFIG
toInteger.MAX_VALUE
-
-
For example:
Properties props = new Properties(); props.put(StreamsConfig.producerPrefix( ProducerConfig.RETRIES_CONFIG), Integer.MAX_VALUE); props.put(StreamsConfig.producerPrefix( ProducerConfig.MAX_BLOCK_MS_CONFIG), Integer.MAX_VALUE); props.put(StreamsConfig.producerPrefix( ProducerConfig.RERQUEST_TIMEOUT_MS_CONFIG), 305000); props.put(StreamsConfig.producerPrefix( ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), Integer.MAX_VALUE);
-
YJ: We might also need to look at
delivery.timeout.ms
for producer.
Handling deserialization errors
-
Kafka Streams provides the
default.serialization.exception.handler
andStreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG
configurations to specify how we want to handle these deserialization errors. -
The default setting is
org.apache.kafka.streams.errors.LogAndFailExceptionHandler
. -
There is also a
org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
. -
A custom deserialization exception handler can be implemented by implementing the
DeserializationExceptionHandler
interface.
Scaling up your application
RocksDB Configuration
-
Refer to the RocksDB Tuning Guide for how to tune the RocksDB (which underlies state stores):
Creating repartitioning topics ahead of time
-
In Kafka Streams, when performing an operation that may potentially change the
map
key (liketranform
orgroupBy
), an internal flag is set in theStreamsBuilder
class, and as soon as we add a further operation using the updated key, a repartitioning operation will be triggered.-
However, if there are multiple downstream operations using the updated key, Kafka Streams may trigger multiple repartitioning although only one is really needed.
-
To avoid this, use the
KStreams.through
to manually repartition. -
YJ: please note that the
KStream.through
might have been deprecated / renamed.
-
Configuring internal topics
-
It is generally a good idea to configure our internal topics to keep their size manageable. There are two options for doing so.
-
The first option is to provide configurations directly when creating state stores, using either
StoreBuilder.withLoggingEnabled
orMaterialized.withLoggingEnabled
. -
The other option is to provide configurations with configuring the Kafka Streams application itself.
Resetting your Kafka Streams application
-
The
kafka-streams-application-reset.sh
script may be used to, among others:-
reset input topics to the earliest available offset,
-
reset intermedadiate topics to the latest offset, and/or
-
delete any internal topics.
-
-
Note that we'll need to call
KafkaStreams.cleanUp
the next time we start the application, to delete any local state from previous runs.
Cleaning up local state
-
Call
KafkaStreams.cleanUp
either beforeKafkaStreams.start
or afterKafkaStreams.stop
to clean out all previous state store.
Appendix B Exactly once semantics
To Internalize Now
-
Kafka Streams processes each record in a depth-first manner, meaning the a record must be completely processed by the processors and sent out via one or more sinks (if applicable) before the next record is sent down the processing pipeline.
-
Kafka Streams application creates x number of tasks based on the highest number of partitions across all topics the application is subscribed to. Each topic-partition is then assigned to a task (this assignment is never changed). The tasks are then distributed across all application with the same consumer group ID, rebalancing as required (e.g., when new consumer joins the cluster).
To Learn/Do Soon
-
Although not referred to in the book, this article explains
KTable.toStream()
more clearly: -
Refer to this Confluence Page on KIP-138: Change punctuate semantics for some example uses for punctuation in stream processing, and the behavioral characteristics depending on which timestamps (event time or system time) is used:
-
How does auto commit work?
-
What offset is committed?
-
How does the auto commit logic know which offset to commit?
-
-
Learn about Kakfa JDBC Connector:
To Revisit When Necessary
Chapter 1 Welcome to Kafka Streams
-
Refer to this chapter of an example of a Kafka Streams application for processing for a source purchase event to generate data relating to rewards program and purchase patterns analysis, and to persist the data.
Chapter 2 Kafka Quickly
-
Refer to this chapter for an example of using a custom partitioner.
Chapter 3 Developing Kafka Streams
-
Refer to this chapter for an excellent example of how to use Kafka Streams DSL to build a streaming application with various initial requirements, and also handling new requirements.
Chapter 4 Streams and State
-
Refer to this chapter for an example of using state store, including repartitioning by a specific key and joining two streams.
Chapter 5 The KTable API
-
Refer to this chapter for:
-
explanation of the difference between
KStream
andKTable
-
how to perform (rolling) aggregation on
KStream
andKTable
(summation, and top-n ranking), with semi-realistic examples: summation of stock trading volumes by ticker, and ranking top-n industry by volumes traded -
how to perform windowing aggregation
-
example of a join between a
KStream
and aKTable
-
Chapter 7 Monitoring and Performance
-
For example for programmatic access to Kafka Stream metrics, refer to the book's accompanying Git repository:
Chapter 8 Testing a Kafka Streams Application
-
Refer to this chapter for illustrative examples of how to do unit and integration tests with Kafka.
Chapter 9 Advanced Applications with Kafka Streams
-
Refer to this chapter for example configuration of Kafka JDBC Connector, together with explanation of the individual configuration settings.
-
Refer to this chapter for an example of setting up state stores for direct external read-only access.
Other Resources Referred To
-
Refer to the accompanying code repository for examples of various ways of using Kafka Streams:
-
Refer to Kafka's official documentation on how to monitor a full Kafka custer:
-
For a lists of Kafka Streams metrics to consider monitoring, see Kafka's official documentation: