在GCP Dataproc上使用Spark,我成功地将整个RDD写入GCS,如下所示:
rdd.saveAsTextFile(s"gs://$path")
产品是同一路径中每个分区的文件。
如何为每个分区编写文件(具有基于分区信息的唯一路径(
下面是一个发明的非工作一厢情愿的代码示例
rdd.mapPartitionsWithIndex(
(i, partition) =>{
partition.write(path = s"gs://partition_$i", data = partition_specific_data)
}
)
当我从Mac上的分区内调用下面的函数时,它会写入本地磁盘,在Dataproc上,我收到错误,无法将gs识别为有效路径。
def writeLocally(filePath: String, data: Array[Byte], errorMessage: String): Unit = {
println("Juicy Platform")
val path = new Path(filePath)
var ofos: Option[FSDataOutputStream] = null
try {
println(s"nTrying to write to $filePathn")
val conf = new Configuration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
// conf.addResource(new Path("/home/hadoop/conf/core-site.xml"))
println(conf.toString)
val fs = FileSystem.get(conf)
val fos = fs.create(path)
ofos = Option(fos)
fos.write(data)
println(s"nWrote to $filePathn")
}
catch {
case e: Exception =>
logError(errorMessage, s"Exception occurred writing to GCS:n${ExceptionUtils.getStackTrace(e)}")
}
finally {
ofos match {
case Some(i) => i.close()
case _ =>
}
}
}
这是错误:
java.lang.IllegalArgumentException: Wrong FS: gs://path/myFile.json, expected: hdfs://cluster-95cf-m
如果在 Dataproc 集群上运行,则无需在配置中显式填充"fs.gs.impl";new Configuration()
应该已经包含必要的映射。
这里的主要问题是val fs = FileSystem.get(conf)
正在使用 conf 的fs.defaultFS
属性;它无法知道你是要获取特定于 HDFS 还是 GCS 的文件系统实例。一般来说,在Hadoop和Spark中,FileSystem
实例基本上与单个URLscheme
相关联;您需要为每个不同的方案获取特定于方案的实例,例如hdfs://
或gs://
或s3://
。
解决您的问题的最简单方法是始终使用 Path.getFileSystem(Configuration( 而不是FileSystem.get(Configuration)
。并确保您的path
完全符合该计划:
...
val path = "gs://bucket/foo/data"
val fs = path.getFileSystem(conf)
val fos = fs.create(path)
ofos = Option(fos)
fos.write(data)
...