使用nifi验证官方提供的处理器,只支持put-也就是set方式.对其他方式目前支持.也不知道为啥 官方的处理器使用方式
nifi PutDistributedMapCache redis处理器
因为要完全重新创建一个一模一样的处理器,连接池和服务也都要创建,不然读取不到;
连接池服务是RedisConnectionPoolService
<aside> 💡 该连接池使用的底层是jedis连接池,使用的是springboot2.5.1的redis
</aside>
封装redis的方法服务是 RedisDistributedMapCacheClientService
需要复制的清单
//用到了3个接口,2个服务,2个工具类
//接口
RedisConnectionPool
DistributedMapCacheClient 改名
RedisAction
//服务
RedisConnectionPoolService 改名
RedisDistributedMapCacheClientService 改名
//工具类
RedisUtils
RedisType
新建一个处理器 PutDistributedHashMapRedis
开始改造
连接池原封不动,就能用
redis实现服务需要重新实现一个支持hashMap的redis方法
处理调用hashMap的方法即可;
大致思路确定
对DistributedMapCacheClient 接口新增一个hset方法
public interface DistributedMapCacheClient2 extends ControllerService {
void hset(String hkey,String key, byte[] value) throws IOException;
}
对RedisDistributedMapCacheClientService 同时实现hset方法
public class RedisDistributedMapCacheClientService2 extends AbstractControllerService implements DistributedMapCacheClient2 {
@Override
public void hset(String hkey, String key, byte[] value) throws IOException {
withConnection(redisConnection -> {
// final TupleC kv = new TupleC(hkey.getBytes(StandardCharsets.UTF_8), key.getBytes(StandardCharsets.UTF_8), value);
// redisConnection.hSet(kv.getKey(), kv.getField(), kv.getValue());
//验证不需要对象实例化可以存储,所以就去除了实体对象
redisConnection.hSet(hkey.getBytes(StandardCharsets.UTF_8), key.getBytes(), value);
return null;
});
}
}
PutDistributedHashMapRedis
处理器
public class PutDistributedHashMapRedis extends AbstractProcessor {
//新增一个key的选项
public static final PropertyDescriptor HASHKEY_CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder()
.name("Hash cache Entry Identifier")
.description("A FlowFile attribute, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the cache key")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
//其他不动.但是Distributed Cache Service 选项卡需要改变一下接口
// Identifies the distributed map cache client
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
.name("Distributed Cache Service")
.description("The Controller Service that is used to cache flow files")
.required(true)
.identifiesControllerService(**DistributedMapCacheClient2**.class)
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
**descriptors.add(HASHKEY_CACHE_ENTRY_IDENTIFIER);**
descriptors.add(CACHE_ENTRY_IDENTIFIER);
descriptors.add(DISTRIBUTED_CACHE_SERVICE);
descriptors.add(CACHE_UPDATE_STRATEGY);
descriptors.add(CACHE_ENTRY_MAX_BYTES);
return descriptors;
}
//实现方法
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
//从原处理器把代码都复制过来即可,原处理器是:PutDistributedMapCache
if (updateStrategy.equals(CACHE_UPDATE_REPLACE.getValue())) {
**cache.hset(hashKey, cacheKey, cacheValue);**
cached = true;
} else if (updateStrategy.equals(CACHE_UPDATE_KEEP_ORIGINAL.getValue())) {
// final byte[] oldValue = cache.getAndPutIfAbsent(cacheKey, cacheValue, keySerializer, valueSerializer, valueDeserializer);
// if (oldValue == null) {
// cached = true;
// }
}
}
}
改完把pom的jar改成nar,打包放到nifi项目的lib中,启动即可