如何将一个事件拆分为多个事件,以将它们发送到多路复用扇出流



我们计划使用 kafka flume-ng integration(Flafka),其中 flume 是 kafka 队列的消费者。Flume 代理将收到列出命令及其输出的文件,如下所示:

root@host> [Command1]
[Output1]
root@host> [Command2]
[Output2]

该文件可能包含多个命令,并且命令的输出可能很大。我们需要拦截事件(即文件数据)并根据命令将事件拆分为多个事件。然后,源会将流扇出到多个通道,将每个子事件发送到通道(使用多路复用),每个接收器会将命令信息存储到相应的 Hive 表中。是否可以使用扇出流将一个事件拆分为多个事件?或者如果我用其他方式问,我们可以在拦截器中将一个事件拆分为多个事件吗?

我已经阅读了有关正则表达式提取器拦截器和序列化程序的信息,但不确定它是否对这种情况有任何帮助。

如果我理解得很好,你需要将从 Kafka 队列中获取的原始事件拆分为几个子事件。你想知道哪一块Flume可以做到这一点。

我认为拦截器

不适合这个目的,因为拦截器被"放置"在源和通道之间,它们旨在添加、删除或修改有关 Flume 事件的标头,然后再将其放入通道; 同样,它们可以删除整个事件。但是他们无法基于其他现有事件生成多个事件。

我认为您正在寻找类似于附加到源的处理程序之类的东西,能够解释从 Kafka 获取的事件并在源输出中生成多个 Flume 事件。此概念类似于可以附加到HTTPSoure的处理程序(此处提供了更多详细信息)。如果您的源可以做到这一点,那么您很可能必须开发自己的自定义处理程序,因为您需要的功能非常具体。

感谢您的回复 frb。

我想将传入的事件拆分为多个子事件,并将它们发送到相应的通道。因此,拓扑中的第一个水槽节点会将每个子事件(使用多路复用)路由到可以处理此类信息的特定跃点。

根据您的回复,我知道使用拦截器无法完成。你能分享任何处理程序的例子或文档吗?

是的,水槽不能将事件拆分为多个事件。这是我对这种方法的替代解决方案,以 Kafka 源代码为例。

首先实现一个扩展 Kafka 源代码的源类,替换默认的 ChannelProcessor 对象。

public class XXXSplitSource extends KafkaSource {
    @Override
    public synchronized ChannelProcessor getChannelProcessor()
    {
        return new XXXYourChannelProcessorProxy(super.getChannelProcessor());
    }
}

然后,在通道处理器代理实现中,您可以使用自定义功能拆分事件。

public class XXXYourChannelProcessorProxy  extends ChannelProcessor {
    public ChannelProcessor  m_downstreamChannelProcessor = null;
    public XXXYourChannelProcessorProxy (ChannelSelector selector) {
        super(selector);
    }
    public XXXYourChannelProcessorProxy (ChannelProcessor processor) {
        super(null);
        m_downstreamChannelProcessor = processor;
    }
    @Override
    public void processEventBatch(List<Event> events) {
        List<Event> generatedEvents = YOUR_SPLIT_FUNCTION_HERE(events);
        m_downstreamChannelProcessor.processEventBatch(generatedEvents);    
    }
}

最新更新