Keyed State 中可用的数据结构

在 Keyed State 中,提供了以下数据结构,以便存储不同类型的数据:

Keyed State 使用案例

// 这里 RichMapFunction 泛型,第一个参数是输入,第二个参数是输出
class MapWithCounterFunction extends RichMapFunction<Tuple2<String, String>, Long> {

    private ValueState<Long> totalLengthByKey;  // 注册需要保存的中间状态变量,这里是 ValueState 类型的数据

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
				// 创建State描述符
        ValueStateDescriptor<Long> stateDescriptor =
                new ValueStateDescriptor<Long>("sum of length", LongSerializer.INSTANCE);
        totalLengthByKey = getRuntimeContext().getState(stateDescriptor);  // 注册状态变量
    }

    @Override
    public Long map(Tuple2<String, String> value) throws Exception {
        Long length = totalLengthByKey.value();  // 读取数据
        if (length == null) {
            length = 0L;
        }
        long newTotalLength = length + value.f1.length();
        totalLengthByKey.update(newTotalLength);  // 更新数据
        return newTotalLength;
    }
}

调用 RichMapFunction

DataStream<Tuple<String,String> strings = ...
DataStream<Long> lengs = strings.keyBy(0).map(new MapWithCounterFunction());