使用 Eclipse 开发、测试和调试 Hadoop 映射/归约作业



在 Eclipse 中开发 Java Map Reduce 作业有哪些选择?我的最终目标是在我的 Amazon Hadoop 集群上运行我开发的 map/reduce 逻辑,但我想先在本地机器上测试逻辑,并在将其部署到更大的集群之前在其中放置断点。

看到Eclipse有一个Hadoop插件,它看起来很旧(如果我错了,请纠正我),一家名为Karmasphere的公司有一些ecplise和Hadoop的东西,但我不确定它是否仍然可用。

您如何使用 Eclipse 开发、测试和调试您的 map/reduce 作业?

我在Eclipse中通过以下方式开发Cassandra/Hadoop应用程序:

  1. 使用 maven (m2e) 为我的 Eclipse 项目收集和配置依赖项(Hadoop、Cassandra、Pig 等)

  2. 创建测试用例(src/test/java中的类)来测试我的映射器和化简器。 诀窍是使用扩展 RecordWriter 和 StatusReporter 的内部类动态构建上下文对象。 如果这样做,则在调用 setup/map/cleanup 或 setup/reduce/cleanup 后,您可以断言映射器或化简器写入了正确的键/值对和上下文信息。 mapred和mapreduce中的上下文构造函数看起来很丑陋,但你会发现这些类很容易实例化。

  3. 编写这些测试后,maven 会在每次构建时自动调用它们。

  4. 您可以通过选择项目并执行运行 --> Maven 测试来手动调用测试。 事实证明,这非常方便,因为测试是在调试模式下调用的,您可以在映射器和化简器中设置断点,并执行 Eclipse 允许您在调试中执行的所有很酷的事情。

  5. 一旦你对代码的质量感到满意,使用Maven为Hadoop非常喜欢的一个罐子构建一个带有依赖项的jar

作为旁注,我已经基于 Eclipse 中的 M2T JET 项目构建了许多代码生成工具。 它们为我上面提到的所有内容生成了基础结构,我只是为我的映射器、化简器和测试用例编写逻辑。 我认为如果你考虑一下,你可能会想出一组可重用的类,你可以扩展它们来做几乎同样的事情。

下面是一个示例测试用例类:

/*
 * 
 * This source code and information are provided "AS-IS" without 
 * warranty of any kind, either expressed or implied, including
 * but not limited to the implied warranties of merchantability
 * and/or fitness for a particular purpose.
 * 
 * This source code was generated using an evaluation copy 
 * of the Cassandra/Hadoop Accelerator and may not be used for
 * production purposes.
 *
 */
