我在Palantir Foundry中有一个80TB的日期分区数据集,它每3小时在一个增量Append事务中摄取300-450GB的数据。我想创建一个增量变换,使用它作为输入。
但是,数据集太大,无法立即读取初始快照。附加到数据集的数据将足够小,可以在初始快照之后处理每个增量构建。我如何从输入数据集中解析积压工作,并达到可以以增量模式运行转换的程度?
当从增量构建的大型输入数据集中读取时,Foundry中不可能从输入事务的某个子集中读取。您必须一次读取整个输入数据集(snapshot
模式),或者只读取自上次构建输出以来写入的输入事务(incremental
模式)。
为了解决这个问题,我们必须巧妙地解析输入。以下是转换:
from transforms.api import transform, Input, Output, incremental
from pyspark.sql import Row
from pyspark.sql import functions as F, types as T, SparkSession as S
import datetime
# set this value for the type of build:
# "first" for a snapshot run on a single date (sets placeholder_date, runs snapshot)
# "catchup" for subsequent runs on subsequent dates (reads from placeholder_date to decide what date to run, then runs update from full read)
# "continuing" for ongoing incremental runs
PHASE = 'first'
# Where data begins
START_DATE = datetime.date(2022, 7, 1)
# Where we want the automated rebuild process to stop.
# Set this value to less than the most recent date for reasons discussed in the accompanying post
END_DATE = datetime.date(2022, 7, 22)
DAYS_PER_RUN = 4 # How many days worth of data do we want each 'catchup' run to read
placeholder_date_schema = T.StructType([
T.StructField("date", T.DateType(), True)
])
@incremental(semantic_version=3)
@transform(
output=Output("output"),
placeholder_date=Output("placeholder_date"),
source=Input("input"),
)
def compute(source, output, placeholder_date):
# First and Catchup Builds
if((PHASE == 'first') | (PHASE == 'catchup')):
df = source.dataframe('current') # read the entire input dataset
# Continuing Builds
if(PHASE == 'continuing'):
df = source.dataframe() # read the latest incremental appends
# First Build: Build placeholder_date initially
if(PHASE == 'first'):
spark = S.builder.getOrCreate()
next_output_last_date = START_DATE + datetime.timedelta(days=(DAYS_PER_RUN-1))
most_recent_output_date = START_DATE - datetime.timedelta(days=1)
placeholder_date_df = spark.createDataFrame(data=[Row(next_output_last_date)], schema=placeholder_date_schema)
# Catchup Builds: Use placeholder_date to get the previous starting time
if(PHASE == 'catchup'):
placeholder_date_df = placeholder_date.dataframe('previous', placeholder_date_schema)
most_recent_output_date = placeholder_date_df.collect()[0][0] # noqa
next_output_last_date = most_recent_output_date + datetime.timedelta(days=DAYS_PER_RUN)
# Ensure that the time window doesn't go past the end date by curtailing the period if necessary
if next_output_last_date >= END_DATE:
next_output_last_date == END_DATE
# Ensure we don't run once we pass the end point
if most_recent_output_date >= END_DATE:
return True # this will result in the build completing without writing or reading any further data
placeholder_date_df = placeholder_date_df.withColumn("date", F.lit(next_output_last_date))
# First and Catchup Builds: Write the placeholder
# It's safe to write the placeholder because if the build fails the placeholder transaction will also be aborted
if((PHASE == 'first') | (PHASE == 'catchup')):
placeholder_date.set_mode('replace')
placeholder_date.write_dataframe(placeholder_date_df, output_format='csv')
# Filter the whole input dataset
df = df.where((F.col("date") > F.lit(most_recent_output_date)) & (F.col("date") <= F.lit(next_output_last_date)))
# Transform the data as required
df = transform_data(df)
# Write the output
output.write_dataframe(df, partition_cols=["date"])
# Define whatever transformations you want to perform here
def transform_data(df):
return df
变换具有三个";相位"-first
、catchup
和continuing
。在first
阶段运行一次转换,然后在catchup
阶段根据需要运行多次转换,直到解析完整个现有输入数据集。最后,完成后,将其切换到continuing
阶段,并安排它在每次输入更新时(增量)运行。
该构建将状态存储在placeholder_date
数据集中,该数据集在first
构建中创建,并在catchup
构建期间从中读取,以确定catchup
进程的运行情况。catchup
模式有一个额外的故障保护,如果构建继续超过END_DATE
,它将不会写出空事务。这允许您在catchup
阶段设置(强制构建)时间表(例如,每10分钟一次),然后简单地离开它,定期回来检查,而无需仔细计时catchup
阶段的终点。完成catchup
阶段后,可以将转换设置为continuing
模式,它将切换到完全增量行为。
票据
在上面的示例代码中,使用由date
进行蜂窝分区的输入数据集是非常有用的。这将使过滤更便宜、更容易。然而,如果没有蜂窝分区的输入,这将起作用(尽管速度要慢得多)。也就是说,通过date
对增量数据集进行hive分区是一种很好的做法,本例中的输出通过date
对输出进行分区,以便于将来使用。
注意:此过程假设摄入的数据在时间上是连续的,即来自后续增量附加的数据将具有与上次增量附加中的最近日期相同或更晚的日期值。如果你的输入数据集不是随时间单调增加的,如果不小心管理,这种技术可能会导致数据丢失。例如,假设您对前4天的数据(一次一天的数据)运行catchup
模式。当您运行第3天的构建时,数据会被摄入到包含第1天数据的输入中。catchup
模式不会解析此数据,因为它已经获取了第1天的数据,随后的catchup
构建将过滤掉第1天中的数据。此外,转换的continuing
阶段将看不到在最终成功构建catchup
之前发生的对输入的任何附加,因为它们不是自上次成功构建以来摄入的新数据。如果这种情况发生在您身上,您可以通过识别行为并对其进行说明来确保数据的完整性:假设输入数据集的每个后续附加都可以包含最多3天前的数据,并且假设您希望从第1天到第30天(今天)。因此,您知道在运行catchup
构建的第1-27天内不会发布新数据。如果将END_DATE
设置为第27天,那么您的第一个continuing
构建将有一个相当大的增量构建,但不会出现数据丢失。
注意:我选择在first
、catchup
和continuing
阶段之间进行手动切换,原因有两个:
首先,您可以通过将从placeholder_date
数据集读取的过程封装在try/catch
中,将first
和catchup
阶段合并为一个阶段,但这会使您处于依赖错误处理控制流的位置,这通常是不明智的。
其次,一旦catchup
阶段结束,continuing
阶段就放弃不再适用的placeholder_date
数据集(因为continuing
阶段是从可能是盘中或混合日期的交易中读取的)。因此,不可能根据现有的已知状态安全地确定下一个构建应该是catchup
还是continuing
。