如何使用pyspark writestream写入存储谷歌云



我试图从pyspark流写入GCP存储
这是代码:

df_test
.writeStream.format("parquet")
.option("path","gs://{my_bucketname}/test")
.option("checkpointLocation", "gs://{my_checkpointBucket}/checkpoint")
.start()
.awaitTermination()

但我得到了这个错误:

20/11/15 16:37:59 WARN CheckpointFileManager: Could not use FileContext API 
for managing Structured Streaming checkpoint files at gs://name- 
bucket/test/_spark_metadata
.Using FileSystem API instead for managing log files. If the implementation 
of FileSystem.rename() is not atomic, then the correctness and fault- 
tolerance ofyour Structured Streaming is not guaranteed.
Traceback (most recent call last):
File "testgcp.py", line 40, in <module>
.option("checkpointLocation", "gs://check_point_bucket/checkpoint")
File "/home/naya/anaconda3/lib/python3.6/site- 
packages/pyspark/sql/streaming.py", line 1105, in start
return self._sq(self._jwrite.start())
File "/home/naya/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", 
line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/home/naya/anaconda3/lib/python3.6/site-packages/pyspark/sql/utils.py", 
line 63, in deco
return f(*a, **kw)
File "/home/naya/anaconda3/lib/python3.6/site-packages/py4j/protocol.py", 
line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o55.start.
: java.io.IOException: No FileSystem for scheme: gs

什么应该是正确的语法?

根据本文档。似乎首先需要使用正确的身份验证来设置spark.conf。

spark.conf.set("google.cloud.auth.service.account.enable", "true")
spark.conf.set("google.cloud.auth.service.account.email", "Your_service_email")
spark.conf.set("google.cloud.auth.service.account.keyfile", "path/to/your/files")

然后,您可以使用读取功能访问存储桶中的文件。

df = spark.read.option("header",True).csv("gs://bucket_name/path_to_your_file.csv")
df.show() 

最新更新