Spark+kafka集成中的问题



我正在通过DataSet读取一个CSV文件,然后将该文件发送给Kafka。spark-submit作业运行良好,但当程序将文件发送到Kafka时,它会给我一个异常。以下是例外-

FileStreamSource位于org.apache.spark.sql.expension.stream.StreamExecution.org.apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297(网址:org.apache.spark.sql.exexecution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193(导致原因:org.apache.spark.sql.AnalysisException:找不到必需的属性"value";网址:org.apache.spark.sql.kafka010。KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:72(网址:org.apache.spark.sql.kafka010。KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:72(在scala。Option.getOrElse(Option.scala:121(网址:org.apache.spark.sql.kafka010。KafkaWriter$.validateQuery(KafkaWriter.scala:71(网址:org.apache.spark.sql.kafka010。KafkaWriter$.write(KafkaWriter.scala:87(网址:org.apache.spark.sql.kafka010.KafkaSink.addBatch(KafkaSink.scala:38(

下面是我的代码:

System.setProperty("hadoop.home.dir", "C:\hadoop-2.7.3\");
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
SparkSession spark = SparkSession
.builder()
.config("spark.sql.session.timeZone", "UTC")
.config("spark.sql.streaming.checkpointLocation", "D:\Workspac\checkpoint")
.appName("StructuredStreamingAverage")
.master("local")
.getOrCreate();

StructType userSchema = new StructType().add("startdate", "string").add("accountname", "string").add("eventdate", "string")/*.add("u_lastlogin", "string")*//*.add("u_firstName", "string")*/;
Dataset<Row> dataset = spark.
readStream()
.option("header",true)
.option("sep",",")
.schema(userSchema)
.csv("D:\Workspac\sophos");
Dataset<Row> df_DateConverted = dataset.withColumn("eventdate", from_unixtime(col("eventdate").divide(1000)).cast(DataTypes.TimestampType));

if(df_DateConverted.isStreaming()) {
try {
df_DateConverted
.select("CAST(key AS STRING) AS key", "to_json(struct(*)) AS value")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "rawEventTopic")
.start().awaitTermination();
} catch (StreamingQueryException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

作为例外cleary说

原因:org.apache.spark.sql.AnalysisException:找不到必需的属性'value'

所以可能是.select("CAST(key AS STRING) AS key", "to_json(struct(*)) AS value")代码的问题,但我不知道我应该在这里写什么。谢谢

我试过

df_DateConverted
.select(col("key").cast("string"), from_json(col("value").cast("string"),userSchema))
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "rawEventTopic")
.start().awaitTermination();

但面临以下例外-

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`key`' given input columns: [startdate, accountname, eventdate];;
'Project [unresolvedalias(cast('key as string), None), jsontostructs(StructField(startdate,StringType,true), StructField(accountname,StringType,true), StructField(eventdate,StringType,true), cast('value as string), Some(UTC)) AS jsontostructs(CAST(value AS STRING))#10]
+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5f3ddc86,csv,List(),Some(StructType(StructField(startdate,StringType,true), StructField(accountname,StringType,true), StructField(eventdate,StringType,true))),List(),None,Map(sep -> ,, header -> true, path -> D:cybernetizWorkspacesophos),None), FileSource[D:cybernetizWorkspacesophos], [startdate#0, accountname#1, eventdate#2]

df_DateConverted.printSchema((的输出低于-

root
|-- startdate: string (nullable = true)
|-- accountname: string (nullable = true)
|-- eventdate: timestamp (nullable = true)

df_DateConverted模式中可以看出,您没有键列,因此在执行col("key").cast("string")时会出现错误

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`key`' given input columns: [startdate, accountname, eventdate];;

您可以在将数据写入kafka时简单地移除密钥,因为在将数据写到kafka的过程中key是可选的。参考:此处

在上面的实现中,语法"to_json(struct(*)) AS value"是错误的,因此您在value上得到了一个错误。

你应该做这样的事情:

df_DateConverted
.select(to_json(struct($"startdate", $"accountname", $"eventdate")).alias("value"))
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "rawEventTopic")
.start().awaitTermination();

最新更新