Pyspark和Joblib给了我模棱两可的结果



我正在尝试从teradata获取数据--

select ... from table1_1
union all
select .. from table_2
union all
select ... from table_3

注意:一个或多个select ..可能会失败,这不应导致整个联合失败

from .base import Base
from joblib import Parallel, delayed
import re
import pandas as pd
class TeradataWithSpark(Base):
def __init__(self, spark, host=None, port=None, database=None, username=None, password=None):
super().__init__(spark, host, port, database, username, password)
self._reader = self._spark.read.format("jdbc") 
.option("url", f'jdbc:teradata://{self._host}/Database={self._database},LOGMECH=LDAP') 
.option("user", self._username) 
.option("password", self._password) 
.option("driver", "com.teradata.jdbc.TeraDriver")
def run_query(self, query, return_pandasDF=True):
spark_df = self._reader.option('dbtable', f"({query}) as tbl").load()
if return_pandasDF:
return spark_df.toPandas()
else:
return spark_df
def run_queries_and_union_all(self, queries, return_pandasDF=True):
def run(query):
try:
return self._reader.option('dbtable', f"({query}) as tbl").load().toPandas()
except Exception as e:
return None

dfs = Parallel(n_jobs=10, prefer='threads')(delayed(run)(q) for q in queries)
concat_df = pd.concat(dfs).reset_index(drop=True)
if return_pandasDF:
return concat_df
else:
return self._spark.createDataFrame(concat_df)
def split_query_and_run_individually(self, query, separator='union all', return_pandasDF=True):
queries = re.split(separator, query, flags=re.IGNORECASE)
return self.run_queries_and_union_all(queries, return_pandasDF)

如您所见,split_query_and_run_individually方法基于union all拆分查询,然后在并行线程n_jobs=10中运行所有子查询。

但我面临的问题是数据像一样被破坏了

n_jobs = 1
src_tbl  total_count   data_date
0     dsl_dim_mdm_....................           61  2022-03-17
1     dsl_agg_call....................      3992202  2022-03-27
2      dsl_call_ac....................       924719  2022-03-27
3      dsl_dim_acc....................         4762  2022-03-31
4                 ....................         6821  2022-03-31
5     dsl_dim_geo_....................      8610038  2022-04-05
6              dsl....................        67116  2022-03-31
7           dsl_rl....................      2087669  2022-04-06
8             dsl_....................          154  2022-04-01
9             dsl_....................        85630  2022-03-27
10  dsl_selling_da....................           53  2021-03-03
11  dsl_speaker_ev....................        17765  2022-03-31
12       dsl_speak....................        26269  2022-08-24
13   dsl_speaker_e....................         4202  2022-04-05
14              ds....................          268  2022-03-31
15      dsl_rltn_r....................       255794  2022-03-18
16     dsl_rltn_nr....................        12088  2022-03-18
17        dsl_rapp....................        81182  2022-01-01
18   dsl_dim_physi....................       109299  2022-03-31
19             dsl....................         4265  2022-02-01
20         dsl_fac....................       117978  2022-04-03
21      dsl_coachi....................          242  2022-03-31
22   dsl_speaker_e....................        16653  2022-03-31
23     dsl_dim_cal....................        17817  2099-12-31
24    dsl_rltn_nrt....................         3304  2022-02-01
Time took: 3.4742537260055544 minutes
-----------
n_jobs=10
src_tbl  total_count   data_date
0             dsl_sel................        85630  2022-03-27
1             dsl_sel................        85630  2022-03-27
2             dsl_sel................        85630  2022-03-27
3             dsl_sel................        85630  2022-03-27
4             dsl_sel................        85630  2022-03-27
5             dsl_sel................        85630  2022-03-27
6             dsl_sel................        85630  2022-03-27
7             dsl_sel................        85630  2022-03-27
8             dsl_sel................        85630  2022-03-27
9             dsl_sel................        85630  2022-03-27
10  dsl_speaker_event................        17765  2022-03-31
11   dsl_speaker_even................         4202  2022-04-05
12   dsl_speaker_even................         4202  2022-04-05
13              dsl_s................          268  2022-03-31
14        dsl_rapper_................        81182  2022-01-01
15        dsl_rapper_................        81182  2022-01-01
16     dsl_rltn_nrtl_................        12088  2022-03-18
17        dsl_rapper_................        81182  2022-01-01
18   dsl_dim_physicia................       109299  2022-03-31
19             dsl_cu................         4265  2022-02-01
20         dsl_fact_f................       117978  2022-04-03
21      dsl_coaching_................          242  2022-03-31
22   dsl_speaker_even................        16653  2022-03-31
23     dsl_dim_call_c................        17817  2099-12-31
24    dsl_rltn_nrtl_r................         3304  2022-02-01
Time took: 1.8048373858133953 minutes
-----------
n_jobs=-1
src_tbl  total_count   data_date
0   dsl_dim_acc....................         4762  2022-03-31
1   dsl_dim_acc....................         4762  2022-03-31
2   dsl_dim_acc....................         4762  2022-03-31
3   dsl_dim_acc....................         4762  2022-03-31
4   dsl_dim_acc....................         4762  2022-03-31
5   dsl_dim_acc....................         4762  2022-03-31
6   dsl_dim_acc....................         4762  2022-03-31
7   dsl_dim_acc....................         4762  2022-03-31
8   dsl_dim_acc....................         4762  2022-03-31
9   dsl_dim_acc....................         4762  2022-03-31
10  dsl_dim_acc....................         4762  2022-03-31
11  dsl_dim_acc....................         4762  2022-03-31
12  dsl_dim_acc....................         4762  2022-03-31
13  dsl_dim_acc....................         4762  2022-03-31
14  dsl_dim_acc....................         4762  2022-03-31
15  dsl_dim_acc....................         4762  2022-03-31
16  dsl_dim_acc....................         4762  2022-03-31
17  dsl_dim_acc....................         4762  2022-03-31
18  dsl_dim_acc....................         4762  2022-03-31
19  dsl_dim_acc....................         4762  2022-03-31
20  dsl_dim_acc....................         4762  2022-03-31
21  dsl_dim_acc....................         4762  2022-03-31
22  dsl_dim_acc....................         4762  2022-03-31
23  dsl_dim_acc....................         4762  2022-03-31
24  dsl_dim_acc....................         4762  2022-03-31
25  dsl_dim_acc....................         4762  2022-03-31
-----------

