如何将SparkSession对象传递给Kafka-Spark流的foreachBatch方法?



我有一个python脚本loader.py,它由创建sparkSession对象的主类组成,如下所示,并调用各种方法来执行不同的操作。从utils导入extract_kafka_data,do_some_transformation

def main():
try:
spark = SparkSession.builder.appName(config['kafka_transformations']).enableHiveSupport().getOrCreate()
kafka_df = extract_kafka_data(spark=spark, config=config, topic_name=topic_name)
do_some_transformation(kafka_df, spark)
except Exception as exc:
print(f'Failed with Exception:{exc}')
traceback.print_exc()
print('Stopping the application')
sys.exit(1)

if __name__ == '__main__':
main()

extract_kafka_data、do_some_transformation方法存在于不同的python脚本中:utils.py在我的utils.py文件中还有很多其他方法可以执行各种转换。下面是这个场景中需要解决的几个方法。

def extract_kafka_data(spark: SparkSession, config: dict, topic_name: str):
jass_config = config['jaas_config'] + " oauth.token.endpoint.uri=" + '"' + config['endpoint_uri'] + '"' + " oauth.client.id=" + '"' + config['client_id'] + '"' + " oauth.client.secret=" + '"' + config['client_secret'] + '" ;'
stream_df = spark.readStream 
.format('kafka') 
.option('kafka.bootstrap.servers', config['kafka_broker']) 
.option('subscribe', topic_name) 
.option('kafka.security.protocol', config['kafka_security_protocol']) 
.option('kafka.sasl.mechanism', config['kafka_sasl_mechanism']) 
.option('kafka.sasl.jaas.config', jass_config) 
.option('kafka.sasl.login.callback.handler.class', config['kafka_sasl_login_callback_handler_class']) 
.option('startingOffsets', 'earliest') 
.option('fetchOffset.retryIntervalMs', config['kafka_fetch_offset_retry_intervalms']) 
.option('fetchOffset.numRetries', config['retries']) 
.option('failOnDataLoss', 'False') 
.option('checkpointLocation', checkpoint_location) 
.load() 
.select(from_json(col('value').cast('string'), schema).alias("json_dta")).selectExpr('json_dta.*')
return stream_df
def do_some_transformation(spark: SparkSession, kafka_df: Dataframe):
kafka_df.writeStream 
.format('kafka') 
.foreachBatch(my_transformation_method) 
.option('checkpointLocation', checkpoint_location) 
.trigger(processingTime='10 minutes') 
.start()
.awaitTermination()
def my_transformation_method(kafka_df: Dataframe, batch_id: int):
base_delta = DeltaTable.forPath(spark, config['delta_path'])
base_delta.alias("base") 
.merge(source=kafka_df.alias("inc"), condition=build_update_condition(config['merge_keys'], config['inc_keys'])) 
.whenMatchedUpdateAll() 
.whenNotMatchedInsertAll() 
.execute()

我在这里面临的问题是使用方法:my_transformation_method。方法内部:my_transformation_method我正在执行kafka数据帧与delta表的合并。为了读取基表数据,我需要运行以下语句:base_delta=DeltaTable.forPath(spark,config['delta_path'](但这里的问题是do_some_transformation方法中foreachBatch调用的方法my_transformation_method只能接收两个方法参数:1。数据帧2。batch_id按照spark流的语法。

我可以使spark会话对象全局化,但我不想这样做,因为这似乎不是标准的方式。当我从do_some_transformation调用sparkSession对象spark时,有什么方法可以让它对方法my_transformation_method可用吗?非常感谢您的帮助。

DataFrame API ptovides可使用的sparkSession方法:

spark = kafka_df.sparkSession()

最新更新