Spark SQL UDF任务不可序列化



Cassandra&DataStax社区,我有一个问题,我希望有明智的人能帮助我。

我们正在将分析代码从Hadoop迁移到运行在Cassandra之上的Spark(通过DataStax Enterprise)。DSE 4.7在生产中,但4.8在开发中。

Java 7在生产中,或者Java 7/8在开发中。

我们需要几个DataFrame转换,我们认为通过Spark SQLContext针对内存中的DataFrame编写一个UDF就可以了。其中主要有:

  1. 我们数据的每一个文本值都以".即"some data"作为前缀和后缀。这非常令人讨厌,因此我们希望清除其中的每一项
  2. 我们想添加一个列,该列包含一个由其他列组成的散列键

我们的代码如下。这在sqlContext中没有包含UDF调用的情况下运行良好,但一旦添加它们,我们就会收到"Task is not Serializable"错误

线程"main"org.apache.spark.SparkException中出现异常:任务不可序列化

我曾尝试将"implements Serializable"作为这个(以及许多其他类)的基类,这会将错误类更改为链上的下一个错误类,但这会导致失败。Exception类不可序列化…这可能意味着我正朝着错误的方向前进。

我还尝试过将UDF实现为lambda,这也会导致同样的错误。

如果有人能指出我做错了什么,我将不胜感激!

public class entities implements Serializable{
    private spark_context m_spx = null;
    private DataFrame m_entities = null;
    private String m_timekey = null;
    public entities(spark_context _spx, String _timekey){
        m_spx = _spx;
        m_timekey = _timekey;
    }

    public DataFrame get_dimension(){
        if(m_entities == null) {
            DataFrame df = m_spx.get_flat_data(m_timekey).select("event", "url");
            //UDF to generate hashed ids
            UDF2 get_hashed_id = new UDF2<String, String, String>() {
                public String call(String o, String o2) throws Exception {
                    return o.concat(o2);
                }
            };

            //UDF to clean the " from strings
            UDF1 clean_string = new UDF1<String, String>() {
                public String call(String o) throws Exception {
                    return o.replace(""","");
                }
            };

            //Get the Spark SQL Context from SC.
            SQLContext sqlContext = new SQLContext(m_spx.sc());

            //Register the UDFs
            sqlContext.udf().register("getid", get_hashed_id, DataTypes.StringType);
            sqlContext.udf().register("clean_string", clean_string, DataTypes.StringType);

            //Register the DF as a table.
            sqlContext.registerDataFrameAsTable(df, "entities");
            m_entities = sqlContext.sql("SELECT getid(event, url) as event_key, clean_string(event) as event_cleaned, clean_string(url) as url_cleaned FROM entities");
        }
        return m_entities;
    }
}

您的entities类包含一个SparkContext成员,因此它不可序列化(SparkContexts在国际上是不可序列化的,您不应该序列化它们)。

由于entities是不可序列化的,它的任何非静态方法/成员/匿名内部类也不可序列化(因为它们将尝试序列化包含它们的entities实例)。

在这种情况下,最好的解决方法是将匿名UDF提取到类的静态成员中:

private final static UDF2 get_hashed_id = new UDF2<String, String, String>() {
   public String call(String o, String o2) throws Exception {
       return o.concat(o2);
   }
};
private final static UDF1 clean_string = new UDF1<String, String>() {
   public String call(String o) throws Exception {
       return o.replace(""","");
   }
};

然后您就可以在get_dimension中使用它们了。

相关内容

  • 没有找到相关文章

最新更新