PySpark-将行与其他行关联以进行筛选



在PySpark中,如何将一行中的数据关联到另一行,然后基于此进行筛选?

简化示例,

我有一个pyspark数据帧,每天有9行数据,我有很多天的数据,在下面的例子中,我给出了2天,18行的数据。

我有一个加压容器,一天可以自动补充3次。首先,我想要数据有效的第一个实例的最终结束压力,其次是过滤该值高于阈值3000psi的所有行。在某些情况下,数据一天内有效两次,但我只想要与第一个有效实例对应的结束压力。

数据帧:

2022年9月3日,p_01_start,2600

2022年9月3日,p_01_end,3100

2022年9月3日,p_02_start,2700

2022年9月3日,p_02_end,2900

2022年9月3日,p_03_start,2700

2022年9月3日,p_03_end,3050

Sep_3_2022,p_01_有效性,错误

Sep_3_2022,p_02_有效性,真实

Sep_3_2022,p_03_有效性,真实

2022年9月4日,p_01_start,2600

2022年9月4日,p_01_end,3100

2022年9月4日,p_02_start,2700

2022年9月4日,p_02_end,3050

2022年9月4日,p_03_start,2700

2022年9月4日,p_03_end,3050

2022年9月4日,p_01_有效性,真实

Sep_4_2022,p_02_有效性,真实

Sep_4_2022,p_03_有效性,错误


预期结果1(当天的第一个有效数据(:

2022年9月3日,p_02_start,2700

2022年9月3日,p_02_end,2900

Sep_3_2022,p_02_有效性,真实

2022年9月4日,p_01_start,2600

2022年9月4日,p_01_end,3100

2022年9月4日,p_01_有效性,真实


预期结果2(当天第一个有效数据,高于阈值,仅限终点压力(:

2022年9月4日,p_01_end,3100


我曾考虑过使用substring((来确定字符串的01、02部分,以使它们相互关联,但我不知道如何进一步。

感谢您的任何建议

解决问题的一个好方法是使用Fugue库。我首先将您的数据转换为Pandas DataFrame来创建解决方案,因为它比pyspark数据框架更简单、更快。

https://fugue-tutorials.readthedocs.io/

import pandas as pd
from fugue import transform
df = pd.DataFrame(
[
("Sep_3_2022"," p_01_start","2600"),
("Sep_3_2022"," p_01_end","3100"),
("Sep_3_2022"," p_02_start","2700"),
("Sep_3_2022"," p_02_end","2900"),
("Sep_3_2022"," p_03_start","2700"),
("Sep_3_2022"," p_03_end","3050"),
("Sep_3_2022"," p_01_validity","False"),
("Sep_3_2022"," p_02_validity","True"),
("Sep_3_2022"," p_03_validity","True"),
], columns = ["index", "txt", "result"])
def logic(df: pd.DataFrame) -> pd.DataFrame:
x = df.loc[(df['result'] == "True")].drop_duplicates(subset=['result'],keep='first')
search = x['txt'].str[:5].to_string()[-4:]
return df[df['txt'].str.contains(search)]

第一个函数过滤DataFrame,使其仅包括有效样本的第一个实例。

def logicII(df: pd.DataFrame) -> pd.DataFrame:
x = df.loc[(df['result'] == "True")].drop_duplicates(subset=['result'],keep='first')
search = x['txt'].str[:5].to_string()[-4:]
valid = df[df['txt'].str.contains(search+"_end")]
valid['result'] = valid['result'].apply(pd.to_numeric)
return valid[valid['result'] > 3000]

第二个函数的作用与第一个函数相同,但我们添加了一个条件,该条件将删除结果低于3000psi的任何行。

设置好以上功能后,我们将使用赋格的Transform来获得所需的数据。

  1. df是包含我们的数据的数据帧
  2. 逻辑是我们创造的功能
  3. Schema是您希望结束项为的数据类型
  4. 分区将起到分组机制和核心分布的作用
transform(df, 
logic,
schema="index:str, txt:str, result:str",
partition={"by":"index"}
)
transform(df, 
logicII,
schema="index:str, txt:str, result:int",
partition={"by":"index"}
)

以上仅在Pandas上运行。下面是一个将在pyspark上运行的示例,它将生成一个pyspark DataFrame,并将基于transform((的partition参数进行分区。

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sdf = transform(df,
logic,
schema="index:str, txt:str, result:int",
partition={"by":"index"},
engine=spark)
sdf_II = transform(df,
logicII,
schema="index:str, txt:str, result:int",
partition={"by":"index"},
engine=spark)

最新更新