我最近通过从Plc4x的Github页面克隆源代码将Plc4x Kafka Connect插件从0.5.0版本更新到0.8.0,并使用Maven构建它,完全按照自述文件中的规定。构建源代码后,我收到了一个超级 JAR,其中包含 Kafka Connect 插件的所有必要库和依赖项。 然后,我创建一个连接器配置文件,类似于 Plc4x 的 Github 页面中的示例:
{
"name":"plc-source-test",
"config": {
"connector.class":"org.apache.plc4x.kafka.Plc4xSourceConnector",
"tasks.max":"1",
"file":"test.sink.txt",
"topics":"connect-test"
}
}
然后,我将配置推送到 REST 接口:
curl -X POST -H "Content-Type: application/json" --data config.json http://localhost:8083/connectors
REST 接口现在响应如下:
{"error_code":500,"message":null}
这就是我卡住的地方。我相信此错误与config.json文件中的以下行有关:
"connector.class":"org.apache.plc4x.kafka.Plc4xSourceConnector"
因为当我使用不同的连接器类时,例如:
"connector.class":"FileStreamSinkConnector"
一切正常,我可以成功地将我的连接器配置推送到 REST 接口。在 Plc4x 的 0.5.0 版本中也没有出现此问题。 我已经解压缩了包含所有依赖项的 uber JAR,并验证了 Plc4xSourceConnector 类确实存在。我不知道我做错了什么,因为我正在按照他们的 Github 页面中概述的步骤来构建和配置所有内容。 还有其他人遇到过这个问题吗?
我设法找到了我的问题的解决方案。我使用的连接器配置文件是为 PLC4x v.0.4.0 设计的,并且在 v.0.8.0 发布之前已经更新。我在PLC4X Github存储库中找到了一个示例配置文件,并将其用作布局。从他们的网站上,我发现该字段:
"sources.machineX.connectionString"
必须以特殊方式格式化。我将连接器配置文件更新为:
{"name": "plc-source-test",
"config": {
"connector.class": "org.apache.plc4x.kafka.Plc4xSourceConnector",
"default-topic": "test-topic",
"tasks.max": "1",
"sources": "machineA",
"sources.machineA.connectionString": "s7:<PLC_IP>?remote-rack=0&remote-slot=0",
"sources.machineA.jobReferences": "jobA",
"sources.machineA.jobReferences.jobA": "job-topic",
"jobs": "jobA",
"jobs.jobA.interval": "500",
"jobs.jobA.fields": "fieldA",
"jobs.jobA.fields.fieldA": "%DB1.DBD1:REAL"
}
并让一切正常!
使用 Plc4xSourceConnector 时,必须在连接器配置中指定所需的键:值:
"default-topic" //Required. The default name for the Kafka topic
"tasks.max" //Not quite sure what this does, but it is in the example config so lets use it
"sources" //Required. It must be a comma separated list of your source
"sources.machineA.connectionString" //Required. The connection string to the machine/PLC that you want to talk to
"sources.machineA.jobReferences" //Required. A comma separated list of all the jobs that you wish to create for this machine
"sources.machineA.jobReferences.jobA" //Required. The Kafka topic name for data produced by jobA
"jobs" //Required. A comma separated list of all jobs you wish to create
"jobs.jobA.interval" //Not sure if this is required. Determines the polling rate of your created job
"jobs.jobA.fields" //Required. A comma separated list of the fields belonging to this job. A field is a machine/PLC register containing a value
"jobs.jobA.fields.fieldA" //Required. The address to the resource on your machine/PLC
发布错误的连接器配置时显示的错误消息非常模糊,通常只是一些 NullPointerException。因此,我花了一些时间分析PLC4X的源代码,尤其是这个类,以确定需要哪些字段。