使用 pySpark 迭代数据帧的每一行



我需要使用 pySpark 迭代一个dataframe,就像我们可以使用 for 循环迭代一组值一样。下面是我编写的代码。此代码的问题是

  1. 我必须使用会破坏并行性的集合
  2. 我无法在函数funcRowIter中打印数据帧中的任何值
  3. 一旦找到匹配项,我就无法打破循环。

我必须在pySpark中执行此操作,并且不能为此使用熊猫:

from pyspark.sql.functions import *
from pyspark.sql import HiveContext
from pyspark.sql import functions
from pyspark.sql import DataFrameWriter
from pyspark.sql.readwriter import DataFrameWriter
from pyspark import SparkContext
sc = SparkContext()
hive_context = HiveContext(sc)
tab = hive_context.sql("select * from update_poc.test_table_a")
tab.registerTempTable("tab")
print type(tab)
df = tab.rdd
def funcRowIter(rows):
    print type(rows)
        if(rows.id == "1"):
            return 1
df_1 = df.map(funcRowIter).collect()
print df_1

与其使用df_1 = df.map(funcRowIter).collect()不如尝试UDF。希望这会有所帮助。

from pyspark.sql.functions import struct
from pyspark.sql.functions import *
def funcRowIter(rows):
    print type(rows)
    if(row is nor None and row.id is not None)
        if(rows.id == "1"):
            return 1
A = udf(funcRowIter, ArrayType(StringType()))
z = df.withColumn(data_id, A(struct([df[x] for x in df.columns])))
z.show()

collect()永远不会是非常大数据(即数百万条记录)的好选择

似乎您的目标是显示特定行。您可以使用.filter然后使用.collect.

例如

row_1 = rdd.filter(lambda x: x.id==1).collect()

但是,尝试以这种方式迭代数据帧效率不高。

相关内容

  • 没有找到相关文章

最新更新