有一个输入数据帧df
(有10列,col1-col10(,其中我使用下面的UDF添加一个新列uuid
,并转换为另一个数据帧newdf
。
接下来,从newdf
数据帧中,我将创建两个单独的数据帧df1
(uuid,col1-col5(和df2
(uuid、col6-col10(,它们仅具有提到的列。
问题出现在这里,我希望uuid
列对于df1
&df2
数据帧。
由于Spark使用惰性求值,所以当我写入df1
和df2
时,它会运行UDF,因此它为df1
和df2
数据帧中的每一行提供不同的uuid
值。
我现在遵循的解决方案,我首先在临时路径中编写newdf
数据帧,并将其读回。但这种逻辑对于大量数据来说并不好。
下面是一个代码片段:
df.show(false)
+------+------+------+------+------+------+------+------+------+-------+
| col1 | col2 | col3 | col4 | col5 | col6 | col7 | col8 | col9 | col10 |
+------+------+------+------+------+------+------+------+------+-------+
| A1 | A2 | A3 | A4 | A5 | A6 | A7 | A8 | A9 | A10 |
| B1 | B2 | B3 | B4 | B5 | B6 | B7 | B8 | B9 | B10 |
| C1 | C2 | C3 | C4 | C5 | C6 | C7 | C8 | C9 | C10 |
+------+------+------+------+------+-------+------+------+------+------+
val uuid = udf(() => java.util.UUID.randomUUID().toString)
val newdf = df.withColumn("uuid", uuid())
val df1 = newdf.select(uuid, col1, col2, col3, col4, col5)
val df2 = newdf.select(uuid, col6, col7, col8, col9, col10)
df1.write.format("parquet").save("/df1/")
df2.write.format("parquet").save("/df2/")
df1.show()
+-----------------+------+------+------+------+------+
| uuid | col1 | col2 | col3 | col4 | col5 |
+-----------------+------+------+------+------+------+
|1abdecf-8303-4a4e| A1 | A2 | A3 | A4 | A5 |
|1dbcecf-1304-4a4e| B1 | B2 | B3 | B4 | B5 |
|1vbdecf-8406-4a4e| C1 | C2 | C3 | C4 | C5 |
+-----------------+------+------+------+------+------+
df2.show()
+-----------------+------+------+------+------+------+
| uuid | col6 | col7 | col8 | col9 | col10|
+-----------------+------+------+------+------+------+
|2aodecf-3303-6n5e| A6 | A7 | A8 | A9 | A10 |
|2docecf-6305-6n5e| B6 | B7 | B8 | B9 | B10 |
|2vodecf-1406-6n5e| C6 | C7 | C8 | C9 | C10 |
+-----------------+------+------+------+------+------+
预期输出:在行中的两个数据帧中的uuid相同
df1.show()
+-----------------+------+------+------+------+------+
| uuid | col1 | col2 | col3 | col4 | col5 |
+-----------------+------+------+------+------+------+
|1abdecf-8303-4a4e| A1 | A2 | A3 | A4 | A5 |
|1dbcecf-1304-4a4e| B1 | B2 | B3 | B4 | B5 |
|1vbdecf-8406-4a4e| C1 | C2 | C3 | C4 | C5 |
+-----------------+------+------+------+------+------+
df2.show()
+-----------------+------+------+------+------+------+
| uuid | col6 | col7 | col8 | col9 | col10|
+-----------------+------+------+------+------+------+
|1abdecf-8303-4a4e| A6 | A7 | A8 | A9 | A10 |
|1dbcecf-1304-4a4e| B6 | B7 | B8 | B9 | B10 |
|1vbdecf-8406-4a4e| C6 | C7 | C8 | C9 | C10 |
+-----------------+------+------+------+------+------+
请提供克服这个问题的最佳方法。
试试这个-
解决方案在spark-scala-api 中
使用UUID.nameUUIDFromBytes
作为udf
加载提供的测试数据
df.show(false)
df.printSchema()
/**
* +----+----+----+----+----+----+----+----+----+-----+
* |col1|col2|col3|col4|col5|col6|col7|col8|col9|col10|
* +----+----+----+----+----+----+----+----+----+-----+
* |A1 |A2 |A3 |A4 |A5 |A6 |A7 |A8 |A9 |A10 |
* |B1 |B2 |B3 |B4 |B5 |B6 |B7 |B8 |B9 |B10 |
* |C1 |C2 |C3 |C4 |C5 |C6 |C7 |C8 |C9 |C10 |
* +----+----+----+----+----+----+----+----+----+-----+
*
* root
* |-- col1: string (nullable = true)
* |-- col2: string (nullable = true)
* |-- col3: string (nullable = true)
* |-- col4: string (nullable = true)
* |-- col5: string (nullable = true)
* |-- col6: string (nullable = true)
* |-- col7: string (nullable = true)
* |-- col8: string (nullable = true)
* |-- col9: string (nullable = true)
* |-- col10: string (nullable = true)
*/
创建相同的UUID
val uuid = udf((s: String) => UUID.nameUUIDFromBytes(s.getBytes(StandardCharsets.UTF_8)).toString)
val newdf = df.withColumn("uuid", uuid(concat_ws(":", df.columns.map(col): _*)))
val df1 = newdf.select("uuid", "col1", "col2", "col3", "col4", "col5")
val df2 = newdf.select("uuid", "col6", "col7", "col8", "col9", "col10")
df1.show(false)
/**
* +------------------------------------+----+----+----+----+----+
* |uuid |col1|col2|col3|col4|col5|
* +------------------------------------+----+----+----+----+----+
* |0c26ece0-708a-3105-896f-e70d18b67766|A1 |A2 |A3 |A4 |A5 |
* |0e19058c-3c14-3d2f-8c71-b7308f63b0d6|B1 |B2 |B3 |B4 |B5 |
* |eef9969b-3650-31f5-b877-d5e86ce7b1b7|C1 |C2 |C3 |C4 |C5 |
* +------------------------------------+----+----+----+----+----+
*/
df2.show(false)
/**
* +------------------------------------+----+----+----+----+-----+
* |uuid |col6|col7|col8|col9|col10|
* +------------------------------------+----+----+----+----+-----+
* |0c26ece0-708a-3105-896f-e70d18b67766|A6 |A7 |A8 |A9 |A10 |
* |0e19058c-3c14-3d2f-8c71-b7308f63b0d6|B6 |B7 |B8 |B9 |B10 |
* |eef9969b-3650-31f5-b877-d5e86ce7b1b7|C6 |C7 |C8 |C9 |C10 |
* +------------------------------------+----+----+----+----+-----+
*/