我正在尝试从IBM数据科学体验中连接到IBM云对象存储:
access_key = 'XXX'
secret_key = 'XXX'
bucket = 'mybucket'
host = 'lon.ibmselect.objstor.com'
service = 'mycos'
sqlCxt = SQLContext(sc)
hconf = sc._jsc.hadoopConfiguration()
hconf.set('fs.cos.myCos.access.key', access_key)
hconf.set('fs.cos.myCos.endpoint', 'http://' + host)
hconf.set('fs.cose.myCos.secret.key', secret_key)
hconf.set('fs.cos.service.v2.signer.type', 'false')
obj = 'mydata.tsv.gz'
rdd = sc.textFile('cos://{0}.{1}/{2}'.format(bucket, service, obj))
print(rdd.count())
此返回:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.io.IOException: No FileSystem for scheme: cos
我猜想我需要根据Stocator文档使用" COS"方案。但是,错误表明stocator不可用,还是旧版本?
有什么想法?
更新1:
我也尝试了以下内容:
sqlCxt = SQLContext(sc)
hconf = sc._jsc.hadoopConfiguration()
hconf.set('fs.cos.impl', 'com.ibm.stocator.fs.ObjectStoreFileSystem')
hconf.set('fs.stocator.scheme.list', 'cos')
hconf.set('fs.stocator.cos.impl', 'com.ibm.stocator.fs.cos.COSAPIClient')
hconf.set('fs.stocator.cos.scheme', 'cos')
hconf.set('fs.cos.mycos.access.key', access_key)
hconf.set('fs.cos.mycos.endpoint', 'http://' + host)
hconf.set('fs.cos.mycos.secret.key', secret_key)
hconf.set('fs.cos.service.v2.signer.type', 'false')
service = 'mycos'
obj = 'mydata.tsv.gz'
rdd = sc.textFile('cos://{0}.{1}/{2}'.format(bucket, service, obj))
print(rdd.count())
但是,这次响应是:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.io.IOException: No object store for: cos
at com.ibm.stocator.fs.ObjectStoreVisitor.getStoreClient(ObjectStoreVisitor.java:121)
...
Caused by: java.lang.ClassNotFoundException: com.ibm.stocator.fs.cos.COSAPIClient
支持FS.COS方案的最新版本(V1.0.9)尚未部署在Spark Aaservice上(很快就会很快)。请使用Stocator方案" FS.S3D"连接到您的cos。
示例:
endpoint = 'endpointXXX'
access_key = 'XXX'
secret_key = 'XXX'
prefix = "fs.s3d.service"
hconf = sc._jsc.hadoopConfiguration()
hconf.set(prefix + ".endpoint", endpoint)
hconf.set(prefix + ".access.key", access_key)
hconf.set(prefix + ".secret.key", secret_key)
bucket = 'mybucket'
obj = 'mydata.tsv.gz'
rdd = sc.textFile('s3d://{0}.service/{1}'.format(bucket, obj))
rdd.count()
另外,您可以使用IBMOS2Spark。LIB已经安装在我们的服务上。示例:
import ibmos2spark
credentials = {
'endpoint': 'endpointXXXX',
'access_key': 'XXXX',
'secret_key': 'XXXX'
}
configuration_name = 'os_configs' # any string you want
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name)
bucket = 'mybucket'
obj = 'mydata.tsv.gz'
rdd = sc.textFile(cos.url(obj, bucket))
rdd.count()
Stocator在SPARK 2.0和2.1内核的类Path上,但是cos
方案未配置。您可以通过在Python笔记本中执行以下操作来访问配置:
!cat $SPARK_CONF_DIR/core-site.xml
寻找属性fs.stocator.scheme.list
。我目前看到的是:
<property>
<name>fs.stocator.scheme.list</name>
<value>swift2d,swift,s3d</value>
</property>
我建议您提出针对DSX的功能请求,以支持cos
方案。
看起来COS驱动程序未正确初始化。尝试此配置:
hconf.set('fs.cos.impl', 'com.ibm.stocator.fs.ObjectStoreFileSystem')
hconf.set('fs.stocator.scheme.list', 'cos')
hconf.set('fs.stocator.cos.impl', 'com.ibm.stocator.fs.cos.COSAPIClient')
hconf.set('fs.stocator.cos.scheme', 'cos')
hconf.set('fs.cos.mycos.access.key', access_key)
hconf.set('fs.cos.mycos.endpoint', 'http://' + host)
hconf.set('fs.cos.mycos.secret.key', secret_key)
hconf.set('fs.cos.service.v2.signer.type', 'false')
更新1:
您还需要确保在类路径上进行Stocator类。您可以通过以下方式来使用Pyspark来使用包装系统:
./bin/pyspark --packages com.ibm.stocator:stocator:1.0.24
这与swift2d
和cos
方案一起使用。
更新2:
只需遵循Stocator文档(https://github.com/codait/stocator)即可。它包含所有详细信息如何安装它,要使用的分支等等。
我发现了同样的问题,为了解决它,我只是改变了环境:
在IBM Watson Studio中,如果您在没有预先配置的火花集群的环境中启动了jupyter笔记本,那么您会遇到错误。安装PySpark
还不够。
相反,如果您使用Spark群集可用的笔记本,您就可以了。
您必须与其他一些fs.cos...
配置一起设置.config("spark.hadoop.fs.stocator.scheme.list", "cos")
。这是有效的端到端摘要代码示例(使用pyspark==2.3.2
和Python 3.7.3
进行了测试):
from pyspark.sql import SparkSession
stocator_jar = '/path/to/stocator-1.1.2-SNAPSHOT-IBM-SDK.jar'
cos_instance_name = '<myCosIntanceName>'
bucket_name = '<bucketName>'
s3_region = '<region>'
cos_iam_api_key = '*******'
iam_servicce_id = 'crn:v1:bluemix:public:iam-identity::<****************>'
spark_builder = (
SparkSession
.builder
.appName('test_app'))
spark_builder.config('spark.driver.extraClassPath', stocator_jar)
spark_builder.config('spark.executor.extraClassPath', stocator_jar)
spark_builder.config(f"fs.cos.{cos_instance_name}.iam.api.key", cos_iam_api_key)
spark_builder.config(f"fs.cos.{cos_instance_name}.endpoint", f"s3.{s3_region}.cloud-object-storage.appdomain.cloud")
spark_builder.config(f"fs.cos.{cos_instance_name}.iam.service.id", iam_servicce_id)
spark_builder.config("spark.hadoop.fs.stocator.scheme.list", "cos")
spark_builder.config("spark.hadoop.fs.cos.impl", "com.ibm.stocator.fs.ObjectStoreFileSystem")
spark_builder.config("fs.stocator.cos.impl", "com.ibm.stocator.fs.cos.COSAPIClient")
spark_builder.config("fs.stocator.cos.scheme", "cos")
spark_sess = spark_builder.getOrCreate()
dataset = spark_sess.range(1, 10)
dataset = dataset.withColumnRenamed('id', 'user_idx')
dataset.repartition(1).write.csv(
f'cos://{bucket_name}.{cos_instance_name}/test.csv',
mode='overwrite',
header=True)
spark_sess.stop()
print('done!')