在 Spark 中组合 Row()



看似简单的问题,却找不到答案。

问题:我创建了一个函数,我将传递给 map(),该函数获取单个字段并从中创建三个字段。我希望 map() 的输出给我一个新的 RDD,包括来自输入 RDD 和新/输出 RDD 的字段。我该怎么做?

我是否需要将数据的键添加到函数的输出中,以便我可以将更多输出RDD连接回原始RDD?这是正确/最佳实践吗?

def extract_fund_code_from_iv_id(holding):
    # Must include key of data for later joining
    iv_id = Row(iv_id_fund_code=holding.iv_id[:2], iv_id_last_code=holding.iv_id[-2:])
    return iv_id

更基本的是,我似乎无法将两个 Row 结合起来。

row1 = Row(name="joe", age="35")
row2 = Row(state="MA")
print row1, row2

这不会像我想要的那样返回一个新的 Row()。

谢谢

我真的建议使用UserDefinedFunction

假设您要从数据帧dfint 类型的列int_col中提取许多特征。假设这些功能只是modulo 3modulo 2所述列内容。

我们将导入函数的UserDefinedFunction和数据类型。

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

然后我们将实现我们的特征提取函数:

def modulo_three(col):
    return int(col) % 3
def modulo_two(col):
    return int(col) % 2

并将它们变成udf

mod3 = udf(modulo_three, IntegerType())
mod2 = udf(modulo_two, IntegerType())

现在我们将计算所有其他列并给它们起好听的名字(通过 alias):

new_columns = [
    mod3(df['int_col']).alias('mod3'),
    mod2(df['int_col']).alias('mod2'),
]

最后,我们选择这些列以及之前已经存在的所有列:

new_df = df.select(*df.columns+new_columns)

new_df现在将有另外两列mod3mod2

相关内容

  • 没有找到相关文章

最新更新