我有一个python脚本loader.py
,它由创建sparkSession
对象的主类组成,如下所示,并调用各种方法来执行不同的操作。从utils导入extract_kafka_data,do_some_transformation
def main():
try:
spark = SparkSession.builder.appName(config['kafka_transformations']).enableHiveSupport().getOrCreate()
kafka_df = extract_kafka_data(spark=spark, config=config, topic_name=topic_name)
do_some_transformation(kafka_df, spark)
except Exception as exc:
print(f'Failed with Exception:{exc}')
traceback.print_exc()
print('Stopping the application')
sys.exit(1)
if __name__ == '__main__':
main()
extract_kafka_data、do_some_transformation方法存在于不同的python脚本中:utils.py在我的utils.py文件中还有很多其他方法可以执行各种转换。下面是这个场景中需要解决的几个方法。
def extract_kafka_data(spark: SparkSession, config: dict, topic_name: str):
jass_config = config['jaas_config'] + " oauth.token.endpoint.uri=" + '"' + config['endpoint_uri'] + '"' + " oauth.client.id=" + '"' + config['client_id'] + '"' + " oauth.client.secret=" + '"' + config['client_secret'] + '" ;'
stream_df = spark.readStream
.format('kafka')
.option('kafka.bootstrap.servers', config['kafka_broker'])
.option('subscribe', topic_name)
.option('kafka.security.protocol', config['kafka_security_protocol'])
.option('kafka.sasl.mechanism', config['kafka_sasl_mechanism'])
.option('kafka.sasl.jaas.config', jass_config)
.option('kafka.sasl.login.callback.handler.class', config['kafka_sasl_login_callback_handler_class'])
.option('startingOffsets', 'earliest')
.option('fetchOffset.retryIntervalMs', config['kafka_fetch_offset_retry_intervalms'])
.option('fetchOffset.numRetries', config['retries'])
.option('failOnDataLoss', 'False')
.option('checkpointLocation', checkpoint_location)
.load()
.select(from_json(col('value').cast('string'), schema).alias("json_dta")).selectExpr('json_dta.*')
return stream_df
def do_some_transformation(spark: SparkSession, kafka_df: Dataframe):
kafka_df.writeStream
.format('kafka')
.foreachBatch(my_transformation_method)
.option('checkpointLocation', checkpoint_location)
.trigger(processingTime='10 minutes')
.start()
.awaitTermination()
def my_transformation_method(kafka_df: Dataframe, batch_id: int):
base_delta = DeltaTable.forPath(spark, config['delta_path'])
base_delta.alias("base")
.merge(source=kafka_df.alias("inc"), condition=build_update_condition(config['merge_keys'], config['inc_keys']))
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
我在这里面临的问题是使用方法:my_transformation_method
。方法内部:my_transformation_method
我正在执行kafka数据帧与delta表的合并。为了读取基表数据,我需要运行以下语句:base_delta=DeltaTable.forPath(spark,config['delta_path'](但这里的问题是do_some_transformation
方法中foreachBatch
调用的方法my_transformation_method
只能接收两个方法参数:1。数据帧2。batch_id按照spark流的语法。
我可以使spark会话对象全局化,但我不想这样做,因为这似乎不是标准的方式。当我从do_some_transformation
调用sparkSession对象spark
时,有什么方法可以让它对方法my_transformation_method
可用吗?非常感谢您的帮助。
DataFrame API ptovides可使用的sparkSession方法:
spark = kafka_df.sparkSession()