我有两个版本(旧/新)的数据库表,大约有100,000,000条记录。它们在文件中:
trx-old
trx-new
结构为:
id date amount memo
1 5/1 100 slacks
2 5/1 50 wine
id为简单主键,其他字段为非键。我想生成三个文件:
trx-removed (ids of records present in trx-old but not in trx-new)
trx-added (records from trx-new whose ids are not present in trx-old)
trx-changed (records from trx-new whose non-key values have changed since trx-old)
我需要每天在一个短的批处理窗口中执行此操作。实际上,我需要为多个表和多个模式(为每个表生成三个文件)执行此操作,因此实际的应用程序更复杂一些。但是我认为这个例子抓住了问题的关键。
这感觉像是mapreduce的一个明显的应用程序。我从来没有写过mapreduce应用程序,我的问题是:
- 是否有一些EMR应用程序已经这样做了? 是否有一个明显的猪或级联解决方案?
- 是否有其他开源的例子非常接近这个?
PS我看到了表格之间的差异问题,但那里的解决方案看起来不具有可扩展性。
这里有一个小Ruby玩具来演示算法:Ruby dbdiff我认为编写自己的作业将是最简单的,主要是因为当典型的reducer只写入一个文件时,您将希望使用MultipleOutputs从单个reduce步骤写入三个独立的文件。您需要使用MultipleInputs为每个表指定一个映射器。
这似乎是在级联中解决的完美问题。您提到过您从未编写过MR应用程序,如果您的目的是快速入门(假设您熟悉Java),那么层叠是一种选择。我一会儿会详细介绍这个
可以使用Pig或Hive,但是如果你想在这些列上执行额外的分析或更改模式,它们就不那么灵活了,因为你可以在Cascading中通过读取列头或从你创建的映射文件中读取Schema来动态构建Schema。
在Cascading
中,您将:
- 设置您的传入
Taps
:点击trxOld和Tap trxNew(这些指向您的源文件) - 将水龙头连接到
Pipes
: Pipe oldPipe和Pipe newPipe - 设置您的出站
Taps
:点击trxRemoved,点击trxAdded和点击trxChanged - 构建您的管道分析(这是乐趣(伤害)发生的地方)
trx-removed:trx-added
Pipe trxOld = new Pipe ("old-stuff");
Pipe trxNew = new Pipe ("new-stuff");
//smallest size Pipe on the right in CoGroup
Pipe oldNnew = new CoGroup("old-N-new", trxOld, new Fields("id1"),
trxNew, new Fields("id2"),
new OuterJoin() );
外部连接为我们提供了null,其中id在其他管道(您的源数据)中缺失,因此我们可以在随后的逻辑中使用FilterNotNull
或FilterNull
来获得我们最终的管道,然后我们连接到Tap trxRemoved和Tap trxAdded。
trx-changed
在这里,我将首先连接您正在使用FieldJoiner
寻找更改的字段,然后使用ExpressionFilter
给我们僵尸(因为它们更改了),例如:
Pipe valueChange = new Pipe("changed");
valueChange = new Pipe(oldNnew, new Fields("oldValues", "newValues"),
new ExpressionFilter("oldValues.equals(newValues)", String.class),
Fields.All);
它的作用是过滤掉具有相同值的字段并保留差异。此外,如果上面的表达式为真,它将删除该记录。最后,将valueChange管道连接到Tap trxChanged,您将有三个输出,其中包含您正在寻找的所有数据,以及允许添加一些分析的代码。
正如@ChrisGerken所建议的,您将不得不使用MultipleOutputs
和MultipleInputs
来生成多个输出文件,并将自定义映射器关联到每个输入文件类型(旧/新)。
映射器将输出:
- key:主键(id)
- 值:从带有附加标志(新/旧取决于输入)的输入文件记录
对于每个键,reducer将遍历所有记录R
并输出:
- 删除文件:如果只存在带有标志old的记录。
- 添加的文件:如果只有一个标记为new的记录存在。
- 修改文件:如果
R
记录不一致。
由于该算法随着reducer的数量而扩展,您很可能需要第二个作业,该作业将把结果合并到单个文件中以作为最终输出。
我想到的是:
考虑你的表是这样的:
Table_old
1 other_columns1
2 other_columns2
3 other_columns3
Table_new
2 other_columns2
3 other_columns3
4 other_columns4
添加table_old的元素"a"和table_new的元素"b"
当合并两个文件时,如果一个元素在第一个文件中存在,而在第二个文件中不存在,则删除
table_merged
1a other_columns1
2a other_columns2
2b other_columns2
3a other_columns3
3b other_columns3
4a other_columns4
从那个文件你可以很容易地做你的操作。
同样,假设你的id是n个数字,你有10个集群+1个主机。您的键将是id的第一个数字,因此,您将数据均匀地划分到集群中。您可以进行分组+分区,这样您的数据将被排序。
,
table_old
1...0 data
1...1 data
2...2 data
table_new
1...0 data
2...2 data
3...2 data
你的键是第一个数字,你根据这个数字分组,你的分区是根据id的其余部分。那么你的数据就会以
的形式到达你的集群worker1
1...0b data
1...0a data
1...1a data
worker2
2...2a data
2...2b data and so on.
注意,a, b不需要排序。
编辑合并将是这样的:
FileInputFormat.addInputPath(job, new Path("trx-old"));
FileInputFormat.addInputPath(job, new Path("trx-new"));
MR将得到两个输入,两个文件将被合并,
对于附加部分,您应该在Main MR之前再创建两个作业,其中只有Map
。第一个Map
将append "a"
赋给第一个列表中的所有元素,第二个append "b"
赋给第二个列表中的所有元素。第三个作业(我们现在使用的那个/主地图)将只有reduce作业来收集它们。所以你会得到Map-Map-Reduce
。
可以这样添加
//you have key:Text
new Text(String.valueOf(key.toString()+"a"))
但是我认为可能有不同的附加方式,其中一些可能更有效(文本hadoop)
希望对大家有所帮助,