package com.creditco.countwords.ReadDocs;
// Begin imports 
import java.io.IOException;
import java.util.ArrayList;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.junit.Test;
// End imports 
public class ParseDocsMapperTest extends TestCase {
    @Test
    public void testCount() {
        TestRecordWriter    recordWriter    = new TestRecordWriter();
        TestRecordReader    recordReader    = new TestRecordReader();
        TestOutputCommitter outputCommitter = new TestOutputCommitter();
        TestStatusReporter  statusReporter  = new TestStatusReporter();
        TestInputSplit      inputSplit      = new TestInputSplit();
        try {
                // Begin test logic

                // Get an instance of the mapper to be tested and a context instance
            ParseDocsMapper mapper = new ParseDocsMapper();
            Mapper<LongWritable,Text,Text,IntWritable>.Context context = 
                mapper.testContext(new Configuration(), new TaskAttemptID(),recordReader,recordWriter,outputCommitter,statusReporter,inputSplit);
                // Invoke the setup, map and cleanup methods
            mapper.setup(context);
            LongWritable key = new LongWritable(30);
            Text value = new Text("abc def ghi");
            mapper.map(key, value, context);
            if (recordWriter.getKeys().length != 3) {
                fail("com.creditco.countwords:ParseDocsMapperTest.testCount() - Wrong number of records written ");
            }
            mapper.cleanup(context);
                // Validation:
                //
                // recordWriter.getKeys() returns the keys written to the context by the mapper
                // recordWriter.getValues() returns the values written to the context by the mapper
                // statusReporter returns the most recent status and any counters set by the mapper
                //
                // End test logic
        } catch (Exception e) {
            fail("com.creditco.countwords:ParseDocsMapperTest.testCount() - Exception thrown: "+e.getMessage());
        }
    }
    final class TestRecordWriter extends RecordWriter<Text, IntWritable> {
        ArrayList<Text> keys = new ArrayList<Text>();
        ArrayList<IntWritable> values = new ArrayList<IntWritable>();
        public void close(TaskAttemptContext arg0) throws IOException, InterruptedException { }
        public void write(Text key, IntWritable value) throws IOException, InterruptedException {
            keys.add(key);
            values.add(value);
        }
        public Text[] getKeys() {
            Text result[] = new Text[keys.size()];
            keys.toArray(result);
            return result;
        }
        public IntWritable[] getValues() {
            IntWritable[] result = new IntWritable[values.size()];
            values.toArray(result);
            return result;
        }
    };  
    final class TestRecordReader extends RecordReader<LongWritable, Text> {
        public void close() throws IOException { }
        public LongWritable getCurrentKey() throws IOException, InterruptedException {
            throw new RuntimeException("Tried to call RecordReader:getCurrentKey()");
        }
        public Text getCurrentValue() throws IOException, InterruptedException {
            throw new RuntimeException("Tried to call RecordReader:getCurrentValue()");
        }
        public float getProgress() throws IOException, InterruptedException {
            throw new RuntimeException("Tried to call RecordReader:getProgress()");
        }
        public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { }
        public boolean nextKeyValue() throws IOException, InterruptedException {
            return false;
        }
    };
    final class TestStatusReporter extends StatusReporter {
        private Counters counters = new Counters();
        private String status = null;
        public void setStatus(String arg0) {
            status = arg0;
        }
        public String getStatus() {
            return status;
        }
        public void progress() { }
        public Counter getCounter(String arg0, String arg1) {
            return counters.getGroup(arg0).findCounter(arg1);
        }
        public Counter getCounter(Enum<?> arg0) {
            return null;
        }
    };
    final class TestInputSplit extends InputSplit {
        public String[] getLocations() throws IOException, InterruptedException {
            return null;
        }
        public long getLength() throws IOException, InterruptedException {
            return 0;
        }
    };
    final class TestOutputCommitter extends OutputCommitter {
        public void setupTask(TaskAttemptContext arg0) throws IOException { }
        public void setupJob(JobContext arg0) throws IOException { }
        public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
            return false;
        }
        public void commitTask(TaskAttemptContext arg0) throws IOException { }
        public void cleanupJob(JobContext arg0) throws IOException { }
        public void abortTask(TaskAttemptContext arg0) throws IOException { }
    };
}

这是一个示例Maven Pom。 请注意,引用的版本有点过时,但只要这些版本保存在某个地方的 maven 存储库中,您就可以构建此项目。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.creditco</groupId>
  <artifactId>wordcount.example</artifactId>
  <version>0.0.1-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.2</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
  <dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-core</artifactId>
        <version>0.20.2</version>
        <type>jar</type>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.cassandra</groupId>
        <artifactId>cassandra-all</artifactId>
        <version>1.0.6</version>
        <type>jar</type>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.cassandraunit</groupId>
        <artifactId>cassandra-unit</artifactId>
        <version>1.0.1.1</version>
        <type>jar</type>
        <scope>compile</scope>
        <exclusions>
            <exclusion>
                <artifactId>hamcrest-all</artifactId>
                <groupId>org.hamcrest</groupId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.pig</groupId>
        <artifactId>pig</artifactId>
        <version>0.9.1</version>
        <type>jar</type>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.json</groupId>
        <artifactId>json</artifactId>
        <version>20090211</version>
        <type>jar</type>
        <scope>compile</scope>
    </dependency>
  </dependencies>
</project>

我使用Apache附带的MiniMRCluster集群。你用来在单元测试中启动一个迷你地图化简集群!HBase也有HBaseTestingUtil,这很棒,因为你在大约两行中启动HDFS和MapReduce。

@Chris Gerken - 我正在尝试通过将驱动程序作为 Java 应用程序运行来运行 Eclipse 中的字数统计作业,但我在映射器上得到了 ClassNotFoundException。在我看来,作为 java 应用程序运行,Hadoop作业不会获得所需的映射器和 Reduce 与 jar。

相关内容

  • 没有找到相关文章

最新更新