由于值序列化程序为null,Kafka使用者失败



我有一个Kafka使用者从生产者那里读取,但有时在从表中删除记录时,使用者会拾取空值,从而在值序列化程序中产生NoneType错误消息。如何重写此"value_serializer=lambda x loads(x.decode('utf-8'(("以检查None或Nulls。请注意,这是使用来自postgresql数据库的Kafka流。

consumer = KafkaConsumer(
topic,
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
value_serializer=lambda x loads(x.decode('utf-8'))
)

您可以在value_serializer和中执行null检查

lambda x: None if not x else loads(x.decode('utf-8')

整个修改代码:

consumer = KafkaConsumer(
topic,
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
value_serializer=lambda x: None if not x else loads(x.decode('utf-8'))
)

最新更新