对于 Operator State :

三种状态类型,这里状态类型主要针对并行度变化时的不同处理:

两种定义方式:

ListState 调用案例

// 实现 BufferingSink 的发送数据,在队列满时输出
// 在此基础上实现状态数据的持久化,避免未发送的队列数据丢失
class BufferingSinkFunction
        implements SinkFunction<Tuple2<String, Integer>>,
        CheckpointedFunction {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;  // 存储队列数据

    public BufferingSinkFunction(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element : bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // 保存当前队列数据的快照
        checkpointedState.clear();
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
                new ListStateDescriptor<>(
                        "buffered-elements",
                        TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
                        }));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        // 需要恢复数据
        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }
}

Broadcast State

Broadcast State 使得 Flink 用户能够以容错、一致、可扩缩容地将来自广播的低吞吐的事件流数据存储下来,被广播到某个 operator 的所有并发实例中,然后与另一条流数据连接进行计算。

广播状态与其他 operator state 之间有三个主要区别:

  1. Map 格式类型