如何使用MongoDB API将Azure Databricks连接到Cosmos DB



我已经使用MongoDB API创建了一个azure CosmosDB帐户。我需要将CosmosDB(MongoDB API(连接到Azure Databricks集群,以便从cosmos读取和写入数据。

如何将Azure Databricks集群连接到CosmosDB帐户?

以下是我使用Azure Databricks(5.2 ML Beta(包括Apache Spark 2.4.0、Scala 2.11(的MongoDB API和MongoDB连接器:org.MongoDB.Spark:mongo-Spark-connector_2.11:2.4.0连接到CosmosDB数据库时使用的pyspark代码):

from pyspark.sql import SparkSession
my_spark = SparkSession 
.builder 
.appName("myApp") 
.getOrCreate()
df = my_spark.read.format("com.mongodb.spark.sql.DefaultSource") 
.option("uri", CONNECTION_STRING) 
.load()

使用看起来像这样的CONNECTION_STRING:"mongodb://USERNAME:PASSWORD@testgp.documents.azure.com:10255/DATABASE_NAME。COLLECTION_NAME?ssl=真&replicaSet=globaldb">

我尝试了很多不同的其他选项(添加数据库和集合名称作为SparkSession的选项或配置(,但都没有成功。告诉我它是否适合你。。。

添加org.mongodb.spark:mongo-spark-connector_2.11:2.4.0包后,这对我有效:

import json
query = {
'$limit': 100,
}
query_config = {
'uri': 'myConnectionString'
'database': 'myDatabase',
'collection': 'myCollection',
'pipeline': json.dumps(query),
}
df = spark.read.format("com.mongodb.spark.sql") 
.options(**query_config) 
.load()

然而,我确实在一些集合中遇到了这个错误:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, 10.139.64.6, executor 0): com.mongodb.MongoInternalException: The reply message length 10168676 is less than the maximum message length 4194304

以与我自己的问题相同的方式回答。

使用MAVEN作为源,我使用路径将正确的库安装到集群中

org.mongodb.spark:mongo-spark-connector_2.11:2.4.0

Spark 2.4

我使用的代码示例如下(对于那些想尝试的人(:

# Read Configuration
readConfig = {
"URI": "<URI>",
"Database": "<database>",
"Collection": "<collection>",
"ReadingBatchSize" : "<batchSize>"
}

pipelineAccounts = "{'$sort' : {'account_contact': 1}}"
# Connect via azure-cosmosdb-spark to create Spark DataFrame 
accountsTest = (spark.read.
format("com.mongodb.spark.sql").
options(**readConfig).
option("pipeline", pipelineAccounts).
load())
accountsTest.select("account_id").show()

最新更新