Stateful Stream Processing 是 Flink 中最底层的 API,上层的 SQL API 、Table API、DataStream API 等都是通对其的封装实现,其核心为称之为 Process Function。可以实现更大的自主灵活性。

常见的应用场景有:

下面以一个实际案例作为例子讲解 ProcessFunction 应该如何使用

需求:

  1. 记录每个传入的Key的counts数量
  2. 如果指定的Key在最近100ms(EventTime)没有接收到任何Element,则输出key/count键值对

实现思路:

  1. 存储count值,key以及最后更新的TimeStamp到ValueState 中,ValueState 由key 隐含定义
  2. 对于每条记录:
  3. Times被回调时:

实现代码:

// the data type stored in the state
public class CountWithTimestamp{
	public String key;
	public long count;
	public long lastModified;
}

// apply the process function onto a keyed stream
DataStream<Tuple2<String, Long>> result = stream
	.keyBy(0)
	.process(new CountWithTimeoutFunction());

public class CountWithTimeoutFunction
	extends RichProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> {
		private ValueState<CountWithTimestamp> state;

		@Override
		public void open(Configuration parameters) throws Exception {
			// register our state with the state backend
			state = getRuntimeContext()
				.getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));}}
		}

		@Override
		public void processElement(Tuple2<String, Long> value, Context ctx,Collector<Tuple2<String, Long>> out) throws Exception {
			// update our state and register a timer
			CountWithTimestamp current = state.value();
			if (current == null) {
				current = new CountWithTimestamp();
				current.key = value.f0;
			}
			current.count++;
			current.lastModified = ctx.timestamp();
			state.update(current);
			ctx.timerService().registerEventTimeTimer(current.lastModified + 100);
		}

		@Override
		public void onTimer(long timestamp, OnTimerContext ctx,Collector<Tuple2<String, Long>> out) throws Exception {
			// check the state for the key and emit a result if needed
			CountWithTimestamp result = state.value();
			if (timestamp >= result.lastModified + 100){  // 说明超过 100ms 没有操作过了
				out.collect(new Tuple2<String, Long>(result.key, result.count));
				state.clear();
			}
		}
}