Streaming Audio: Apache Kafka® & Real-Time Data

Apache Kafka 3.5 - Kafka Core, Connect, Streams, & Client Updates

June 15, 2023 Confluent, founded by the original creators of Apache Kafka® Season 1 Episode 265
Streaming Audio: Apache Kafka® & Real-Time Data
Apache Kafka 3.5 - Kafka Core, Connect, Streams, & Client Updates
Show Notes Transcript Chapter Markers

Apache Kafka® 3.5 is here with the capability of previewing migrations between ZooKeeper clusters to KRaft mode. Follow along as Danica Fine highlights key release updates.

Kafka Core:

  • KIP-833 provides an updated timeline for KRaft.
  • KIP-866 now is preview and allows migration from an existing ZooKeeper cluster to KRaft mode.
  • KIP-900 introduces a way to bootstrap the KRaft controllers with SCRAM credentials.
  • KIP-903 prevents a data loss scenario by preventing replicas with stale broker epochs from joining the ISR list. 
  • KIP-915 streamlines the process of downgrading Kafka's transaction and group coordinators by introducing tagged fields.


Kafka Connect:

  • KIP-710 provides the option to use a REST API for internal server communication that can be enabled by setting `dedicated.mode.enable.internal.rest` equal to true. 
  • KIP-875 offers support for native offset management in Kafka Connect. Connect cluster administrators can now read offsets for both source and sink connectors. This KIP adds a new STOPPED state for connectors, enabling users to shut down connectors and maintain connector configurations without utilizing resources.
  • KIP-894 makes `IncrementalAlterConfigs` API available for use in MirrorMaker 2 (MM2), adding a new use.incremental.alter.config configuration which takes values “requested,” “never,” and “required.”
  • KIP-911 adds a new source tag for metrics generated by the `MirrorSourceConnector` to help monitor mirroring deployments.


Kafka Streams:

  • KIP-339 improves Kafka Streams' error-handling capabilities by addressing serialization errors that occur before message production and extending the interface for custom error handling. 
  • KIP-889 introduces versioned state stores in Kafka Streams for temporal join semantics in stream-to-table joins. 
  • KIP-904 simplifies table aggregation in Kafka by proposing a change in serialization format to enable one-step aggregation and reduce noise from events with old and new keys/values. 
  • KIP-914 modifies how versioned state stores are used in Kafka Streams. Versioned state stores may impact different DSL processors in varying ways, see the documentation for details.


Kafka Client:

  • KIP-881 is now complete and introduces new client-side assignor logic for rack-aware consumer balancing for Kafka Consumers. 
  • KIP-887 adds the `EnvVarConfigProvider` implementation to Kafka so custom configurations stored in environment variables can be injected into the system by providing the map returned by `System.getEnv()`.
  • KIP 641 introduces the `RecordReader` interface to Kafka's clients module, replacing the deprecated MessageReader Scala trait. 


EPISODE LINKS

Danica Fine (00:00):

Welcome to Streaming Audio. I'm Danica Fine, developer advocate at Confluent. In today's episode, I have the honor of announcing the Apache Kafka 3.5 release, on behalf of the Kafka community. There are so many great KIPs, highlights, and updates in this release. So let's get to it.


(00:21)

KIP-833 provides an updated timeline for KRaft, and since all we want are happy clusters, I think it's worth it. Apache Kafka 3.5, this release, is the first bridge release between ZooKeeper and KRaft modes, meaning that you can preview migrating from ZooKeeper mode clusters, to KRaft mode clusters. Along with that, we're officially marking ZooKeeper mode as deprecated. With Apache Kafka 3.6, migrating from ZooKeeper to KRaft clusters will be general availability. You'll also see support for JBOD and delegation tokens, in KRaft mode. Apache Kafka 3.7 will be the final bridge release, to smoothly transition from ZooKeeper to a KRaft only cluster. But don't worry, you have more than enough time to wrap things up and say your goodbyes because ZooKeeper won't officially be removed until Apache Kafka version 4.0.


(01:08)

Building on KIP-833, we have KIP-866, which introduces feature complete migrations from ZooKeeper based clusters to KRaft mode clusters that you're encouraged to test. As an added bonus, after enabling KRaft migration, the cluster will use dual write mode so that you can safely transition control to KRaft while also allowing fail-back to ZooKeeper just in case. Isn't it fantastic that you can change your mind and still safely work with all these happy, innovative things?


(01:34)

