Scala:列中值时的过程数据帧满足条件



我必须处理一个巨大的数据框架,通过数据框的id列从服务下载文件。要下载和所有更改的逻辑都已准备好,但是我不确定围绕此循环的最佳方法是什么。我在 databricks 上运行此操作,这就是为什么我需要在块中执行过程。

数据框具有" status "列,可以保持以下值:

" todo","处理","失败","成功"

在段循环中,我想执行以下任务:

while (there are rows with status "todo") {
   - get the first 10 rows if status is todo (DONE)
   - start processing the dataframe, update status to processing (DONE)
   - download files (call UDF), update status to succeeded or failed
     (DONE, not in the code here)
}

我想运行此,直到所有行的status都是其他todo !问题在于,循环尚未完成,因为数据帧本身没有更新。它需要将其分配给另一个数据框,但是如何将新的框架添加到循环中?

我的代码现在:

while(statusDoc.where("status == 'todo'").count > 0) {
  val todoDF = test.filter("status == 'todo'")
  val processingDF = todoDF.limit(10).withColumn("status", when(col("status") === "todo", "processing")
                           .otherwise(col("status")))
 statusDoc.join(processingDF, Seq("id"), "outer")
      .select($"id", 
       statusDoc("fileUrl"), 
       coalesce(processingDF("status"), statusDoc("status")).alias("status"))
}

加入应该像这样:

val update = statusDoc.join(processingDF, Seq("id"), "outer")
                          .select($"id", statusDoc("fileUrl"),
    coalesce(processingDF("status"), statusDoc("status")).alias("status"))

那么,应将此新的update数据框用于下一轮循环。

在这里要记住的一件事是,数据框(Spark)是不可变的,因为它们是分布的。您不能保证如果您做了一些,则给定的修改将在所有执行者网络中正确传播。而且您也无法保证尚未在其他地方使用的一个给定数据(例如,在另一个节点中)。

您可以做的一件事是添加带有更新值的另一列并删除旧列。

val update = statusDoc.
    .withColumnRenamed("status", "status_doc")
    .join(processingDF, Seq("id"), "outer")
    .withColumn("updated_status", udf((stold: String, stold: String) => if (stnew != null) stnew else stold).apply(col("status"), col("status_doc"))
    .drop("status_doc", "status")
    .withColumnRenamed("updated_status", "status")
    .select("id", "fileUrl", "status")

然后确保将" statusdoc"替换为"更新"数据框。不要忘记使数据框成为" var"而不是" val"。我很惊讶您的IDE尚未大喊大叫。

另外,我敢肯定,您可以想到一种分发问题的方法,以避免使用时循环 - 我可以帮助您做到这一点,但是我需要对您的问题进行更清晰的描述。如果您使用一个时循环,则不会使用群集的完整功能,因为while循环仅在主机上执行。然后,每次您一次只对待10行。我敢肯定,您可以将所有数据附加到单个地图操作中的整个数据框中。

最新更新