使用pyspark读取和处理存储在GCP中的镶木地板文件的最佳方式



我是使用GCS的新手。我正在使用它来存储一些镶木地板数据文件。在GCS之前,我将所有镶木地板文件本地存储在机器上,以测试一些代码,使用Spark将所有镶木地板文件读取到数据帧中。

以下是我在python中本地工作的设置示例:

source_path = '/mylocal/directory/files'

appName = "PySpark Parquet Example"
master = "local"
# Create Spark session
spark = SparkSession.builder 
.appName(appName) 
.master(master) 
.getOrCreate()
# Read parquet files
df = spark.read.parquet(
source_path)

现在,我已经将所有源数据存储到GCS的存储桶中,我有点不知道从哪里开始使用等效的方法来访问现在存储在GCS存储桶中文件夹中的文件。我已经研究了gsutil和其他库,但我愿意接受任何关于最简单方法的建议。有什么建议吗?

据我所知,您正试图从本地spark访问存储在gcs bucket中的镶木地板文件。如果是这种情况,请按照以下步骤

  1. 下载gcs-hadoop-connector.jar,并将其放在本地spark中的jar文件夹中。注意:请从以下链接下载正确的匹配版本(https://mvnrepository.com/artifact/com.google.cloud.bigdataoss/gcs-connector)
  2. 创建并下载具有存储访问权限的服务帐户json文件,以将数据读/写到gcs桶中
  3. 在spark代码中更新hadoop配置,如下所示
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") 
.appName('readParquetData') 
.getOrCreate()
conf =spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl","com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
conf.set("fs.gs.project.id", projectId)
conf.set("fs.gs.auth.service.account.enable", "true")
conf.set("fs.gs.auth.service.account.json.keyfile", secretLocation)
conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
  1. 现在您可以使用以下代码使用spark读取gcs中的数据
df=spark.read.option("header",True).parquet(location)

完整代码:

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") 
.appName('readParquetData') 
.getOrCreate()
conf =spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl","com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
conf.set("fs.gs.project.id", projectId)
conf.set("fs.gs.auth.service.account.enable", "true")
conf.set("fs.gs.auth.service.account.json.keyfile", secretLocation)
conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
df=spark.read.option("header",True).parquet("gs://bucketName/folderName")
df.show()

如果答案有助于解决您的问题,请批准。感谢

相关内容

  • 没有找到相关文章

最新更新