spark streaming + kafka - spark session API



感谢您使用SPARK 2.0.2运行火花流程序的帮助。

使用"java.lang.ClassNotFoundException: Failed to find data source: kafka"的运行错误。修改的POM文件如下。

spark正在创建,但是当调用来自Kafka的负载时出错。

创建的火花会话:

 val spark = SparkSession
            .builder()
            .master(master)
            .appName("Apache Log Analyzer Streaming from Kafka")
            .config("hive.metastore.warehouse.dir", hiveWarehouse)
            .config("fs.defaultFS", hdfs_FS)
            .enableHiveSupport()
            .getOrCreate()

创建kafka流:

    val logLinesDStream = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:2181")
      .option("subscribe", topics)
      .load()

错误消息:

Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark-packages.org

pom.xml:

    <scala.version>2.10.4</scala.version>
        <scala.compat.version>2.10</scala.compat.version>
        <spark.version>2.0.2</spark.version>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>
       <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-streaming-kafka-0-10_2.10</artifactId>
       <version>${spark.version}</version>
       </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.5</version>
        </dependency>
</dependencies>

当您实际需要2.0.2时,您是引用Spark的V1.5.1引用Kafka的引用。您还需要使用sql-kafka进行结构化流:

<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.10</artifactId>
<version>2.0.2</version>

请注意,SparkSession API仅对Kafka> = 0.10

支持

我面临同一问题。我已将Spark版本从2.0.0升级到2.2.0,并添加了Spark-SQL-KAFKA依赖项。它对我很好。请找到依赖项。

<spark.version>2.2.0</spark.version>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>${spark.version}</version>
    <scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.10.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
    <version>${spark.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.0</version>
</dependency>

通过更改pom.xml

将其修复
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

最新更新