从模块导入 udf 时出错 - > 只能在驱动程序上创建和访问 SparkContext



我的sparkcontext有问题:这是我的项目结构:

dependencies | 
-------------|spark.py
etl.py
shared       |
-------------|tools.py

dependencies.spark.py中,我有一个创建火花会话的函数:

# dependencies.spark.py
from pyspark.sql import SparkSession
def get_or_create_session(app_name, master="local[*]"):
spark_builder = SparkSession.builder.master(master).appName(app_name)
session = spark_builder.getOrCreate()
return session

etl.py中,我有我的main(),在Panda UDF的帮助下,我导入了shared.tools.py中定义的函数。

# etl.py
from dependencies.spark import get_or_create_session
from shared.tools import cleanup_pob_column
def main():
spark = get_or_create_session(app_name="my_app"))
data = get_data(input_file)
transformed_data = transform_data(data)
transformed_data.printSchema()
tranformed_data.show(truncate=False)
def get_data(input_file):
... 
return data
def transform_data(data):
return (
data
.transform(cleanup_pob_column)
)
if __name__ == "__main__":
main()
# shared.tools.py
def extract_iso(x):
...from x to iso_string
return iso_string
@F.pandas_udf("string")
def cleanup_geo_column_udf(col: pd.Series) -> pd.Series:
return col.apply(lambda x: extract_iso(x=x))
def cleanup_pob_column(df):
return df.withColumn("pob_cln", cleanup_geo_column_udf(F.col("place_of_birth")))

现在我陷入了一个我不理解的错误循环。

如果在shared.tools上,我没有得到会话(意味着如果我省略了下面的代码(:

from dependencies.spark import get_or_create_session
spark = get_or_create_session(app_name="my_app))

我得到了这个错误(这似乎是由上下文为None的事实引起的(:

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/gsimeone/PycharmProjects/assignment/shared/geographic_tools.py", line 39, in <module>
def cleanup_geo_column_udf(col: pd.Series) -> pd.Series:
File "/Users/gsimeone/PycharmProjects/sayaritest/sayari_test/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py", line 450, in _create_pandas_udf
return _create_udf(f, returnType, evalType)
File "/Users/gsimeone/PycharmProjects/sayaritest/sayari_test/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 74, in _create_udf
return udf_obj._wrapped()
File "/Users/gsimeone/PycharmProjects/sayaritest/sayari_test/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 286, in _wrapped
wrapper.returnType = self.returnType  # type: ignore[attr-defined]
File "/Users/gsimeone/PycharmProjects/sayaritest/sayari_test/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 134, in returnType
self._returnType_placeholder = _parse_datatype_string(self._returnType)
File "/Users/gsimeone/PycharmProjects/sayaritest/sayari_test/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1010, in _parse_datatype_string
assert sc is not None
AssertionError

但如果包含上面的代码片段,我会得到另一个错误:

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/gsimeone/PycharmProjects/assignment/shared/geographic_tools.py", line 15, in <module>
spark = get_or_create_session(app_name=config.get("app_name"))
File "/Users/gsimeone/PycharmProjects/assignment/dependencies/spark.py", line 22, in get_or_create_session
session = spark_builder.getOrCreate()
File "/Users/gsimeone/PycharmProjects/sayaritest/sayari_test/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/session.py", line 277, in getOrCreate
return session
File "/Users/gsimeone/PycharmProjects/sayaritest/sayari_test/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/context.py", line 485, in getOrCreate
return SparkContext._active_spark_context
File "/Users/gsimeone/PycharmProjects/sayaritest/sayari_test/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/context.py", line 186, in __init__
SparkContext._assert_on_driver()
File "/Users/gsimeone/PycharmProjects/sayaritest/sayari_test/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/context.py", line 1533, in _assert_on_driver
raise RuntimeError("SparkContext should only be created and accessed on the driver.")
RuntimeError: SparkContext should only be created and accessed on the driver.

帮助?

更新:

如果我取shared.tools.py的全部内容并粘贴到etl.py中。该应用程序运行时没有问题。

我也遇到过类似的问题,通过将UDF函数的返回数据类型从"人类可读字符串"更改为Spark内置数据类型,解决了这个问题。建议是修改这一具体方面。

@F.pandas_udf("string")

from pyspark.sql.types import StringType
@F.pandas_udf(StringType())

最新更新