如何将PcapPackets传递到Kafka队列中



使用下面的代码将PcapPackets传递到队列中,是否可以将其传递到Kafka队列中,以便Kafka消费者可以从Kafka生产者那里提取PcapPacket?

StringBuilder errbuf = new StringBuilder();  
Pcap pcap = Pcap.openOffline("tests/test-afs.pcap", errbuf);  
PcapPacketHandler<Queue<PcapPacket>> handler = new PcapPacketHandler<Queue<PcapPacket>>() {  
  public void nextPacket(PcapPacket packet, Queue<PcapPacket> queue) {  
    PcapPacket permanent = new PcapPacket(packet);  
    queue.offer(packet);  
  }  
}  
Queue<PcapPacket> queue = new ArrayBlockingQueue<PcapPacket>();  
pcap.loop(10, handler, queue);  
System.out.println("we have " + queue.size() + " packets in our queue");  
pcap.close(); 

Kafka支持将任意二进制数据存储为消息。在您的情况下,您只需要提供一个PcapPacket类二进制序列化程序(以及用于读取的反序列化程序)。

参见Kafka:编写自定义序列化程序的示例。

虽然我参加聚会迟到了,但如果有类似需求的人觉得它有用,我会在这里分享我的工具:Pcap Processor(GitHub URL)。我为我的研究开发了一个Python工具,用于读取原始pcap文件、处理它们并将它们提供给我的流处理器。由于我尝试了各种流协议,所以我在这个工具中实现了所有这些协议。当前支持的接收器:

  • CSV文件
  • Apache Kafka(编码为JSON字符串)
  • HTTP REST(JSON)
  • gRPC
  • 控制台(只需打印到终端)

例如,要读取input.pcap并将其发送到Kafka主题,需要调整Kafka_sink.py中的引导端点和主题名称。然后,从父目录执行以下命令将读取该文件并将数据包发送到Kavka队列。

python3 -m pcap_processor --sink kafka input.pcap

有关更多详细信息和安装说明,请查看GitHub自述文件,如果遇到任何问题,请随时打开GitHub问题。

最新更新