更改 Hadoop 作业的拆分数



我目前正在编写代码以使用Hadoop处理单个图像,所以我的输入只有一个文件(.png)。我有将运行作业的工作代码,但它不是运行顺序mappers,而是只运行一个mapper,并且永远不会生成其他mappers

我创建了自己的FileInputFormatRecordReader类扩展,以便创建(我认为是)"n"自定义splits->"n"map任务。

我一直在疯狂地在网上搜索这种性质的例子来学习,但我所能找到的只是处理使用整个文件作为拆分(意味着正好一个mapper)或使用固定数量的文本文件(例如,3)每个map任务的示例。

我正在尝试做的是向每个mapper发送一对坐标((x1, y1), (x2, y2))其中坐标对应于图像中某个矩形的左上/右下像素。

任何建议/指导/示例/示例链接将不胜感激。

自定义文件输入格式

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class FileInputFormat1 extends FileInputFormat
{
@Override
public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
return new RecordReader1();
}
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return true;
}
}

自定义记录读取器

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
public class RecordReader1 extends RecordReader<KeyChunk1, NullWritable> {
private KeyChunk1 key;
private NullWritable value;
private ImagePreprocessor IMAGE;
public RecordReader1()
{
}
@Override
public void close() throws IOException {
}
@Override
public float getProgress() throws IOException, InterruptedException {
return IMAGE.getProgress();
}
@Override
public KeyChunk1 getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public NullWritable getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
boolean gotNextValue = IMAGE.hasAnotherChunk();
if (gotNextValue)
{
if (key == null)
{
key = new KeyChunk1();
}
if (value == null)
{
value = NullWritable.get();
}
int[] data = IMAGE.getChunkIndicesAndIndex();
key.setChunkIndex(data[2]);
key.setStartRow(data[0]);
key.setStartCol(data[1]);
key.setChunkWidth(data[3]);
key.setChunkHeight(data[4]);
}
else
{
key = null;
value = null;
}
return gotNextValue;
}
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
Configuration config = taskAttemptContext.getConfiguration();
IMAGE = new ImagePreprocessor(
config.get("imageName"),
config.getInt("v_slices", 1),
config.getInt("h_slices", 1),
config.getInt("kernel_rad", 2),
config.getInt("grad_rad", 1),
config.get("hdfs_address"),
config.get("local_directory")
);
}
}

图像预处理器类(在自定义记录读取器中使用 - 仅显示必要的信息)

import java.awt.image.BufferedImage;
import java.io.IOException;
public class ImagePreprocessor {
private String filename;
private int num_v_slices;
private int num_h_slices;
private int minSize;
private int width, height;
private int chunkWidth, chunkHeight;
private int indexI, indexJ;
String hdfs_address, local_directory;
public ImagePreprocessor(String filename, int num_v_slices, int num_h_slices, int kernel_radius, int gradient_radius,
String hdfs_address, String local_directory) throws IOException{
this.hdfs_address = hdfs_address;
this.local_directory = local_directory;
// all "validate" methods throw errors if input data is invalid
checkValidFilename(filename);
checkValidNumber(num_v_slices, "vertical strips");
this.num_v_slices = num_v_slices;
checkValidNumber(num_h_slices, "horizontal strips");
this.num_h_slices = num_h_slices;
checkValidNumber(kernel_radius, "kernel radius");
checkValidNumber(gradient_radius, "gradient radius");
this.minSize = 1 + 2 * (kernel_radius + gradient_radius);
getImageData(); // loads image and saves width/height to class variables
validateImageSize();
chunkWidth = validateWidth((int)Math.ceil(((double)width) / num_v_slices));
chunkHeight = validateHeight((int)Math.ceil(((double)height) / num_h_slices));
indexI = 0;
indexJ = 0;
}
public boolean hasAnotherChunk()
{
return indexI < num_h_slices;
}
public int[] getChunkIndicesAndIndex()
{
int[] ret = new int[5];
ret[0] = indexI;
ret[1] = indexJ;
ret[2] = indexI*num_v_slices + indexJ;
ret[3] = chunkWidth;
ret[4] = chunkHeight;
indexJ += 1;
if (indexJ >= num_v_slices)
{
indexJ = 0;
indexI += 1;
}
return ret;
}
}

谢谢你的时间!

应重写FileInputFormat1类中的方法public InputSplit[] getSplits(JobConf job, int numSplits)。 基于具有矩形坐标的InputSplit创建自己的类,因此在FileInputFormat中,您可以获取此信息以将正确的键/值对返回给映射器。 也许在FileInputFormat中实现getSplits可以帮助您在这里看到。

最新更新