无论如何都有在一个应用程序中将Apache Spark结构化流与Apache Hive和Apache Kafka集成
使用collectAslist添加列表并将其存储到列表中。我有以下错误。
任何人都可以帮助我解决这个问题。
预先感谢。
import org.apache.spark.api.java.function.MapFunction;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
public class DatasetKafka {
public static void main(String[] args) throws IOException {
SparkSession spark = SparkSession
.builder()
.appName("Java Spark Hive Example").master("yarn")
.config("spark.sql.warehouse.dir", "hdfs://localhost:54310/user/hive/warehouse")
.enableHiveSupport()
.getOrCreate();
Logger.getRootLogger().setLevel(Level.ERROR);
Dataset<String> lines = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.load().selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING());
List<String> line=lines.collectAsList();
for(String li:line) {
String values[]=li.split(",");
String query="insert into table match values("+Integer.parseInt(
values[0])+
","+values[1]+
","+Integer.parseInt(values[2])+
","+Integer.parseInt(values[3])+
","+Integer.parseInt(values[4])+
","+values[5]+
","+Integer.parseInt(values[6])+
","+values[7]+
","+Integer.parseInt(values[8])+
","+Integer.parseInt(values[9])+
","+Integer.parseInt(values[10])+
","+values[11]+
","+Integer.parseInt(values[12])+
","+Integer.parseInt(values[13])+
","+Integer.parseInt(values[14])+
","+Integer.parseInt(values[15])+
","+Integer.parseInt(values[16])+
","+values[17]+
","+values[18]+")";
spark.sql(query);
}
// List<String> values=ll.collectAsList();
Dataset<String> words=lines.map((MapFunction<String, String>)k->{
return k;
}, Encoders.STRING());
Dataset<Row> wordCounts = words.flatMap(
(FlatMapFunction<String, String>) x -> Arrays.asList(x.split(",")).iterator(),
Encoders.STRING()).groupBy("value").count();
StreamingQuery query = wordCounts.writeStream()
.outputMode("complete")
.format("console")
.start();
try {
query.awaitTermination();
} catch (StreamingQueryException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:38)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:36)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3360)
at org.apache.spark.sql.Dataset.collectAsList(Dataset.scala:2794)
at com.ges.kafka.DatasetKafka.main(DatasetKafka.java:48)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
您可以使用库
-
spark-sql-kafka
要读取Kafka的数据,请参见Spark结构化流 Kafka集成指南和 -
spark-llap
要将数据写入Hive,请参见https://github.com/hortonworks-spark/spark-llap/
两个库都可以在Maven上找到。
Spark结构化流应用程序的一个简单示例如下所示。确保提前创建蜂巢表。
val ds = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", config.getString("broker.list"))
.option("kafka.security.protocol", config.getString("security.protocol"))
.option("subscribe", config.getString("kafka.topic.in"))
.option("startingOffsets", config.getString("kafka.starting.offset"))
.option("failOnDataLoss", "false")
.load()
.selectExpr("CAST(key AS STRING) AS key", "CAST(value AS STRING)")
val query = ds.writeStream
.format(HiveWarehouseSession.STREAM_TO_STREAM)
.option("database", "my_database")
.option("table", "my_table")
.option("metastoreUri", spark.conf.get("spark.datasource.hive.warehouse.metastoreUri"))
.option("checkpointLocation", config.getString("spark.checkpoint.dir"))
.trigger(Trigger.ProcessingTime(config.getLong("spark.batchWindowSizeSecs").seconds))
.start()
query.awaitTermination()