PySpark代码结构



我正在编写PySpark代码,其中我有10个查找表和每个查找表,我定义了一个结构体,然后是一个模式。然后,我为每个查找表创建一个DF,最后使用它们与一个主表连接。我知道如何编码,但请有人指导我如何构建代码?我是Python新手,所以不知道如何在PySpark中组织我的代码。也许可以给我分享一些样例生产PySpark代码?谢谢!

为了回收和管理代码,您可以在不同的类中定义不同的部分。我的方法是创建一个iniyaml作为参考。此外,您可以在测试和生产环境中处理输入变量。

例如:

主类:


from sparkSchema import SparkSchema
from configparser import ConfigParser 
if __name__ == "__main__":
    config_path = './config.ini'
    config = ConfigParser()
    config.optionxform = str
    config.read(config_path)
#initialize your app, create session and context, etc. 
#also you can handle this part with some class.
streamDFWithSchema  = SparkSchema(streamDF, config, 'Schema').getDFWithSchema()
# rest of the code

最好将每个输入变量(例如SparkSchema类中的Schema)定义为输入变量,您可以使用argpars库。

SparkSchema类:

from pyspark.sql.functions import to_timestamp
class SparkSchema:
    
    def __init__(self, DF, config, section):
        self.DF =DF
        self.config = config
        self.section = section

    def getDFWithSchema(self):
        
        self.DF  = self.DF 
      .selectExpr(
   'CAST(split(KafkaValue,"'+self.config[self.section]['message.delimiter']+'")['
   +self.config[self.section]['grouped.column.index']+
   '] AS STRING) as '+self.config[self.section]['grouped.column.name'] 
   ,'CAST(split(KafkaValue,"'+self.config[self.section]['message.delimiter']+'")['
   +self.config[self.section]['date.column.index']+
   '] AS STRING) as '+self.config[self.section]['date.column.name'])
        self.DF = self.DF
            .withColumn('EventDate',
            to_timestamp(self.config[self.section]['date.column.name']
            , self.config[self.section]['date.column.format']))
        self.DF.printSchema()
        return self.DF

.ini file:

.
.
.
[Schema]
message.delimiter=\|
grouped.column.name=EmployeeId
grouped.column.index=61
date.column.name=END_DATE
date.column.index=12
date.column.format=yyyyMMddHHmmss
[SchemaLocal]
message.delimiter=\|
grouped.column.name=EmployeeId
grouped.column.index=2
date.column.name=END_DATE
date.column.index=3
date.column.format=yyyy-MM-dd HHmmss
.
.
.

和你应该添加库configparser作为--py-files在你的spark-submit命令:

$ spark-submit --jars some.jar,jar.file
  --py-files configparser.zip,argpars.zip
  main_class.py

相关内容

  • 没有找到相关文章

最新更新