我是使用GCS的新手。我正在使用它来存储一些镶木地板数据文件。在GCS之前,我将所有镶木地板文件本地存储在机器上,以测试一些代码,使用Spark将所有镶木地板文件读取到数据帧中。
以下是我在python中本地工作的设置示例:
source_path = '/mylocal/directory/files'
appName = "PySpark Parquet Example"
master = "local"
# Create Spark session
spark = SparkSession.builder
.appName(appName)
.master(master)
.getOrCreate()
# Read parquet files
df = spark.read.parquet(
source_path)
现在,我已经将所有源数据存储到GCS的存储桶中,我有点不知道从哪里开始使用等效的方法来访问现在存储在GCS存储桶中文件夹中的文件。我已经研究了gsutil和其他库,但我愿意接受任何关于最简单方法的建议。有什么建议吗?
据我所知,您正试图从本地spark访问存储在gcs bucket中的镶木地板文件。如果是这种情况,请按照以下步骤
- 下载gcs-hadoop-connector.jar,并将其放在本地spark中的jar文件夹中。注意:请从以下链接下载正确的匹配版本(https://mvnrepository.com/artifact/com.google.cloud.bigdataoss/gcs-connector)
- 创建并下载具有存储访问权限的服务帐户json文件,以将数据读/写到gcs桶中
- 在spark代码中更新hadoop配置,如下所示
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]")
.appName('readParquetData')
.getOrCreate()
conf =spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl","com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
conf.set("fs.gs.project.id", projectId)
conf.set("fs.gs.auth.service.account.enable", "true")
conf.set("fs.gs.auth.service.account.json.keyfile", secretLocation)
conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
- 现在您可以使用以下代码使用spark读取gcs中的数据
df=spark.read.option("header",True).parquet(location)
完整代码:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]")
.appName('readParquetData')
.getOrCreate()
conf =spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl","com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
conf.set("fs.gs.project.id", projectId)
conf.set("fs.gs.auth.service.account.enable", "true")
conf.set("fs.gs.auth.service.account.json.keyfile", secretLocation)
conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
df=spark.read.option("header",True).parquet("gs://bucketName/folderName")
df.show()
如果答案有助于解决您的问题,请批准。感谢