如何避免Hadoop MapReduce作业因一个数据库插入失败而失败



我正在编写一个MapReduce作业来挖掘Web服务器日志。输入来自文本文件,输出进入MySQL数据库。问题是,如果一条记录由于任何原因(如数据超过列大小)未能插入,则整个作业将失败,并且不会向数据库写入任何内容。有没有办法保持良好的记录?我想一种方法是验证数据,但这将客户端与数据库模式结合在一起太多了,我不喜欢。 我不会发布代码,因为这不是一个特别的代码问题

编辑:

减速器:

protected void reduce(SkippableLogRecord rec,
        Iterable<NullWritable> values, Context context) {
    String path = rec.getPath().toString();
    path = path.substring(0, min(path.length(), 100));
    try {
        context.write(new DBRecord(rec), NullWritable.get());
        LOGGER.info("Wrote record {}.", path);
    } catch (IOException | InterruptedException e) {
        LOGGER.error("There was a problem when writing out {}.", path, e);
    }
}

日志:

15/03/01 14:35:06 WARN mapred.LocalJobRunner: job_local279539641_0001
java.lang.Exception: java.io.IOException: Data truncation: Data too long for column 'filename' at row 1
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.io.IOException: Data truncation: Data too long for column 'filename' at row 1
    at org.apache.hadoop.mapreduce.lib.db.DBOutputFormat$DBRecordWriter.close(DBOutputFormat.java:103)
    at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.close(ReduceTask.java:550)
    at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:629)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
15/03/01 14:35:06 INFO mapred.LocalJobRunner: reduce > reduce
15/03/01 14:35:07 INFO mapreduce.Job: Job job_local279539641_0001 failed with state FAILED due to: NA

在回答我自己的问题并查看这篇SO文章时,我看到数据库写入是批量完成的,在SQLException上,事务被回滚。这就解释了我的问题。我想我只需要让DB列足够大,或者先进行验证。我也可以创建一个自定义的DBOutputFormat/DBRecordWriter,但除非我一次插入一条记录,否则总会有一条坏记录导致整个批回滚的风险。

public void close(TaskAttemptContext context) throws IOException {
  try {
      LOG.warn("Executing statement:" + statement);   
      statement.executeBatch();
    connection.commit();
  } catch (SQLException e) {
    try {
      connection.rollback();
    }
    catch (SQLException ex) {
      LOG.warn(StringUtils.stringifyException(ex));
    }
    throw new IOException(e.getMessage());
  } finally {
    try {
      statement.close();
      connection.close();
    }
    catch (SQLException ex) {
      throw new IOException(ex.getMessage());
    }
  }
}

相关内容

  • 没有找到相关文章

最新更新