Microsoft Azure spark kusto 连接器 -- 是否可以从数据砖中获取 Azure 存储的文件?



我正在尝试在 azure 存储中读取和写入文件,到目前为止我的尝试:

创建 Spark 会话:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

sparkOptions = {"executor_memory" : "1G","driver_memory": "1G", "max_results_size": "1G"}
conf = pyspark.SparkConf().setAppName(app)
conf = (conf.setMaster("local[*]")
.set('spark.executor.memory', sparkOptions["executor_memory"])
.set('spark.driver.memory', sparkOptions["driver_memory"])
.set('spark.driver.maxResultSize', sparkOptions["max_results_size"])
.set('spark.sql.crossJoin.enabled', "true")
.set('spark.jars.packages', 'com.microsoft.azure.kusto:spark-kusto-connector:1.0.0')
.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
.set("fs.azure.account.auth.type", "OAuth")
.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
.set("fs.azure.account.oauth2.client.id", id)
.set("fs.azure.account.oauth2.client.secret", secret)
.set("fs.azure.account.oauth2.client.endpoint", endpoint)
.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
)
sparkContext = pyspark.SparkContext(conf=conf)
sparkSession = SparkSession(sparkContext)
sqlContext = SQLContext(sparkContext)

尝试读取 Azure 存储中的 CSV:

df = sparkSession.read.option("header", "true").csv("wasbs://container@account.blob.core.windows.net/archive.csv")
df.show()

错误:

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-3-975f978e0f66> in <module>()
----> 1 df = sparkSession.read.option("header", "true").csv("wasbs://container@account.blob.core.windows.net/archive.csv")
2 df.show()
~/anaconda3/lib/python3.6/site-packages/pyspark/sql/readwriter.py in csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue)
474             path = [path]
475         if type(path) == list:
--> 476             return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
477         elif isinstance(path, RDD):
478             def func(iterator):
~/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
1255         answer = self.gateway_client.send_command(command)
1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
1258 
1259         for temp_arg in temp_args:
~/anaconda3/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
61     def deco(*a, **kw):
62         try:
---> 63             return f(*a, **kw)
64         except py4j.protocol.Py4JJavaError as e:
65             s = e.java_exception.toString()
~/anaconda3/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326                 raise Py4JJavaError(
327                     "An error occurred while calling {0}{1}{2}.n".
--> 328                     format(target_id, ".", name), value)
329             else:
330                 raise Py4JError(
Py4JJavaError: An error occurred while calling o68.csv.
: java.io.IOException: No FileSystem for scheme: wasbs
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:547)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:355)
at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:618)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)

尝试使用 abfss:

df = sparkSession.read.option("header", "true").csv("abfss://container@account.blob.core.windows.net/archive.csv")
df.show()

错误:

y4JJavaError                             Traceback (most recent call last)
<ipython-input-4-02abec06890e> in <module>()
----> 1 df = sparkSession.read.option("header", "true").csv("abfss://container@account.blob.core.windows.net/archive.csv")
2 df.show()
~/anaconda3/lib/python3.6/site-packages/pyspark/sql/readwriter.py in csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue)
474             path = [path]
475         if type(path) == list:
--> 476             return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
477         elif isinstance(path, RDD):
478             def func(iterator):
~/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
1255         answer = self.gateway_client.send_command(command)
1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
1258 
1259         for temp_arg in temp_args:
~/anaconda3/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
61     def deco(*a, **kw):
62         try:
---> 63             return f(*a, **kw)
64         except py4j.protocol.Py4JJavaError as e:
65             s = e.java_exception.toString()
~/anaconda3/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326                 raise Py4JJavaError(
327                     "An error occurred while calling {0}{1}{2}.n".
--> 328                     format(target_id, ".", name), value)
329             else:
330                 raise Py4JError(
Py4JJavaError: An error occurred while calling o104.csv.
: java.io.IOException: No FileSystem for scheme: abfss
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:547)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:355)
at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:618)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)

搜索使用 kusto-spark 连接器的示例,我只在数据砖中找到了示例,利用 dbutils,我想知道是否可以在数据砖中使用连接器以及我在代码中做错了什么,谢谢。

这不是真正的关于 kusto 你使用的是 Azure Databricks 吗?如果是这样,只需参考他们的文档。 如果没有,请尝试导入

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-azure</artifactId>
<version>2.7.0</version>
</dependency>

如果它没有帮助 - 从 GitHub 下载连接器代码并更改此依赖项 到 2.7(连接器使用 3.2( 顺便说一句,不知道你是否做了,但你必须通过火花会议为这个容器设置密钥或 sas

最新更新