我正在编写PySpark代码,其中我有10个查找表和每个查找表,我定义了一个结构体,然后是一个模式。然后,我为每个查找表创建一个DF,最后使用它们与一个主表连接。我知道如何编码,但请有人指导我如何构建代码?我是Python新手,所以不知道如何在PySpark中组织我的代码。也许可以给我分享一些样例生产PySpark代码?谢谢!
为了回收和管理代码,您可以在不同的类中定义不同的部分。我的方法是创建一个ini
或yaml
作为参考。此外,您可以在测试和生产环境中处理输入变量。
主类:
from sparkSchema import SparkSchema
from configparser import ConfigParser
if __name__ == "__main__":
config_path = './config.ini'
config = ConfigParser()
config.optionxform = str
config.read(config_path)
#initialize your app, create session and context, etc.
#also you can handle this part with some class.
streamDFWithSchema = SparkSchema(streamDF, config, 'Schema').getDFWithSchema()
# rest of the code
最好将每个输入变量(例如SparkSchema类中的Schema)定义为输入变量,您可以使用argpars
库。
SparkSchema类:
from pyspark.sql.functions import to_timestamp
class SparkSchema:
def __init__(self, DF, config, section):
self.DF =DF
self.config = config
self.section = section
def getDFWithSchema(self):
self.DF = self.DF
.selectExpr(
'CAST(split(KafkaValue,"'+self.config[self.section]['message.delimiter']+'")['
+self.config[self.section]['grouped.column.index']+
'] AS STRING) as '+self.config[self.section]['grouped.column.name']
,'CAST(split(KafkaValue,"'+self.config[self.section]['message.delimiter']+'")['
+self.config[self.section]['date.column.index']+
'] AS STRING) as '+self.config[self.section]['date.column.name'])
self.DF = self.DF
.withColumn('EventDate',
to_timestamp(self.config[self.section]['date.column.name']
, self.config[self.section]['date.column.format']))
self.DF.printSchema()
return self.DF
.ini
file:
.
.
.
[Schema]
message.delimiter=\|
grouped.column.name=EmployeeId
grouped.column.index=61
date.column.name=END_DATE
date.column.index=12
date.column.format=yyyyMMddHHmmss
[SchemaLocal]
message.delimiter=\|
grouped.column.name=EmployeeId
grouped.column.index=2
date.column.name=END_DATE
date.column.index=3
date.column.format=yyyy-MM-dd HHmmss
.
.
.
和你应该添加库configparser
作为--py-files
在你的spark-submit命令:
$ spark-submit --jars some.jar,jar.file
--py-files configparser.zip,argpars.zip
main_class.py