Spark-如果已经存在,则更新记录(在Parquet文件中)



我正在写一份火花作业来读取JSON文件并将其写入Parquet文件,以下是示例代码:

    DataFrame dataFrame = new DataFrameReader(sqlContext).json(textFile);
    dataFrame = dataFrame.withColumn("year", year(to_date(unix_timestamp(dataFrame.col("date"), "YYYY-MM-dd'T'hh:mm:ss.SSS").cast("timestamp"))));
    dataFrame = dataFrame.withColumn("month", month(to_date(unix_timestamp(dataFrame.col("date"), "YYYY-MM-dd'T'hh:mm:ss.SSS").cast("timestamp"))));
    dataFrame.write().mode(SaveMode.Append).partitionBy("year", "month").parquet("<some_path>");

JSON文件由许多JSON记录组成,如果已经存在,我希望该记录在Parquet中进行更新。我尝试过Append模式,但似乎在文件级别而不是记录级别上工作(即,如果文件已经存在,则最终写入)。因此,为同一文件运行此作业会重复记录。

有什么方法可以将数据框架行ID指定为唯一键并要求Spark更新记录是否已经存在?所有保存模式似乎都在检查文件而不是记录。

parquet是文件格式而不是数据库,为了通过ID实现更新,您需要读取文件,更新内存中的值,而不是重写数据到新文件(或覆盖现有文件)。

如果这是经常发生的用例,您可能会更好地使用数据库。

您可以看一下Apache orc文件格式,请参阅:

https://orc.apache.org/docs/acid.html

根据您的用例,如果您想留在HDF的顶部,则HBase。

,但请记住,HDFS是一个写入文件系统,如果这不适合您的需要,请选择其他内容(也许是Elasticsearch,MongoDB)。

其他,在HDFS中,您必须每次创建新文件,必须设置一个增量过程才能构建" delta"文件,然后合并旧 delta =new_data。

您还可以查看Apache Hudi(https://hudi.apache.org/),该

提供了支持更新的支持。

相关内容

  • 没有找到相关文章

最新更新