一次生成多个筛选器的架构



问题我有趣的火花1.5.0。我有一个名为events的不同json字符串对象的RDD。每个json对象都有一个字段type。我想为type字段的每个值创建一个不同的数据帧。然后我想为每个数据帧生成模式(df.printSchema(((

样本输入

{"a": 1, "b": 2, "type": "x"}
{"a": 1, "c": 2, "type": "x"}
{"c": 1, "d": 2, "type": "y"}
{"d": 1, "e": 2, "type": "y"}

所以我的模式应该看起来像:

for type "x":
root
 |-- a: string (nullable = true)
 |-- b: string (nullable = true)
 |-- c: string (nullable = true)
for type "y":
 root
 |-- c: string (nullable = true)
 |-- d: string (nullable = true)
 |-- e: string (nullable = true)

我尝试了什么:

final String[] event_types = {"x", "y"};
for (final String event_type: event_types) {
    JavaRDD<String> filtered_events = events.filter(
        new Function<String, Boolean>() {
            public Boolean call(String s) {
                String event_type_t = null;
                try {
                    JSONObject json_data = new JSONObject(s);
                    event_type_t = json_data.getString("type").toString();
                }
                catch (JSONException e) {
                    return false;
                }
                if (event_type.equalsIgnoreCase(event_type_t)) {
                    return true;
                }
                else {
                    return false;
                }
            }
        }
    );
    DataFrame df = sqlContext.read().option("header", "true").json(filtered_events);
    System.out.println(event_type);
    df.printSchema();
}

现有解决方案的问题它为每个过滤器运行多次。如果有多个事件类型,那么处理需要花费大量时间。我想一次性完成。

我猜最昂贵的部分实际上是JSON解析。把它推到过滤逻辑之外是有意义的。使用Scala:

val events: RDD[String] = ???
val event_types = List("x", "y")
val df: DataFrame  = sqlContext.read.json(events)
val dfs = event_types.map(t => (t -> df.where($"type" <=> t))).toMap

相关内容

  • 没有找到相关文章

最新更新