我想知道在融合中心使用数据生成连接器的过程,这样我就可以为测试主题生成随机数据。
我试着用一些设置连接datagen连接器,但总是失败。
更新:我尝试连接在主题中创建的我自己的架构,但它没有拾取,datagen连接器也无法启动。
以下是datagen连接的配置:
{
"value.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schemas.enable": "false",
"name": "datagen-protobuf-userprofile",
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"transforms": [
"SetSchemaMetadata"
],
"transforms.SetSchemaMetadata.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"transforms.SetSchemaMetadata.schema.name": "value_User_PROFILE",
"kafka.topic": "User_PROFILE",
"max.interval": "1000",
"iterations": "10000000",
"schema.filename": "value_User_PROFILE",
"schema.keyfield": "userid",
"quickstart": "value_User_PROFILE"
}
新主题中定义的架构:
{
"doc": "Sample schema to help you get started.",
"fields": [
{
"doc": "The int type is a 32-bit signed integer.",
"name": "userid",
"type": "int"
},
{
"doc": "The string is a unicode character sequence.",
"name": "firstname",
"type": "string"
},
{
"doc": "The string is a unicode character sequence.",
"name": "lastname",
"type": "string"
},
{
"doc": "The string is a unicode character sequence.",
"name": "countrycode",
"type": "string"
},
{
"doc": "this si double which store floting value as well",
"name": "rating",
"type": "double"
}
],
"name": "value_User_PROFILE",
"namespace": "com.mycorp.mynamespace",
"type": "record"
}
您可以尝试以下步骤:
- 将datagen连接器目录放在配置的插件路径中——如果是分布式工作程序,则需要为所有工作程序执行此操作
- 重新启动所有工作人员
- 连接器应出现在控制中心的连接器列表页面上(点击
connect-ip:8083/connector-plugins
以确认连接器是否正确加载( - 填写连接器属性——您可以参考以下repo获取示例配置值:https://github.com/confluentinc/kafka-connect-datagen/tree/master/config(您也可以在控制中心上传属性文件(
- 测试并继续,稍后启动连接器