Streaming Audio: Apache Kafka® & Real-Time Data

Improving Apache Kafka Scalability and Elasticity with Tiered Storage

November 22, 2022 Confluent, founded by the original creators of Apache Kafka® Season 1 Episode 244
Streaming Audio: Apache Kafka® & Real-Time Data
Improving Apache Kafka Scalability and Elasticity with Tiered Storage
Show Notes Transcript Chapter Markers

What happens when you need to store more than a few petabytes of data? Rittika Adhikari (Software Engineer, Confluent) discusses how her team implemented tiered storage, a method for improving the scalability and elasticity of data storage in Apache Kafka®. She also explores the motivating factors for building it in the first place: cost, performance, and manageability. 

Before Tiered Storage, there was no real way to retain Kafka data indefinitely. Because of the tight coupling between compute and storage, users were forced to use different tools to access cold and hot data. Additionally, the cost of re-replication was prohibitive because Kafka had to process large amounts of data rather than small hot sets.

As a member of the Kafka Storage Foundations team, Rittika explains to Kris Jenkins how her team initially considered a Kafka data lake but settled on a more cost-effective method – tiered storage. With tiered storage, one tier handles elasticity and throughput for long-term storage, while the other tier is dedicated to high-cost, low-latency, short-term storage. Before, re-replication impacted all brokers, slowing down performance because it required more replication cycles. By decoupling compute and storage, they now only replicate the hot set rather than weeks of data. 

Ultimately, this tiered storage method broke down the barrier between compute and storage by separating data into multiple tiers across the cloud. This allowed for better scalability and elasticity that reduced operational toil. 

In preparation for a broader rollout to customers who heavily rely on compacted topics, Rittika’s team will be implementing tier compaction to support tiering of compacted topics. The goal is to have the partition leader perform compaction. This will substantially reduce compaction costs (CPU/disk) because the number of replicas compacting is significantly smaller. It also protects the broker resource consumption through a new compaction algorithm and throttling. 

EPISODE LINKS

Kris Jenkins: (00:00)
In this week's Streaming Audio, we are talking about storage. Now I'm going to kick this off with a clue as to my age. When I was a young teenager, I bought my first memory expansion board at a trade show. Yes, I was the kind of teenager that went to computing trade shows. And it was a circuit board, it was about the size of a piece of toast, and it had a massive half a megabyte of extra memory for my computer. Fast forward today and you can tell people that you've just bought a one terabyte SSD and they won't bat an eyelid. I mean, how things change. And yet even with all that extra storage, we can easily find enough data to fill discs like that up. So the question arises, how does Apache Kafka cope when even the largest disc size isn't enough? And equally, are there reasons to think about how it copes about disc sizes even when your data isn't that big?

Kris Jenkins: (00:59)
The answer is, yes, and the hint is recovery times. Joining me to talk about all this is Rittika Adhikari who works on the Kafka Storage Foundations team here at Confluent, and she's going to take us through Tiered Storage, what it is, how it works, what difference it'll make to you, and why it matters, no matter how large or small your data sets are. Before we begin, Streaming Audio is brought to you by our education site, Confluent Developer, and our cloud service for Apache Kafka, Confluent Cloud. More about those at the end, but for now, I'm your host, Kris Jenkins, this is Streaming Audio, let's get into it.

Kris Jenkins: (01:44)
Joining me today to talk about Tiered Storage is Rittika Adhikari. Rittika how are you

Rittika Adhikari: (01:50)
I'm good. How are you?

Kris Jenkins: (01:52)
Very well, thank you. So, you're with us at Confluent and you work on our something storage team. Remind me?

Rittika Adhikari: (02:01)
I work on the Kafka Storage Foundations team. So we work on building out the Tiered Storage feature and ensuring that it remains performant, even under cloud storage failures, et cetera.

Kris Jenkins: (02:13)
Oh God. So that sounds like one of those things where under the hood it's a lot more work than I predict.

Rittika Adhikari: (02:21)
For sure.

Kris Jenkins: (02:22)
So you can take me through the details of that, but go back to the start. Why is Tiered Storage worth having?

