我试图运行一个样例spark流代码。但我得到了这个错误:
16/06/02 15:25:42 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:161)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:624)
at com.streams.spark_consumer.SparkConsumer.main(SparkConsumer.java:56)
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:161)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:624)
at com.streams.spark_consumer.SparkConsumer.main(SparkConsumer.java:56)
我的代码如下。我知道有一些未使用的导入,因为我正在做其他事情,并得到了相同的错误,所以我修改了相同的代码来运行spark流媒体网站上给出的示例程序:
package com.streams.spark_consumer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import scala.Tuple2;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.Durations;
import org.apache.spark.api.java.JavaSparkContext;
public class SparkConsumer {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
System.out.println("Han chal raha hai"); //just to know if this part of the code is executed
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
System.out.println("Han bola na chal raha hau chutiye 1"); //just to know if this part of the code is executed
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
JavaDStream<String> words = lines.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String x) {
return Arrays.asList(x.split(" "));
}
});
JavaPairDStream<String, Integer> pairs = words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
jssc.start();
jssc.awaitTermination();
}
}
有人能帮我吗?我正在使用本地master,即使在那时我也尝试过启动master和停止master(也是奴隶),我不知道为什么这会有所帮助,但以防万一,我已经尝试过了。
根据Spark文档
由于输出操作实际上允许外部系统使用转换后的数据,因此它们会触发所有数据流转换的实际执行(类似于RDD的操作)。
因此,在转换后使用任何输出操作。
print()
foreachRDD(func)
saveAsObjectFiles(prefix, [suffix])
saveAsTextFiles(prefix, [suffix])
saveAsHadoopFiles(prefix, [suffix])