我正试图在KafkaConsumer对象中实现(刚刚开始使用Java和Flink(一个无键状态,因为在这个阶段没有调用keyBy((。这个对象是前端,也是处理Kafka消息的第一个模块。SourceOutput是表示消息的原型文件。
我有KafkaConsumer对象:
public class KafkaSourceFunction extends ProcessFunction<byte[], SourceOutput> implements Serializable
{
@Override
public void processElement(byte[] bytes, ProcessFunction<byte[], SourceOutput>.Context
context, Collector<SourceOutput> collector) throws Exception
{
// Here, I want to call to sorting method
collector.collect(output);
}
}
我有一个对象(KafkaSourceSort(,它完成所有的排序,应该将无序的消息保持在priorityQ状态,如果消息以正确的顺序通过收集器,它还负责传递消息。
class SessionInfo
{
public PriorityQueue<SourceOutput> orderedMessages = null;
public void putMessage(SourceOutput Msg)
{
if(orderedMessages == null)
orderedMessages = new PriorityQueue<SourceOutput>(new SequenceComparator());
orderedMessages.add(Msg);
}
}
public class KafkaSourceState implements Serializable
{
public TreeMap<String, SessionInfo> Sessions = new TreeMap<>();
}
我读到我需要使用一个非键控状态(ListState(,它应该包含会话的映射,而每个会话都包含一个priorityQ,其中包含与该会话相关的所有消息。
我找到了一个例子,所以我实现了这个:
public class KafkaSourceSort implements SinkFunction<KafkaSourceSort>,
CheckpointedFunction
{
private transient ListState<KafkaSourceState> checkpointedState;
private KafkaSourceState state;
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception
{
checkpointedState.clear();
checkpointedState.add(state);
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception
{
ListStateDescriptor<KafkaSourceState> descriptor =
new ListStateDescriptor<KafkaSourceState>(
"KafkaSourceState",
TypeInformation.of(new TypeHint<KafkaSourceState>() {}));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored())
{
state = (KafkaSourceState) checkpointedState.get();
}
}
@Override
public void invoke(KafkaSourceState value, SinkFunction.Context contex) throws Exception
{
state = value;
// ...
}
}
我发现我需要实现一个调用消息,该消息可能会从processElement((调用,但invoke((的签名不包含收集器,我不知道如何做到这一点,甚至到目前为止我是否还可以。
请帮忙,我们将不胜感激。谢谢
SinkFunction
是作为作业图的DAG中的终端节点。它的接口中没有Collector
,因为它不能向下游发射任何东西。它应该连接到外部服务或数据存储,并在那里发送数据。
如果你更多地分享你正在努力实现的目标,也许我们可以提供更多的帮助。也许有一种更简单的方法可以解决这个问题。