我想有效地查找许多ID。我有一个看起来像这样dataframe
df_source
的dataframe
,但有几百万条记录分发给 10 个工人:
+-------+----------------+
| URI| Links_lists|
+-------+----------------+
| URI_1|[URI_8,URI_9,...|
| URI_2|[URI_6,URI_7,...|
| URI_3|[URI_4,URI_1,...|
| URI_4|[URI_1,URI_5,...|
| URI_5|[URI_3,URI_2,...|
+-------+----------------+
我的第一步是用df_source
做一个RDD
:
rdd_source = df_source.rdd
出于rdd_source
我想创建一个仅包含带有 ID 的 URI 的RDD
。我是这样做的:
rdd_index = rdd_source.map(lambda x: x[0]).zipWithUniqueId()
现在,我还.flatMap()
rdd_source
到包含所有关系的RDD
中。到目前为止,仅包含在Links_list
列中。
rdd_relations = rdd_source.flatMap(lamda x: x)
现在我将rdd_index
和rdd_relations
都转换回dataframes
,因为我想做连接,我认为(我可能错了)dataframes
的连接速度更快。
schema_index = StructType([
StructField("URI", StringType(), True),
StructField("ID", IntegerType(), True))
df_index = sqlContext.createDataFrame(rdd_index, schema=schema_index)
和
schema_relation = StructType([
StructField("URI", StringType(), True),
StructField("LINK", StringType(), True))
df_relations = sqlContext.createDataFrame(rdd_relations, schema=schema_relation )
生成的dataframes
应如下所示:
df_index:
+-------+-------+
| URI| ID|
+-------+-------+
| URI_1| 1|
| URI_2| 2|
| URI_3| 3|
| URI_4| 4|
| URI_5| 5|
+-------+-------+
df_relations:
+-------+-------+
| URI| LINK|
+-------+-------+
| URI_1| URI_5|
| URI_1| URI_8|
| URI_1| URI_9|
| URI_2| URI_3|
| URI_2| URI_4|
+-------+-------+
现在要替换df_relations
中的长字符串 URI,我将在df_index
上进行连接,第一个连接:
df_relations =
df_relations.join(df_index, df_relations.URI == df_index.URI,'inner')
.select(col(ID).alias(URI_ID),col('LINK'))
这应该给我一个看起来像这样的dataframe
:
df_relations:
+-------+-------+
| URI_ID| LINK|
+-------+-------+
| 1| URI_5|
| 1| URI_8|
| 1| URI_9|
| 2| URI_3|
| 2| URI_4|
+-------+-------+
第二个连接:
df_relations =
df_relations.join(df_index, df_relations.LINK == df_index.URI,'inner')
.select(col(URI_ID),col('ID').alias(LINK_ID))
这应该会导致我需要的最终dataframe
。看起来像这样
df_relations:
+-------+-------+
| URI_ID|LINK_ID|
+-------+-------+
| 1| 5|
| 1| 8|
| 1| 9|
| 2| 3|
| 2| 4|
+-------+-------+
其中所有 URI 都替换为df_index
中的 ID。
这是在关系表的两列上查找所有 URI 的 ID 的有效方法,还是有更有效的方法?
我正在使用Apache Spark 2.1.0和Python 3.5
您不需要将RDD用于您描述的操作。使用RDD可能非常昂贵。其次,你不需要做两个连接,你可以只做一个:
import pyspark.sql.functions as f
# add a unique id for each URI
withID = df_source.withColumn("URI_ID", f.monotonically_increasing_id())
# create a single line from each element in the array
exploded = withID.select("URI_ID", f.explode("Links_lists").alias("LINK")
linkID = withID.withColumnRenamed("URI_ID", "LINK_ID").drop("Links_lists")
joined= exploded.join(linkID, on=exploded.LINK==linkID.URI).drop("URI").drop("LINK")
最后,如果 linkID(基本上是df_source替换了一列)相对较小(即可以完全包含在单个 worker 中),则可以广播它。 在联接之前添加以下内容:
linkID = f.broadcast(linkID)