Rittika Adhikari: (02:30)
So there are three motivating factors that we had that actually inspired us to build Tiered Storage in the first place. They were cost, performance and manageability. So first of all, we didn't really have any means of infinite retention before we created Tiered Storage, we weren't able to use the same tooling to play back and deposit data, and we would need to use SSDs if we wanted to store data in Kafka forever.

Rittika Adhikari: (03:06)
So we kind of thought about, well, we ideally want Kafka to be a data lake, so how can we improve costs of actually keeping data in Kafka? And what we came up with was creating one tier for elasticity and throughput for long term storage and one tier for a high cost and low latency for short term storage. And we also, prior to Tiered Storage, noticed that re-replication was super expensive because you might have two weeks of data or so on each of the brokers and you would need to replace all of that data. And this ends up impacting all the brokers since you're replicating a lot more. So by decoupling storage, we're only replicating the hot set as opposed to two weeks worth of data. So it's a lot smaller amount of data that ends up making the process a lot quicker.

Kris Jenkins: (03:58)
So if I've got this right, you've got topics can get really huge, properly huge. I know Honeycomb taking in something like 50 million events a second. So you can easily, if you want to get into petabytes?

Rittika Adhikari: (04:14)
Yeah, you really could.

Kris Jenkins: (04:17)
And you really don't want to buy a petabyte solid state disc. Not yet.

Rittika Adhikari: (04:22)
That would end up being pretty expensive. At least current prices.

Kris Jenkins: (04:25)
Give it 10 years, but for now, So the idea is that you split a topic up into the hot set, is that the term?

Rittika Adhikari: (04:35)
Yeah. So we take our data and after some point of time, your data's eligible for tiering. Once it's flushed from the page cache into disk, it's eligible for tiering. And then after that point, we would tier that eligible data to the remote object store. So instead of persisting on the disc, it would end up persisting in remote object story.

Kris Jenkins: (05:01)
Something like S3?

Rittika Adhikari: (05:03)
Yeah, exactly. S3, Azure or DCP.

Kris Jenkins: (05:07)
Right. Okay. And so, you've got, that's going to create two problems as I see it. One is that how do you access the old data as transparently as the new data? And the other is how do you manage replication when a node completely dies and has to be brought back up? That's what you're talking about.

Rittika Adhikari: (05:28)
Great. So for your first question, how do we still access the data transparently, nothing really changes at the user level. So when you're fetching data, you're still able to fetch data from the remote object store through your consumers. Nothing really changes for the user. Internally, a lot changes, but maybe I'll quickly walk us through how Tiered Storage works to give a baseline understanding of what's happening under the head.

Kris Jenkins: (06:05)
Give us the best description of the algorithm you can without diagrams.

Rittika Adhikari: (06:12)
So at the core, we're just moving bytes from one tiered storage to another. We can divide this up into tiering any eligible data to a remote store, then retrieving tiered data whenever the user's requesting it and garbage collecting any data that has reached is retention period or has been deleted. So a Kafka partition is comprised of multiple segment files. Data produced to a partition is first written to a page cache, and then periodically it's flushed to the local disc and we would have to repeat the same process for any replicas that we would have.

Rittika Adhikari: (06:55)
So once the data is in the desk, like I was saying, it's eligible to be tiered or archived to remote storage and can be deleted from the disc. So the leader of the partition is the guy who's responsible for tiering this data. And we end up deleting any data on the disc based on this hot set. So hot set is this configuration that basically determines how much data we want to keep on the disc at a time for quick access. So this is configurable using either Confluent tier local hot set millisecond, so that's by time or by number of bytes. So that's Confluent tiere local hot set bytes. And we typically configure this to be the size of the page cache on the broker. So this way real time reads near the tail of the consumer will come from the page cache and otherwise we'll be reading from the remote store. Yeah. And, oh yeah.

Kris Jenkins: (08:01)
Just to quantify that, how large does the page cache tend to be?

Rittika Adhikari: (08:09)
Not sure.

Kris Jenkins: (08:10)
Are we talking megabytes? Gigabytes? Kilobytes? Just give me an order of magnitude.

Rittika Adhikari: (08:17)
I would guess...

Kris Jenkins: (08:22)
Sorry, I didn't say there was going to be a numeric quiz in this interview.

Rittika Adhikari: (08:27)
No worries.

