左外 在 pyspark 中加入并选择存在于左侧表中的列



我必须编写一个pyspark连接查询。我的要求是: 我只需要选择仅存在于左侧表中的记录。

对此的SQL解决方案是:

select Left.* 
FROM LEFT LEFT_OUTER_JOIN RIGHT
where RIGHT.column1 is NULL and Right.column2 is NULL

对我来说,挑战是,这两个表是数据帧。我正在运行时创建它们。所以我不知道正确的数据帧列详细信息或其编号。我必须在右侧数据帧的每一列上运行此"是否为空"检查。

我需要您的帮助来解决此问题,任何类型的运行时功能都会有所帮助。

到目前为止,我拥有的代码 -

from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import sha2,udf
from pyspark.sql.types import StructType
import csv
import ConfigParser
import collections
import hashlib
import sys
import pandas as pd
import datetime
from datetime import datetime,timedelta
from pyspark.sql.utils import AnalysisException
from pyspark.sql import Row
emp = [('Ram',25,12,1),('Jalfaizy',22,13,2),('saurabh',20,14,3),('Bala',26,15,4)]
rddemp = sc.parallelize(emp)
emp1 = rddemp.map(lambda x: Row(name=x[0], dept=int(x[1]),col=x[2], ign=x[2]))
empDF = sqlContext.createDataFrame(emp1)
dept = [('Ram',25,12,1),('Jalfaizy',22,16,3),('Kukarm',50,17,4)]
rdddept = sc.parallelize(dept)
dept1 = rdddept.map(lambda x: Row(name=x[0], dept=int(x[1]), col=x[2], ign=x[2]))
deptDF = sqlContext.createDataFrame(dept1)
empDF1=empDF.drop("ign")
deptDF1=deptDF.drop("ign")
make_sha = udf(lambda row: hashlib.pbkdf2_hmac('sha512', str(row), b'salt', 100000))
src_sha = empDF1.withColumn("sha512", make_sha(struct([empDF1[x] for x in empDF1.columns])))
tgt_sha = deptDF1.withColumn("sha512", make_sha(struct([deptDF1[x] for x in deptDF1.columns])))
tblPrimaryKeyList="dept|name".split('|')
stgActiveDF = src_sha.alias('STG').join(tgt_sha.alias('TGT'), "sha512",'left_outer').where("TGT.name").isNull())
.select("STG.*").drop("sha512").dropDuplicates()

问题区域,我需要帮助的地方如下。我必须用"TGT.columns"之类的东西替换 TGT.name:

stgActiveDF = src_sha.alias('STG').join(tgt_sha.alias('TGT'),"sha512",'left_outer').where(col("TGT.name").isNull()).select("STG.*").drop("sha512").dropDuplicates()

提前谢谢。

一种幼稚的方法可能是使用基本的字符串组合将查询拼凑在一起:

where_clause = f' and '.join([f'TGT.{col}' for col in tgt_sha.columns])
joined_df = df1.join(df2,"join_col",'left_outer').where(where_clause)

最新更新