PySpark:执行相同的操作,多个列



我是Spark/PySpark的新手,一直在努力保持以下"最佳实践"的一致性,但不幸的是,有时我会回到下面这样的破解解决方案。

我正在开发一个推特数据集,其中包含基础推文、转发内容和引用内容。如果tweet不是retweeted_status或quoted_status,则根据数据类型,它的每个子功能都设置为None或空列表或其他变体。

我想做的是为这些功能中的每一个创建新的列,如果不是转发或引用状态,则使用基本功能中的内容,或者使用转发的内容,或使用基本功能+引用的内容。

def _shared_content(feature, df):
return df.withColumn(f'tweet_{feature.replace("entities.", "")}',
when((col(f'`retweeted_status.{feature}`').isNotNull()),
df[f'`retweeted_status.{feature}`'])
.when((col(f'`quoted_status.{feature}`').isNotNull()),
concat(
df[f'`{feature}`'],
lit(" "),
df[f'`quoted_status.{feature}`']))
.otherwise(df[f'`{feature}`']))
common_features = [
'text',
'entities.hashtags',
'entities.media',
'entities.urls',
]
for f in common_features:
df = df.transform(lambda df, f=f: _shared_content(f, df))

正如你所看到的,这有点混乱,所以我写了一些伪代码来提高可读性。我在这里执行以下功能:

  • 对于常见功能中的每个功能:
    • 如果转发状态。[FFEATURE]不是None,请将new col tweet_[FFEATTURE]设置为retweet_status。[功能]
    • 如果引用状态。[FFEATURE]不是None,将new col tweet_[FFEATTURE]设置为[FFEATULE]+"+quoted_status。[功能]
    • 否则,将tweet_[FFEATURE]设置为基础[FFEATTURE]

这个解决方案目前有效,但感觉非常粗糙,坦率地说难以辨认。我想知道是否有一种更类似Spark的方法,可以消除很多冗余代码?我试图将列表中的某种映射应用到函数中,但有点迷失了方向。

作为最后的澄清,我正在执行相同的转换,但唯一改变的是我正在开发的功能

编辑:我用使这个稍微更清晰

def _shared_content(f, df):
new_col = f'tweet_{f.replace("entities.", "")}'
retweet_cond = ((
col(f'`retweeted_status.{f}`').isNotNull()),
df[f'`retweeted_status.{f}`'])
quoted_cond = ((
col(f'`quoted_status.{f}`').isNotNull()),
concat(df[f'`{f}`'], lit(" "), df[f'`quoted_status.{f}`']))
return df.withColumn(
new_col,
when(*retweet_cond)
.when(*quoted_cond)
.otherwise(df[f'`{f}`'])
)

我会写这样的东西:

def _shared_content(feature, df):
feat_col = col(feature)
retweet = col(f"retweeted_status.{feature}")
quoted = col(f"quoted_status.{feature}")
new_feat_name = f'tweet_{feature.replace("entities.", "")}'
return df.withColumn(
new_feat_name,
(
when(retweet.isNotNull(), retweet)
.when(quoted.isNotNull(), concat(feat_col, lit(" "), quoted))
.otherwise(feat_col)
),
)

使用Pyspark(或其他任何东西(编写代码时,我通常遵循的一些原则:

  • 避免代码重复(在多个位置重复了列名(
  • 在有助于可读性的情况下,用变量名替换原始值(如新功能名称(
  • 使用Spark列对象,而不是不必要地使用df["<column name>"]引用相同的DataFrame

PS:我不知道你为什么使用反引号。

最新更新