我试图在Spark SQL数据框架中创建新列,比较数据框架内的两列,如果它们相等,则返回True,否则为False。对于包含数千列的数据集,我必须这样做。作为一个示例问题,我在这里包含了我的所有代码。然而,重要的问题出现在代码组末尾的第二个for
循环中。
from pyspark.sql import SQLContext
from pyspark.sql.types import *
data = sc.parallelize([[1, None, 'BND'], [2, None, 'RMP'], [3, None, 'SWP'], [4, None, "IRS"], [5, None, "SWP"], [6, None, "IRS"]])
match = sc.parallelize([[1, 2, 100], [3, 5, 101], [4, 6, 102]])
trade_schema_string = 'trade_id,match_id,product'
trade_fields = [StructField(field_name, StringType(), True) for field_name in trade_schema_string.split(',')]
trade_fields[0].dataType = IntegerType()
trade_fields[1].dataType = IntegerType()
trade_schema = StructType(trade_fields)
match_schema_string = "pri_netting_id,sec_netting_id,match_id"
match_fields = [StructField(field_name, IntegerType(), True) for field_name in match_schema_string.split(',')]
match_schema = StructType(match_fields)
sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame(data, trade_schema)
odf = sqlContext.createDataFrame(match, match_schema)
df.registerTempTable("trade")
odf.registerTempTable("match")
# Get match_ids so you can match up front office and back office records
# Change column names for fo and bo dataframes so that they say "bo_product" and "fo_product", etc.
fo = sqlContext.sql("SELECT t.trade_id,t.product,m.match_id FROM trade t INNER JOIN match m WHERE t.trade_id = m.pri_netting_id")
bo = sqlContext.sql("SELECT t.trade_id,t.product,m.match_id FROM trade t INNER JOIN match m WHERE t.trade_id = m.sec_netting_id")
col_names = fo.columns
for x in range(0, len(col_names)):
col_name = col_names[x]
fo = fo.withColumnRenamed(col_name, "fo_" + col_name)
bo = bo.withColumnRenamed(col_name, "bo_" + col_name)
fo.registerTempTable("front_office")
bo.registerTempTable("back_office")
fobo = sqlContext.sql("SELECT f.fo_trade_id,f.fo_product,b.bo_trade_id,b.bo_product FROM front_office f INNER JOIN back_office b WHERE f.fo_match_id = b.bo_match_id")
fobo = fobo.repartion(5)
# How to create diff columns
num_cols = len(fobo.columns)
fobo_names = fobo.columns
first = fobo.first()
for x in range(0, num_cols / 2):
new_name = "'diff_" + fobo_names[x][3:] + "'"
old_column_fo = "fobo." + fobo_names[x]
old_column_bo = "fobo." + fobo_names[x + (num_cols / 2)]
fobo = fobo.withColumn(new_name, old_column_fo == old_column_bo)
我得到的错误是:
Traceback(最近一次调用):文件",第8行文件"/opt/cloudera/packages/CDH-5.4.0-1.cdh5.4.0.p0.27/lib/spark/python/pyspark/sql/dataframe.py",第695行,在withColumn中回归自我。选择(‘*’,col.alias (colName))AttributeError: 'bool'对象没有属性'alias'
那么,奇怪的是,如果我手动执行以下命令:
fobo = fobo.withColumn("diff_product", fobo.fo_product == fobo.bo_product)
和
fobo = fobo.withColumn("diff_trade_id", fobo.fo_trade_id == fobo.bo_trade_id)
这一切都很完美。但是,对于我的实际用例来说,这是不实用的,因为它有许多列。
old_column_fo = "fobo." + fobo_names[x]
old_column_bo = "fobo." + fobo_names[x + (num_cols / 2)]
fobo = fobo.withColumn(new_name, old_column_fo == old_column_bo)
old_column_fo
和old_column_bo
将是仅仅看起来像您试图访问的属性名称的字符串,但它们不会是实际的属性。试着用getattr
代替。
old_column_fo = getattr(fobo, fobo_names[x])
old_column_bo = getattr(fobo, fobo_names[x + (num_cols / 2)])
fobo = fobo.withColumn(new_name, old_column_fo == old_column_bo)