Next up we have KIP-881, which has to do with rack-aware partition assignments for Kafka consumers. And no, that's not a mistake. You may recall that I mentioned KIP-881 as part of the Apache Kafka 3.4 release. But at that time, the KIP was only partially complete. The KIP is meant to be a bridge, between existing client protocols and the new consumer group protocol, as outlined by KIP-848. So in this release, you'll see new client-side assigner logic for rack-aware consumer balancing. Check out the release notes for more details.


(02:05)

KIP-900 brings support for SASL/SCRAM, to KRaft mode clusters. If you're not familiar, SASL/SCRAM is a popular authentication mechanism, for setting up access control lists, or ACLs, within Kafka. These ACLs are pretty important, as they ensure that the brokers in the cluster have the right credentials to interact with each other, as well as the controller. In ZooKeeper mode Kafka clusters, this Kafka metadata was added to and maintained by the ZooKeeper cluster itself. But it's a little different with Kraft. KIP-900 adds the ability to bootstrap your KRaft controllers with your SCRAM ACLs during cluster creation with the Kafka storage.SH script on the command line using the add SCRAM parameter.


(02:46)

Moving on, we have KIP 903. The motivation behind this KIP is interesting and involves a very specific scenario. So let's imagine together that a broker reboots, but some logs haven't yet been flushed from the page cache. Well, in that case, the data on the broker is lost. Most of the time this is okay though because the controller will have removed the rebooting broker from the list of In-Sync Replicas. So any real damage is limited and the rebooting broker has a chance to catch up later.


(03:14)

But what if a leader of one of the partitions crashes at the same time? Then the rebooting broker could be marked as a partition leader, making the data loss a very real concern. KIP 903 eliminates this issue by ensuring that replicas with a stale broker EPIC won't be able to join the ISR list. Keep in mind that since ZooKeeper mode will be removed in Apache Kafka 4.0, the scope of KIP 903 is only for KRaft mode clusters.


(03:41)

Next up, let me introduce you to KIP 915. But first a question. What happens when you want to downgrade transaction and group coordinators after new fields have been added to the _transaction_state or _consumer offsets topics? If you've ever had to do that, you know it's tough. Since major changes have been made to the consumer group coordinator under KIP 848, KIP 915 seeks to make this downgrade process easier by laying a generic foundation for both coordinators and opening the door to backward compatible changes to the record schema through the use of tag fields. Any future tag fields won't require a version bump and older brokers can simply ignore the tag fields that they don't recognize.


(04:23)

KIP 641 creates a new Java interface to replace Kafka common MessageReader. Why? Well, developers need a custom reader to produce and consume custom records, which means that they need a Java public interface in the client's module to implement for their custom logic. KIP 641 introduces a new record reader interface to replace the MessageReader Scala trait. Note that the MessageReader trait will be marked for deprecation.


(04:48)

Finally, for Kafka core updates, we have KIP 887. You may have noticed that it's not currently possible to inject additional custom configurations stored in environment variables and that's no good. So KIP 887 provides a new implementation of the config provider interface called EnvVarConfigProvider. This new ConfigProvider implementation gets around this injection issue by programmatically providing the map returned by System.getEnv(). Now, every day is a good day when you use Kafka streams.


(05:16)

So next up we have a couple of really cool Kafka streams kits. First we have KIP 889. As you may know, state stores currently only support storing the most recent value per key. And this is fine for a lot of use cases, but that implementation limits Kafka stream's ability to leverage temporal join semantics for stream to table joins. KIP 889 provides the base implementation of version state stores, which add a temporal element to state stores and Kafka streams.


(05:43)

When records are inserted into a state store that implements the new version state store interface, they need to include a timestamp. The git method to fetch from a version state store also includes an as of timestamp. But it's not enough to just implement version state stores. We need to provide a means for these to be recognized and utilized by the streams DSL. So we have to give KIP 889 a friend. Like I always say, everyone needs a friend. So let's introduce KIP 914.


(06:10)

KIP 914 doesn't make any changes to public interfaces to recognize version state stores, but it does affect how version state stores are handled behind the scenes. By default, the streams DSL won't use version state stores. You'll have to explicitly instantiate and pass them via materialized as indicate tables. Version state stores may affect each of the DSL processors in different ways, so check the docs for more details.


(06:34)

Next, for Kafka streams we have KIP 399, which improves the error handling capabilities and Kafka streams when Kafka streams fails to serialize messages. It provides an interface to insert custom messaging for errors and indicate to streams whether or not it should re-throw the exception thus causing the application to fail over.


