从kafka读取数据并在apache beam中打印



我是apache beam的超级新手,我正试图从kafka主题读取数据,然后在屏幕上打印。这是我正在尝试的,但我想我不确定打印部分

class LogProcessor(beam.DoFn):
def process(self, element):
print(element)

(p
| 'read' >> ReadFromKafka(cluster='mykafkacluster',topic='funny')
| 'print' >> beam.ParDo(LogProcessor())
)

有人可以帮助我与LogProcessor()部分,究竟是如何工作的?

您可以使用beam.Map与一个函数,该函数记录然后返回PCollection中的每个元素:

import logging
def log_element(elem):
logging.info(elem)
return elem

(p
| 'read' >> ReadFromKafka(cluster='mykafkacluster',topic='funny')
| 'print' >> beam.Map(log_element)
)

然后您将在DataflowUI控制台或Cloud Logging中看到日志。

最新更新