当源代码为Kafka-Avro格式时,如何在Memsql中创建Transform



我能够将数据从Kafka推送到Memsql。

我正在尝试使用Transform进行推送。我用Python创建了Kafka Consumer,它使用Kafka Topic中的数据并转换为Json格式。

我不知道如何在Memsql中将其用作Transform。

from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
import sys
c = AvroConsumer({
'bootstrap.servers': 'X.Y.Z.W:9092',
'group.id': 'groupid1112',
'schema.registry.url': 'http://X.Y.Z.W:8081',
'default.topic.config': {
'auto.offset.reset': 'smallest'
}
})
c.subscribe(['test_topic'])
count =0
while True:
try:
msg = c.poll(10)
except SerializerError as e:
print("Message deserialization failed for {}: {}".format(msg, e))
break
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(msg.error())
break
valueList = list(msg.value().values())
print(valueList)
c.close()

它优先考虑

[1518776144187, 1, 2, 103,'asas',asas'eer',None]

检查这些文档https://docs.memsql.com/memsql-pipelines/v6.0/transforms/

请继续关注即将发布的MemSQL中对本机avro的支持。

你会想做下面这样的事情,但我会画出avro的具体细节,因为我根本不知道avro库。

```

def input_stream():
"""
Consume STDIN and yield each record that is received from MemSQL
"""
while True:
byte_len = sys.stdin.read(8)
if len(byte_len) == 8:
byte_len = struct.unpack("L", byte_len)[0]
result = sys.stdin.read(byte_len)
yield result
else:
assert len(byte_len) == 0, byte_len
return
avro_context = WhateverYouNeed() # maybe connect to schema registry here if you need to
for msg in input_stream():
object = DeserializeAvro(avro_context, msg) # this is your code
sys.stdout.write(SerializeToTSV(object)) # also your code

```

使用模式注册表应该很好,但您不必担心在转换脚本中读取kafka的细节。我可以试着在周一给你一个更详细的脚本,但这是如何构建代码的。

最新更新