我正在写一份火花作业来读取JSON文件并将其写入Parquet文件,以下是示例代码:
DataFrame dataFrame = new DataFrameReader(sqlContext).json(textFile);
dataFrame = dataFrame.withColumn("year", year(to_date(unix_timestamp(dataFrame.col("date"), "YYYY-MM-dd'T'hh:mm:ss.SSS").cast("timestamp"))));
dataFrame = dataFrame.withColumn("month", month(to_date(unix_timestamp(dataFrame.col("date"), "YYYY-MM-dd'T'hh:mm:ss.SSS").cast("timestamp"))));
dataFrame.write().mode(SaveMode.Append).partitionBy("year", "month").parquet("<some_path>");
JSON文件由许多JSON记录组成,如果已经存在,我希望该记录在Parquet中进行更新。我尝试过Append
模式,但似乎在文件级别而不是记录级别上工作(即,如果文件已经存在,则最终写入)。因此,为同一文件运行此作业会重复记录。
有什么方法可以将数据框架行ID指定为唯一键并要求Spark更新记录是否已经存在?所有保存模式似乎都在检查文件而不是记录。
parquet是文件格式而不是数据库,为了通过ID实现更新,您需要读取文件,更新内存中的值,而不是重写数据到新文件(或覆盖现有文件)。
如果这是经常发生的用例,您可能会更好地使用数据库。
您可以看一下Apache orc文件格式,请参阅:
https://orc.apache.org/docs/acid.html
根据您的用例,如果您想留在HDF的顶部,则HBase。
,但请记住,HDFS是一个写入文件系统,如果这不适合您的需要,请选择其他内容(也许是Elasticsearch,MongoDB)。
其他,在HDFS中,您必须每次创建新文件,必须设置一个增量过程才能构建" delta"文件,然后合并旧 delta =new_data。
您还可以查看Apache Hudi(https://hudi.apache.org/),该