在 Keyed State 中,提供了以下数据结构,以便存储不同类型的数据:
// 这里 RichMapFunction 泛型,第一个参数是输入,第二个参数是输出
class MapWithCounterFunction extends RichMapFunction<Tuple2<String, String>, Long> {
private ValueState<Long> totalLengthByKey; // 注册需要保存的中间状态变量,这里是 ValueState 类型的数据
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 创建State描述符
ValueStateDescriptor<Long> stateDescriptor =
new ValueStateDescriptor<Long>("sum of length", LongSerializer.INSTANCE);
totalLengthByKey = getRuntimeContext().getState(stateDescriptor); // 注册状态变量
}
@Override
public Long map(Tuple2<String, String> value) throws Exception {
Long length = totalLengthByKey.value(); // 读取数据
if (length == null) {
length = 0L;
}
long newTotalLength = length + value.f1.length();
totalLengthByKey.update(newTotalLength); // 更新数据
return newTotalLength;
}
}
调用 RichMapFunction
DataStream<Tuple<String,String> strings = ...
DataStream<Long> lengs = strings.keyBy(0).map(new MapWithCounterFunction());