读取Pypark中最近2小时创建的parquet文件



我想读取过去2小时内创建的Azure Blob Storage中的多个parquet文件。

我可以用下面的命令

读取多个文件
df = sqlContext.read.parquet("/path/*.parquet")

这个查询从该文件夹中存在的所有parquet文件返回结果,现在我只想从过去2小时内创建的parquet文件中获取数据。

请帮我把这个命令弄对。

一种方法是使用Hadoop FS APIlistStatus方法列出该文件夹下的所有文件,选择在过去两个小时内使用getModificationTime修改的文件,并将过滤后的文件列表传递给Spark DataFrame阅读器。

在Pyspark中,你可以像这样通过JVM网关访问hadoop fs:

import datetime

Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
conf = sc._jsc.hadoopConfiguration()
data_path = "/path/*.parquet"
fs = Path(data_path).getFileSystem(conf)
file_status = fs.listStatus(Path(data_path))
last_2hours_time = (datetime.datetime.now() - datetime.timedelta(hours=2)).timestamp() * 1000
created_last_2hours = [
f.getPath().toString() for f in file_status if f.getModificationTime() >= last_2hours_time
]
df = spark.read.parquet(*created_last_2hours)

您可能还需要查看Azure Blob Storage的Python包以列出文件。

标准的寻址方式是使用带有检查点的spark结构化流。

先在整个数据上运行&然后使用相同的检查点每2小时运行您的特定业务逻辑:

  • 第一次运行将填充检查点数据,这样您就不会在第二次运行时遍历相同的文件。
  • 您可以使用空逻辑进行第一次运行,例如下面的my_function()示例中什么都不做。
spark.readStream.parquet("/path/*.parquet")
.writeStream 
.trigger(once=True) 
.outputMode('append') 
.format("delta") 
.option("checkpointLocation", checkpoint_path) 
.start(path) 
.awaitTermination()

或类似

的内容
def my_function(df, epochId): 
# do anything here

spark.readStream.parquet("/path/*.parquet") 
.writeStream 
.trigger(once=True) 
.foreachBatch(my_function) 
.option('checkpointLocation', 'some_location') 
.start() 
.awaitTermination()

参见其他示例

相关内容

  • 没有找到相关文章

最新更新