我用Java编写了一个非常简单的代码来读取文件并将这些记录发送到Kafka主题。一切都按预期工作。但是,我想使用Kafka文件连接器,而不是写入文件。我过去使用REST proxy(curl(命令这样做过,但从未在java中尝试过。我需要一些帮助来做到这一点。
我可以看到Maven存储库中有Kafka-connect api,我可以将其添加到我的pom.xml文件中。我的下一步应该把它集成到我的 java 代码中。
我的代码在没有卡夫卡连接的情况下读取文件:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Properties;
import java.util.Scanner;
public class SimpleProducer_ReadFile {
public static void main(String[] args) throws FileNotFoundException {
// System.out.println("Hello Kafka ");
// setting properties
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// create the producer
KafkaProducer<String, String> produce = new KafkaProducer<String, String>(props);
//reading file
File read = new File("C:\Users\Desktop\TestFile.txt");
Scanner scan = new Scanner(read);
while(scan.hasNextLine()){
String data = scan.nextLine();
System.out.println(data);
//create the producer record
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test-topic",data);
//send data
produce.send(record);
}
//flush and close
produce.flush();
produce.close();
}
}
您所需要的只是Kafka Connect和FileStreamSource
连接器,该连接器从文件中读取数据并将其发送到Kafka。
在您的情况下,配置应该是
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/path/to/file.txt
topic=test-topic
现在,等效的curl
命令将是:
curl -X POST
-H "Content-Type: application/json"
--data '{"name": "local-file-source", "config": {"connector.class":"FileStreamSource", "tasks.max":"1", "file":"path/to/file.txt", "topics":"test-topic" }}' http://localhost:8083/connectors
如果要以编程方式执行此操作,只需发送上述POST
请求即可。