正如你所看到的,随着线程数量的增加,结果变得模糊不清。现在的情况是,每个查询的结果相互重叠。

我还用teradatasql库实现了同一个类,它在n_jobs=-1时工作得很好。我认为self._reader.option('dbtable', f"({query}) as tbl").load()在线程中搞砸了。我尝试了ThreadpoolExecutor,但结果相似。有人知道如何解决这个问题吗?版本

Python 3.6.8
Spark 2.4.0-cdh6.3.4

感谢@pltc,这里有一个解决方案。尽管它与具有多线程的teradatasql库相比非常慢,尽管上的FAIR调度器

from .base import Base
import re
import pandas as pd
from pyspark.sql import DataFrame
from functools import reduce
class TeradataWithSpark(Base):
def __init__(self, spark, host=None, port=None, database=None, username=None, password=None):
super().__init__(spark, host, port, database, username, password)
self._reader = self._spark.read.format("jdbc") 
.option("url", f'jdbc:teradata://{self._host}/Database={self._database},LOGMECH=LDAP') 
.option("user", self._username) 
.option("password", self._password) 
.option("driver", "com.teradata.jdbc.TeraDriver")
def run_query(self, query, return_pandasDF=True):
# spark_df = self._reader.option('dbtable', f"({query}) as tbl").load()
# if return_pandasDF:
#     return spark_df.toPandas()
# else:
#     return spark_df
return self.split_query_and_run_individually(query, r'union all', return_pandasDF)
def run_queries_and_union_all(self, queries, return_pandasDF=True):     
dataframes = []
for each_query in queries:
try:
spark_df = self._reader.option('dbtable', f"({each_query}) as tbl").load()
dataframes.append(spark_df)
except Exception as e:
# simply ignoring the query
print(f'Error while reading the query {each_query}')

concat_sparkDf = reduce(DataFrame.unionAll, dataframes)
if return_pandasDF:
return concat_sparkDf.toPandas()
else:
return concat_sparkDf
def split_query_and_run_individually(self, query, separator=r'union all', return_pandasDF=True):
queries = re.split(separator, query, flags=re.IGNORECASE)
return self.run_queries_and_union_all(queries, return_pandasDF)

最新更新