如何正确地在宇宙中插入数据与pyspark数据框架?



我正在尝试插入一个数据框架pyspark到cosmosdb…我下载了com. microsoft.azure_azure_cosmosdb -spark_2.3.0_2.11- 1.3.3.3 jar"如在文档中所示:https://github.com/Azure/azure-cosmosdb-spark/blob/2.4/README.md然后我像这样配置我的sparkSession:

spark = SparkSession.builder 
.config("spark.driver.extraClassPath", "C:/Users/Cris243/.ivy2/jars/")
.config("spark.jars", "com.microsoft.azure_azure-cosmosdb-spark_2.3.0_2.11-1.3.3.jar") 
.appName('Spark') 
.getOrCreate()

但是当我试图用我的配置写cosmos时,我得到一个"java.lang。ClassNotFoundException: com.microsoft.azure.cosmosdb.spark.DefaultSource"这样的:

Py4JJavaError                             Traceback (most recent call last)
Cell In[16], line 13
1 writeConfig3 = {
2     
3     "Endpoint": "https://localhost:8081/",
(...)
8     
9 }
11 # df = test.coalesce(1)
---> 13 df.write.format("com.microsoft.azure.cosmosdb.spark").mode("overwrite").options(**writeConfig3).save()
File ~AppDataLocalProgramsPythonPython311Libsite-packagespysparksqlreadwriter.py:966, in DataFrameWriter.save(self, path, format, mode, partitionBy, **options)
964     self.format(format)
965 if path is None:
--> 966     self._jwrite.save()
967 else:
968     self._jwrite.save(path)
File ~AppDataLocalProgramsPythonPython311Libsite-packagespy4jjava_gateway.py:1321, in JavaMember.__call__(self, *args)
1315 command = proto.CALL_COMMAND_NAME +
1316     self.command_header +
1317     args_command +
1318     proto.END_COMMAND_PART
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322     answer, self.gateway_client, self.target_id, self.name)
1324 for temp_arg in temp_args:
1325     temp_arg._detach()
File ~AppDataLocalProgramsPythonPython311Libsite-packagespysparksqlutils.py:190, in capture_sql_exception.<locals>.deco(*a, **kw)
188 def deco(*a: Any, **kw: Any) -> Any:
189     try:
--> 190         return f(*a, **kw)
191     except Py4JJavaError as e:
192         converted = convert_exception(e.java_exception)
File ~AppDataLocalProgramsPythonPython311Libsite-packagespy4jprotocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
327         "An error occurred while calling {0}{1}{2}.n".
328         format(target_id, ".", name), value)
329 else:
330     raise Py4JError(
331         "An error occurred while calling {0}{1}{2}. Trace:n{3}n".
332         format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o113.save.
: java.lang.ClassNotFoundException: 
Failed to find data source: com.microsoft.azure.cosmosdb.spark. Please find packages at
https://spark.apache.org/third-party-projects.html

at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToFindDataSourceError(QueryExecutionErrors.scala:587)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:675)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:725)
at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:864)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:256)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
at java.base/java.lang.reflect.Method.invoke(Method.java:578)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:1589)
Caused by: java.lang.ClassNotFoundException: com.microsoft.azure.cosmosdb.spark.DefaultSource
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:661)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:661)
at scala.util.Failure.orElse(Try.scala:224)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:661)
... 14 more

我代码:

from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession
spark = SparkSession.builder 
.config("spark.driver.extraClassPath", "C:/Users/Cris243/.ivy2/jars/")
.config("spark.jars", "com.microsoft.azure_azure-cosmosdb-spark_2.3.0_2.11-1.3.3.jar") 
.appName('Spark') 
.getOrCreate()
df = (
spark.read.option("header", True).csv("crypto.csv")
.select(
col("rank"),
col("symbol"),
col("name"),
col("priceUsd"),
col("volumeUsd24Hr"))
)
writeConfig3 = {

"Endpoint": "https://localhost:8081/",
"Masterkey": "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==",
"Database": "cryptos",
"Collection": "symbol",
"Upsert": "true"

}
df.write.format("com.microsoft.azure.cosmosdb.spark").mode("overwrite").options(**writeConfig3).save()

我试着添加一个"spark.jars.ivy"是这样的:("spark.jars.ivy"C:/用户/Cris243/.ivy2/罐/"),但是我得到了相同的错误

有人能帮忙解释一下如何修复它吗?感激

查找数据源失败:com.microsoft.azure.cosmosdb.spark。请在https://spark.apache.org/third-party-projects.html上查找软件包

错误的原因是spark应用程序无法检测到jar文件中的库。

Spark程序员可以使用这些库。应用创建完成后,使用Spark -submit命令将应用提交到Spark环境中执行。

  • 使用——jars选项要将jar添加到Spark作业中,可以使用——JARs选项在Spark驱动程序和执行器类路径中包含jar。如果需要包含多个JAR文件,请使用逗号分隔。

示例如下:

spark-submit --jars /path/to/jar/file1

相关内容

  • 没有找到相关文章

最新更新