我已经根据本指南使用 docker 设置了我的环境。
在 kafka-console-producer 上,我将发送这一行:
Hazriq|27|Undegrad|UNITEN
我希望像这样将这些数据引入 Kusto:
+--------+-----+----------------+------------+
| Name | Age | EducationLevel | University |
+--------+-----+----------------+------------+
| Hazriq | 27 | Undegrad | UNITEN |
+--------+-----+----------------+------------+
Kusto 可以使用映射(我仍在尝试理解(来处理这个问题,还是应该由 Kafka 来满足?
尝试@daniel建议:
.create table ParsedTable (name: string, age: int, educationLevel: string, univ:string)
.create table ParsedTable ingestion csv mapping 'ParsedTableMapping' '[{ "Name" : "name", "Ordinal" : 0},{ "Name" : "age", "Ordinal" : 1 },{ "Name" : "educationLevel", "Ordinal" : 2},{ "Name" : "univ", "Ordinal" : 3}]'
kusto.tables.topics_mapping=[{'topic': 'kafkatopiclugiaparser','db': 'kusto-test', 'table': 'ParsedTable','format': 'psv', 'mapping':'ParsedTableMapping'}]
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
但得到这个:
+----------------------------+-----+----------------+------+
| Name | Age | EducationLevel | Univ |
+----------------------------+-----+----------------+------+
| Hazriq|27|Undergrad|UNITEN | | | |
+----------------------------+-----+----------------+------+
目前,连接器在数据传入时传递数据(客户端上不对其进行操作(,任何分析都留给 Kusto。
因此,kusto 支持psv
格式,通过将格式设置为 psv
并提供映射引用,应该可以实现。
按照所述添加插件时,您应该能够像这样设置它:
kusto.tables.topics_mapping=[{'topic': 'testing1','db': 'testDB', 'table': 'KafkaTest','format': 'psv', 'mapping':'KafkaMapping'}]
映射可以在 Kusto 中定义,如 Kusto 文档中定义
支持使用 psv
格式显示的数据引入(见下文( - 这可能只是调试为什么客户端调用基础命令没有产生预期结果的问题。 如果您可以共享完整的流程和代码(包括参数(,可能会有所帮助。
.create table ParsedTable (name: string, age: int, educationLevel: string, univ:string)
.ingest inline into table ParsedTable with(format=psv) <| Hazriq|27|Undegrad|UNITEN
ParsedTable:
| name | age | educationLevel | univ |
|--------|-----|----------------|--------|
| Hazriq | 27 | Undegrad | UNITEN |