简单的Kafka Producer在Java中的实现依赖于SLF4J



我已经写了一个我认为是Kafka Producer在Java中的基本实现。

package com.mycompany.kafkaexample.KafkaExample;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class Main {
public static void main(String[] args) {
System.out.println("Hello Kafka");

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:29092");
properties.setProperty("default.topic", "hello-world-topic");


KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

String topicName = "hello-world-topic";
String key = "";
String message = "Hello Kafka! (Message String)";

ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, key, message);

producer.send(producerRecord);

producer.close();
}
}

请注意:我不完全确定这是否会工作,因为我还没有能够编译它。

原因如下:

The type org.slf4j.Logger cannot be resolved. It is indirectly referenced from required type org.apache.kafka.clients.producer.KafkaProducer

这向我暗示Kafka中的某些东西引用了slf4j-但为什么?

  • 我没有包含任何与slf4j有关的代码,至少我没有故意这样做。
  • 那么为什么我得到关于它的错误消息?
  • 如果Kafka内部的东西引用了slf4j,那么为什么相关的Kafka代码不能自己排序import ...slf4j?(为什么我必须做些什么?)
  • 是什么导致这个错误?

我正在使用Maven构建这个项目。

我的pom.xml:(也请注意,我对这做什么了解很少。它主要是为我生成的,尽管Kafka依赖是我添加的。)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mycompany.kafkaexample</groupId>
<artifactId>KafkaExample</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>KafkaExample</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>3.4.0</version>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>

旁注:我不建议同时学习Kafka、Java和Maven。Kafka客户端存在许多不同的(更简单的)语言。

首先,用kafka-clients代替kafka_2.13。前者是代理服务器库,而不是生产者/消费者API。

关于这个问题,是的,Kafka客户端使用slf4j-api,但它只在runtimeMaven范围内,这意味着你你需要自己提供。(本指南使用Gradle,但同样的概念适用)
这只是一个外观库,需要您添加实现日志记录器,如reload4j,logbacklog4j2(推荐)。

因此,需要将其中一个添加到POM中。例如,请参考log4j2文档

https://logging.apache.org/log4j/2.x/manual/migration.html migrating-from-log4j-1-x-to-2-x

添加后,您还需要创建一个src/main/resources/log4j2.xml文件,例如
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="DEBUG">
<AppenderRef ref="Console"/>
</Root>
<Logger name="org.apache.kafka" level="INFO"/>
</Loggers>
</Configuration>

并且,如果这有效,您应该能够运行代码并得到下一个问题…

Exception in thread "main" org.apache.kafka.common.config.ConfigException: Invalid value null for configuration key.serializer: must be non-null.
at org.apache.kafka.clients.producer.ProducerConfig.appendSerializerToConfig(ProducerConfig.java:579)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:290)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:317)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:302)

需要添加这两行

properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

理想情况下,您可以用slf4j记录器替换System.out的使用。

private static final Logger LOG = LoggerFactory.getLogger(Main.class);
// LOG.info("Hello world")

相关内容

  • 没有找到相关文章

最新更新