(06:54)

Our final Kafka streams KIP is KIP 904, which affects table aggregation semantics. There's a lot going on for this KIP, so let's set the stage. Whenever you aggregate a table and conduct a group buy, events are sent to an internal re-partition topic. Since you could potentially change the grouping key within the aggregation group by, two events are sent downstream to processor nodes. One event with the old key and old value and another event with a new key and new value. These events can then be subtracted from or added to the corresponding aggregate associated with the old and new keys, respectively. In that case, the aggregation is a two-step process. But sometimes the key doesn't change and sending two events downstream creates an unnecessary amount of noise. KIP 904 seeks to remove this noise by making aggregation happen in one step. This feature is on by default and cannot be disabled. If your topology contains a K table aggregation operator, you'll need to do a two round rolling bounds upgrade.


(07:55)

Finally, let's hear from some Kafka connect updates. First we have KIP 710, an update to MirrorMaker two. Since it was introduced, MirrorMaker two has had the capability to run a dedicated cluster, allowing multiple connect clusters and replication flows to run within it. But there were some limitations. Namely, dedicated MirrorMaker two clusters don't have a connect rest server, meaning that follower to leader communication was impossible between nodes. Also, in dedicated mode, MirrorMaker two eagerly resolves configuration provider references in connector configs, which meant that you were unable to provide host-specific or sensitive configurations through the indirection of configuration providers. To get around these limitations, KIP 710 provides the option to use an internal only REST server that can be enabled by setting dedicated mode enable internal rest equal to true, and it allows for the lazy evaluation of configuration references.


(08:49)

Moving on, KIP 875 offers first class support for offsets in Kafka Connect. With the current implementation of this KIP, Connect cluster administrators can read offsets for both source and sync connectors. In the future as part of Apache Kafka 3.6, administrators will also have the ability to alter and reset connector offsets. Also in this release, KIP 875 adds a new stop state for connectors, which allows users to shut down connectors and maintain connector configurations without utilizing resources.


(09:20)

Next is KIP 894, which also affects MirrorMaker two. When syncing topic configurations for broker compatibility. MirrorMaker two uses the deprecated AlterConfigs API. If you wanted to use the AlterConfigs API to update a handful of configurations, any existing configuration parameters that are explicitly sent along as part of the configuration update will be reset to the default value for those configurations. And that's not always a good thing. In the context of MirrorMaker two, this could result in remote topic configurations being removed unintentionally.


(09:53)

Since Apache Kafka version 2.3, the more flexible incremental AlterConfigs API has been available and KIP 894 makes it available for use in MirrorMaker two. The change adds in new used incremental AlterConfig configuration, which takes values requested, never, or required. Check out the docs to see how to use these parameters.


(10:14)

Our final Kafka connect update is from KIP 911. To help monitor mirroring deployments, this KIP adds a new source tag for the metrics generated by the mirror source connector. By setting add source alias two metrics to true, a new tag with the name of the source cluster will be added to the front of the list of existing tags.


(10:33)

And those are the highlights from this latest Apache Kafka release. Thank you for taking the time to listen to this special episode. If you have any questions or would like to discuss, you can reach out to our community forum or slack. Both are linked in the show notes. If you're listening on Apple Podcast or other podcast platforms, please be sure to leave a review. I'd love to hear your feedback. If you're watching on YouTube, please subscribe so you'll be notified with updates you might be interested in. Thanks again for your support. See you next time.

Intro
KIP-833: KRaft Timeline Update
KIP-866: ZooKeeper to KRaft Cluster Migration
KIP-881: Rack-aware Partition Assignment for Kafka Consumers
KIP-900: SASL SCRAM Support in KRaft
KIP-903: Replicas with stale broker epoch should not be allowed to join the ISR
KIP-915: Transactions and Group Coordinator Downgrade Foundation
KIP-641: A new java interface to replace `kafka.common.MessageReader`
KIP-887: Add ConfigProvider to make use of environment variables
KIP-889: Add Versioned State Stores
KIP-914: DSL Processor Semantics for Versioned Stores
KIP-399: Extend `ProductionExceptionHandler` to Cover Serialization Errors
KIP-904: Improve Table Aggregation Semantics
KIP-710: Full Support for Distributed Mode in MM2 Clusters
KIP-875: First-class offsets support in Kafka Connect
KIP-894: Use incrementalAlterConfigs API for syncing topic configurations
KIP-911: Add source tag to MirrorSourceConnector metrics
It’s a wrap!