我正在尝试使用kafka-jdbc-connect API编写一个独立的Java程序,将数据从oracle-table流式传输到kafka主题。
使用的API:我目前正在尝试使用Kafka Connectors,准确地说是JdbcSourceConnector类。
约束:使用Confluent Java API,而不是通过CLI或执行提供的shell脚本来执行此操作。
我所做的:创建一个 JdbcSourceConnector .java 类的实例,并通过提供 Properties 对象作为参数来调用该类的 start(Properties) 方法。此属性对象具有数据库连接属性、表白名单属性、主题前缀等。
启动线程后,我无法从"主题前缀表名"主题中读取数据。我不确定如何将Kafka Broker详细信息传递给JdbcSourceConnector。在 JdbcSourceConnector 启动线程上调用 start() 方法,但不执行任何操作。 是否有一个简单的Java API教程页面/示例代码我可以参考,因为我看到的所有示例都使用CLI/shell脚本?
任何帮助不胜感激
法典:
public static void main(String[] args) {
Map<String, String> jdbcConnectorConfig = new HashMap<String, String>();
jdbcConnectorConfig.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "<DATABASE_URL>");
jdbcConnectorConfig.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "<DATABASE_USER>");
jdbcConnectorConfig.put(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG, "<DATABASE_PASSWORD>");
jdbcConnectorConfig.put(JdbcSourceConnectorConfig.POLL_INTERVAL_MS_CONFIG, "300000");
jdbcConnectorConfig.put(JdbcSourceConnectorConfig.BATCH_MAX_ROWS_CONFIG, "10");
jdbcConnectorConfig.put(JdbcSourceConnectorConfig.MODE_CONFIG, "timestamp");
jdbcConnectorConfig.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "<TABLE_NAME>");
jdbcConnectorConfig.put(JdbcSourceConnectorConfig.TIMESTAMP_COLUMN_NAME_CONFIG, "<TABLE_COLUMN_NAME>");
jdbcConnectorConfig.put(JdbcSourceConnectorConfig.TOPIC_PREFIX_CONFIG, "test-oracle-jdbc-");
JdbcSourceConnector jdbcSourceConnector = new JdbcSourceConnector ();
jdbcSourceConnector.start(jdbcConnectorConfig);
}
假设您正在尝试在独立模式下执行此操作。
在应用程序运行配置中,主类应该是"org.apache.kafka.connect.cli.ConnectStandalone",并且需要传递两个属性文件作为程序参数。
您还应该使用"org.apache.kafka.connect.source.SourceConnector"类扩展"your-custom-JdbcSourceConnector"类。
主类:org.apache.kafka.connect.cli.ConnectStandalone
程序参数: .\path-to-config\connect-standalone.conf .\path-to-config\connetcor.properties
"connect-standalone.conf">文件将包含所有Kafka代理详细信息。
// Example connect-standalone.conf
bootstrap.servers=<comma seperated brokers list here>
group.id=some_loca_group_id
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=connect.offset
offset.flush.interval.ms=100
offset.flush.timeout.ms=180000
buffer.memory=67108864
batch.size=128000
producers.acks=1
"连接器.属性"文件将包含创建和启动连接器所需的所有详细信息
// Example connector.properties
name=some-local-connector-name
connector.class=your-custom-JdbcSourceConnector
tasks.max=3
topic=output-topic
fetchsize=10000
更多信息在这里 : https://docs.confluent.io/current/connect/devguide.html#connector-example