我已经写了一个我认为是Kafka Producer在Java中的基本实现。
package com.mycompany.kafkaexample.KafkaExample;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class Main {
public static void main(String[] args) {
System.out.println("Hello Kafka");
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:29092");
properties.setProperty("default.topic", "hello-world-topic");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
String topicName = "hello-world-topic";
String key = "";
String message = "Hello Kafka! (Message String)";
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, key, message);
producer.send(producerRecord);
producer.close();
}
}
请注意:我不完全确定这是否会工作,因为我还没有能够编译它。
原因如下:
The type org.slf4j.Logger cannot be resolved. It is indirectly referenced from required type org.apache.kafka.clients.producer.KafkaProducer
这向我暗示Kafka中的某些东西引用了slf4j
-但为什么?
- 我没有包含任何与
slf4j
有关的代码,至少我没有故意这样做。 - 那么为什么我得到关于它的错误消息?
- 如果Kafka内部的东西引用了
slf4j
,那么为什么相关的Kafka代码不能自己排序import ...slf4j
?(为什么我必须做些什么?) - 是什么导致这个错误?
我正在使用Maven构建这个项目。
我的pom.xml
:(也请注意,我对这做什么了解很少。它主要是为我生成的,尽管Kafka依赖是我添加的。)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mycompany.kafkaexample</groupId>
<artifactId>KafkaExample</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>KafkaExample</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>3.4.0</version>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
旁注:我不建议同时学习Kafka、Java和Maven。Kafka客户端存在许多不同的(更简单的)语言。
首先,用kafka-clients
代替kafka_2.13
。前者是代理服务器库,而不是生产者/消费者API。
slf4j-api
,但它只在runtime
Maven范围内,这意味着你做你需要自己提供。(本指南使用Gradle,但同样的概念适用)这只是一个外观库,需要您添加实现日志记录器,如
reload4j
,logback
或log4j2
(推荐)。
因此,需要将其中一个添加到POM中。例如,请参考log4j2文档
https://logging.apache.org/log4j/2.x/manual/migration.html migrating-from-log4j-1-x-to-2-x
添加后,您还需要创建一个src/main/resources/log4j2.xml
文件,例如
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="DEBUG">
<AppenderRef ref="Console"/>
</Root>
<Logger name="org.apache.kafka" level="INFO"/>
</Loggers>
</Configuration>
并且,如果这有效,您应该能够运行代码并得到下一个问题…
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Invalid value null for configuration key.serializer: must be non-null.
at org.apache.kafka.clients.producer.ProducerConfig.appendSerializerToConfig(ProducerConfig.java:579)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:290)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:317)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:302)
需要添加这两行
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
理想情况下,您可以用slf4j记录器替换System.out
的使用。
private static final Logger LOG = LoggerFactory.getLogger(Main.class);
// LOG.info("Hello world")