如何在单个pyspark会话中使用多个输入流和多个输出流



我使用的是spark v2.4.0,我正在从kafka读取两个独立的流,并对其中的每一个进行不同的转换,现在我想持久化这两个流数据帧,但其中只有一个正在持久化,另一个似乎无法同时工作,我将非常感谢提供的任何帮助。

下面是我的代码,

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import from_json, col, to_date
# Created a SparkSession here, as it is an entry point to underlying Spark functionality
spark = SparkSession.builder 
.master('spark://yash-tech:7077') 
.appName('Streaming') 
.getOrCreate()
# Defined a schema for our data being streamed from kafka
schema = StructType([
StructField("messageId", StringType(), True),
StructField("type", StringType(), True),
StructField("userId", StringType(), True),
StructField('data', StringType(), True),
StructField("timestamp", StringType(), True),
])
profileDF = spark 
.readStream 
.format("kafka") 
.option("kafka.bootstrap.servers", "localhost:9092") 
.option("subscribe", 'test') 
.option("startingOffsets", "latest") 
.load() 
.select(from_json(col("value").cast("string"), schema).alias("value"))
# Using readStream on SparkSession to load a streaming Dataset from Kafka
clickStreamDF = spark 
.readStream 
.format("kafka") 
.option("kafka.bootstrap.servers", "localhost:9092") 
.option("subscribe", 'test_new') 
.option("startingOffsets", "latest") 
.load() 
.select(from_json(col("value").cast("string"), schema).alias("value"))
# Selecting every column from the DF
clickStreamDFToPersist = clickStreamDF.select("value.*")
profileDFToPersist = profileDF.select("value.*")
# Added a new column containing date(yyyy-MM-dd) parsed from timestamp column for day wise partitioning
clickStreamDFToPersist = clickStreamDFToPersist.withColumn(
"date", to_date(col("timestamp"), "yyyy-MM-dd"))
# Writing data on local disk as json files, partitioned by userId.
clickStream_writing_sink = clickStreamDFToPersist.repartition(1) 
.writeStream 
.partitionBy('userId', 'date') 
.format("json") 
.option("path", "/home/spark/data/") 
.outputMode("append") 
.option("checkpointLocation", "/home/spark/event_checkpoint/") 
.trigger(processingTime='20 seconds') 
.start()

profile_writing_sink = profileDFToPersist.repartition(1) 
.writeStream 
.partitionBy('userId') 
.format("json") 
.option("path", "/home/spark/data/") 
.outputMode("append") 
.option("checkpointLocation", "/home/spark/profile_checkpoint/") 
.trigger(processingTime='30 seconds') 
.start()
clickStream_writing_sink.awaitTermination()
profile_writing_sink.awaitTermination()

注意

  1. 我希望两个writeStreams在同一路径上写入
  2. 如果我在两个writeStreams中都给出了不同的数据路径,那么代码似乎可以工作,但数据被持久化在不同的位置上,有没有一种方法可以将两个流持久化在同一位置上,或者如果我可以同时进行这两个转换,并仅使用单个流持久化数据,因为两者的位置都相同
  3. 在一个流中,我只使用userId进行分区,而在另一个流则使用userId+date进行分区

Hi,因为我们为sink目录位置提供了相同的路径,所以输出被重写。

您无法更改";部分";前缀,同时使用任何标准输出格式。

如果您可以覆盖recordWriter((,这是可能的。

最新更新