假设我有一个有3个分区的RDD,我想在一个序列中运行每个执行器/工作者,这样,在计算了分区1之后,就可以计算分区2,在计算完分区2之后,最后就可以计算分区3。我需要这种同步的原因是,每个分区都依赖于前一个分区的某些计算。如果我错了,请纠正我,但这种类型的同步似乎不太适合Spark框架。
我考虑过在每个工作任务节点中打开JDBC连接,如下所示:
rdd.foreachPartition( partition => {
// 1. open jdbc connection
// 2. poll database for the completion of dependent partition
// 3. read dependent edge case value from computed dependent partition
// 4. compute this partition
// 5. write this edge case result to database
// 6. close connection
})
我甚至考虑过使用累加器,在驱动程序中获取acc值,然后重新广播一个值,这样合适的工作人员就可以开始计算,但显然广播不是这样工作的,即,一旦您通过foreachPartition发送了广播变量,就无法重新广播不同的值。
同步并不是一个真正的问题。问题是,您希望使用并发层来实现这一点,结果您得到了完全顺序的执行。更不用说,将更改推送到数据库只是为了在另一个工作线程上获取这些更改,这意味着您无法从内存处理中获得好处。在当前的形式中,使用Spark根本没有意义。
一般来说,如果你想在Spark中实现同步,你应该考虑转换。你的问题很粗略,但你可以试试这样的方法:
- 使用第一个分区中的数据创建第一个RDD。并行处理并可选择将结果推送到外部
- 计算差分缓冲区
- 使用第二个分区中的数据创建第二个RDD。与来自2的差异缓冲区合并,处理,并可选择将结果推送到数据库
- 回到2。并重复
你在这里收获了什么?首先,您可以利用整个集群。此外,部分结果保存在内存中,不必在工作程序和数据库之间来回传输。