如何在两个独立的数据帧中保持相同的uuid



有一个输入数据帧df(有10列,col1-col10(,其中我使用下面的UDF添加一个新列uuid,并转换为另一个数据帧newdf

接下来,从newdf数据帧中,我将创建两个单独的数据帧df1(uuid,col1-col5(和df2(uuid、col6-col10(,它们仅具有提到的列。

问题出现在这里,我希望uuid列对于df1&df2数据帧。

由于Spark使用惰性求值,所以当我写入df1df2时,它会运行UDF,因此它为df1df2数据帧中的每一行提供不同的uuid值。

我现在遵循的解决方案,我首先在临时路径中编写newdf数据帧,并将其读回。但这种逻辑对于大量数据来说并不好。

下面是一个代码片段:

df.show(false)
+------+------+------+------+------+------+------+------+------+-------+
| col1 | col2 | col3 | col4 | col5 | col6 | col7 | col8 | col9 | col10 |
+------+------+------+------+------+------+------+------+------+-------+
| A1   | A2   | A3   | A4   | A5   | A6   | A7   | A8   | A9   |  A10  |
| B1   | B2   | B3   | B4   | B5   | B6   | B7   | B8   | B9   |  B10  |
| C1   | C2   | C3   | C4   | C5   | C6   | C7   | C8   | C9   |  C10  |
+------+------+------+------+------+-------+------+------+------+------+

val uuid = udf(() => java.util.UUID.randomUUID().toString)
val newdf = df.withColumn("uuid", uuid())
val df1 = newdf.select(uuid, col1, col2, col3, col4, col5)
val df2 = newdf.select(uuid, col6, col7, col8, col9, col10)
df1.write.format("parquet").save("/df1/")
df2.write.format("parquet").save("/df2/")
df1.show()
+-----------------+------+------+------+------+------+
|     uuid        | col1 | col2 | col3 | col4 | col5 |
+-----------------+------+------+------+------+------+
|1abdecf-8303-4a4e| A1   | A2   | A3   | A4   | A5   |
|1dbcecf-1304-4a4e| B1   | B2   | B3   | B4   | B5   |
|1vbdecf-8406-4a4e| C1   | C2   | C3   | C4   | C5   |
+-----------------+------+------+------+------+------+
df2.show()
+-----------------+------+------+------+------+------+
|     uuid        | col6 | col7 | col8 | col9 | col10|
+-----------------+------+------+------+------+------+
|2aodecf-3303-6n5e| A6   | A7   | A8   | A9   | A10  |
|2docecf-6305-6n5e| B6   | B7   | B8   | B9   | B10  |
|2vodecf-1406-6n5e| C6   | C7   | C8   | C9   | C10  |
+-----------------+------+------+------+------+------+

预期输出:在行中的两个数据帧中的uuid相同

df1.show()
+-----------------+------+------+------+------+------+
|     uuid        | col1 | col2 | col3 | col4 | col5 |
+-----------------+------+------+------+------+------+
|1abdecf-8303-4a4e| A1   | A2   | A3   | A4   | A5   |
|1dbcecf-1304-4a4e| B1   | B2   | B3   | B4   | B5   |
|1vbdecf-8406-4a4e| C1   | C2   | C3   | C4   | C5   |
+-----------------+------+------+------+------+------+
df2.show()
+-----------------+------+------+------+------+------+
|     uuid        | col6 | col7 | col8 | col9 | col10|
+-----------------+------+------+------+------+------+
|1abdecf-8303-4a4e| A6   | A7   | A8   | A9   | A10  |
|1dbcecf-1304-4a4e| B6   | B7   | B8   | B9   | B10  |
|1vbdecf-8406-4a4e| C6   | C7   | C8   | C9   | C10  |
+-----------------+------+------+------+------+------+

请提供克服这个问题的最佳方法。

试试这个-

解决方案在spark-scala-api 中

使用UUID.nameUUIDFromBytes作为udf

加载提供的测试数据

df.show(false)
df.printSchema()
/**
* +----+----+----+----+----+----+----+----+----+-----+
* |col1|col2|col3|col4|col5|col6|col7|col8|col9|col10|
* +----+----+----+----+----+----+----+----+----+-----+
* |A1  |A2  |A3  |A4  |A5  |A6  |A7  |A8  |A9  |A10  |
* |B1  |B2  |B3  |B4  |B5  |B6  |B7  |B8  |B9  |B10  |
* |C1  |C2  |C3  |C4  |C5  |C6  |C7  |C8  |C9  |C10  |
* +----+----+----+----+----+----+----+----+----+-----+
*
* root
* |-- col1: string (nullable = true)
* |-- col2: string (nullable = true)
* |-- col3: string (nullable = true)
* |-- col4: string (nullable = true)
* |-- col5: string (nullable = true)
* |-- col6: string (nullable = true)
* |-- col7: string (nullable = true)
* |-- col8: string (nullable = true)
* |-- col9: string (nullable = true)
* |-- col10: string (nullable = true)
*/

创建相同的UUID


val uuid = udf((s: String) => UUID.nameUUIDFromBytes(s.getBytes(StandardCharsets.UTF_8)).toString)
val newdf = df.withColumn("uuid", uuid(concat_ws(":", df.columns.map(col): _*)))
val df1 = newdf.select("uuid", "col1", "col2", "col3", "col4", "col5")
val df2 = newdf.select("uuid", "col6", "col7", "col8", "col9", "col10")
df1.show(false)
/**
* +------------------------------------+----+----+----+----+----+
* |uuid                                |col1|col2|col3|col4|col5|
* +------------------------------------+----+----+----+----+----+
* |0c26ece0-708a-3105-896f-e70d18b67766|A1  |A2  |A3  |A4  |A5  |
* |0e19058c-3c14-3d2f-8c71-b7308f63b0d6|B1  |B2  |B3  |B4  |B5  |
* |eef9969b-3650-31f5-b877-d5e86ce7b1b7|C1  |C2  |C3  |C4  |C5  |
* +------------------------------------+----+----+----+----+----+
*/
df2.show(false)
/**
* +------------------------------------+----+----+----+----+-----+
* |uuid                                |col6|col7|col8|col9|col10|
* +------------------------------------+----+----+----+----+-----+
* |0c26ece0-708a-3105-896f-e70d18b67766|A6  |A7  |A8  |A9  |A10  |
* |0e19058c-3c14-3d2f-8c71-b7308f63b0d6|B6  |B7  |B8  |B9  |B10  |
* |eef9969b-3650-31f5-b877-d5e86ce7b1b7|C6  |C7  |C8  |C9  |C10  |
* +------------------------------------+----+----+----+----+-----+
*/

最新更新