Deprecated, see RFC: The WatermarkFilter and StreamSort Operator

Background

About three months ago, @Eric Fu wrote a design doc about the watermark in our streaming system. Every watermark must be assigned over a specified time window in that design. However, many cases of watermarks are not covered in that design.

Design

Watermark definition and behavior

In the streaming context, the watermark guarantees the completeness and orderliness of the incoming stream. With the watermark, all events are emitted with a specified latency, and late events are ignored.

But in the database context, the watermark's behavior is the same as a filter:

  1. New events over the current watermark were filtered (will be visible in the future).
  2. Historical late events below their corresponding watermark were filtered (ignored forever).

watermark.drawio.png

WatermarkFilterExecutor

WatermarkExecutor{ time_col, time_out }

state_table:

columns: all columns of input

pk : time_col | input_pk

time_col, timeout = user_defined()
# an ordered memory data structure buffers the records
state = recover_from_state().or_empty()
watermark = max((rec[time_col] for rec in state)).or(0)
def process(msg):
  match msg:
		case Checkpoint(epoch):			
      dump(state, epoch)
    case Record(op, rec):
			time = rec[time_col]
			if time < watermark:
				return
			new_watermark = time - timeout
			if new_watermark > watermark:
				completed_recs = state.scan_and_delete(watermark, new_watermark)
				watermark = new_watermark
				emit(completed_recs)
			state.apply(op, time, rec)