我有一个Spark Structured-Streaming应用程序,它从s3读取JSON数据并进行一些转换并将其写回s3。
在运行应用程序时,有时作业会出错并重新尝试(没有任何可见的丢失或数据损坏 - 因此一切看起来都很好),但提供的错误消息描述性不强
以下是错误消息:
pyspark.sql.utils.StreamingQueryException: u'assertion failed: Invalid batch: _ra_guest_gid#1883,_ra_sess_ts#1884,_ra_evt_ts#1885,event#1886,brand#1887,category#1888,funding_daysRemaining#1889,funding_dollarsRemaining#1890,funding_goal#1891,funding_totalBackers#1892L,funding_totalFunded#1893,id#1894,name#1895,price#1896,projectInfo_memberExclusive#1897,projectInfo_memberExclusiveHoursRemaining#1898,projectInfo_numberOfEpisodes#1899,projectInfo_projectState#1900,variant#1901 != _ra_guest_gid#2627,_ra_sess_ts#2628,_
我的猜测是这可能与列不匹配有关,其中
传入的 JSON 记录不符合架构。
或者,传入 JSON 记录的数据类型可能与架构中提供的数据类型不匹配。
但我不确定如何查明哪个记录或哪个特定字段导致错误。
此处有关错误含义或如何以更好的方式记录错误的任何帮助或建议。
谢谢
我已经解决了这个问题,它与架构不匹配无关。在我的情况下,我有两个并行运行的流操作。
1)从 S3 存储桶读取原始传入数据,然后执行一些操作并将其写回输出文件夹"a"中的 S3
2)从文件夹"a"(步骤1)读取处理后的流数据,然后再次执行一些操作并写回输出文件夹"b"中的S3
现在根据我的观察,如果我单独运行上述步骤,那么它工作正常,但是如果我一起运行它们,我会得到错误
'pyspark.sql.utils.StreamingQueryException: u'assertion failed: 无效批处理: '
所以我认为当它尝试从同一位置读取和写入时会遇到麻烦,即一个流的目的地是另一个流的源