看似简单的问题,却找不到答案。
问题:我创建了一个函数,我将传递给 map(),该函数获取单个字段并从中创建三个字段。我希望 map() 的输出给我一个新的 RDD,包括来自输入 RDD 和新/输出 RDD 的字段。我该怎么做?
我是否需要将数据的键添加到函数的输出中,以便我可以将更多输出RDD连接回原始RDD?这是正确/最佳实践吗?
def extract_fund_code_from_iv_id(holding):
# Must include key of data for later joining
iv_id = Row(iv_id_fund_code=holding.iv_id[:2], iv_id_last_code=holding.iv_id[-2:])
return iv_id
更基本的是,我似乎无法将两个 Row 结合起来。
row1 = Row(name="joe", age="35")
row2 = Row(state="MA")
print row1, row2
这不会像我想要的那样返回一个新的 Row()。
谢谢
我真的建议使用UserDefinedFunction
。
假设您要从数据帧df
的 int
类型的列int_col
中提取许多特征。假设这些功能只是modulo 3
和modulo 2
所述列内容。
我们将导入函数的UserDefinedFunction
和数据类型。
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
然后我们将实现我们的特征提取函数:
def modulo_three(col):
return int(col) % 3
def modulo_two(col):
return int(col) % 2
并将它们变成udf
:
mod3 = udf(modulo_three, IntegerType())
mod2 = udf(modulo_two, IntegerType())
现在我们将计算所有其他列并给它们起好听的名字(通过 alias
):
new_columns = [
mod3(df['int_col']).alias('mod3'),
mod2(df['int_col']).alias('mod2'),
]
最后,我们选择这些列以及之前已经存在的所有列:
new_df = df.select(*df.columns+new_columns)
new_df
现在将有另外两列mod3
和mod2
。