我正在编写一个MapReduce作业来挖掘Web服务器日志。输入来自文本文件,输出进入MySQL数据库。问题是,如果一条记录由于任何原因(如数据超过列大小)未能插入,则整个作业将失败,并且不会向数据库写入任何内容。有没有办法保持良好的记录?我想一种方法是验证数据,但这将客户端与数据库模式结合在一起太多了,我不喜欢。 我不会发布代码,因为这不是一个特别的代码问题
编辑:
减速器:
protected void reduce(SkippableLogRecord rec,
Iterable<NullWritable> values, Context context) {
String path = rec.getPath().toString();
path = path.substring(0, min(path.length(), 100));
try {
context.write(new DBRecord(rec), NullWritable.get());
LOGGER.info("Wrote record {}.", path);
} catch (IOException | InterruptedException e) {
LOGGER.error("There was a problem when writing out {}.", path, e);
}
}
日志:
15/03/01 14:35:06 WARN mapred.LocalJobRunner: job_local279539641_0001
java.lang.Exception: java.io.IOException: Data truncation: Data too long for column 'filename' at row 1
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.io.IOException: Data truncation: Data too long for column 'filename' at row 1
at org.apache.hadoop.mapreduce.lib.db.DBOutputFormat$DBRecordWriter.close(DBOutputFormat.java:103)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.close(ReduceTask.java:550)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:629)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/03/01 14:35:06 INFO mapred.LocalJobRunner: reduce > reduce
15/03/01 14:35:07 INFO mapreduce.Job: Job job_local279539641_0001 failed with state FAILED due to: NA
在回答我自己的问题并查看这篇SO文章时,我看到数据库写入是批量完成的,在SQLException
上,事务被回滚。这就解释了我的问题。我想我只需要让DB列足够大,或者先进行验证。我也可以创建一个自定义的DBOutputFormat/DBRecordWriter
,但除非我一次插入一条记录,否则总会有一条坏记录导致整个批回滚的风险。
public void close(TaskAttemptContext context) throws IOException {
try {
LOG.warn("Executing statement:" + statement);
statement.executeBatch();
connection.commit();
} catch (SQLException e) {
try {
connection.rollback();
}
catch (SQLException ex) {
LOG.warn(StringUtils.stringifyException(ex));
}
throw new IOException(e.getMessage());
} finally {
try {
statement.close();
connection.close();
}
catch (SQLException ex) {
throw new IOException(ex.getMessage());
}
}
}