Apache Spark 中的高效数据帧查找



我想有效地查找许多ID。我有一个看起来像这样dataframedf_sourcedataframe,但有几百万条记录分发给 10 个工人:

+-------+----------------+
|    URI|     Links_lists|
+-------+----------------+
|  URI_1|[URI_8,URI_9,...|
|  URI_2|[URI_6,URI_7,...|
|  URI_3|[URI_4,URI_1,...|
|  URI_4|[URI_1,URI_5,...|
|  URI_5|[URI_3,URI_2,...|
+-------+----------------+

我的第一步是用df_source做一个RDD

rdd_source = df_source.rdd

出于rdd_source我想创建一个仅包含带有 ID 的 URI 的RDD。我是这样做的:

rdd_index = rdd_source.map(lambda x: x[0]).zipWithUniqueId()

现在,我还.flatMap()rdd_source到包含所有关系的RDD中。到目前为止,仅包含在Links_list列中。

rdd_relations = rdd_source.flatMap(lamda x: x)

现在我将rdd_indexrdd_relations都转换回dataframes,因为我想做连接,我认为(我可能错了)dataframes的连接速度更快。

schema_index = StructType([
StructField("URI", StringType(), True),
StructField("ID", IntegerType(), True))
df_index = sqlContext.createDataFrame(rdd_index, schema=schema_index)

schema_relation = StructType([
StructField("URI", StringType(), True),
StructField("LINK", StringType(), True))
df_relations = sqlContext.createDataFrame(rdd_relations, schema=schema_relation )

生成的dataframes应如下所示:

df_index:
+-------+-------+
|    URI|     ID|
+-------+-------+
|  URI_1|      1|
|  URI_2|      2|
|  URI_3|      3|
|  URI_4|      4|
|  URI_5|      5|
+-------+-------+
df_relations:
+-------+-------+
|    URI|   LINK|
+-------+-------+
|  URI_1|  URI_5|
|  URI_1|  URI_8|
|  URI_1|  URI_9|
|  URI_2|  URI_3|
|  URI_2|  URI_4|
+-------+-------+

现在要替换df_relations中的长字符串 URI,我将在df_index上进行连接,第一个连接:

df_relations =
df_relations.join(df_index, df_relations.URI == df_index.URI,'inner')
.select(col(ID).alias(URI_ID),col('LINK'))

这应该给我一个看起来像这样的dataframe

df_relations:
+-------+-------+
| URI_ID|   LINK|
+-------+-------+
|      1|  URI_5|
|      1|  URI_8|
|      1|  URI_9|
|      2|  URI_3|
|      2|  URI_4|
+-------+-------+

第二个连接:

df_relations =
df_relations.join(df_index, df_relations.LINK == df_index.URI,'inner')
.select(col(URI_ID),col('ID').alias(LINK_ID))

这应该会导致我需要的最终dataframe。看起来像这样

df_relations:
+-------+-------+
| URI_ID|LINK_ID|
+-------+-------+
|      1|      5|
|      1|      8|
|      1|      9|
|      2|      3|
|      2|      4|
+-------+-------+

其中所有 URI 都替换为df_index中的 ID。

这是在关系表的两列上查找所有 URI 的 ID 的有效方法,还是有更有效的方法?

我正在使用Apache Spark 2.1.0和Python 3.5

您不需要将RDD用于您描述的操作。使用RDD可能非常昂贵。其次,你不需要做两个连接,你可以只做一个:

import pyspark.sql.functions as f
# add a unique id for each URI
withID = df_source.withColumn("URI_ID", f.monotonically_increasing_id())
# create a single line from each element in the array
exploded = withID.select("URI_ID", f.explode("Links_lists").alias("LINK")
linkID = withID.withColumnRenamed("URI_ID", "LINK_ID").drop("Links_lists")
joined= exploded.join(linkID, on=exploded.LINK==linkID.URI).drop("URI").drop("LINK")

最后,如果 linkID(基本上是df_source替换了一列)相对较小(即可以完全包含在单个 worker 中),则可以广播它。 在联接之前添加以下内容:

linkID = f.broadcast(linkID)

相关内容

  • 没有找到相关文章

最新更新