We have introduced file sink and it’s batching strategy(detail in ‣ ), currently, the last piece of the puzzle for the file sink is "exactly once”. In the previous design, the file sink could batch across barriers(checkpoints) based on the batching strategy, finishing writing a file only when the batching condition was triggered. In other words, the file commit and checkpoint are decoupled. However, the current implementation is "at least once," meaning that after cluster recovery, previously written data may be duplicated.
The File Sink writer will trigger try_commit
after reaching the batching conditions (it will force a commit when scaling occurs, specifically during UpdateVnodeBitmap
). Once a commit is needed, it will immediately close the writer, completing the writing of a file, making it visible to downstream. The current implementation is at least once, meaning that during failure recovery, data duplication may occur.
Basic interfaces
pub enum LogStoreReadItem {
StreamChunk {
chunk: StreamChunk,
chunk_id: ChunkId,
},
Barrier {
is_checkpoint: bool,
},
UpdateVnodeBitmap(Arc<Bitmap>),
}
Writer interfaces{
// write chunk(aka upload part)
write_batch(&mut self, chunk: StreamChunk)
// close writer
commit()
try_commit()
}
// Consume log store
match LogStoreReadItem{
StreamChunk => {
write_batch(chunk);
try_commit();
}
Barrier => {
try_commit();
}
// scale
UpdateVnodeBitmap => {
commit();
}
}
This RFC proposes a framework for implementing the exactly-once sink, primarily targeting file sink. At the end of the document, we can discuss the possibility of implementing exactly-once for other sinks using this framework.
To achieve "exactly once", we need to ensure that the data sent to the external sink is written only once. The key here is that if we can know the location of the data in the log store, and the downstream sink provides a way to perform idempotent query operations whether a bunch of data exist in downstream sinks, we can decide how to handle the data in the log store during recovery based on the visibility of the downstream: either rewind or truncate it.
Based on this, implementing this idea roughly requires:
Next, let’s take file sink as an example to introduce the details of this solution.
The essential issue here is to persist the output data’s metadata and its location in the log store, and then pre commit 1. metadata
and 2. location in log store
before the data is visible in the downstream sink.
For the file sink, the metadata here is the file name, which allows us to accurately check whether a specific batch of data has been written to the downstream object store by specifying the file name.
For the location in log store, during recovery, the order of several chunks between two barriers may not be the same as before. This means the chunk_id
in the log store (log reader) are unreliable, only the epoch are globally consistent. Therefore, the finest granularity for this location is the epoch
.
In addition to epochs, it is also necessary to record the vnode
. The reason for this is that during scaling or recovery, vnodes originally allocated to one parallelism may become scattered across various parallelisms. The internal implementation of the log store is similar, with n
queues consuming data, where n
is the number of vnodes.
Under the current file sink batching implementation, data for a successfully written file generally starts creating the writer at epoch A and closes it at epoch B. Therefore, it is necessary to record the start_epoch
and end_epoch
. The start_epoch
serves as the rewind point when data has not yet been written to the downstream, while the end_epoch
acts as the truncate endpoint after the data has been written to the downstream.
Therefore, what to persist can be abstracted into the following struct: