使用批处理提取SQL Server CDC更改表不完整



基本问题:我有一个从"丢失"记录的CDC表中提取记录的过程。

我从MS SQL 2019(Data Center Ed(数据库中提取67张表上启用了CDC。其中一张桌子有3.23亿行,宽约125列。在夜间过程中,大约有1200万行被更新,因此在_CT表中生成了大约2000万行。在这个夜间过程中,CDC捕获仍在使用默认设置运行。它可能会"落后",但我们会对此进行检查。

在夜间过程完成后,我有一个Python 3.6提取器,它使用ODBC连接到SQL服务器。我有一个循环,它遍历67个源表中的每一个。在循环开始之前,我确保CDC的捕获被"捕获"。

对于每个表,提取器通过从Snowflake中的目标数据库读取最后一个成功加载的LSN来开始该过程。

Python脚本将表名、上次加载的LSN和表PKEY写入以下查询,以获取表的当前MAX_LSN:

def get_incr_count(self, table_name, pk, last_loaded_lsn):
try:
cdc_table_name = self.get_cdc_table(table_name)
max_lsn = self.get_max_lsn(table_name)
incr_count_query = """with incr as
(
select
row_number() over
(
partition by """ + pk + """
order by
__$start_lsn desc,
__$seqval desc
) as __$rn,
*
from """ + cdc_table_name + """
where
__$operation <> 3 and
__$start_lsn > """ + last_loaded_lsn + """ and
__$start_lsn <= """ + max_lsn + """
)
select COUNT(1) as count from incr where __$rn = 1 ;
"""
lsn_df = pd.read_sql_query(incr_count_query, self.cnxn)
incr_count = lsn_df['count'][0]
return incr_count
except Exception as e:
raise Exception('Could not get the count of the incremental load for ' + table_name + ': ' + str(e))

如果此查询找到要处理的记录,则会运行此函数。一次提取500000条记录的限制是运行此代码的虚拟机上的内存限制。超过这个数量会耗尽可用内存。

def get_cdc_data(self, table_name, pk, last_loaded_lsn, offset_iterator=0, fetch_count=500000):
try: 
cdc_table_name = self.get_cdc_table(table_name)
max_lsn = self.get_max_lsn(table_name)
#Get the lasst LSN loaded from the ODS.LOG_CDC table for the current table
last_lsn = last_loaded_lsn
incremental_pull_query = """with incr as
(
select
row_number() over
(
partition by """ + pk + """
order by
__$start_lsn desc,
__$seqval desc
) as __$rn,
*
from """ + cdc_table_name + """
where
__$operation <> 3 and
__$start_lsn > """ + last_lsn + """ and
__$start_lsn <= """ + max_lsn + """
)
select CONVERT(VARCHAR(max), __$start_lsn, 1) as __$conv_lsn, * 
from incr where __$rn = 1 
order by __$conv_lsn
offset """ + str(offset_iterator) + """ rows
fetch first """ + str(fetch_count) + """ rows only;
"""
# Load the incremental data into a dataframe, df, using the SQL Server connection and the incremental query
full_df = pd.read_sql_query(incremental_pull_query, self.cnxn)
# Trim all cdc columns except __$operation
df = full_df.drop(['__$conv_lsn', '__$rn', '__$start_lsn', '__$end_lsn', '__$seqval', '__$update_mask', '__$command_id'], axis=1)
return df

except Exception as e:
raise Exception('Could not get the incremental load dataframe for ' + table_name + ': ' + str(e))

然后将文件移动到snowflake中并合并到一个表中。如果每个导入循环都成功,我们将更新目标数据库中的MAX LSN以设置下一个起点。如果任何失败,我们将保留最大值,然后重试下一次。在下面的场景中,没有发现错误。

我们发现有证据表明,第二个查询在循环通过时并没有提取起始LSN和最大LSN之间的所有有效记录没有可辨别的模式会遗漏记录,除了如果遗漏了一个LSN,则会遗漏其中的所有更改。

我认为这可能与我们如何订购记录有关:通过__$conv_lsn订购。此值将二进制转换为VARCHAR(MAX(。。。所以我想知道订购一把更可靠的钥匙是否明智。如果不在这个过程中增加额外的工作,我想不出一种方法来审计这一点,因为这个过程非常耗时。这确实使故障排除更加困难。

我怀疑您的问题就在这里。

row_number() over
(
partition by """ + pk + """
order by
__$start_lsn desc,
__$seqval desc
) as __$rn,
...
from incr where __$rn = 1

如果给定的事务影响了多行,则它们将被枚举为1-N。即使这样也有点波浪形;我不确定如果一行在一个事务中被影响不止一次会发生什么(我需要设置一个测试……嗯……我很懒(。


但尽管如此,这个工作流程对我来说很奇怪。我过去曾与美国疾病控制与预防中心合作过,虽然无可否认我不是针对雪花,但提取部分应该是类似的,而且相当简单。

  1. 使用sys.fn_cdc_get_max_lsn();获取最大LSN(即无需查询CDC数据本身即可获得此值(
  2. 使用LSN端点从cdc.fn_cdc_get_all_changes_«capture_instance»()cdc.fn_cdc_get_net_changes_«capture_ instance»()中进行选择(最小值来自该表的上一次运行或第一次运行的sys.fn_cdc_get_min_lsn(«capture_ instance»);最大值来自上面(
  3. 将结果流式传输到任何位置(即,您不需要同时在内存中保存大量的更改记录(

最新更新