java.lang.UnsupportedOperationException:数据源mongodb不支持微批量处理



我正在尝试将CosmosDB API for MongoDB的流式数据读/写到databricks pyspark中,并获取错误java.lang.UnsupportedOperationException:数据源MongoDB不支持微批处理
请帮助任何人如何在pyspark中实现数据流。

from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.streaming import *
from pyspark.sql.types import StringType,BooleanType,DateType,StructType,LongType,IntegerType 

spark = SparkSession.
builder.
appName("streamingExampleRead").
config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector:10.0.0').
getOrCreate()

sourceConnectionString = <primary connection string of cosmosDB API for MongoDB isntance>
sourceDb = <your database name>
sourceCollection =  <yourcollection name>       

dataStreamRead=(
spark.readStream.format("mongodb")
.option('spark.mongodb.connection.uri', sourceConnectionString)
.option('spark.mongodb.database', sourceDb) 
.option('spark.mongodb.collection', sourceCollection) 
.option('spark.mongodb.change.stream.publish.full.document.only','true') 
.option("forceDeleteTempCheckpointLocation", "true")                             
.load()
)


display(dataStreamRead)


query2=(dataStreamRead.writeStream 
.outputMode("append") 
.option("forceDeleteTempCheckpointLocation", "true") 
.format("console") 
.trigger(processingTime='1 seconds')
.start().awaitTermination());

Getting following error: 
java.lang.UnsupportedOperationException: Data source mongodb does not support microbatch processing.
at org.apache.spark.sql.errors.QueryExecutionErrors$.microBatchUnsupportedByDataSourceError(QueryExecutionErrors.scala:1579)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:123)

Data source mongodb does not support microbatch processing.
=== Streaming Query ===
Identifier: [id = 78cfcef1-19de-40f4-86fc-847109263ee9, runId = d2212e1f-5247-4cd2-9c8c-3cc937e2c7c5]
Current Committed Offsets: {}
Current Available Offsets: {}

Current State: INITIALIZING
Thread State: RUNNABLE```

尝试使用trigger(continuous="1 second")而不是trigger(processingTime='1 seconds')

最新更新