我有一个序列化的blob和一个将其转换为java Map的函数。我已经将该函数注册为UDF,并尝试在Spark SQL中使用它,如下所示:
sqlCtx.udf.register("blobToMap", Utils.blobToMap)
val df = sqlCtx.sql(""" SELECT mp['c1'] as c1, mp['c2'] as c2 FROM
(SELECT *, blobToMap(payload) AS mp FROM t1) a """)
我确实做到了,但由于某种原因,非常重的blobToMap
函数每行运行两次,实际上我提取了20个字段,每行运行20次。我在Spark DataFrame中的单个列派生多个列中看到了建议但它们确实是不可扩展的——我不想每次需要提取数据时都创建一个类。
我怎样才能强迫Spark做合理的事情?我试着分成两个阶段。唯一有效的方法是缓存内部select,但这也不可行,因为它确实是一个大blob,我只需要其中的几十个字段
我会自己回答,希望它能帮助任何人。。因此,经过几十次实验,我能够迫使spark对udf进行评估,并将其转换为Map一次,而不是对每个关键请求一遍又一遍地重新计算,方法是拆分查询并执行一个邪恶的丑陋把戏——将其转换成RDD并返回DataFrame:
val df1 = sqlCtx.sql("SELECT *, blobToMap(payload) AS mp FROM t1")
sqlCtx.createDataFrame(df.rdd, df.schema).registerTempTable("t1_with_mp")
val final_df = sqlCtx.sql("SELECT mp['c1'] as c1, mp['c2'] as c2 FROM t1_with_mp")