将数据从一个Kafka主题移动到另一个主题在KafkaPython中是不起作用的



我是卡夫卡的新手。我目前正在尝试与Kafka合作,我想从一个名为";输入主题";它有5个分区,每个分区有一些随机整数;输出主题";其仅具有一个分区。

下面是我使用的代码

卡夫卡制作人

producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: bytes(x))
topic_name = "input-topic"
num_partitions = 3
def send_data_to_topics():
print("sending...")
for i in range(num_partitions):
for j in range(5):
producer.send(topic_name, value=j, partition=i)
producer.flush()

if __name__ == "__main__":
send_data_to_topics()

卡夫卡消费者

topic_name_input  = "input-topic"
topic_name_output = "output-topic"
data = []
def bytes_to_int(bytes):
result = 0
for b in bytes:
result = result * 256 + int(b)
return result
consumer = KafkaConsumer(
topic_name_input,
bootstrap_servers=['localhost:9092'],
group_id='my-group',
value_deserializer=lambda x: bytes_to_int(x)
)
producer1 = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: bytes(x))
def read_topic_data():
print("received")
for message in consumer:
print(message)
data.append(message.value)
def send_data_to_topic():
data = sorted(data) 
for d in data:
producer1.send(topic_name_output, value=d)
producer1.flush()



if __name__ == "__main__":
read_topic_data()
send_data_to_topic()

另一个卡夫卡消费者

topic_name_input  = "input-topic"
topic_name_output = "output-topic"
def bytes_to_int(bytes):
result = 0
for b in bytes:
result = result * 256 + int(b)
return result
consumer2 = KafkaConsumer(
topic_name_output,
bootstrap_servers=['localhost:9092'],
group_id='my-group_new',
value_deserializer=lambda x: bytes_to_int(x)
)
def read_topic_data():
print("received")
for message in consumer2:
print(message)

if __name__ == "__main__":
read_topic_data()

然而,我没有收到任何关于";输出主题";。我是不是遗漏了什么?我如何才能有效地实现它?我需要使用Kafka Streams吗?如有任何帮助,我们将不胜感激。谢谢

我建议使用KafkaStreams,这对于这个用例是有效的。

KafkaConsumer调用本质上是阻塞调用。当KafkaConsumer启动时,它开始处理主题中已有的消息,并等待新消息进入。当新消息进入主题时,它会处理新消息,然后继续等待更多消息。

如果没有新消息,参数consumer_timeout_ms可以传递给KafkaConsumer构造函数,以便在X时间后停止处理。

从共享的示例代码来看,

if __name__ == "__main__":
read_topic_data()
send_data_to_topic()

由于KafkaConsumer的性质,调用了read_topic_data((函数,但控制不会返回到main。因此,send_data_to_topic((永远不会被调用。

为了避免这个问题,从一个主题读取数据和向另一个主题发送数据应该同时工作。这里可以使用线程。

if __name__ == "__main__":
read_thread = Thread(target=read_topic_data)
read_thread.start()
write_thread = Thread(target=send_data_to_topic)
write_thread.start()

同样需要注意的是,KafkaProducer将发送data列表中的所有消息并退出。它在退出后不等待/处理新消息。所以用户必须实现等待以保持它的运行。

def send_data_to_topic():
while True:
print("starting write thread")
sorted_data = sorted(data)
for d in sorted_data:
producer1.send(topic_name_output, value=d)
producer1.flush()
print("finishing write thread")
time.sleep(10) # wait for 10 seconds before starting next iteration

代码的另一个问题是消息重复。read_topic_data((从Kafka输入主题读取消息,并将它们附加到列表data中。send_data_to_pic((从该列表中读取消息并将其发送到输出主题。不过,已处理的邮件不会从列表中删除。队列可用于解决此问题。

from threading import Thread
from queue import Queue
from kafka import KafkaProducer
from kafka import KafkaConsumer
topic_name_input = "input-topic"
topic_name_output = "output-topic"
data = Queue()

def bytes_to_int(bytes):
result = 0
for b in bytes:
result = result * 256 + int(b)
return result

consumer = KafkaConsumer(
topic_name_input,
bootstrap_servers=['avelusamy-mbp15:9092'],
group_id='my-group',
value_deserializer=lambda x: bytes_to_int(x)
)
producer1 = KafkaProducer(
bootstrap_servers=['avelusamy-mbp15:9092'], value_serializer=lambda x: bytes(x))

def read_topic_data():
print("received")
for message in consumer:
print(message)
data.put(message.value)

def send_data_to_topic():
while True:
producer1.send(topic_name_output, value=data.get())
producer1.flush()

if __name__ == "__main__":
read_thread = Thread(target=read_topic_data)
read_thread.start()
write_thread = Thread(target=send_data_to_topic)
write_thread.start()

最新更新