问题我有趣的火花1.5.0
。我有一个名为events
的不同json字符串对象的RDD。每个json对象都有一个字段type
。我想为type
字段的每个值创建一个不同的数据帧。然后我想为每个数据帧生成模式(df.printSchema(((
样本输入
{"a": 1, "b": 2, "type": "x"}
{"a": 1, "c": 2, "type": "x"}
{"c": 1, "d": 2, "type": "y"}
{"d": 1, "e": 2, "type": "y"}
所以我的模式应该看起来像:
for type "x":
root
|-- a: string (nullable = true)
|-- b: string (nullable = true)
|-- c: string (nullable = true)
for type "y":
root
|-- c: string (nullable = true)
|-- d: string (nullable = true)
|-- e: string (nullable = true)
我尝试了什么:
final String[] event_types = {"x", "y"};
for (final String event_type: event_types) {
JavaRDD<String> filtered_events = events.filter(
new Function<String, Boolean>() {
public Boolean call(String s) {
String event_type_t = null;
try {
JSONObject json_data = new JSONObject(s);
event_type_t = json_data.getString("type").toString();
}
catch (JSONException e) {
return false;
}
if (event_type.equalsIgnoreCase(event_type_t)) {
return true;
}
else {
return false;
}
}
}
);
DataFrame df = sqlContext.read().option("header", "true").json(filtered_events);
System.out.println(event_type);
df.printSchema();
}
现有解决方案的问题它为每个过滤器运行多次。如果有多个事件类型,那么处理需要花费大量时间。我想一次性完成。
我猜最昂贵的部分实际上是JSON解析。把它推到过滤逻辑之外是有意义的。使用Scala:
val events: RDD[String] = ???
val event_types = List("x", "y")
val df: DataFrame = sqlContext.read.json(events)
val dfs = event_types.map(t => (t -> df.where($"type" <=> t))).toMap