使用下面的代码将PcapPackets传递到队列中,是否可以将其传递到Kafka队列中,以便Kafka消费者可以从Kafka生产者那里提取PcapPacket?
StringBuilder errbuf = new StringBuilder();
Pcap pcap = Pcap.openOffline("tests/test-afs.pcap", errbuf);
PcapPacketHandler<Queue<PcapPacket>> handler = new PcapPacketHandler<Queue<PcapPacket>>() {
public void nextPacket(PcapPacket packet, Queue<PcapPacket> queue) {
PcapPacket permanent = new PcapPacket(packet);
queue.offer(packet);
}
}
Queue<PcapPacket> queue = new ArrayBlockingQueue<PcapPacket>();
pcap.loop(10, handler, queue);
System.out.println("we have " + queue.size() + " packets in our queue");
pcap.close();
Kafka支持将任意二进制数据存储为消息。在您的情况下,您只需要提供一个PcapPacket类二进制序列化程序(以及用于读取的反序列化程序)。
参见Kafka:编写自定义序列化程序的示例。
虽然我参加聚会迟到了,但如果有类似需求的人觉得它有用,我会在这里分享我的工具:Pcap Processor(GitHub URL)。我为我的研究开发了一个Python工具,用于读取原始pcap文件、处理它们并将它们提供给我的流处理器。由于我尝试了各种流协议,所以我在这个工具中实现了所有这些协议。当前支持的接收器:
- CSV文件
- Apache Kafka(编码为JSON字符串)
- HTTP REST(JSON)
- gRPC
- 控制台(只需打印到终端)
例如,要读取input.pcap
并将其发送到Kafka主题,需要调整Kafka_sink.py中的引导端点和主题名称。然后,从父目录执行以下命令将读取该文件并将数据包发送到Kavka队列。
python3 -m pcap_processor --sink kafka input.pcap
有关更多详细信息和安装说明,请查看GitHub自述文件,如果遇到任何问题,请随时打开GitHub问题。