我无法从本地独立 Spark 群集访问对象存储上的文件。这是代码 -
sqlCxt = SQLContext(sc)
prefix = "fs.swift.service." + creds['name']
hconf = sc._jsc.hadoopConfiguration()
hconf.set(prefix + ".auth.url", creds['auth_url'] + '/v2.0/tokens')
hconf.set(prefix + ".auth.endpoint.prefix", "endpoints")
hconf.set(prefix + ".tenant", creds['project_id'])
hconf.set(prefix + ".username", creds['user_id'])
hconf.set(prefix + ".password", creds['password'])
hconf.setInt(prefix + ".http.port", 8080)
hconf.set(prefix + ".region", creds['region'])
hconf.setBoolean(prefix + ".public", True)
weather = sqlCxt.read.json("swift://notebooks." + creds['name'] + "/repo_local.json")
weather.show()
这是我得到的例外
16/04/21 17:31:11 INFO JSONRelation: Listing swift://notebooks.pac/repo_local.json on driver
16/04/21 17:31:11 WARN HttpMethodDirector: Unable to respond to any of these challenges: {keystone=Keystone uri="https://identity.open.softlayer.com"}
16/04/21 17:31:33 INFO SparkContext: Created broadcast 0 from json at NativeMethodAccessorImpl.java:-2
Traceback (most recent call last):
File "C:UsersMY_PCDesktopPACsrcunittestpythonPACObjectStorage_tests.py", line 18, in <module>
weather = sqlCxt.read.json("swift://notebooks.pac/config-repo_local.json")
File "C:Python27libpysparksqlreadwriter.py", line 176, in json
return self._df(self._jreader.json(path))
File "C:Python27libsite-packagespy4jjava_gateway.py", line 813, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "C:Python27libpysparksqlutils.py", line 45, in deco
return f(*a, **kw)
File "C:Python27libsite-packagespy4jprotocol.py", line 308, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o22.json.
: java.io.IOException: No input paths specified in job
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:201)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1115)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1113)
at org.apache.spark.sql.execution.datasources.json.InferSchema$.infer(InferSchema.scala:65)
at org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun$4.apply(JSONRelation.scala:114)
at org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun$4.apply(JSONRelation.scala:109)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.execution.datasources.json.JSONRelation.dataSchema$lzycompute(JSONRelation.scala:109)
at org.apache.spark.sql.execution.datasources.json.JSONRelation.dataSchema(JSONRelation.scala:108)
at org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:636)
at org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:635)
at org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:37)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:109)
at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:244)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:56)
at java.lang.reflect.Method.invoke(Method.java:620)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:801)
请注意 - 当我在 bluemix 中运行笔记本或 Spark-submit 时,我能够访问该文件。
此外,我还能够通过快速 CLI 访问文件。
swift 需要身份验证令牌才能通过 keystone 身份验证从本地环境连接到对象存储。
我建议尝试使用 Stocator 连接器来访问 bluemix 对象存储,它对我来说非常一致。
https://github.com/SparkTC/stocator
谢谢查尔斯。