在Dataproc上使用Spark,如何从每个分区单独写入GCS?



在GCP Dataproc上使用Spark,我成功地将整个RDD写入GCS,如下所示:

rdd.saveAsTextFile(s"gs://$path")

产品是同一路径中每个分区的文件。

如何为每个分区编写文件(具有基于分区信息的唯一路径(

下面是一个发明的非工作一厢情愿的代码示例

rdd.mapPartitionsWithIndex(
(i, partition) =>{
partition.write(path = s"gs://partition_$i", data = partition_specific_data)
}
)

当我从Mac上的分区内调用下面的函数时,它会写入本地磁盘,在Dataproc上,我收到错误,无法将gs识别为有效路径。

def writeLocally(filePath: String, data: Array[Byte], errorMessage: String): Unit = {
println("Juicy Platform")
val path = new Path(filePath)
var ofos: Option[FSDataOutputStream] = null
try {
println(s"nTrying to write to $filePathn")
val conf = new Configuration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
//      conf.addResource(new Path("/home/hadoop/conf/core-site.xml"))

println(conf.toString)
val fs = FileSystem.get(conf)
val fos = fs.create(path)
ofos = Option(fos)
fos.write(data)
println(s"nWrote to $filePathn")
}
catch {
case e: Exception =>
logError(errorMessage, s"Exception occurred writing to GCS:n${ExceptionUtils.getStackTrace(e)}")
}
finally {
ofos match {
case Some(i) => i.close()
case _ =>
}
}
}

这是错误:

java.lang.IllegalArgumentException: Wrong FS: gs://path/myFile.json, expected: hdfs://cluster-95cf-m

如果在 Dataproc 集群上运行,则无需在配置中显式填充"fs.gs.impl";new Configuration()应该已经包含必要的映射。

这里的主要问题是val fs = FileSystem.get(conf)正在使用 conf 的fs.defaultFS属性;它无法知道你是要获取特定于 HDFS 还是 GCS 的文件系统实例。一般来说,在Hadoop和Spark中,FileSystem实例基本上与单个URLscheme相关联;您需要为每个不同的方案获取特定于方案的实例,例如hdfs://gs://s3://

解决您的问题的最简单方法是始终使用 Path.getFileSystem(Configuration( 而不是FileSystem.get(Configuration)。并确保您的path完全符合该计划:

...
val path = "gs://bucket/foo/data"
val fs = path.getFileSystem(conf)
val fos = fs.create(path)
ofos = Option(fos)
fos.write(data)
...

最新更新