使用 Kafka Kusto Sink 从 Kafka 解析记录 (PCF)



我已经根据本指南使用 docker 设置了我的环境。

在 kafka-console-producer 上,我将发送这一行:

Hazriq|27|Undegrad|UNITEN

我希望像这样将这些数据引入 Kusto:

+--------+-----+----------------+------------+
| Name   | Age | EducationLevel | University |
+--------+-----+----------------+------------+
| Hazriq | 27  | Undegrad       | UNITEN     |
+--------+-----+----------------+------------+

Kusto 可以使用映射(我仍在尝试理解(来处理这个问题,还是应该由 Kafka 来满足?


尝试@daniel建议:

.create table ParsedTable (name: string, age: int, educationLevel: string, univ:string)
.create table ParsedTable ingestion csv mapping 'ParsedTableMapping' '[{ "Name" : "name", "Ordinal" : 0},{ "Name" : "age", "Ordinal" : 1 },{ "Name" : "educationLevel", "Ordinal" : 2},{ "Name" : "univ", "Ordinal" : 3}]'
kusto.tables.topics_mapping=[{'topic': 'kafkatopiclugiaparser','db': 'kusto-test', 'table': 'ParsedTable','format': 'psv', 'mapping':'ParsedTableMapping'}]
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter

但得到这个:

+----------------------------+-----+----------------+------+
| Name                       | Age | EducationLevel | Univ |
+----------------------------+-----+----------------+------+
| Hazriq|27|Undergrad|UNITEN |     |                |      |
+----------------------------+-----+----------------+------+

目前,连接器在数据传入时传递数据(客户端上不对其进行操作(,任何分析都留给 Kusto。

因此,kusto 支持psv格式,通过将格式设置为 psv 并提供映射引用,应该可以实现。

按照所述添加插件时,您应该能够像这样设置它:

kusto.tables.topics_mapping=[{'topic': 'testing1','db': 'testDB', 'table': 'KafkaTest','format': 'psv', 'mapping':'KafkaMapping'}]

映射可以在 Kusto 中定义,如 Kusto 文档中定义

的那样

支持使用 psv 格式显示的数据引入(见下文( - 这可能只是调试为什么客户端调用基础命令没有产生预期结果的问题。 如果您可以共享完整的流程和代码(包括参数(,可能会有所帮助。

.create table ParsedTable (name: string, age: int, educationLevel: string, univ:string)
.ingest inline into table ParsedTable with(format=psv) <| Hazriq|27|Undegrad|UNITEN

ParsedTable:
| name   | age | educationLevel | univ   |
|--------|-----|----------------|--------|
| Hazriq | 27  | Undegrad       | UNITEN |

最新更新