Kris Jenkins: (08:28)
But something that's able to be fast enough to access for the leading edge of the topic, right?

Rittika Adhikari: (08:36)
Yeah, exactly. Yeah. Cool.

Kris Jenkins: (08:42)
So, under the hood is the broker keeping track and a consumer connect and says, offset earliest, and the broker says, ah, that offsets over on S3.

Rittika Adhikari: (08:53)
Yes, exactly. So we actually have this internal topic called Confluent tier state, which acts as our source of truth of which data exists inside remote storage. And our broker uses this to build a view of the log and where our data's actually located. So whenever we receive a request to consume from a certain offset, the broker determines if we should be fetching locally from the hot or the untiered part of the log, or if that data actually is present in remote storage. So that's how we end up deciding where we should be starting.

Kris Jenkins: (09:29)
So somewhere on S3 there's literally a segment file for each segment that would've been on disc or a combined segment file?

Rittika Adhikari: (09:42)
Yeah.

Kris Jenkins: (09:44)
Right. With apologies to Google and Azure, I always think in terms of S3 and AWS-

Rittika Adhikari: (09:49)
I think a lot of people do.

Kris Jenkins: (09:49)
Those are my biases.

Rittika Adhikari: (09:56)
That's all it was.

Kris Jenkins: (09:58)
And take me through how that affects replication again.

Rittika Adhikari: (10:02)
So now we have way less data on the broker's individual discs. So that makes replicating a lot quicker because you only have to replicate the hot set as opposed to replicating, I don't know, two weeks of data, which could be terabytes, petabytes. So it could take a lot longer. And with the data already remaining in remote object stories you don't really have to change anything at that layer per se. You don't have to replicate that data anywhere else.

Kris Jenkins: (10:35)
Right. So you are also relying on the fact that something like S3 is replicating that data for you?

Rittika Adhikari: (10:42)
Yeah. So that we are storing our data in S3 and that the data is existing over there, so we're relying on that.

Kris Jenkins: (10:55)
Okay. Do you also use that to lean into multi availability zone type stuff or is that still handled by something like... What's it called? Cluster linking?

Rittika Adhikari: (11:14)
I think this is still handled by cluster linking, if I remember correctly.

Kris Jenkins: (11:16)
Okay. I'm going to grill you on lots of different things on this because I want to get all the details I can out of you. Yeah. Okay. I'm building up the picture in my head. And so the broker is managing all that, so it's still transparent to the end user. What kind of performance differences do you see as a reader of old data versus new data?

Rittika Adhikari: (11:41)
So for us, it was really important while we were implementing and building out this feature that we made sure that we're not seeing a degradation in performance for sure. We should be seeing an improvement, that was kind of our goal. So we were kind of making sure that kafka is staying up, even if our cloud storage is going down, et cetera, we were really considering these things while we were trying to build out the Tiered Storage feature. And we were testing to make sure that we maintain availability, maintain performance.

Kris Jenkins: (12:23)
Yeah. How did you go about that? Because it's a tricky thing to do. Because a lot of these topics that you're testing with are going to be eventually live production topics and you're suddenly splitting them into two views of the world under the hood.

Rittika Adhikari: (12:39)
So it was a lot of just improving our testing and adding more formal verification and just simulations, et cetera. Just making sure that when we're designing and when we're testing, we're thinking about what could go wrong because kind of Tiered Storage was changing the engine of an airplane while the airplane's still flying. We don't want the airplane to go down. We want to maintain feature parity, make sure we're not losing any data, and we kind just assumed what will go wrong will go wrong. So that way you got to be really careful.

Kris Jenkins: (13:24)
Because this is the tricky thing when you're a cloud provider, you can't just say, here's the new release, you shut down your system and upgrade and bring it back up.

Rittika Adhikari: (13:32)
Yeah, exactly.

Kris Jenkins: (13:35)
Yeah. The plane, as you say, is always moving.

Rittika Adhikari: (13:38)
Yeah.

Kris Jenkins: (13:39)
Just to check, when you say formal verification, do you actually mean algorithmic... Did you pull out proof assistance and model it that way? Or do you mean lots of very rigorous testing?

