Kafka Spark流媒体消费者将不会收到Kafka控制台生产商的任何消息



我正在尝试集成Spark和Kafka,以消费Kafka的消息。我也有生产者代码在"临时"主题上发送消息。另外,我正在使用Kafka的控制台生产商来制作"临时"主题上的消息。

我已经创建了以下代码,以消耗来自同一"临时"主题的消息,但也不会收到单个消息。

程序:

import java.util.Arrays;
import java.util.Map;
import java.util.HashMap;
import static org.apache.commons.lang3.StringUtils.SPACE;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import scala.collection.immutable.ListSet;
import scala.collection.immutable.Set;
public class ConsumerDemo {
    public void main() {
        String zkGroup = "localhost:2181";
        String group = "test";
        String[] topics = {"temp"};
        int numThreads = 1;
        SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount").setMaster("local[4]").set("spark.ui.port‌​", "7077").set("spark.executor.memory", "1g");
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
        Map<String, Integer> topicMap = new HashMap<>();
        for (String topic : topics) {
            topicMap.put(topic, numThreads);
        }
        System.out.println("topics : " + Arrays.toString(topics));
        JavaPairReceiverInputDStream<String, String> messages
                = KafkaUtils.createStream(jssc, zkGroup, group, topicMap);
        messages.print();
        JavaDStream<String> lines = messages.map(Tuple2::_2);
        //lines.print();
        JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
                .reduceByKey((i1, i2) -> i1 + i2);
        //wordCounts.print();
        jssc.start();
        jssc.awaitTermination();
    }
    public static void main(String[] args) {
        System.out.println("Started...");
        new ConsumerDemo().main();
        System.out.println("Ended...");
    }
}

我在pom.xml文件中添加了以下依赖项:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.9.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.11.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.2.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>0.9.0-incubating</version>
        <type>jar</type>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.6.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.6.3</version>
        <type>jar</type>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
    <dependency>
        <groupId>org.anarres.lzo</groupId>
        <artifactId>lzo-core</artifactId>
        <version>1.0.5</version>
        <type>jar</type>
    </dependency>
    <dependency> 
        <groupId>com.fasterxml.jackson.core</groupId> 
        <artifactId>jackson-databind</artifactId> 
        <version>2.8.2</version> 
    </dependency> 
    <dependency> 
        <groupId>com.fasterxml.jackson.module</groupId> 
        <artifactId>jackson-module-scala_2.10</artifactId> 
        <version>2.8.2</version> 
    </dependency>
    <dependency>
        <groupId>com.msiops.footing</groupId>
        <artifactId>footing-tuple</artifactId>
        <version>0.2</version>
    </dependency>

我是否缺少某些依赖性或问题在代码中?为什么此代码不会收到任何消息?

您没有调用具有与Kafka连接和消费消息的代码的方法。在公共静态void main()中写入该逻辑,或调用您编写此逻辑的方法。

使用KAFKA消费者时,尤其是当我们在开发环境中测试和调试时,生产者可能不会连续向Kafka推动消息。在这种情况下,我们需要照顾这个KAFKA消费者参数auto.offset.reset,该参数确定是否仅读取消费者开始运行后写入主题的新消息?或从主题的开头阅读

这是卡夫卡文档中给出的官方说明:

auto.offset.Reset
当Kafka中没有初始偏移或服务器上不再存在当前偏移时该怎么办 (例如,由于该数据已删除):

  1. 最早:自动将偏移重置为最早的偏移
  2. 最新:自动将偏移量重置为最新偏移量
  3. 无:如果没有为消费者的组找到以前的偏移,请向消费者提供异常
  4. 其他任何:向消费者投掷例外。

关于如何使用Kafkaparams创建Kafkadstreams的示例代码片段:

    Map<String,String> kafkaParams = new HashMap<>();
    kafkaParams.put("zookeeper.connect", "localhost:2181");
    kafkaParams.put("group.id", "test02");  //While you are testing the codein develeopment system, change this groupid each time you run the consumer
    kafkaParams.put("auto.offset.reset", "earliest");
    kafkaParams.put("metadata.broker.list", "localhost:9092");
    kafkaParams.put("bootstrap.servers", "localhost:9092");
    Map<String, Integer> topics = new HashMap<String, Integer>();
    topics.put("temp", 1);
    StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER();
    JavaPairDStream<String, String> messages =
        KafkaUtils.createStream(jssc,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParams,
                topics,
                storageLevel)    
        ;
    messages.print();

最新更新