从数据湖中带有时间戳的csv的外部表中具体化的ad-hoc缓慢变化的维度



问题

主要问题

如何从每日摘录的文件夹中短暂地实现缓慢更改维度类型2,其中每个csv是源系统中表的一个完整摘录?

基本原理

我们正在将短暂的数据仓库设计为最终用户的数据集市,这些数据集市可以在没有后果的情况下被旋转和烧毁。这需要我们在一个湖/斑点/桶中拥有所有数据。

我们每天都在翻录完整的摘录,因为:

  1. 我们无法可靠地仅提取变更集(由于我们无法控制的原因),并且
  2. 我们希望维护一个数据湖;最原始的";可能的数据

挑战问题

有没有一个解决方案可以给我特定日期的状态,而不仅仅是";最新的";状态

存在问题

我是不是完全向后想,有一种更简单的方法可以做到这一点?

可能的方法

自定义dbt物化

dbt.utils包中有一个insert_by_perioddbt实体化,我想这可能正是我想要的?但我很困惑,因为它是dbt snapshot,但是:

  1. 一次为每个文件递增地运行dbt snapshot;并且
  2. 直接由外部表构建

三角洲湖

我对Databricks的Delta Lake了解不多,但似乎使用Delta Tables是可能的?

修复提取作业

如果我们能让我们的摘录只包含自上一次摘录以来发生的变化,我们的问题就解决了吗?

示例

假设以下三个文件位于数据湖的文件夹中。(Gist,带有3个csv,所需表格结果为csv)。我添加了Extracted列,以防从文件名解析时间戳太麻烦。

2020-09-14_CRM_extract.csv

| OppId | CustId | Stage       | Won | LastModified | Extracted |
|-------|--------|-------------|-----|--------------|-----------|
| 1     | A      | 2 - Qualify |     | 9/1          | 9/14      |
| 2     | B      | 3 - Propose |     | 9/12         | 9/14      |

2020-09-15_CRM_extract.csv

| OppId | CustId | Stage       | Won | LastModified | Extracted |
|-------|--------|-------------|-----|--------------|-----------|
| 1     | A      | 2 - Qualify |     | 9/1          | 9/15      |
| 2     | B      | 4 - Closed  | Y   | 9/14         | 9/15      |
| 3     | C      | 1 - Lead    |     | 9/14         | 9/15      |

2020-09-16_CRM_extract.csv

| OppId | CustId | Stage       | Won | LastModified | Extracted |
|-------|--------|-------------|-----|--------------|-----------|
| 1     | A      | 2 - Qualify |     | 9/1          | 9/16      |
| 2     | B      | 4 - Closed  | Y   | 9/14         | 9/16      |
| 3     | C      | 2 - Qualify |     | 9/15         | 9/16      |

最终结果

以下是截至9/16的三个文件的SCD-II。截至9/15的SCD-II将是相同的,但OppId=3只有来自valid_from=9/15valid_to=null的一个

| OppId | CustId | Stage       | Won | LastModified | valid_from | valid_to |
|-------|--------|-------------|-----|--------------|------------|----------|
| 1     | A      | 2 - Qualify |     | 9/1          | 9/14       | null     |
| 2     | B      | 3 - Propose |     | 9/12         | 9/14       | 9/15     |
| 2     | B      | 4 - Closed  | Y   | 9/14         | 9/15       | null     |
| 3     | C      | 1 - Lead    |     | 9/14         | 9/15       | 9/16     |
| 3     | C      | 2 - Qualify |     | 9/15         | 9/16       | null     |

这是一个有趣的概念,当然,要充分了解您的业务、利益相关者、数据等,这将是一个比在这个论坛上更长的对话。我可以看到,如果您的数据量相对较小,您的源系统很少更改,您的报告需求(以及数据集市)也很少发生变化,您只需要很少地启动这些数据集市。

我担心的是:

  1. 如果您的源或目标需求发生变化,您将如何处理?您需要启动您的数据集市,对其进行全面的回归测试,应用您的更改,然后对其进行测试。如果你在已知更改的情况下这样做,那么对于一个没有被使用的Datamart来说,需要付出很多努力——尤其是如果你需要在两次使用之间多次这样做;如果你在需要数据集市时这样做,那么你就没有达到让数据集市可用于";即时";使用

您的声明";我们有一个DW作为代码,它可以被删除、更新和重新创建,而不会像传统的DW更改管理那样复杂;我不确定这是真的。在不启动数据集市和使用数据进行标准测试周期的情况下,您将如何测试代码的更新?这与传统的DW更改管理有何不同?

  1. 如果源系统中存在损坏/意外的数据,会发生什么?在一个";正常的";DW如果你每天都在加载数据,这通常会在当天被注意到并修复。在您的解决方案中,不可靠的数据可能发生在几天/几周前,假设它加载到您的数据集市中,而不是在加载时出错,您将需要适当的流程来发现它,然后可能需要解开几天的SCD记录来解决问题
  2. (只有当你有大量数据时才相关)考虑到存储成本低,我不确定在需要时启动数据集市的好处,而不是仅仅保存数据以便随时使用。每次启动数据集市时都要加载大量数据,这既耗时又昂贵。可能的混合方法可能是只在需要数据集市时运行增量加载,而不是每天运行它们——因此,您可以随时准备好上次使用数据集市时的数据,只需添加自上次加载以来创建/更新的记录

我不知道这是否是最好的,但我已经看到了。构建初始SCD-II表时,添加一列,该列是记录所有值的存储HASH()值(可以排除主键)。然后,您可以在每天传入的完整数据集上创建一个外部表,其中包括相同的HASH()函数。现在,您可以根据主键和HASH值是否已更改,对SCD-II执行MERGEINSERT/UPDATE

这样做的主要优势是避免了每天将所有数据加载到Snowflake中进行比较,但这样执行会更慢。您还可以使用COPY INTO语句中包含的HASH()函数加载到临时表,然后更新SCD-II,然后删除临时表,这实际上可能会更快。

相关内容

  • 没有找到相关文章

最新更新