我想读取过去2小时内创建的Azure Blob Storage中的多个parquet文件。
我可以用下面的命令
读取多个文件df = sqlContext.read.parquet("/path/*.parquet")
这个查询从该文件夹中存在的所有parquet文件返回结果,现在我只想从过去2小时内创建的parquet文件中获取数据。
请帮我把这个命令弄对。
一种方法是使用Hadoop FS APIlistStatus
方法列出该文件夹下的所有文件,选择在过去两个小时内使用getModificationTime
修改的文件,并将过滤后的文件列表传递给Spark DataFrame阅读器。
在Pyspark中,你可以像这样通过JVM网关访问hadoop fs:
import datetime
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
conf = sc._jsc.hadoopConfiguration()
data_path = "/path/*.parquet"
fs = Path(data_path).getFileSystem(conf)
file_status = fs.listStatus(Path(data_path))
last_2hours_time = (datetime.datetime.now() - datetime.timedelta(hours=2)).timestamp() * 1000
created_last_2hours = [
f.getPath().toString() for f in file_status if f.getModificationTime() >= last_2hours_time
]
df = spark.read.parquet(*created_last_2hours)
您可能还需要查看Azure Blob Storage的Python包以列出文件。
标准的寻址方式是使用带有检查点的spark结构化流。
先在整个数据上运行&然后使用相同的检查点每2小时运行您的特定业务逻辑:
- 第一次运行将填充检查点数据,这样您就不会在第二次运行时遍历相同的文件。
- 您可以使用空逻辑进行第一次运行,例如下面的
my_function()
示例中什么都不做。
spark.readStream.parquet("/path/*.parquet")
.writeStream
.trigger(once=True)
.outputMode('append')
.format("delta")
.option("checkpointLocation", checkpoint_path)
.start(path)
.awaitTermination()
或类似
的内容def my_function(df, epochId):
# do anything here
spark.readStream.parquet("/path/*.parquet")
.writeStream
.trigger(once=True)
.foreachBatch(my_function)
.option('checkpointLocation', 'some_location')
.start()
.awaitTermination()
参见其他示例