Kafka Streams in Action - Real-time apps and microservices with the Kafka Streams API

Book Details

  • Full Title: Kafka Streams in Action - Real-time apps and microservices with the Kafka Streams API

  • Author: William P.Bejeck Jr.

  • ISBN/URL: 9781617294471

  • Reading Period: 2021.01.01–2021.01.06

  • Source: Googl-ing for books on Kafka

General Review

  • An rather good book:

    • Covers the topic in sufficient detail

    • Provides practical semi-realistic examples with code snippets are appropriately condensed (with irrelevant parts omitted, but nonetheless available on accompanying repository)

    • Where related concepts are required but that are not directly related to Kafka Streams, the author provides generally concise and yet accurate explanation of such topics, and also provide links for further exploration.

  • Chapter 2 provides a helpful overview into vanilla Kafka (i.e., Producers and Consumers API, not Streams), going into some details about common configuration properties and why we would want to change the defaults.

Specific Takeaways

Chapter 1 Welcome to Kafka Streams

  • Certain concepts in Kafka have their roots in the big data processing using traditional mapreduce approach:

    • Hot to distribute data across a cluster to achieve scale in processing

    • The use of key/value pairs and partitions to group distributed data together

    • Instead of avoiding failure, embracing failure by using replication

  • Some good use cases for stream processing include:

    • Credit card fraud

    • Intrusion detection

    • A large race, such as the New York City Marathon

    • The financial industry

  • Some situations where stream processing may not be a good fit (i.e., when the focus is on analyzing data over time, rather than just the most current data):

    • Economic forecasting

    • School curriculum changes

  • In Kafka Streams, we define a topology of porcessing nodes.

    • Records flow through the graph in a depth-first manner—that is, each record is processed in full by the entire topology before another record is forwarded through the topology.

    • As such, there is no need to have backpressure within Kafka Streams.

  • Steps in building a Kafka Streams application:

    1. Define a source. A source might be a single topic, multiple topics in a comma-separated list, or a regex that can match one or more topics.

    2. Add processor(s), such as:

      • A mapper to update an object (i.e., take in object A and outputs object

      • An output to another Kafka topic for other application

      • An output to another Kafka topic for persisting into data store

Chapter 2 Kafka Quickly

  • How logs work in Kafka

    • The log.dir setting specify where Kafka storers log data.

    • Each topic maps to a subdirectory under the specified log directory.

      • There will be as many subdirectories as there are topic partitions, with a format of partition-name_partition-number.

      • Inside each directory is the log file where incoming messages are appended. Once the log file reach a ceratin size (either a number of record or size on disk), or when a configured time difference between message timestamps is reached, the log file is "rolled", and Kafka appends incoming messages to a new log.

  • Partitioning in Kafka allows for (a) greater throughput and (b) spreading of a topic's message across multiple machines.

  • One reason for wanting to write a customer partitioner is when we have composite keys.

    • For example, we have purchase data flowing into Kafka, and the keys contain two values: a customer ID and transaction date. But we need to group values by customer ID, so taking a hash of the customer ID and the purchase date won't work.

  • Deterimining the correct number of partitions:

    • More partitions means higher throughput, which is often necessary when we have more data.

    • However, more partitions also means more TCP connections and open file handles.

    • Another thing that affects throughput is how long processing takes within the consumer.

  • There are two ways to deal with old log data: deletion and compaction.

    • Deletion

      • Log deletion involves two steps: first the logs are rolled into segments, and then the oldest segments are deleted.

      • Kafka can roll log files into segments when the timestamp of the latest incoming message is greater than the timestamp of the oldest message by certain duration, configurable by log.roll.ms or log.roll.hours.

      • Kafka removes old segments based on the timestamp of the messages in the segment, based on the following configuration: log.retention.ms, log.retention.minutes, log.retention.hours.

      • Other relevant configuration to consider includes: log.retention.bytes, and log.segnment.bytes.

      • Deletion of logs works well for non-keyed records, or records that stand alone. But if we have keyed data and expected updates, we might want to consider using log compaction.

    • Compaction

      • Problem solved by log compaction: An application is restarted and needs to rebuild the information for each key, but some of the less frequently updated keys exists only in deleted log segments, and the data is lost.

      • Log compaction deletes old records per key in a log, as opposed to deleting entire segments based on time or size.

      • At a high level, log compaction is achieved by have a pool of log cleaner threads running in the background that recopies log segments while removing records for keys that has a newer record.

      • By default, the log cleaner is enabled. To use compaction for a topic, we'll need to set the log.cleanup.policy=compact.

      • Compaction is used in Kakfa Streams when using state stores, but we don't need to create those logs/topics ourselves because its handled by the framework.

  • Kafka producers are thread-safe. All sends to Kafka are asynchronous—Producer.send returns immediately once the producer places the record in an internal buffer. Depending on the configuration, there might be some blocking if the buffer is full when attempting to send a message.

    • Some of the common properties when starting a KafkaProducer are as follows:

      • Bootstrap serversbootstrap.servers is a comman-separated list of host:post values. This list is used for initially connecting to the cluster.

      • Serializationkey.serializer and value.serializer instruct Kafka how to convert the keys and values into byte arrays.

      • actsacks specifiies the minimum number of acknowledgements from a broker that the producer will wait fro before considering a record send completed. Valid values are all, 0, and 1.

      • Retries—If sending a batch results in a failure, retries specifies the number of times to attempt to resend. If record order is important, we should consider setting max.in.flight.requests.per.connection to 1 to prevent the scenario of a second batch being sent successfully before a failed record being sent as the result of a retry.

      • Compression type

      • Partitioner class

    • When creating a ProducerRecord, we can manually specify the partiton and/or timestamp.

      • One reason for manually specifying the partition is as follows: the incoming records are keyed, but the downstream consumer doesn't can handle any value that the key may hold, and we want to ensure the distribution of records to each partition is roughly equal.

  • Kafka consumers are stateful, and manages state by periodically committing the offsets of messages consumed from Kafka.

    • Committing an offset has two implications for a consumer:

      1. Committing implies that the consumer has fully processed the message.

      2. Committing also represents the starting point for that consumer in the case of a failure or restart.

    • When there is a new consumer instance (either a completely new one or because or restarts), where the consumer starts from will depend on the configuration:

      • auto.offset.reset="earliest"

      • auto.offset.reset="latest"

      • auto.offset.reset="none"—No reset strategy; the broker throws an exception to the consumer.

    • Automatic offset commits are enabled by default, and they are represented by the enable.auto.commit property. The companion option auto.commit.interval.ms specifies how often the consumer will commit offsets (default is 5 seconds). Note that if the value is too small, it will increase network traffic; if it's too large, it could result in the consumer receiving large amounts of repeated data in the event of a failure or restart.

    • Manual offset commits:

      • There are two types of manually commited offsets—synchronous and asynchrous.

      • Examples of synchronous commits:

          // Blocks until all offsets return from the last retrieval
          // succeed. Applies to all subscribed topics and partitions.
          consumer.commitSync();
        
          // Commits only the offsets, partitions, and topics specified in the
          // map.
          consumer.commitSync(Map<TopicPartition, OffsetAndMetadata>);
    • Consumer is typically runned in a loop, where it polls for a period specified in milliseconds.

    • When running multiple consumer instances, the total thread count across all consumer instances shouldn't exceed the total number of partitions in the topic. This is because any threads in excess of the total partition count will be idle.

    • Rebalancing is the process of adding and removing topic-partition assignments to consumers.

    • It is possible for a KafkaConsumer to manually subscribe to a specific partition in the topic:

        TopicPartition fooTopicPartition_0 = new TopicPartition("foo", 0);
        TopicPartition barTopicPartition_0 = new TopicPartition("bar", 0);
        consumer.assign(Arrays.asList(fooTopicPartition_0, barTopicPartition_0);
      • There are however various tradeoff to be considered (e.g., failures will not result in topic partitions being reassigned, even for consumers with the same group ID).

Chapter 3 Developing Kafka Streams

  • Kafka Streams provides two APIs: the higher level Kafka Streams DSL, and the lower level Processor API.

  • A simple example of a Kafka Streams application using the Streams DSL is as follows:

      KStream<String, String> simpleFirstStream =
          builder.stream("src-topic", Consumed.with(stringSerde, stringSerde));
      KStream<String, String> upperCasedStream =
          simpleFirstStream.mapValues(String::toUpperCase);
      upperCasedStream.to("out-topic", Produced.with(stringSerde, stringSerde));
    
      // Equivalently, using a fluent approach:
      builder.stream("src-topic", Consumed.with(stringSerde, stringSerde))
          .mapValues(String::toUpperCase)
          .to("out-topic", Produced.with(stringSerde, stringSerde));
  • The general steps to creating a Kafka Streams application are as follows:

    1. Create a StreamsConfig instance

    2. Create a Serde object

    3. Construct a processing topology, for example

      • mapping the keys and/or values to different objects (with KStream.mapValues and/or KStream.map),

      • filtering and/or branching (with KStream.filter and KStream.branch),

      • deriving completely new keys from the values (i.e., rekeying with KStream.selectKey)

    4. Start the Kafka Streams program

  • The KStream.print and KStream.peek methods are useful during development to print the records flow through Kafka to STDOUT / file.

Chapter 4 Streams and State

  • A related method to KStream.mapValues() is KStream.transformValues(), which operate in the same way, save that the latter is able to use (i.e., maintain and update) local state.

  • When retrieving values from local state using a particular identifier (e.g., customer ID) on the record, it is crucial that that the records are partitioned by that same identifier to ensure all records relating to the same identifier is sent to the same process (or StreamTask to be more accurate).

  • KStream.through() can be used to transparently repartition record by a different key.

    • Under the hoods, the method is actually writing the records out to a different topic using the new key, and return a KStream that is reading from the new topic.

    • Implement StreamPartitioner and pass it the KStream.through() to customize how to determine which partition to send the record. Note: if we have already modified the key in an earlier step, it may not be necessary to implement a custom StreamPartitioner, and we can just use the DefaultPartitioner.

    • Note: Repartitioning comes at the cost of data duplication and additional overheads. As such, try to use mapValues(), transformValues(), or flatMapValues() whenever possible, because map(), transform(), and flatMap() can trigger automatic repartitioning. It's best to use repartitioning logic sparingly.

  • State stores are customised using either Materialized or StoreBuilder class. The former is typically used with the high-level Kafka Streams DSL, whereas the latter is typically used with the lower-level Processor API.

  • Some of the available state stores include:

    • Stores.persistentKeyValueStore

    • Stores.lruMap

    • Stores.persistentWindowStore

    • Stores.persistentSessionStore

  • By default, all the StateStoreSupplier have logging enabled—that is, a Kafka topic is used as a changelog to back up the values in the store and provide fault tolerance.

  • By default, changelog topics are compacted; however, the cleanup policy can be configured to be delete, compact, or both.

  • In Kakfa Streams, whenever we invoke a method that could result in generating a new key (selectKey, map, or transform), an internal Boolean flag is set to true, indicating that the new KStream instance requires repartitioning.

    • With this Boolean flag set, if we perform a join, reduce, or aggregation operation, the repartitioning is handled for us automatically.

  • Two streams can be joined in various ways:

    • inner join—where a record from the joined stream is emitted only if the record for the particular key is present in both source streams within the time window specified.

    • outer join—where if either side of the join isn't present when the time window expires, the joined stream will emit a record built using only the available side.

    • left outer join—where a record is emitted from the joined stream if the key is present in main stream, but may or may not be present in the second stream.

  • In an event-driven system, there are usually three different possible notion of timestamps:

    1. Event time—when the event occured, usually embedded within the event data

    2. Ingest time—when the record is appended to the log

    3. Processing time—when actual processing of the record starts

  • Kafka Stream provides a TimestampExtractor interface (with some concrete implementations) to allow choosing the correct type of timestamp as required by the application.

  • Timestamps affects the following functionalities:

    1. Joining streams (i.e., the window period)

    2. Upadating a changelog (i.e., KTable API)

    3. Deciding when the Processor.punctuate() method is triggered (i.e., Procesor API)

Chapter 5 The KTable API

  • Two important questions in relation to KTable are:

    1. Where are the data stored?

      • Ans: Using an internal state store.

    2. How often are records emitted from the KTable?

      • Ans: Depends on (a) rate of records entering the system, (b) how many distinct keys are in the data, and (c) the configuration parameters cache.max.bytes.buffering and commit.interval.ms.

      • A larger cache means less frequent records. When a cache reaches its limit, records will be sent.

      • The commit interval specifies how often the state of the processor should be saved, during which there will be a cache flush, and the KTable will send the latest updated, deduplicated records downstream.

  • KGroupedStream is an intermediate representation of the event stream after grouping by keys (KStream.groupBy or KStream.groupByKey) and is not meant to be worked with directly. It is used to perform aggregation operations.

    • The analogous KGroupedTable (produced by KTable.groupBy) is the intermediate representation of the update stream regrouped by key.

  • Aggregation operations can be performed on a rolling basis or windowed basis.

  • In Kafka Streams, three types of windows are available:

    1. Session windows (activity-based, i.e., combined if inactivity gap is small; SessionWindows.with(...).until(...))

    2. Tumbling windows (time-based, non-overlapping; TimeWindows.of(...))

    3. Sliding/hopping windows (time-based, overlapping; TimeWindows.of(...).advanceBy(...))

  • When performing a KStream-to-KTable join, there is no need to provide a JoinWindow because there is only one record per key in the KTable.

    • The join is unrelated to time, the record is either present in the KTable or not.

  • When using KTable to maintain certain state to enrich other KStream, there will likely be repartitioning involved due to the need to change the key to something appropriate for the joining. Such repartitioning comes with overheads.

    • When the lookup data for enriching the KStream is small enough to fit on every node, the GlobalKTable may be used to avoid the repartitioning. GlobalKTable also allows non-key joins.

  • The available joins (as of Kafka Streams 1.0.0) are as follows:

    Left Join Inner Join Outer Join
    KStream-KStream KStream-KStream KStream-KStream
    KStream-KTable KStream-KTable N/A
    KTable-KTable KTable-KTable KTable-KTable
    KStream-GlobalKTable KStream-GlobalKTable N/A
  • It is possible to make a state store directly queryable, providing benefits such as not needing to write to an external data store before being able to query the data.

Chapter 6 The Processor API

  • General steps for using the Processor API:

    1. Use Topology.addSource() to "subscribe" to a topic.

    2. Add one or more processors using Topology.addProcessor(), passing in (among others) the name of the parent source or processor, and also the ProcessorSupplier that supply the actual processor for processing records.

      • The Processor instance supplied is usually a subclass of AbstractProcessor, and we override the any, some or all of the init(), process(), punctuate(), and close() methods as required.

      • context().forward(<key>, <value>, <downstream-node>) is called within a processor to forward the recrod to a downstream node.

      • Override the init() method if we require additional setup in the processor, like configuring a state store.

    3. (Optional) Add one or more state stores using Topology.addStateStore().

    4. Add one of more sinks using Topology.addSink().

  • Keeping most of the business logic out of the processor is generally a good idea. One way to achieve this is for the processor to rely on another class which contains the actual business logic.

  • The punctuate() and processor() methods are never runned concurrently.

  • Review of Kafka Stream's architecture:

    • Each StreamTask has its own copy of a local state store, and StreamThread objects don't share tasks or data.

    • As records make their way through the topology, each node is visited in a depth-first manner, meaning there's never concurrent access to state stores from any given processor.

  • It is possible to combine the Processor API and Kafka Streams DSL.

    • The Kafka Streams DLS offers three methods for plugging in functionality using processor API: KStream.process, KStream.transform, and KStream.transformValues.

    • KStream.process creates a terminal node.

    • KStream.transform returns a KStream instance for addition chaining. KStream.transform is also stateful, so we'll need to provide a state store name.

      • To return multiple records from a KStream.transform step, return a List<KeyValue<K,V>>, and then attach a flapMap or flapMapValues to send individual records downstream.

Chapter 7 Monitoring and Performance

Measuring consumer and producer performance

  • The kafka-consumer-groups.sh script provide with Kafka allows us to check the consumer lag.

Intercepting the producer and consumer

  • Kafka Improvement Proposal 42 introduced interceptors for intercepting records between brokers and consumers/producers.

  • For example, the ConsumerInterceptor.onConsume() reads the record between the point where they're retrieved from the broker, and before the messages are return from the Consumer.poll() method.

    • Other interceptors include ConsumerInterceptor.onCommit(), ProducerInterceptor.onSend() and ProducerInterceptor.onAcknowledgement().

  • When interceptor(s) fail, errors will be logged, be the processing with continue without the changes that would have been made by the failed interceptor(s).

Application metrics

  • Some categories of metrics include:

    • Thread metrics

      • Average time for commits, poll, process operations

      • Tasks created per second, tasks closed per second

    • Task metrics

      • Average number of commits per second

      • Average commit time

    • Processor node metrics

      • Average and max processing time

      • Average number of process operations per second

      • Forward rate

    • State store metrics

      • Average execution time for put, get, and flush operations

      • Average number of put, get, and flush operations per second

  • Kafka Streams provides mechanisms to expose metrics to JMX, which then can be accessed via Java VisualVM, JConsole, and Java Mission Control.

  • Topology.described() can be used to print information regarding the structure of the program, including any internal topics created to support repartitioning.

    • A Topology object can be obtained even when using the Streams DSL by calling StreamsBuilder.build().

  • KafkaStreams.localThreadsMetadata() can be used to access StreamThread objects, which can then be used to obtained information about the stream threads.

  • A Kafka Streams application determines the number of tasks to create by taking the max partition size among all input topics.

    • The rebalance process then assigns the two tasks across various stream threads.

  • A Kafka Streams application can be in one of six states at any point in time:

    • Created (initial state)

    • Running

    • Rebalancing

    • Error (temporary state, leads to Pendig Shutdown)

    • Pending Shutdown (temporary state, leads to Not Running)

    • Not Running (terminal state)

  • KafkaStreams.setStateListener (used with StateListener) can be used to listen to state changes.

    • The related StateRestoreListener allows listening to state store restoration events.

  • KafkaStreams.setUncaughtExceptionHandler can be used to handle unexpected errors.

Chapter 8 Testing a Kafka Streams Application

Testing a topology

  • The ProcessorTopologyTestDriver can be used to test individual Topology objects without needing to run Kafka.

Building the test

  • The steps are as follows:

    1. Instantiate an instance of the Topology and configurations.

    2. Create an instance of ProcessorTopologyTestDriver using the objects in step 1 above.

    3. Call process() on the ProcessorTopologyTestDriver instance in step 2 above, passing in sample record.

    4. Call readOutput() on the same ProcessorTopologyTestDriver instance in step 3 above to retrieve an output from a topic (as specified in the arguments).

    5. Assert on the output in step 4 above.

Testing a state store in the topology

  • ProcessorTopologyTestDriver.getKeyValueStore() can be used to retrieve the state store from the topology, for performing assertions on the state store (or items retrieved from the state store).

Testing processors and transformers

  • Processor and Tranformer objects depend on ProcessorContext; when testing, the ProcessorContext can be replaced with a mock from Mockito or the MockProcessorContext object from Kafka, depending on the particular test scenario.

    • If we intend to verify the parameters passed to a mock, the value returned, or any other behavior, it would be better to use a mock object generated by a framework like Mockito.

Integration testing

  • EmbeddedKafkaCluster can be used to create a—wait for it—embedded Kafka cluster for integration testing purposes. Note that the @ClassRule annotation is required to ensure the set up and tear down methods in relation to EmbeddedKafkaCluster are runned. For example:

      private static final int NUM_BROKERS = 1;
    
      @ClassRule
      public static final EmbededKafkaCluster EMBEDDED_KAFKA =
          new EmbeddedKafkaCluster(NUM_BROKERS);
    
      @BeforeClass
      public static void setUpAll() throws Exception {
          EMBEDDED_KAFKA.createTopic(MY_IN_TOPIC);
          EMBEDDED_KAFKA.createTopic(MY_OUT_TOPIC);
          // Other overloads of createTopic are available if additional
          // configuration of the topic is required.
      }
  • The IntegrationTestUtils class provides useful helper methods like produceValuesSynchronously and waitUtilMinValuesRecordsReceived for testing.

Chapter 9 Advanced Applications with Kafka Streams

Integrating Kafka with other data sources

  • Kafka JDBC Connector can be used to transfer data to/from RDBMS and Kakfa. When used to import data into Kakfa topics, some of the important configurations for the JDBC Connector are as follows:

    • name

    • connector.class

    • tasks.max

    • connection.url

    • mode—How the connector will detect new rows it needs to load.

    • incrementing.column.name—When mode is incrementing, this option specify the particular column which has an incrementing value that the connector will use to detect new rows. (Note: when mode is incrementing, updates to existing rows will not be picked up by the connector.)

    • topic.prefix

  • Kakfa Connect itself also requires configuration. Some of the important ones are as follows:

    • bootstrap.servers

    • key.converter

    • value.converter—e.g., org.apache.kafka.connect.json.JsonConverter

    • value.converter.schemas.enable

    • plugin.path

    • offset.storage.file.filename

  • Kafka Connect has the concept of Transformations that let us perform lightweight transformations before Connect writes data to Kafka.

    • Some built-in transformations include:

      • ValueToKey (for extracting key from the value)

      • FlattenStruct (for flattening any struct outputs from earlier transformation steps, like when using ExtractKey for ValueToKey transformation)

      • TimestampConverter (for converting timestamp from database format to different format for Kafka)

      • ReplaceField (for mapping table column names to JSON field names)

Kicking your database to the curb

  • State stores within Kafka can be exposed directly to external services for read-only operations.

  • The store might retrieved via the following example, and exposed via REST:

      // Other store types include sessionStore and keyValueStore
      ReadOnlyWindowStore readOnlyStore =
          kafkaStreams.store(storeName, QueryableStoreTypes.windowStore());
  • Note however that Kafka Streams assigns a state store per task, so it is possible that thet state store that was queried may not contain the relevant key.

    • To enable interactive queries, we need to set the StreamsConfig.APPLICATION_SERVER_CONFIG parameter to hostname:port of the Kafka Streams application, and the port that a query service will listen on.

    • Several methods on the KafkaStreams object allow for retrieving information for all running instances with the same application ID and defining the APPLICATION_SERVER_CONFIG. For example:

      • KafkaStreams.allMetadata()

      • KafkaStreams.AllMetadataForStore(<store-name>)

      • KafkaStremas.AllMetadataForKey(Key, Serializer)

      • KafkaStreams.AllMetadataForKey(Key, StreamPartitioner)

KSQL

  • KSQL is made up of two components: a CLI and a server.

    • The CLI communicates with the server via REST interface, and the server will communicate with Kafka (only a different JVM).

  • Before a topic may be used with KSQL with a CLI, the topic needs to be registered via a CREATE STREAM statement (analogous to the CREATE TABLE in traditional SQL).

  • Some useful commands on the CLI are:

    • list topics—show a list of topics on the broker, and whether the topics are registered on the KSQL CLI.

    • show properties—shows the various properties in relation to the underlying Kafka Streams / Kafka.

Appendix A Additional Configuration Information

Limiting the number of rebalances on startup

  • The group.initial.rebalance.delay.ms option can be used to delay the initial consumer rebalance from the GroupCoordinator.

  • This option can be tuned to avoid the situation where the first instance of the Kafka Streams application gets assigned all partitions, and for each instance that is brought up shortly after, the partitions are rebalanced once more.

Resilience to broker outages

  • Some recommended settings to make the Kafka Streams application resilient in face of broker failures:

    • Set Producer.NUM_RETRIES to Integer.MAX_VALUE

    • Set Producer.REQUEST_TIMEOUT to 305000 (5 minutes)

    • Set Producer.BLOCK_MS_CONFIG to Integer.MAX_VALUE

    • Set Consumer.MAX_POLL_CONFIG to Integer.MAX_VALUE

  • For example:

      Properties props = new Properties();
      props.put(StreamsConfig.producerPrefix(
              ProducerConfig.RETRIES_CONFIG), Integer.MAX_VALUE);
      props.put(StreamsConfig.producerPrefix(
              ProducerConfig.MAX_BLOCK_MS_CONFIG), Integer.MAX_VALUE);
      props.put(StreamsConfig.producerPrefix(
              ProducerConfig.RERQUEST_TIMEOUT_MS_CONFIG), 305000);
      props.put(StreamsConfig.producerPrefix(
              ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), Integer.MAX_VALUE);
  • YJ: We might also need to look at delivery.timeout.ms for producer.

Handling deserialization errors

  • Kafka Streams provides the default.serialization.exception.handler and StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG configurations to specify how we want to handle these deserialization errors.

  • The default setting is org.apache.kafka.streams.errors.LogAndFailExceptionHandler.

  • There is also a org.apache.kafka.streams.errors.LogAndContinueExceptionHandler.

  • A custom deserialization exception handler can be implemented by implementing the DeserializationExceptionHandler interface.

Scaling up your application

RocksDB Configuration

Creating repartitioning topics ahead of time

  • In Kafka Streams, when performing an operation that may potentially change the map key (like tranform or groupBy), an internal flag is set in the StreamsBuilder class, and as soon as we add a further operation using the updated key, a repartitioning operation will be triggered.

    • However, if there are multiple downstream operations using the updated key, Kafka Streams may trigger multiple repartitioning although only one is really needed.

    • To avoid this, use the KStreams.through to manually repartition.

    • YJ: please note that the KStream.through might have been deprecated / renamed.

Configuring internal topics

  • It is generally a good idea to configure our internal topics to keep their size manageable. There are two options for doing so.

  • The first option is to provide configurations directly when creating state stores, using either StoreBuilder.withLoggingEnabled or Materialized.withLoggingEnabled.

  • The other option is to provide configurations with configuring the Kafka Streams application itself.

Resetting your Kafka Streams application

  • The kafka-streams-application-reset.sh script may be used to, among others:

    • reset input topics to the earliest available offset,

    • reset intermedadiate topics to the latest offset, and/or

    • delete any internal topics.

  • Note that we'll need to call KafkaStreams.cleanUp the next time we start the application, to delete any local state from previous runs.

Cleaning up local state

  • Call KafkaStreams.cleanUp either before KafkaStreams.start or after KafkaStreams.stop to clean out all previous state store.

Appendix B Exactly once semantics

To Internalize Now

  • Kafka Streams processes each record in a depth-first manner, meaning the a record must be completely processed by the processors and sent out via one or more sinks (if applicable) before the next record is sent down the processing pipeline.

  • Kafka Streams application creates x number of tasks based on the highest number of partitions across all topics the application is subscribed to. Each topic-partition is then assigned to a task (this assignment is never changed). The tasks are then distributed across all application with the same consumer group ID, rebalancing as required (e.g., when new consumer joins the cluster).

To Learn/Do Soon

To Revisit When Necessary

Chapter 1 Welcome to Kafka Streams

  • Refer to this chapter of an example of a Kafka Streams application for processing for a source purchase event to generate data relating to rewards program and purchase patterns analysis, and to persist the data.

Chapter 2 Kafka Quickly

  • Refer to this chapter for an example of using a custom partitioner.

Chapter 3 Developing Kafka Streams

  • Refer to this chapter for an excellent example of how to use Kafka Streams DSL to build a streaming application with various initial requirements, and also handling new requirements.

Chapter 4 Streams and State

  • Refer to this chapter for an example of using state store, including repartitioning by a specific key and joining two streams.

Chapter 5 The KTable API

  • Refer to this chapter for:

    • explanation of the difference between KStream and KTable

    • how to perform (rolling) aggregation on KStream and KTable (summation, and top-n ranking), with semi-realistic examples: summation of stock trading volumes by ticker, and ranking top-n industry by volumes traded

    • how to perform windowing aggregation

    • example of a join between a KStream and a KTable

Chapter 7 Monitoring and Performance

Chapter 8 Testing a Kafka Streams Application

  • Refer to this chapter for illustrative examples of how to do unit and integration tests with Kafka.

Chapter 9 Advanced Applications with Kafka Streams

  • Refer to this chapter for example configuration of Kafka JDBC Connector, together with explanation of the individual configuration settings.

  • Refer to this chapter for an example of setting up state stores for direct external read-only access.

Other Resources Referred To