这是我的源代码,我从服务器端获取一些数据,服务器端不断生成数据流。然后,对于每个RDD,我都应用SQL模式,一旦创建了这个表,我就试图从这个DStream中选择一些东西。
List<String> males = new ArrayList<String>();
JavaDStream<String> data = streamingContext.socketTextStream("localhost", (port));
data.print();
System.out.println("Socket connection established to read data from Subscriber Server");
JavaDStream<SubscriberData> streamData = data
.map(new Function<String, SubscriberData>() {
public SubscriberData call(String record) {
String[] stringArray = record.split(",");
SubscriberData subscriberData = new SubscriberData();
subscriberData.setMsisdn(stringArray[0]);
subscriberData.setSubscriptionType(stringArray[1]);
subscriberData.setName(stringArray[2]);
subscriberData.setGender(stringArray[3]);
subscriberData.setProfession(stringArray[4]);
subscriberData.setMaritalStatus(stringArray[5]);
return subscriberData;
}
});
streamData.foreachRDD(new Function<JavaRDD<SubscriberData>,Void>(){
public Void call(JavaRDD<SubscriberData> rdd){
JavaSQLContext sqlContext = new JavaSQLContext(sc);
JavaSchemaRDD subscriberSchema = sqlContext.applySchema(rdd,SubscriberData.class);
subscriberSchema.registerAsTable("SUBSCRIBER_DIMENSION");
System.out.println("all data");
JavaSchemaRDD names = sqlContext.sql("SELECT msisdn FROM SUBSCRIBER_DIMENSION WHERE GENDER='Male'");
System.out.println("afterwards");
List<String> males = new ArrayList<String>();
males = names.map(new Function<Row, String>() {
public String call(Row row) {
return row.getString(0);
}
}).collect();
System.out.println("before for");
for (String name : males) {
System.out.println(name);
}
return null;
}
});
streamingContext.start();
但它抛出了这个Serializable Exception,尽管我使用的类确实实现了序列化。
14/11/06 12:55:20 ERROR scheduler.JobScheduler: Error running job streaming job 1415258720000 ms.1
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.map(RDD.scala:270)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:75)
at org.apache.spark.sql.api.java.JavaSchemaRDD.map(JavaSchemaRDD.scala:42)
at com.hp.tbda.rta.SubscriberClient$2.call(SubscriberClient.java:206)
at com.hp.tbda.rta.SubscriberClient$2.call(SubscriberClient.java:1)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 20 more
SparkContext
不可序列化,因为它只能在驱动程序上使用,不应包含在任何闭包中。恐怕目前对Spark Streaming上SQL的支持还只是研究层面的。有关详细信息,请参阅Spark峰会上的此演示。
要创建男性用户ID的预期RDD,您可以使用地图和过滤器:
maleSubscribers = subscribers.filter(subsc => subcs.getGender == "Male")
.map(subsc => subsc.getMsisdn)