Rittika Adhikari: (13:52)
Lots of very rigorous testing, lots of system testing, et cetera.

Kris Jenkins: (13:57)
What was your personal involvement in the Tiered Storage project? How did you get your hands dirty on this?

Rittika Adhikari: (14:04)
So the first time I was involved with Confluent/Tiered Storage was back in summer of 2020. I was actually an intern. So Tiered Storage had been fully implemented for S3 at that point, but we were still working on getting it compatible with DCP and Azure at that point. But my intern project at least was based off of the fact that not all customers will want to use these cloud providers and maybe they might actually want to use their own on-prem storage. And a lot of these on-prem object stores actually implement the S3 API because, well, it's one of the most commonly known object stores-

Kris Jenkins: (14:46)
Really. So it's become an informal standard for object storage.

Rittika Adhikari: (14:53)
Yeah. A lot of object stores do end up implementing the S3 API. So theoretically we should also, at that point and currently, we should be compatible with any of those on-prem stores. So my project was about building object store compatibility framework in order to verify any correctness and performance guarantees with these on-prem stores that implement this API. So kind of just making sure that, okay, is it working correctly? Are we still maintaining all of the guarantees that we're promising? Is this truly compatible? Is this something that we can verify and say that yes, you can use this on-prem object store, we know that this works.

Kris Jenkins: (15:35)
All right. So you can actually certify someone's on premises object store as being good enough for the job?

Rittika Adhikari: (15:42)
Exactly.

Kris Jenkins: (15:43)
Interesting. God, that's a [inaudible 00:15:49]-

Rittika Adhikari: (15:49)
Yeah. That was the first thing that I worked on when I was at Confluent. And then I joined earlier-

Kris Jenkins: (15:55)
Officially. No longer an intern, being paid properly, I hope.

Rittika Adhikari: (16:00)
Yes. Yeah, so I joined as not an intern earlier this year. And currently, most of my projects have been around making sure that Tiered Storage/Kafka remains performant, even if we're facing any partial or full storage unavailability.

Kris Jenkins: (16:18)
Oh, does that actually come up often? Like S3's unavailable?

Rittika Adhikari: (16:23)
Yeah, it happens.

Kris Jenkins: (16:23)
Oh God.

Rittika Adhikari: (16:26)
Definitely happens-

Rittika Adhikari: (16:30)
And we don't want to see any produced or consumed latency spikes. Ideally, we don't want to be seeing too many spikes or too much performance hits from something like that.

Kris Jenkins: (16:46)
And you've got three cloud providers to monitor for that?

Rittika Adhikari: (16:49)
Oh yeah.

Kris Jenkins: (16:51)
Out of interest, are their three providers very different in the way they do object storage apart from APIs?

Rittika Adhikari: (17:01)
I think there are some cloud providers that we're all a little bit more biased towards, in the sense of how things are performing under the hood and how we see things working.

Kris Jenkins: (17:24)
I'm not sure if I should ask you what the good and the bad ones are. That might be too controversial.

Rittika Adhikari: (17:30)
Yeah. I ...

Kris Jenkins: (17:33)
No. Okay, we're not going there.

Kris Jenkins: (17:38)
Yeah. So how are things evolving? So you say it's in Confluent Cloud, it's available for all the cloud providers?

Rittika Adhikari: (17:47)
Yes. All the major cloud providers. We've got S3, GCS, Azure. We've got ... Working for all three of those, officially.

Kris Jenkins: (17:57)
So a lot of your work is day to day, making sure it's all operating as normal?

Rittika Adhikari: (18:04)
Yeah, so making sure things are operating smoothly, make sure we're still improving performance where we can, improving availability where we can. Yeah, things like that.

Kris Jenkins: (18:18)
Okay. How long did that take to get to by the way? From inception of the idea that we should do this to it's available generally?

Rittika Adhikari: (18:27)
So I know Azure, I believe we released I think middle of this year if I remember correctly. Middle or maybe March or April, something like that. I might be getting the dates wrong. And S3 I think was released by the time I was interning. So I know at least for GCS and Azure combined, it took roughly two years, I would say. So maybe another year or so for S3 if I had to guess.

Kris Jenkins: (19:08)
That's the cost of, as you say, replacing the engine while the planes moving.

