是否可以使用 SparkSession 对象列出给定 S3 路径(例如:s3://my-bucket/my-folder/*.extension)中的所有文件?
您可以使用Hadoop API访问S3上的文件(Spark也使用它):
import java.net.URI
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
val path = "s3://somebucket/somefolder"
val fileSystem = FileSystem.get(URI.create(path), new Configuration())
val it = fileSystem.listFiles(new Path(path), true)
while (it.hasNext()) {
...
}
方法 1
对于pyspark用户,我已经翻译了Michael Spector的答案(我会让你决定使用它是否是一个好主意):
sc = spark.sparkContext
myPath = f's3://my-bucket/my-prefix/'
javaPath = sc._jvm.java.net.URI.create(myPath)
hadoopPath = sc._jvm.org.apache.hadoop.fs.Path(myPath)
hadoopFileSystem = sc._jvm.org.apache.hadoop.fs.FileSystem.get(javaPath, sc._jvm.org.apache.hadoop.conf.Configuration())
iterator = hadoopFileSystem.listFiles(hadoopPath, True)
s3_keys = []
while iterator.hasNext():
s3_keys.append(iterator.next().getPath().toUri().getRawPath())
s3_keys
现在保存my-bucket/my-prefix
中找到的所有文件密钥
方法2这是我找到的替代方案(帽子提示到@forgetso):
myPath = 's3://my-bucket/my-prefix/*'
hadoopPath = sc._jvm.org.apache.hadoop.fs.Path(myPath)
hadoopFs = hadoopPath.getFileSystem(sc._jvm.org.apache.hadoop.conf.Configuration())
statuses = hadoopFs.globStatus(hadoopPath)
for status in statuses:
status.getPath().toUri().getRawPath()
# Alternatively, you can get file names only with:
# status.getPath().getName()
方法3(不完整!
上述两种方法不使用将应用于分布式读取的 Spark 并行机制。不过,这种逻辑看起来很私密。请参阅此处parallelListLeafFiles
。我还没有找到一种方法来强制 pyspark 对 s3 上的分布式ls
进行操作,而无需同时读取文件内容。我尝试使用py4j实例化InMemoryFileIndex
,但无法正确念咒语。如果有人想从这里拿起它,这是我到目前为止所拥有的:
myPath = f's3://my-bucket/my-path/'
paths = sc._gateway.new_array(sc._jvm.org.apache.hadoop.fs.Path, 1)
paths[0] = sc._jvm.org.apache.hadoop.fs.Path(myPath)
emptyHashMap = sc._jvm.java.util.HashMap()
emptyScalaMap = sc._jvm.scala.collection.JavaConversions.mapAsScalaMap(emptyMap)
# Py4J is not happy with this:
sc._jvm.org.apache.spark.sql.execution.datasources.InMemoryFileIndex(
spark._jsparkSession,
paths,
emptyScalaMap,
sc._jvm.scala.Option.empty() # Optional None
)
Lou Zell 非常接近!下面的最终适用于 ADLS2,但由于 Py4J 的魔力,我把它放在这里。请注意,noopcache 会导致作业运行两次:一次是在创建索引时,一次是在调用列表文件时。我还为此写了一篇博客文章:https://www.perceptivebits.com/a-comprehensive-guide-to-finding-files-via-spark/
import os
base_path = "/mnt/my_data/"
glob_pattern = "*"
sc = spark.sparkContext
hadoop_base_path = sc._jvm.org.apache.hadoop.fs.Path(base_path)
paths = sc._jvm.PythonUtils.toSeq([hadoop_base_path])
#noop_cache_clazz = sc._jvm.java.lang.Class.forName("org.apache.spark.sql.execution.datasources.NoopCache$")
#ff = noop_cache_clazz.getDeclaredField("MODULE$")
#noop_cache = ff.get(None)
file_status_cache_clazz = jvm.java.lang.Class.forName(
"org.apache.spark.sql.execution.datasources.FileStatusCache$"
)
ff = file_status_cache_clazz.getDeclaredField("MODULE$")
jvm_spark_session = spark._jsparkSession
file_status_cache = ff.get(None).getOrCreate(jvm_spark_session)
in_memory_file_index = sc._jvm.org.apache.spark.sql.execution.datasources.InMemoryFileIndex(
jvm_spark_session,
paths,
sc._jvm.PythonUtils.toScalaMap({}),
sc._jvm.scala.Option.empty(),
file_status_catche, # or use noop_cache if you need to save memory
sc._jvm.scala.Option.empty(),
sc._jvm.scala.Option.empty()
)
glob_path = sc._jvm.org.apache.hadoop.fs.Path(os.path.join(base_path, glob_pattern))
glob_paths = sc._jvm.PythonUtils.toSeq([glob_path])
# SparkHadoopUtil.get.globPath(fs, Path.mergePaths(validated(basep), validated(globp))),
status_list = in_memory_file_index.listLeafFiles(glob_paths)
path_list = []
iter = status_list.iterator()
while iter.hasNext():
path_status = iter.next()
path_list.append(str(path_status.getPath().toUri().getRawPath()))
path_list.sort()
print(path_list)
input_file_name
与数据帧一起使用,它将为您提供每行的绝对文件路径。
以下代码将为您提供所有文件路径。
spark.read.table("zen.intent_master").select(input_file_name).distinct.collect
我假设。对于您的用例,您只想使用一些正则表达式从一组文件中读取数据,因此您可以在过滤器中应用它。
例如
val df = spark.read.table("zen.intent_master").filter(input_file_name.rlike("your regex string"))