对于 Operator State :
三种状态类型,这里状态类型主要针对并行度变化时的不同处理:
两种定义方式:
// 实现 BufferingSink 的发送数据,在队列满时输出
// 在此基础上实现状态数据的持久化,避免未发送的队列数据丢失
class BufferingSinkFunction
implements SinkFunction<Tuple2<String, Integer>>,
CheckpointedFunction {
private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements; // 存储队列数据
public BufferingSinkFunction(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element : bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// 保存当前队列数据的快照
checkpointedState.clear();
for (Tuple2<String, Integer> element : bufferedElements) {
checkpointedState.add(element);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
}));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
// 需要恢复数据
if (context.isRestored()) {
for (Tuple2<String, Integer> element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
}
}
Broadcast State 使得 Flink 用户能够以容错、一致、可扩缩容地将来自广播的低吞吐的事件流数据存储下来,被广播到某个 operator 的所有并发实例中,然后与另一条流数据连接进行计算。
广播状态与其他 operator state 之间有三个主要区别: