Deprecated, see RFC: The WatermarkFilter and StreamSort Operator
About three months ago, @Eric Fu wrote a design doc about the watermark in our streaming system. Every watermark must be assigned over a specified time window in that design. However, many cases of watermarks are not covered in that design.
In the streaming context, the watermark guarantees the completeness and orderliness of the incoming stream. With the watermark, all events are emitted with a specified latency, and late events are ignored.
But in the database context, the watermark's behavior is the same as a filter:

WatermarkExecutor{ time_col, time_out }
state_table:
columns: all columns of input
pk : time_col | input_pk
time_col, timeout = user_defined()
# an ordered memory data structure buffers the records
state = recover_from_state().or_empty()
watermark = max((rec[time_col] for rec in state)).or(0)
def process(msg):
match msg:
case Checkpoint(epoch):
dump(state, epoch)
case Record(op, rec):
time = rec[time_col]
if time < watermark:
return
new_watermark = time - timeout
if new_watermark > watermark:
completed_recs = state.scan_and_delete(watermark, new_watermark)
watermark = new_watermark
emit(completed_recs)
state.apply(op, time, rec)