Stateful Stream Processing 是 Flink 中最底层的 API,上层的 SQL API 、Table API、DataStream API 等都是通对其的封装实现,其核心为称之为 Process Function。可以实现更大的自主灵活性。
常见的应用场景有:
下面以一个实际案例作为例子讲解 ProcessFunction 应该如何使用
需求:
实现思路:
实现代码:
// the data type stored in the state
public class CountWithTimestamp{
public String key;
public long count;
public long lastModified;
}
// apply the process function onto a keyed stream
DataStream<Tuple2<String, Long>> result = stream
.keyBy(0)
.process(new CountWithTimeoutFunction());
public class CountWithTimeoutFunction
extends RichProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> {
private ValueState<CountWithTimestamp> state;
@Override
public void open(Configuration parameters) throws Exception {
// register our state with the state backend
state = getRuntimeContext()
.getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));}}
}
@Override
public void processElement(Tuple2<String, Long> value, Context ctx,Collector<Tuple2<String, Long>> out) throws Exception {
// update our state and register a timer
CountWithTimestamp current = state.value();
if (current == null) {
current = new CountWithTimestamp();
current.key = value.f0;
}
current.count++;
current.lastModified = ctx.timestamp();
state.update(current);
ctx.timerService().registerEventTimeTimer(current.lastModified + 100);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx,Collector<Tuple2<String, Long>> out) throws Exception {
// check the state for the key and emit a result if needed
CountWithTimestamp result = state.value();
if (timestamp >= result.lastModified + 100){ // 说明超过 100ms 没有操作过了
out.collect(new Tuple2<String, Long>(result.key, result.count));
state.clear();
}
}
}