Rittika Adhikari: (19:12)
Exactly. You got to be really careful.

Kris Jenkins: (19:17)
As long as they were careful. Yeah. Okay, but so with that in place, where is it starting to evolve, the design?

Rittika Adhikari: (19:25)
So for the future, one cool thing that's going to be coming into play is tiered compaction, which will allow us to have tiered compacted topics, which we don't currently offer. So we're starting to run that in production and we're getting ready for a broader rollout, so make sure to look out after that.

Kris Jenkins: (19:47)
Is there much demand for that, because I thought the whole point of a compacted topic is that it's going to take up a lot less space?

Rittika Adhikari: (19:56)
Right. But then it can ... Just decoupling the compaction from the brokers also will improve performance, et cetera.

Kris Jenkins: (20:06)
Oh, okay. Yeah, okay. I can see how that could come in. So here's another potentially controversial topic, but I have to ask: Tell me about the ... Because this is Confluent only right at the moment.

Rittika Adhikari: (20:16)
Right.

Kris Jenkins: (20:17)
It's in Confluent platform. So we have to talk about the open source Kafka version, which is under KIP-405 I believe.

Rittika Adhikari: (20:29)
Yep. That's right.

Kris Jenkins: (20:30)
Tell me everything you know about that, because I know people will ask.

Rittika Adhikari: (20:35)
So as for everything I know about that, we helped out with the design review and we've been helping them with reviews and as so as needed. I unfortunately don't have any idea on the KIP-405 release timeline. But yeah, definitely something they'll look out for and definitely exciting for the open source community.

Kris Jenkins: (21:05)
But it is an independent implementation of the same idea?

Rittika Adhikari: (21:11)
Yeah, it's the same idea roughly. I guess design wise, they are very similar concepts, but then under the hood, implementation is obviously ... They are very different ways of implementing things. Yeah.

Kris Jenkins: (21:33)
Right. Okay. I have to ask: How much does that mean that the code base of Confluent Kafka and open source Kafka are diverging?

Rittika Adhikari: (21:48)
I would say a significant amount. Or well, a significant amount for features like Tiered Storage, I would say the way even though I think at a high level the designs and concepts and all are pretty similar, I think at a lower level, there are things that we've done differently. Learnings that we've had also, because we started Tiered Storage definitely threeish years ago. So we've obviously had a lot of learnings from that. So in that sense, there's definitely a divergence. I do think we do try to stay up to date with open source Kafka and make sure we're taking any improvements that are available in open source and vice versa if we notice anything. Yeah.

Kris Jenkins: (22:42)
Yeah, I guess we're always going to be on the same page from a logical level, but when you're a cloud provider, the way you deal with the disc side of things, so the physical level, it's got to change and evolve partly.

Rittika Adhikari: (22:57)
Exactly, I think a lot of smaller things end up evolving. And then there are some features that are available on Confluent Cloud that aren't available in Apache Kafka. So then that also is obviously different. Yeah.

Kris Jenkins: (23:17)
Okay, so if someone has an existing cluster on Confluent Cloud, is it something that they can enable now? How do they get started with it? Can you change on a per topic basis or is it per cluster or what's the deal?

Rittika Adhikari: (23:29)
Yeah, so we actually do have a configuration that allows us to change it on a broker/cluster level. So then whenever we enable that, the archiver will start uploading any segments and we'll eventually just continue the way that Tiered Storage work, and purge any local data based on hot set retention.

Kris Jenkins: (23:57)
Okay, so you just flip a flag and eventually all your topics will be tiered?

Kris Jenkins: (24:00)
... or your topics will be tiered?

Kris Jenkins: (24:02)
As long they're not compacted-

Rittika Adhikari: (24:02)
Yes, they will stay tieing. Exactly. That is for later.

Kris Jenkins: (24:08)
Okay.

Rittika Adhikari: (24:10)
But yeah, there is also a topic level config that we can also enable to make certain topics tiered.

Kris Jenkins: (24:21)
That's good. So you can be that fine grained if you really need to be.

Rittika Adhikari: (24:24)
Exactly.

Kris Jenkins: (24:26)
But I think you have promised us that the only performance difference will be an improvement, so maybe you don't need to. In fact, I should ask you about that. What has been the impact of rolling this out to end users?

Rittika Adhikari: (24:41)
So, for example, banks are several user cases. Compliance would need you to store your data for X number of years, and Tiered Storage makes it a lot more easy and cost efficient to do so. And then this also goes back into the idea of Kafka being our data lake. So Kafka remains our source of truth. This is where we created our data. This is where we save our data and access our data. Whereas before we were only able to use Kafka as a source of truth for our newest data and it just has made existing use cases much more manageable and feasible. Customers now get the flexibility to economically increase retention and don't need to rebuild their topics from scratch. And they don't need to worry about adding any additional compute just to store data. Re-replication is faster. Reading historical data is no longer a limitation and they just don't need to think about how much data they need to store because it doesn't affect their performance or costs that much.

Kris Jenkins: (25:50)
It's got to be much cheaper to store large amounts of data off on SSD, right?

Rittika Adhikari: (25:55)
Exactly.

Kris Jenkins: (25:56)
Yeah. Because I know we use Kafka internally, has it had any effect on us?

Rittika Adhikari: (26:02)
Yeah, so prior to Tiered Storage, we had to use magnetic volumes. But with Tiered Storage we can use smaller SSDs as well.

Kris Jenkins: (26:16)
Oh. Because your broker machines can actually be on a smaller AWS EC2 instance, right?

Rittika Adhikari: (26:21)
Exactly. Yeah, so now we just have a lot more elasticity in how much data we can store and the number of operations needed on the cluster. And like I said, re-replication has become so much nicer than what it apparently used to be.

Kris Jenkins: (26:41)
Because failures do happen and when they happen you want recovery to be possible first and quick second, right?

Rittika Adhikari: (26:49)
Exactly.

Kris Jenkins: (26:50)
Yeah. Okay, that sounds like as complete a picture of Tiered Storage as we can get. Unless I can make you promise me a release date for the open source version.

Rittika Adhikari: (27:01)
Ah.

Kris Jenkins: (27:02)
You're not going to do that, are you?

Rittika Adhikari: (27:02)
That, I don't know.

Kris Jenkins: (27:06)
Okay. That's out of your hands. Fair enough.

Rittika Adhikari: (27:08)
It's out of my hands.

Kris Jenkins: (27:11)
I shall grill you no more. Rittika, thanks for joining us.

Rittika Adhikari: (27:15)
All right, thank you so much, Kris.

Kris Jenkins: (27:17)
Thank you, Rittika. I know some people out there have asked me for a timeline for open source Tiered Storage. I'm sorry, I did try to get a prediction, but as you heard, it's out of Rittika's hands. I'm looking forward to everybody being able to use it. In the meantime, I am glad that the cloud version that we have available now sounds like a pretty easy migration and operational path. I want to stay in user space as a programmer and I want that stuff to be transparent, so that was good to hear. If you enjoyed this discussion, if you learn something from it, please consider the Like button and the Review button and the Rating buttons and the Sharing buttons and all those things. You and I both know it feeds into algorithms which feed into us being able to make more interesting episodes of this podcast. So please take a moment. It also makes me feel really good, which is nice.

Kris Jenkins: (28:12)
Before we go, Streaming Audio is brought to you by Confluent Developer, which is our tech site that teaches you everything you need to know about Apache Kafka and building real time event based systems in general. We've got tutorials, guides to connectors, architectural guides. We've got complete build walkthroughs and we've got the back catalog for this podcast. So take a look at developer.confluent.io. And if you want to get your own Kafka cluster up and running with Tiered Storage enabled today, go and look at our cloud service, Confluent Cloud. You can sign up and have Kafka running reliably in minutes. And if you use the code PODCAST100 on your account, you'll get some extra free credit to run with. With that, it remains for me to thank Rittika Adhikari for joining us and you for listening. I've been your host, Kris Jenkins, and I will catch you next time.

Intro
Motivating factors behind Tiered Storage
What is Tiered Storage?
How does it work?
How does it impact performance?
Evolution of Confluent Tiered Storage
Tiered Compaction
Kafka Tiered Storage
Getting started with Confluent Tiered Storage
What is the impact for end users?
It's a wrap!