Mapreduce Table Diff



我有两个版本(旧/新)的数据库表,大约有100,000,000条记录。它们在文件中:

trx-old
trx-new

结构为:

id date amount memo
1  5/1     100 slacks
2  5/1      50 wine

id为简单主键,其他字段为非键。我想生成三个文件:

trx-removed (ids of records present in trx-old but not in trx-new)
trx-added   (records from trx-new whose ids are not present in trx-old)
trx-changed (records from trx-new whose non-key values have changed since trx-old)

我需要每天在一个短的批处理窗口中执行此操作。实际上,我需要为多个表和多个模式(为每个表生成三个文件)执行此操作,因此实际的应用程序更复杂一些。但是我认为这个例子抓住了问题的关键。

这感觉像是mapreduce的一个明显的应用程序。我从来没有写过mapreduce应用程序,我的问题是:

  1. 是否有一些EMR应用程序已经这样做了?
  2. 是否有一个明显的猪或级联解决方案?
  3. 是否有其他开源的例子非常接近这个?

PS我看到了表格之间的差异问题,但那里的解决方案看起来不具有可扩展性。

这里有一个小Ruby玩具来演示算法:Ruby dbdiff

我认为编写自己的作业将是最简单的,主要是因为当典型的reducer只写入一个文件时,您将希望使用MultipleOutputs从单个reduce步骤写入三个独立的文件。您需要使用MultipleInputs为每个表指定一个映射器。

这似乎是在级联中解决的完美问题。您提到过您从未编写过MR应用程序,如果您的目的是快速入门(假设您熟悉Java),那么层叠是一种选择。我一会儿会详细介绍这个

可以使用Pig或Hive,但是如果你想在这些列上执行额外的分析或更改模式,它们就不那么灵活了,因为你可以在Cascading中通过读取列头或从你创建的映射文件中读取Schema来动态构建Schema。

Cascading中,您将:

  1. 设置您的传入Taps:点击trxOld和Tap trxNew(这些指向您的源文件)
  2. 将水龙头连接到Pipes: Pipe oldPipe和Pipe newPipe
  3. 设置您的出站Taps:点击trxRemoved,点击trxAdded和点击trxChanged
  4. 构建您的管道分析(这是乐趣(伤害)发生的地方)

trx-removed:trx-added

Pipe trxOld = new Pipe ("old-stuff");
Pipe trxNew = new Pipe ("new-stuff");
//smallest size Pipe on the right in CoGroup
Pipe oldNnew = new CoGroup("old-N-new", trxOld, new Fields("id1"), 
                                       trxNew, new Fields("id2"), 
                                       new OuterJoin() ); 

外部连接为我们提供了null,其中id在其他管道(您的源数据)中缺失,因此我们可以在随后的逻辑中使用FilterNotNullFilterNull来获得我们最终的管道,然后我们连接到Tap trxRemoved和Tap trxAdded。

trx-changed

在这里,我将首先连接您正在使用FieldJoiner寻找更改的字段,然后使用ExpressionFilter给我们僵尸(因为它们更改了),例如:

Pipe valueChange = new Pipe("changed");
valueChange = new Pipe(oldNnew, new Fields("oldValues", "newValues"), 
            new ExpressionFilter("oldValues.equals(newValues)", String.class),
            Fields.All);

它的作用是过滤掉具有相同值的字段并保留差异。此外,如果上面的表达式为真,它将删除该记录。最后,将valueChange管道连接到Tap trxChanged,您将有三个输出,其中包含您正在寻找的所有数据,以及允许添加一些分析的代码。

正如@ChrisGerken所建议的,您将不得不使用MultipleOutputsMultipleInputs来生成多个输出文件,并将自定义映射器关联到每个输入文件类型(旧/新)。

映射器将输出:

  • key:主键(id)
  • 值:从带有附加标志(新/旧取决于输入)的输入文件记录

对于每个键,reducer将遍历所有记录R并输出:

  • 删除文件:如果只存在带有标志old的记录。
  • 添加的文件:如果只有一个标记为new的记录存在。
  • 修改文件:如果R记录不一致。

由于该算法随着reducer的数量而扩展,您很可能需要第二个作业,该作业将把结果合并到单个文件中以作为最终输出。

我想到的是:

考虑你的表是这样的:

Table_old
1    other_columns1
2    other_columns2
3    other_columns3
Table_new 
2    other_columns2
3    other_columns3
4    other_columns4

添加table_old的元素"a"和table_new的元素"b"

当合并两个文件时,如果一个元素在第一个文件中存在,而在第二个文件中不存在,则删除

table_merged
1a    other_columns1
2a    other_columns2
2b    other_columns2
3a    other_columns3
3b    other_columns3
4a    other_columns4

从那个文件你可以很容易地做你的操作。

同样,假设你的id是n个数字,你有10个集群+1个主机。您的键将是id的第一个数字,因此,您将数据均匀地划分到集群中。您可以进行分组+分区,这样您的数据将被排序。

,

table_old
1...0 data
1...1 data
2...2 data
table_new
1...0 data
2...2 data
3...2 data

你的键是第一个数字,你根据这个数字分组,你的分区是根据id的其余部分。那么你的数据就会以

的形式到达你的集群
worker1
1...0b data
1...0a data
1...1a data
worker2 
2...2a data
2...2b data and so on.

注意,a, b不需要排序。

编辑合并将是这样的:

FileInputFormat.addInputPath(job, new Path("trx-old"));
FileInputFormat.addInputPath(job, new Path("trx-new"));

MR将得到两个输入,两个文件将被合并,

对于附加部分,您应该在Main MR之前再创建两个作业,其中只有Map。第一个Mapappend "a"赋给第一个列表中的所有元素,第二个append "b"赋给第二个列表中的所有元素。第三个作业(我们现在使用的那个/主地图)将只有reduce作业来收集它们。所以你会得到Map-Map-Reduce

可以这样添加

//you have key:Text
new Text(String.valueOf(key.toString()+"a"))

但是我认为可能有不同的附加方式,其中一些可能更有效(文本hadoop)

希望对大家有所帮助,

相关内容

  • 没有找到相关文章

最新更新