查看和操作 kafka 存储数据的快速方法? #isoBlue #isoBus



以最快和最简单的方式,我希望 Kafka 消息日志中的二进制数据可被视为十六进制字符串值。

我有以下包含二进制CAN消息的数据,我想将其视为十六进制字符串

案例-8010-小麦-AULT-072018/├── 清洁偏移检查点 ├── 调试-0 │ ├── 00000000000000000000000000000000000000000000000000000000000000000000000000000000000000 │ ├── 000000000000000000000000000000000000000000000000000000000000000000000000000000000.log 00000000000000 │ ├── 0000000000000000000000.timeindex │ ├── 00000000000000006972.首页 │ ├── 0000000000000006972.log │ ├── 00000000000000006972.快照 │ ├── 0000000000000006972.timeindex │ ├── 00000000000000079766.快照 │ └── 领导者-时代-检查点 ├── 全球定位系统-0 │ ├── 00000000000000000000000000000000000000000000000000000000000000000000000000000000000000 │ ├── 000000000000000000000000000000000000000000000000000000000000000000000000000000000.log 00000000000000 │ ├── 0000000000000000000000.timeindex │ ├── 00000000000000003235.索引 │ ├── 00000000000000003235.log │ ├── 00000000000000003235.快照 │ ├── 00000000000000003235.时间索引 │ ├── 00000000000000029657.快照 │ └── 领导者-时代-检查点 ├── IMP-0 │ ├── 00000000000000000000000000000000000000000000000000000000000000000000000000000000000000 │ ├── 000000000000000000000000000000000000000000000000000000000000000000000000000000000.log 00000000000000 │ ├── 0000000000000000000000.timeindex │ ├── 0000000000000004940.首页 │ ├── 00000000000000004940.log │ ├── 00000000000000004940.快照 │ ├── 00000000000000004940.时间索引 │ ├── 00000000000000915321.快照 │ └── 领导者-纪元-检查点

数据源提供了用于使用数据的文档,但可能是由于版本不匹配,使用 kafka_2.11-0.11.0.1.tgz 的步骤会失败。 https://www.isoblue.org/docs/data/data/

尝试直接查看数据,我有:

1. 玩 kafka.tools.DumpLogSegments

./kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --print-data-log  --files ../../case-8010-wheat-ault-072018/imp-0/00000000000000004940.log | head -n 15

我试图操作 DumpLogSegments 输出,但数据看起来不像预期的那样。./kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files ../../case-8010-wheat-ault-072018/imp-0/00000000000000004940.log | awk -F'payload:' '{print $2}' | awk -F'offset:' '{print $1}' | od -A n -t x1 | head -n 10

2.我目前正在尝试编写基于以下内容的文件源连接器:https://docs.confluent.io/current/connect/devguide.html#connector-example

我无法让KafkaConsumer使用Python2 Kafka。isoBlue 脚本对我不起作用。

1. 编辑 $KAFKA_HOME/config/server.propertieslog.dirs=<KAFKA_DATA>

2. zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties

3.kafka-topics.sh --列表 --动物园管理员 本地主机:2181

4. kafka-server-start.sh $KAFKA_HOME/config/server.properties

5.使用 Python3:

#!/usr/bin/env python3
from kafka import KafkaConsumer
import sys
try:
bootstrap_servers = ['localhost:9092']
topicName = 'tra'
consumer = KafkaConsumer (topicName, group_id = 'can-test',bootstrap_servers = bootstrap_servers,auto_offset_reset='earliest');
for message in consumer:
wait = input("PRESS ENTER TO CONTINUE.")    
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
except KeyboardInterrupt:
sys.exit()

6.它确实帮助我观察服务器是否正在运行以及数据是否仍然存在。

watch -n 5 ls -alF <KAFKA_DATA> | head
watch -n 5 netstat -a | grep 9092

相关内容

最新更新