我正在尝试将CosmosDB API for MongoDB的流式数据读/写到databricks pyspark中,并获取错误java.lang.UnsupportedOperationException:数据源MongoDB不支持微批处理
请帮助任何人如何在pyspark中实现数据流。
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.streaming import *
from pyspark.sql.types import StringType,BooleanType,DateType,StructType,LongType,IntegerType
spark = SparkSession.
builder.
appName("streamingExampleRead").
config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector:10.0.0').
getOrCreate()
sourceConnectionString = <primary connection string of cosmosDB API for MongoDB isntance>
sourceDb = <your database name>
sourceCollection = <yourcollection name>
dataStreamRead=(
spark.readStream.format("mongodb")
.option('spark.mongodb.connection.uri', sourceConnectionString)
.option('spark.mongodb.database', sourceDb)
.option('spark.mongodb.collection', sourceCollection)
.option('spark.mongodb.change.stream.publish.full.document.only','true')
.option("forceDeleteTempCheckpointLocation", "true")
.load()
)
display(dataStreamRead)
query2=(dataStreamRead.writeStream
.outputMode("append")
.option("forceDeleteTempCheckpointLocation", "true")
.format("console")
.trigger(processingTime='1 seconds')
.start().awaitTermination());
Getting following error:
java.lang.UnsupportedOperationException: Data source mongodb does not support microbatch processing.
at org.apache.spark.sql.errors.QueryExecutionErrors$.microBatchUnsupportedByDataSourceError(QueryExecutionErrors.scala:1579)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:123)
Data source mongodb does not support microbatch processing.
=== Streaming Query ===
Identifier: [id = 78cfcef1-19de-40f4-86fc-847109263ee9, runId = d2212e1f-5247-4cd2-9c8c-3cc937e2c7c5]
Current Committed Offsets: {}
Current Available Offsets: {}
Current State: INITIALIZING
Thread State: RUNNABLE```
尝试使用trigger(continuous="1 second")
而不是trigger(processingTime='1 seconds')
。