我有两个来源,kafka和hbase。在卡夫卡中,只有24小时的数据流。在Hbase中,从一开始就有一个聚合的数据。我的目的是,当某个会话的流输入(Kafka(发生时,两个数据在流处理上合并。我尝试了几种方法,但由于性能原因,它不满意。
经过一些搜索,我对键控过程函数中的状态有了一个想法。这个想法在下面。(使用键控过程功能的状态进行缓存(
- 使用会话信息对键控过程函数进行输入
- 检查键控进程的状态
- 如果状态未初始化->然后从hbase查询并初始化到状态->转到5
- else(状态已初始化(->转到5
- 使用状态执行业务逻辑
在编写这个想法的过程中,我遇到了性能问题,即使用同步方式查询hbase的速度很慢。所以,我尝试了异步版本,但它很复杂。
我面临两个问题。其中一个是processElement和hbase Async工作线程之间的线程安全问题,另一个是在processElement函数结束后(而不是hbase Asyncworker结束后(进程函数的上下文过期。
val sourceStream = env.addsource(kafkaConsumer.setStartFromGroupOffsets())
sourceStream.keyBy(new KeySelector[InputMessage, KeyInfo]() {
override def getKey(v: InputMessage): KeyInfo = v.toKeyInfo()
})
.process(new KeyedProcessFunction[KeyInfo, InputMessage, OUTPUTTYPE]() {
var state: MapState[String, (String, Long)] = _
override def open(parameters: Configuration): Unit = {
val conn = ConnectionFactory.createAsyncConnection(hbaseConfInstance).join
table = conn.getTable(TableName.valueOf("tablename"))
state = getRuntimeContext.getMapState(stateDescripter)
}
def request(action: Consumer[CacheResult] ): Unit = {
if ( !state.isEmpty ) {
action.accept(new CacheResult(state))
}
else { // state is empty, so load from hbase
table.get(new Get(key)).thenAccept((hbaseResult: Result) => {
// this is called by worker thread
hbaseResult.toState(state) // convert from hbase result into state
action.accept(new CacheResult(state))
}
}
}
override def processElement(value: InputMessage
, ctx: KeyedProcessFunction[KeyInfo, InputMessage, OUTPUTTYPE]#Context
, out: Collector[OUTPUTTYPE]): Unit = {
val businessAction = new Consumer[CacheResult]() {
override def accept(t: CacheResult): Unit = {
// .. do business logic here.
out.collect( /* final result */ )
}
}
request(businessAction)
}
}).addSink()
有什么建议可以让KeyedProcessFunction在第三方的异步调用中可用吗?
或者在Flink中使用混合的Kafka和Hbase有其他想法吗?
我认为你的一般假设是错误的。我遇到了类似的问题,但涉及的问题完全不同,还没有解决。在程序中保持状态与异步函数是矛盾的,Flink通过其设计防止在异步代码中使用状态(这是一件好事(。如果你想让你的函数异步,那么你必须摆脱状态。为了实现您的目标,您可能需要重新设计您的解决方案。我不知道关于你的问题的所有细节,但你可以考虑把你的流程分成更多的管道。例如,您可以创建使用hbase中数据的管道,并将其传递到kafka主题中。然后,另一个管道可以使用从hbase收集数据的管道发送的数据。在这种方法中,您不必关心状态,因为每个管道都在做自己的事情,只是消耗数据并进一步传递数据。