在 java 中使用 kafka 文件连接器读取文件



我用Java编写了一个非常简单的代码来读取文件并将这些记录发送到Kafka主题。一切都按预期工作。但是,我想使用Kafka文件连接器,而不是写入文件。我过去使用REST proxy(curl(命令这样做过,但从未在java中尝试过。我需要一些帮助来做到这一点。

我可以看到Maven存储库中有Kafka-connect api,我可以将其添加到我的pom.xml文件中。我的下一步应该把它集成到我的 java 代码中。

我的代码在没有卡夫卡连接的情况下读取文件

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Properties;
import java.util.Scanner;
public class SimpleProducer_ReadFile {
public static void main(String[] args) throws FileNotFoundException {
// System.out.println("Hello Kafka ");
// setting properties
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// create the producer
KafkaProducer<String, String> produce = new KafkaProducer<String, String>(props);

//reading file
File read = new File("C:\Users\Desktop\TestFile.txt");
Scanner scan = new Scanner(read);
while(scan.hasNextLine()){
String data = scan.nextLine();
System.out.println(data);
//create the producer record
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test-topic",data);
//send data
produce.send(record);
}
//flush and close
produce.flush();
produce.close();
}
}

您所需要的只是Kafka Connect和FileStreamSource连接器,该连接器从文件中读取数据并将其发送到Kafka。


在您的情况下,配置应该是

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/path/to/file.txt
topic=test-topic

现在,等效的curl命令将是:

curl -X POST 
-H "Content-Type: application/json" 
--data '{"name": "local-file-source", "config": {"connector.class":"FileStreamSource", "tasks.max":"1", "file":"path/to/file.txt", "topics":"test-topic" }}' http://localhost:8083/connectors

如果要以编程方式执行此操作,只需发送上述POST请求即可。

相关内容

  • 没有找到相关文章

最新更新