Background

We have introduced file sink and it’s batching strategy(detail in ‣ ), currently, the last piece of the puzzle for the file sink is "exactly once”. In the previous design, the file sink could batch across barriers(checkpoints) based on the batching strategy, finishing writing a file only when the batching condition was triggered. In other words, the file commit and checkpoint are decoupled. However, the current implementation is "at least once," meaning that after cluster recovery, previously written data may be duplicated.

The File Sink writer will trigger try_commit after reaching the batching conditions (it will force a commit when scaling occurs, specifically during UpdateVnodeBitmap). Once a commit is needed, it will immediately close the writer, completing the writing of a file, making it visible to downstream. The current implementation is at least once, meaning that during failure recovery, data duplication may occur.

Basic interfaces

pub enum LogStoreReadItem {
    StreamChunk {
        chunk: StreamChunk,
        chunk_id: ChunkId,
    },
    Barrier {
        is_checkpoint: bool,
    },
    UpdateVnodeBitmap(Arc<Bitmap>),
}

Writer interfaces{
  // write chunk(aka upload part)
  write_batch(&mut self, chunk: StreamChunk) 
  
  // close writer
  commit()
	try_commit()
}

// Consume log store
match LogStoreReadItem{
	StreamChunk => {
		write_batch(chunk);
		try_commit();
	}
	Barrier => {
		try_commit();
	}
	// scale
	UpdateVnodeBitmap => {
		commit();
	}
}

This RFC proposes a framework for implementing the exactly-once sink, primarily targeting file sink. At the end of the document, we can discuss the possibility of implementing exactly-once for other sinks using this framework.

Design

basic idea

To achieve "exactly once", we need to ensure that the data sent to the external sink is written only once. The key here is that if we can know the location of the data in the log store, and the downstream sink provides a way to perform idempotent query operations whether a bunch of data exist in downstream sinks, we can decide how to handle the data in the log store during recovery based on the visibility of the downstream: either rewind or truncate it.

Based on this, implementing this idea roughly requires:

  1. The downstream sink must provide idempotent access capabilities, meaning it can accurately determine through metadata whether certain data has already been written to the downstream.
  2. In RisingWave, persist the metadata in 1, along with its location in the log store.

Next, let’s take file sink as an example to introduce the details of this solution.

what to persist

The essential issue here is to persist the output data’s metadata and its location in the log store, and then pre commit 1. metadata and 2. location in log store before the data is visible in the downstream sink.

Under the current file sink batching implementation, data for a successfully written file generally starts creating the writer at epoch A and closes it at epoch B. Therefore, it is necessary to record the start_epoch and end_epoch. The start_epoch serves as the rewind point when data has not yet been written to the downstream, while the end_epoch acts as the truncate endpoint after the data has been written to the downstream.

Therefore, what to persist can be abstracted into the following struct: