Inca — Message Tracing and Loss Detection For Streaming Data @Netflix
by Allen Wang, Real Time Data Infrastructure
At Netflix, our real time data infrastructure have embraced the multi-cluster Kafka architecture and Flink powered stream processing platform, which enable us to deliver trillions of messages per day. This architecture is described in detail in the above blogs and is illustrated in the diagram below.
This architecture centers on the concept of “stream” and enables us to provide tiers of services that provide different message delivery guarantees for different use cases.
Recently we launched a new effort to create a distributed and real time message tracing system inside this streaming data infrastructure. We named the tracing system Inca, which is a famous mountain trail in South America. This name — a departure from our convention to use mountain names like Everest and Rocky for data infrastructure — reflects our belief that this is a trail that can help us to climb higher.
Why is message tracing necessary?
Historically, for most of the Netflix data, we optimize for availability in message publishing. We typically configure two replicas for each Kafka partition and enables unclean leader election to maximize Kafka’s availability. The cost is potential data loss on the broker due to log truncation and duplicate messages for the consumer due to offset reset.
Unclean leader election is not the only cause for data inconsistency. Data loss can also happen due to some edge cases in Kafka’s replication protocol, according to this talk in Kafka Summit.
Most of our data analysis jobs can tolerate minor data loss and duplicates. However, it is paramount for us to provide precise measurements to justify this design trade-off. We can easily provide the success rate on message publishing, thanks to Kafka’s acknowledgement on producing. But keep in mind that messages can get lost after Kafka’s acknowledgement. Before the creation of Inca, for end-to-end measurement, we can only rely on aggregated send/receive count. This can be misleading given that duplicates can exist in the count.
More importantly, we have new use cases from studio production, where change data capture (CDC) plays an important role. This demands a higher level of message delivery guarantee and an audit capability for lost messages.
Essentially these are the questions we try to address with Inca:
- If any system in our infrastructure claims certain data delivery guarantee (e.g., at-least-once), does it really keep its promise?
- Assuming data loss is acceptable for certain configuration, can we identify the lost data so that there is a chance to recover them?
- Can we provide a good measurement for message loss rate, duplicate rate and latency as indications of the quality of the service?
One important difference between Inca vs. the traditional tracing in inter-process communication is that our emphasis is on detecting message loss and duplicates. It is not intended to produce the data flow graph, or stream lineage. Instead, our managed streaming platform can already provide stream lineage information which we leverage as an input in Inca.
In a nutshell, we want to generate “traces” for messages being transported in our system and analyze those traces to detect loss and derive the metrics we need.
In our system, we identify each message with a unique ID (UUID) so we expect multiple traces for each message ID.
Let’s start with a simple but common case as shown below.
Assuming the stream processing job does a simple transformation or enrichment which does not alter the identification of the message, here are the type of traces that will be generated for each message being traced:
- A trace that represent a successful sent event from event producer when it produces the message to Kafka Cluster A
- A trace that represent a successful received event in the stream processing job when it consumes the message from Kafka Cluster A
- A trace that represent a successful sent event in the stream processing job when it produces the message to Kafka Cluster B
Note that we do not generate any trace for unsuccessful message publishing. Such metric is already available and applications can simply rely on Kafka’s acknowledgement to find it out, apply retries and take actions at their own discretion. This means Inca only traces messages that have received successful acknowledgements at publishing.
The trace messages will include the following attributes
- ID of the message being traced
- Location where the trace is generated
- Corresponding Kafka cluster name
- Trace type: Sent or Received
- Kafka TopicPartition and offset for the message being traced
- Any custom attributes that can help to recover the message if it is identified as lost, for example, row ID in RDBMS
We decided to use Kafka as the message broker for tracing. The reason is that Kafka has been proved to be capable to handle large volume of traffic with a small footprint on the client side. Once the data is in Kafka, it opens up all possibilities for streaming processing or even batch processing if data are transported to data warehouse.
The following diagram shows how tracing works with message flow:
Generating trace messages
Trace is enabled at the source, which is our Event Producer. Our infrastructure allows the owner of a stream to specify the percentage of messages being traced, up to 100%. If there is a need to audit all lost messages for a stream, it is necessary to turn on tracing for 100% of the messages. This potentially can be very costly but in practice such streams typically have small traffic volume. On the other hand, streams with millions of messages per second would typically care about delivery metrics rather than identities of individual lost messages.
The signal to indicate that the message must be traced along the data flow path is set as a Kafka record header, where the value of the header is the message ID. This record header is checked by every processing component and is guaranteed to propagate through all processors.
Here is how and when the trace messages are sent:
- From Kafka producers: we sent the trace messages by hooking into the producer send callback. When a successful acknowledgement is received from the broker and the message is randomly chosen for tracing according to the sampling rate, we send the trace. To achieve this in our stream processing application written in Flink, we modified Flink source so that its standard FlinkKafkaProducer can be constructed with an optional user callback factory which is responsible to create a producer send callback to generate and send the trace.
- From Kafka consumers: we implemented consumer interceptor to send trace messages. We chose to rely on consumer interceptor because it is the lowest barrier for a consumer to participate in tracing, which is as simple as adding a consumer configuration of interceptor class. Consumer interceptor has access to the whole record, as well as the headers, which makes the implementation of tracing possible.
There are a few design alternatives that are worth discussing. One of these is whether we should use ZipKin to generate traces. We chose not to go with ZipKin for the following reasons:
- As mentioned at the beginning, the goal of our tracing is simple — to detect data loss/duplicates and generate corresponding metrics, but not for creating the data flow graph. Therefore a full-blown tracing framework like ZipKin seems to be an overkill.
- Existing ZipKin Kafka clients only interact with Kafka record. This makes it difficult to leverage our own record abstraction designed for our data infrastructure.
Trace Data Processing
Inca uses Flink stream processing to analyze tracing data. Using stream processing enabled us to publish the results at near real time so that we can immediately take actions to improve the service as opposed to waiting for hours or days.
Inca’s stream processing job groups the trace data by trace IDs (which is the same as message IDs) so that traces of the same message will be analyzed altogether. The job calls our data infrastructure’s control plane to obtain routing information for each stream. Loss can be detected if an expected trace on the route is missing. Likewise duplicates can be detected if more than one trace of the same type is found at a particular point in the route. Latency metric can be calculated by comparing the timestamp of trace messages at each sending/receiving point, barring the small clock drift in our distributed system.
For example, assuming message route of (Event Producer → Kafka Cluster A → Stream Process → Kafka Cluster B) as illustrated previously, we can claim the following
- Message is lost if trace of type Received for remote location Kafka Cluster A is missing
- Message is duplicated if more than one traces of type Received for remote location Kafka Cluster A are found
The biggest challenge here is the choice of windowing. Intuitively a session window seems to be a good fit for this use case. However it is hard to determine the gap of the session window. If the gap is too large it means we have to save the state for a long time, which leads to a huge disk size for our jobs given the potential high volume of messages being traced. On the other hand, if the gap is too small the job will generate a lot of data loss signals that are false positives.
The reality of our streaming data infrastructure is that 99% of our data will be processed and delivered to the final destination within a minute after the data is produced. But for the remaining 1%, it can take as long as one hour or two. This can happen when the stream experienced significant increase of data volume, but the stream processing job has not scaled up yet and therefore is significantly lagging behind. Given this nature, applying a uniform windowing would be plainly inefficient.
So the question becomes this: is there a way to smartly determine if you should keep waiting for a trace message to arrive, or simply give up as early as possible? It is like catching a train which does not always keep to its schedule. When you rush to the platform and the train is not there, will you wonder if you are waiting for a train that may never arrive?
Luckily, there are external signals that can help us to make a smart decision: Kafka consumer offsets. Our stream processing jobs commit the consumer offsets to Kafka. For each message, the first Sent trace from producer will tell us its Kafka offset and we can compare processing job’s current offset position with it. If greater, it means the stream processing job should have already processed it and the trace messages should have arrived or be arriving very soon. Otherwise we should keep waiting patiently.
We eventually chose Flink’s GlobalWindow with custom Trigger for great flexibility. As the job processes each trace message and firing of event timers, the whole processing logic becomes a state machine. With the help of consumer offsets, we can take shortcuts to quickly reduce the state and save a lot of resources, especially disk space, and be able to create message loss signal within a few minutes after the loss happens.
The following code snippet shows the skeleton of the Flink job built on top of our Streaming Processing As A Platform (SpaaS) and record abstraction:
The following diagram shows the complete architecture of Inca:
What about lost traces
Traces are sent to Kafka from different components in our data infrastructure. The delivery of traces are vital to the accuracy to our trace processing. To ensure trace delivery, we applied the following measures
- Kafka cluster for traces is isolated from other clusters and over provisioned
- The cluster is configured for high durability and consistency
- three replicas with two minimal in sync replicas
- unclean leader election disabled
- broker instances backed by AWS EBS
- ack=all for trace producers to ensure consistency
During testing, we found that restart of stream processing jobs often leads to loss of traces. The reason is that trace producers have not been given a chance to flush the buffer when the job is shutdown. The remedy for that is to leverage the Flink’s checkpoint mechanism. When a Flink task is shutdown, it has to make a checkpoint where it needs to commit Kafka consumer offsets for all the messages it has already successfully processed and sent to the sink. Adding a flush for the tracing producer during checkpointing ensures that tracing messages will have the same delivery guarantee as the messages processed by Flink.
However, keep in mind that Inca only plays an assistance role in our data infrastructure. When the system is under stress or tracing cluster becomes unresponsive, we have no choice but to stop sending traces to give away resources to more critical functions. In fact, we spent significant effort to make sure tracing will never be the bottleneck. Therefore, losing trace messages is inevitable and even desired at certain times.
But does losing traces make Inca untrustworthy? Not at all. Assuming that our infrastructure will lose 0.01% of all messages and 0.005% for trace messages (because of its high durability configuration), here is a table that shows the probabilities of all scenarios:
There are some observations we can make:
- When trace is lost but message is delivered, we create a message loss signal which is false positive. This represents ⅓ of all cases where message loss is declared and can potentially cause some unnecessary operational burden. The good news is that is only happens at a chance of 0.005% for all messages transported. Since this is a very small amount and given that most of our system is idempotent or can tolerate small duplicates, one option is to simply resending the these messages again without analyzing them for false positives.
- If both the original message and trace message go to the ether, we lose the opportunity to catch it and the message is truly lost without chance to recover. But it happens only at the probability of 0.00005%. That means even though tracing is not perfect, it can reduce the message loss rate by 200 times. Great achievement!
This is just the beginning
We have put Inca to production for a range of selected data streams and have it traced about 500 million messages and processed two billion trace messages per day. Soon it proves to be effective. We found that most of the message loss is due to edge cases in Kafka replication where the replication factor is set to 2 and there is a change in the partition leader. In one interesting case, a broker had a hardware problem which lead to significant clock drift and started to truncate logs at high frequency. Thanks to Inca, we were immediately alerted for the message loss and terminated the outlier broker to avoid further loss.
The following is a typical metric dashboard for a specific stream that shows the rate and ratio of message loss/ duplicates and latency.
There are still quite a lot of to-do’s for Inca. For example, we have stream processing applications that consume from multiple streams and produce to multiple sinks. How can we trace this complicated data flow? Some applications would like to trace data flow not only in source and sink operators, but also intermediate processing operators. What would be the best way to achieve that? Can we further improve accuracy and reduce the resource utilization?
If you are interested to explore challenges like these and be part of a great team, please reach out.