我需要扫描 Hive 表并将序列中第一条记录的值添加到所有链接记录。
逻辑是:-
- 查找第一条记录(其中previous_id为空)。
- 查找下一条记录 (current_id = previous_id)。
- 重复此步骤,直到没有更多的链接记录。
- 将原始记录中的列添加到所有链接记录。
- 将结果输出到 Hive 表。
示例源数据:-
current_id previous_id start_date
---------- ----------- ----------
100 01/01/2001
200 100 02/02/2002
300 200 03/03/2003
示例输出数据:-
current_id start_date
---------- ----------
100 01/01/2001
200 01/01/2001
300 01/01/2001
我可以通过从源表创建两个数据帧并执行多个联接来实现此目的。但是,这种方法似乎并不理想,因为必须缓存数据以避免每次迭代时重新查询源数据。
关于如何处理这个问题的任何建议?
我认为您可以使用GraphFrames连接组件来实现这一点
它将帮助您避免自己编写检查点和循环逻辑。实质上,您从current_id
和previous_id
对创建一个图形,并将 GraphFrame 用于每个顶点的组件。然后,可以将生成的数据帧联接到原始数据帧以获取start_date
。
from graphframes import *
sc.setCheckpointDir("/tmp/chk")
input = spark.createDataFrame([
(100, None, "2001-01-01"),
(200, 100, "2002-02-02"),
(300, 200, "2003-03-03"),
(400, None, "2004-04-04"),
(500, 400, "2005-05-05"),
(600, 500, "2006-06-06"),
(700, 300, "2007-07-07")
], ["current_id", "previous_id", "start_date"])
input.show()
vertices = input.select(input.current_id.alias("id"))
edges = input.select(input.current_id.alias("src"), input.previous_id.alias("dst"))
graph = GraphFrame(vertices, edges)
result = graph.connectedComponents()
result.join(input.previous_id.isNull(), result.component == input.current_id)
.select(result.id.alias("current_id"), input.start_date)
.orderBy("current_id")
.show()
结果为以下输出:
+----------+----------+
|current_id|start_date|
+----------+----------+
| 100|2001-01-01|
| 200|2001-01-01|
| 300|2001-01-01|
| 400|2004-04-04|
| 500|2004-04-04|
| 600|2004-04-04|
| 700|2001-01-01|
+----------+----------+
这是一种我不确定是否适合Spark的方法。
缺少数据的分组 ID/键。
不确定 Catalyst 将如何优化这一点 - 将在以后的时间点查看。如果内存太大,则出现内存错误?
使数据更加复杂,这确实有效。这里是:
# No grouping key evident, more a linked list with asc current_ids.
# Added more complexity to the example.
# Questions open on performance at scale. Interested to see how well Catalyst handles this.
# Need really some grouping id/key in the data.
from pyspark.sql import functions as f
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
# Started from dataframe.
# Some more realistic data? At least more complex.
columns = ['current_id', 'previous_id', 'start_date']
vals = [
(100, None, '2001/01/01'),
(200, 100, '2002/02/02'),
(300, 200, '2003/03/03'),
(400, None, '2005/01/01'),
(500, 400, '2006/02/02'),
(600, 300, '2007/02/02'),
(700, 600, '2008/02/02'),
(800, None, '2009/02/02'),
(900, 800, '2010/02/02')
]
df = spark.createDataFrame(vals, columns)
df.createOrReplaceTempView("trans")
# Starting data. The null / None entries.
df2 = spark.sql("""
select *
from trans
where previous_id is null
""")
df2.cache
df2.createOrReplaceTempView("trans_0")
# Loop through the stuff based on traversing the list elements until exhaustion of data, and, write to dynamically named TempViews.
# May need to checkpoint? Depends on depth of chain of linked items.
# Spark not well suited to this type of processing.
dfX_cnt = 1
cnt = 1
while (dfX_cnt != 0):
tabname_prev = 'trans_' + str(cnt-1)
tabname = 'trans_' + str(cnt)
query = "select t2.current_id, t2.previous_id, t1.start_date from {} t1, trans t2 where t1.current_id = t2.previous_id".format(tabname_prev)
dfX = spark.sql(query)
dfX.cache
dfX_cnt = dfX.count()
if (dfX_cnt!=0):
#print('Looping for dynamic creation of TempViews')
dfX.createOrReplaceTempView(tabname)
cnt=cnt+1
# Reduce the TempViews all to one DF. Can reduce an array of DF's as well, but could not find my notes here in this regard.
# Will memory errors occur?
from pyspark.sql.types import *
fields = [StructField('current_id', LongType(), False),
StructField('previous_id', LongType(), True),
StructField('start_date', StringType(), False)]
schema = StructType(fields)
dfZ = spark.createDataFrame(sc.emptyRDD(), schema)
for i in range(0,cnt,1):
tabname = 'trans_' + str(i)
query = "select * from {}".format(tabname)
df = spark.sql(query)
dfZ = dfZ.union(df)
# Show final results.
dfZ.select('current_id', 'start_date').sort(col('current_id')).show()
返回:
+----------+----------+
|current_id|start_date|
+----------+----------+
| 100|2001/01/01|
| 200|2001/01/01|
| 300|2001/01/01|
| 400|2005/01/01|
| 500|2005/01/01|
| 600|2001/01/01|
| 700|2001/01/01|
| 800|2009/02/02|
| 900|2009/02/02|
+----------+----------+
感谢您在此处发布的建议。在尝试了各种方法之后,我采用了以下解决方案,该解决方案适用于多次迭代(例如 20 个循环),并且不会导致任何内存问题。
"物理计划"仍然很大,但缓存意味着跳过了大部分步骤,使性能保持可接受。
input = spark.createDataFrame([
(100, None, '2001/01/01'),
(200, 100, '2002/02/02'),
(300, 200, '2003/03/03'),
(400, None, '2005/01/01'),
(500, 400, '2006/02/02'),
(600, 300, '2007/02/02'),
(700, 600, '2008/02/02'),
(800, None, '2009/02/02'),
(900, 800, '2010/02/02')
], ["current_id", "previous_id", "start_date"])
input.createOrReplaceTempView("input")
cur = spark.sql("select * from input where previous_id is null")
nxt = spark.sql("select * from input where previous_id is not null")
cur.cache()
nxt.cache()
cur.createOrReplaceTempView("cur0")
nxt.createOrReplaceTempView("nxt")
i = 1
while True:
spark.sql("set table_name=cur" + str(i - 1))
cur = spark.sql(
"""
SELECT nxt.current_id as current_id,
nxt.previous_id as previous_id,
cur.start_date as start_date
FROM ${table_name} cur,
nxt nxt
WHERE cur.current_id = nxt.previous_id
""").cache()
cur.createOrReplaceTempView("cur" + str(i))
i = i + 1
if cur.count() == 0:
break
for x in range(0, i):
spark.sql("set table_name=cur" + str(x))
cur = spark.sql("select * from ${table_name}")
if x == 0:
out = cur
else:
out = out.union(cur)