这里的问题是如何重用UDF的对象,同时避免竞争条件?
我在我的spark应用程序中使用UDF,由于竞争条件,单元测试似乎是不确定的。有时他们通过,有时他们失败。。。
为了提高效率,我试图通过创建对象并将其传递给UDF来强制重用对象。然而,共享相同spark上下文和JVM的单独"测试"似乎正在使用这些对象并导致错误。
def reformatDate(input:String,sdfIn:SimpleDateFormat,sdfOut:SimpleDateFormat): String ={
sdfOut.format(sdfIn.parse(input))
}
val datePartitionFormat = new SimpleDateFormat("yyyyMMdd")
val dTStampFormat = new SimpleDateFormat("yyyy/MM/dd")
val validDateFormat = new SimpleDateFormat("yyyy-MM-dd")
val partitionToDateUDF = udf(reformatDate(_:String,datePartitionFormat,validDateFormat))
val dTStampToDateUDF= udf(reformatDate(_:String,dTStampFormat,validDateFormat))
有时,当我运行单元测试时,我会用这个函数得到以下错误:
2013年1月17日11:45:45错误执行程序:2.0阶段任务0.0中出现异常(TID 2)java.lang.NumberFormatException:位于sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1890)在sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)java.lang.Double.parseDouble(Double.java:538)java.text.DigitList.getDouble(DigitList.java:169)java.text.DecimalFormat.parse(DecimalFormat.java:2056)java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:1867)位于java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1514)java.text.DateFormat.parse(DateFormat.java:364)位于com.baesystems.ai.engineering.threatanalytics.microbatch.processer.transformers.metric.mDnsPreviouslyeenDomainsStartOfDayDF$.reformatDate(mDnsPreviousySeenDomainsSStartOfDayDF.scala:22)
我使用这样的函数:
val df = df2
.filter(
datediff(
to_date(partitionToDateUDF($"dt"))
,to_date(dTStampToDate($"d_last_seen"))
) < 90
)
并且在调试时发现输入"df2"为:
+-----------+--------+-------------------------+--------------------------------+
|d_last_seen| dt|partitionToDateUDF($"dt")|dTStampToDateUDF($"d_last_seen")|
+-----------+--------+-------------------------+--------------------------------+
| 2016/11/02|20161102|2016-11-02 |2016-11-02 |
| 2016/11/01|20161102|2016-11-02 |2016-11-01 |
+-----------+--------+-------------------------+--------------------------------+
我使用conf.setMaster("local[2]"),可能是spark使用线程,因此在本地运行时共享相同的JVM吗?然而,当部署时不会发生这种情况,因为单独的执行器将有自己的JVM,因此也有自己的对象实例化?
SimpleDateFormat
不是线程安全的(请参阅例如"为什么Java';s SimpleDateFormat不是线程安全?")。这意味着,如果你在任何UDF中使用它(即使是在一个Spark作业中),你可能会得到意想不到的结果,因为Spark会在几个任务中使用你的UDF,这些任务在单独的线程上运行,最终导致多个线程同时访问它。对于本地模式和实际的分布式集群都是如此——每个执行器上的几个线程将使用一个副本。
要克服这一点,只需使用一个线程安全的不同格式化程序,例如Joda的DateTimeFormatter
。