卡夫卡:消息不是以魔术字节开头的



我尝试使用 KSQL 从已处理的主题中获取数据。但是,它不起作用。

我使用 KSQL 设置了一个名为 api_table 的表。以下是我的表格的详细信息。

ksql> show topics;
Kafka Topic   | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups 
-------------------------------------------------------------------------------------------
_schemas      | false      | 1          | 1                  | 0         | 0              
api_log       | true       | 1          | 1                  | 1         | 1              
API_STREAM    | false      | 1          | 1                  | 0         | 0              
API_STREAM_KEY| true       | 1          | 1                  | 1         | 1              
API_TABLE     | true       | 1          | 1                  | 0         | 0              
mysql-config  | false      | 1          | 1                  | 0         | 0              
mysql-offsets | false      | 25         | 1                  | 0         | 0              
mysql-status  | false      | 5          | 1                  | 0         | 0              
-------------------------------------------------------------------------------------------

这是我的表格格式。

ksql> describe extended bv_table;
Name                 : API_TABLE
Type                 : TABLE
Key field            : 
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : AVRO
Kafka topic          : API_TABLE (partitions: 1, replication: 1)
Field      | Type                      
----------------------------------------
ROWTIME    | BIGINT           (system) 
ROWKEY     | VARCHAR(STRING)  (system) 
KSQL_COL_0 | BIGINT                    
COUNT      | BIGINT                    
----------------------------------------
Queries that write into this TABLE
-----------------------------------
CTAS_API_TABLE_2 : CREATE TABLE API_TABLE WITH (REPLICAS = 1, PARTITIONS = 1, KAFKA_TOPIC = 'API_TABLE') AS SELECT
WINDOWSTART() "KSQL_COL_0"
, COUNT(*) "COUNT"
FROM API_STREAM_KEY API_STREAM_KEY
WINDOW TUMBLING ( SIZE 5 MINUTES ) 
GROUP BY API_STREAM_KEY.METRIC;
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics
------------------------
messages-per-sec:      0.10   total-messages:       249     last-message: 2019-08-13T07:07:39.325Z
(Statistics of the local KSQL server interaction with the Kafka topic API_TABLE)

一切正常,我什至可以打印出消息。

但是,如果我尝试使用 python 来使用消息。

from confluent_kafka import KafkaError
import io
from confluent_kafka import Consumer
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError

consumer = AvroConsumer({
'bootstrap.servers': 'localhost:9021',
'schema.registry.url': 'http://localhost:8081',
'group.id': 'abcd'
})
consumer.subscribe(['API_TABLE'])
while True:
try:
msg = consumer.poll(10)
except SerializerError as e:
print("Message deserialization failed for {}: {}".format(msg, e))
break
if msg is None:
continue
if msg.error():
print("AvroConsumer error: {}".format(msg.error()))
continue
print(msg.value())
consumer.close()

它显示此错误。为什么????

Traceback (most recent call last):
File "/Users/ylee/PycharmProjects/Bokeh/env/lib/python3.7/site-packages/confluent_kafka/avro/__init__.py", line 149, in poll
decoded_key = self._serializer.decode_message(message.key(), is_key=True)
File "/Users/ylee/PycharmProjects/Bokeh/env/lib/python3.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 225, in decode_message
raise SerializerError("message does not start with magic byte")
confluent_kafka.avro.serializer.SerializerError: message does not start with magic byte
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/ylee/PycharmProjects/Bokeh/consumer.py", line 18, in <module>
msg = consumer.poll(10)
File "/Users/ylee/PycharmProjects/Bokeh/env/lib/python3.7/site-packages/confluent_kafka/avro/__init__.py", line 156, in poll
e))
confluent_kafka.avro.serializer.SerializerError: Message deserialization failed for message at API_TABLE [0] offset 110: message does not start with magic byte

我试图自己解决它。我修改了一点以适应我的需求。这是一个解决方法。

import io
import struct
from avro.io import BinaryDecoder, DatumReader
from confluent_kafka import Consumer
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer import SerializerError
# Please adjust your server and url
# KAFKA BROKER URL
consumer = Consumer({
'bootstrap.servers': 'localhost:9021',
'group.id': 'abcd'
})
# SCHEMA URL 
register_client = CachedSchemaRegistryClient(url="http://localhost:8081")
consumer.subscribe(['YOUR TOPIC'])
MAGIC_BYTES = 0

def unpack(payload):
magic, schema_id = struct.unpack('>bi', payload[:5])
# Get Schema registry
# Avro value format
if magic == MAGIC_BYTES:
schema = register_client.get_by_id(schema_id)
reader = DatumReader(schema)
output = BinaryDecoder(io.BytesIO(payload[5:]))
abc = reader.read(output)
return abc
# String key
else:
# If KSQL payload, exclude timestamp which is inside the key. 
# payload[:-8].decode()
return payload.decode()

def get_data():
while True:
try:
msg = consumer.poll(10)
except SerializerError as e:
print("Message deserialization failed for {}: {}".format(msg, e))
raise SerializerError
if msg.error():
print("AvroConsumer error: {}".format(msg.error()))
return
key, value = unpack(msg.key()), unpack(msg.value())
print(key, value)
if __name__ == '__main__':
get_data()

有关它为什么发生的更多详细信息,可以在我的博客上阅读

问题是,虽然 KSQL 将写入为 Avro,但STRING

这个问题看起来是一样的,有一个建议的 PR 要修复:https://github.com/confluentinc/confluent-kafka-python/pull/650

Confluent AVRO 不支持字节类型?因此,某些假定它的库无法读取该格式。

https://github.com/confluentinc/ksql/issues/1282,您可以使用 Confluent 库来编码/解码 https://github.com/confluentinc/confluent-kafka-python

解决问题

最新更新