MiniMRYarnCluster,本地运行MR



我正在尝试使用MiniMRYarnCluster在本地运行MR作业。我使用旧的mapreduce(不是YARN)和mapreduce API v2这些东西可以在这里找到

<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>2.0.0-cdh4.1.1</version>
            <type>test-jar</type>
            <scope>test</scope>
        </dependency>

这是log的一部分:

--127.0.1.1-58175-1358256748215, blockid: BP-1072059606-127.0.1.1-1358256746988:blk_6137856716359201843_1008, duration: 229871
13/01/15 17:32:34 INFO localizer.LocalizedResource: Resource hdfs://localhost:50123/apps_staging_dir/ssa/.staging/job_1358256748507_0001/job.xml transitioned from DOWNLOADING to LOCALIZED
13/01/15 17:32:34 INFO container.Container: Container container_1358256748507_0001_01_000001 transitioned from LOCALIZING to LOCALIZED
13/01/15 17:32:34 INFO container.Container: Container container_1358256748507_0001_01_000001 transitioned from LOCALIZED to RUNNING
13/01/15 17:32:34 INFO nodemanager.DefaultContainerExecutor: launchContainer: [bash, /home/ssa/devel/POIClusterMapreduceTest/ru.mrjob.poi.POIClusterMapreduceTest-localDir-nm-0_1/usercache/ssa/appcache/application_1358256748507_0001/container_1358256748507_0001_01_000001/default_container_executor.sh]
13/01/15 17:32:34 WARN nodemanager.DefaultContainerExecutor: Exit code from task is : 1
13/01/15 17:32:34 INFO nodemanager.ContainerExecutor: 
13/01/15 17:32:34 WARN launcher.ContainerLaunch: Container exited with a non-zero exit code 1

这里有一个例外:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/service/CompositeService
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:615)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:283)
    at java.net.URLClassLoader.access$000(URLClassLoader.java:58)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:197)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.service.CompositeService
    at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
    ... 12 more
Could not find the main class: org.apache.hadoop.mapreduce.v2.app.MRAppMaster.  Program will exit.

我使用了org.apache.hadoop.mapreduce.v2。TestMRJobs作为我自己测试的基础。有人遇到这个问题吗?

下面是我的代码,它是一个抽象基类,用于在CI服务器或开发人员机器上本地测试MR作业:

public abstract class AbstractClusterMapReduceTest {
    private static final Log LOG = LogFactory.getLog(AbstractClusterMapReduceTest.class);
    public static final String DEFAULT_LOG_CATALOG = "local-mr-logs";
    private static final int DEFAULT_NAMENODE_PORT = 50123;
    private static final int ONE_DATANODE = 1;
    private static final int DEFAULT_REDUCE_NUM_TASKS = 1;
    private static final String SLASH = "/";
    private static final String DEFAULT_MR_INPUT_DATA_FILE = "mr-input-data-file";
    private MiniMRYarnCluster mrCluster;
    private MiniDFSCluster dfsCluster;
    /** Shitty code from base Cloudera example*/
    private static Path TEST_ROOT_DIR = new Path("target",
            AbstractClusterMapReduceTest.class.getName() + "-tmpDir").makeQualified(getLocalFileSystem());
    static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");

    private static FileSystem getLocalFileSystem(){
        try {
            return FileSystem.getLocal(new Configuration());
        } catch (IOException e) {
            throw new Error("Can't access local file system. MR cluster can't be started", e);
        }
    }
    /**
     * Always provide path to log catalog.
     * Default is: ${project.build.directory}/{@link AbstractClusterMapReduceTest#DEFAULT_LOG_CATALOG}
     * */
    protected String getPathToLogCatalog(){
        return getPathToOutputDirectory()+ SLASH + DEFAULT_LOG_CATALOG;
    }
    private String getPathToOutputDirectory(){
        return System.getProperty("project.build.directory");
    }
    private void checkAppJar(){
        if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
            throw new Error("MRAppJar " + MiniMRYarnCluster.APPJAR+ " not found. Not running test.");
        }else{
            LOG.info(MiniMRYarnCluster.APPJAR + " is at the right place. Can continue to setup Env...");
        }
    }
    public void setupEnv() throws IOException{
        checkAppJar();
        System.setProperty("hadoop.log.dir", getPathToLogCatalog());
        System.setProperty("javax.xml.parsers.SAXParserFactory",
                "com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl");
        dfsCluster = buildMiniDFSCluster();
        //dfsCluster.getFileSystem().makeQualified(createPath(getHDFSPathToInputData()));
        //dfsCluster.getFileSystem().makeQualified(createPath(getOutputPath()));
        mrCluster = new MiniMRYarnCluster(this.getClass().getName(), 1);
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", getFileSystem().getUri().toString());   // use HDFS
        //conf.set(MRJobConfig.MR_AM_STAGING_DIR, getPathToOutputDirectory()+"/tmp-mapreduce");
        conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir");
        mrCluster.init(conf);
        mrCluster.start();
        //Cloudera tricks :)
        // Copy MRAppJar and make it private. TODO: FIXME. This is a hack to
        // workaround the absent public discache.
        getLocalFileSystem().copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR);
        getLocalFileSystem().setPermission(APP_JAR, new FsPermission("700"));
    }
    public void tearDown() {
        if (mrCluster != null) {
            mrCluster.stop();
            mrCluster = null;
        }
        if (dfsCluster != null) {
            dfsCluster.shutdown();
            dfsCluster = null;
        }
    }
    public boolean createAndSubmitJob() throws IOException, ClassNotFoundException, InterruptedException{
        LOG.info("createAndSubmitJob: enter");
        checkAppJar();
        LOG.info("MRAppJar has been found. Can start to create Job");
        Configuration configuration = mrCluster.getConfig();
        configuration.set(MRConfig.MASTER_ADDRESS, "local");
        Job job = Job.getInstance(configuration);
        job.setJobName(this.getClass().getSimpleName()+"-job");
        job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
        job.setJarByClass(getMRJobClass());
        job.setJobName(getMRJobClass().getSimpleName());
        job.setNumReduceTasks(getReduceNumTasks());
        job.setOutputKeyClass(getOutputKeyClass());
        job.setOutputValueClass(getOutputValueClass());
        job.setMapperClass(getMapperClass());
        job.setReducerClass(getReducerClass());
        job.setInputFormatClass(getInputFormat());
        job.setOutputFormatClass(getOutputFormat());
        FileInputFormat.setInputPaths(job, getHDFSPathToInputData());
        FileOutputFormat.setOutputPath(job, createPath(getOutputPath()));
        job.setSpeculativeExecution(false);
        job.setMaxMapAttempts(1); // speed up failures
        LOG.info("Submitting job...");
        job.submit();
        LOG.info("Job has been submitted.");
        String trackingUrl = job.getTrackingURL();
        String jobId = job.getJobID().toString();
        LOG.info("trackingUrl:" +trackingUrl);
        LOG.info("jobId:" +jobId);
       return job.waitForCompletion(true);
    }

    protected FileSystem getFileSystem() throws IOException {
        return dfsCluster.getFileSystem();
    }

    protected int getReduceNumTasks(){
        return DEFAULT_REDUCE_NUM_TASKS;
    }

    /**
     * @return InputStream instance to file you want to run with your MR job
     * */
    protected InputStream getInputStreamForInputData() {
        return this.getClass().getClassLoader().getResourceAsStream(this.getClass().getSimpleName()+"/"+getInputDatasetName());
        //return getPathToOutputDirectory()+ SLASH + DEFAULT_INPUT_CATALOG+"/mr-input-data";
    }
    protected String getHDFSPathToInputData() throws IOException{
        InputStream inputStream = getInputStreamForInputData();
        Path hdfsInputPath = new Path(DEFAULT_MR_INPUT_DATA_FILE);
        FSDataOutputStream fsDataOutputStream = getFileSystem().create(hdfsInputPath);
        copyStream(inputStream, fsDataOutputStream);
        fsDataOutputStream.close();
        inputStream.close();
        return hdfsInputPath.toString();
    }
    private void copyStream(InputStream input, OutputStream output) throws IOException {
        byte[] buffer = new byte[1024]; // Adjust if you want
        int bytesRead;
        while ((bytesRead = input.read(buffer)) != -1)
        {
            output.write(buffer, 0, bytesRead);
        }
    }
    /**
     * Dataset should be placed in resources/ConcreteClusterMapReduceTest
     * @return a name of a file from catalog.
     * */
    protected abstract String getInputDatasetName();
    /**
     * @return path reducer output
     * default is: @{link AbstractClusterMapReduceTest#DEFAULT_OUTPUT_CATALOG}
     * */
    protected String getOutputPath(){
        return "mr-data-output";
    }
    /**
     * Creates @{link Path} using absolute path to some FS resource
     * @return new Path instance.
     * */
    protected Path createPath(String pathToFSResource){
        return new Path(pathToFSResource);
    }
    /**
     * Builds new instance of MiniDFSCluster
     * Default: @{link DEFAULT_NAMENODE_PORT}, @{link DEFAULT_NAMENODE_PORT}
     * @return MiniDFSCluster instance.
     * */
    protected MiniDFSCluster buildMiniDFSCluster() throws IOException {
        return new MiniDFSCluster.Builder(new Configuration())
                    .nameNodePort(DEFAULT_NAMENODE_PORT)
                    .numDataNodes(ONE_DATANODE)
                    .build();
    }
    protected abstract Class<? extends Configured> getMRJobClass();
    protected abstract Class<? extends Mapper> getMapperClass();
    protected abstract Class<? extends Reducer> getReducerClass();
    protected abstract Class<? extends InputFormat> getInputFormat();
    protected abstract Class<? extends OutputFormat> getOutputFormat();
    protected abstract Class<?> getOutputKeyClass();
    protected abstract Class<?> getOutputValueClass();
}

和具体的测试子类:

public class POIClusterMapreduceTest extends AbstractClusterMapReduceTest{
    private static final String INTEGRATION = "integration";

    @BeforeClass(groups = INTEGRATION)
    public void setup() throws IOException {
        super.setupEnv();
    }
    //@Test(groups = INTEGRATION)
    public void runJob() throws InterruptedException, IOException, ClassNotFoundException {
        boolean result = createAndSubmitJob();
        MatcherAssert.assertThat(result, Matchers.is(true));
        String outputResultAsString = getFileSystem().open(createPath(getOutputPath())).readUTF();
        MatcherAssert.assertThat(outputResultAsString.length(), Matchers.greaterThan(0));

    }
    @AfterClass(groups = INTEGRATION)
    public void tearDown(){
        super.tearDown();
    }
    @Override
    protected Class<Main> getMRJobClass() {
        return Main.class;
    }
    @Override
    protected Class<POIMapper> getMapperClass() {
        return POIMapper.class;
    }
    @Override
    protected  Class<Reducer> getReducerClass() {
        return Reducer.class;
    }
    @Override
    protected Class<TextInputFormat> getInputFormat() {
        return TextInputFormat.class;
    }
    @Override
    protected Class<TextOutputFormat> getOutputFormat() {
        return TextOutputFormat.class;
    }
    @Override
    protected Class<LongWritable> getOutputKeyClass() {
        return LongWritable.class;
    }
    @Override
    protected Class<XVLRDataWritable> getOutputValueClass() {
        return XVLRDataWritable.class;
    }
    @Override
    protected String getInputDatasetName() {
        return "mr-input-data";
    }
}

问题出在可怕的maven冲突中。以下是正确的maven依赖项:

<dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>${hadoop.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.common.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.common.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <classifier>tests</classifier>
            <version>${hadoop.common.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-test</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    </dependencies>

:

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <hadoop.version>2.0.0-mr1-cdh4.1.1</hadoop.version>
    <hadoop.common.version>2.0.0-cdh4.1.1</hadoop.common.version>
</properties>
下面是测试的基类:
public abstract class AbstractClusterMRTest {
    private static final Log LOG = LogFactory.getLog(AbstractClusterMRTest.class);
    public static final String DEFAULT_LOG_CATALOG = "local-mr-logs";
    public static final String SLASH = "/";
    public static final String MR_DATA_OUTPUT = "mr-data-output";
    public static final String DEFAULT_OUTPUT_FILE_NAME =  "part-r-00000";
    private static final int DEFAULT_REDUCE_NUM_TASKS = 1;

    private Configuration configuration;
    private FileSystem localFileSystem;
    private MiniMRCluster mrCluster;
    private JobConf mrClusterConf;

    /**
     * Always provide path to log catalog.
     * */
    protected String getPathToLogCatalog(){
        File logCatalog = new File(getPathToOutputDirectory()+ SLASH + DEFAULT_LOG_CATALOG);
        if(!logCatalog.exists()){
            logCatalog.mkdir();
        }
        LOG.info("Path to log catalog is: "+logCatalog.getAbsolutePath());
        return logCatalog.getAbsolutePath();
    }
    private String getPathToOutputDirectory(){
        return System.getProperty("project.build.directory");
    }
    public void setup() throws IOException{
        System.setProperty("hadoop.log.dir", getPathToLogCatalog());
        configuration = new Configuration(true);
        localFileSystem = FileSystem.get(configuration);
        mrCluster = new MiniMRCluster(1, localFileSystem.getUri().toString(), 1, null, null, new JobConf(configuration));
        mrClusterConf = mrCluster.createJobConf();
    }
    public void tearDown() {
        if (mrCluster != null) {
            mrCluster.shutdown();
            mrCluster = null;
        }
    }
    /**
     * Use this method to get JobBuilder configured for testing purposes.
     * @return JobBuilder instance ready for further configuration.
     * */
    public JobBuilder createTestableJobInstance() throws IOException{
        return new JobBuilder(mrClusterConf, this.getClass().getSimpleName()+"-mrjob")
                .withNumReduceTasks(DEFAULT_REDUCE_NUM_TASKS);
    }
    /**
     * Pass configured JobBuilder and wait for completion
     * @param jobBuilder is a JobBuilder ready to submit
     * @return job completion result.
     * */
    public boolean buildSubmitAndWaitForCompletion(JobBuilder jobBuilder)
                        throws InterruptedException, IOException, ClassNotFoundException {
        String pathToInputFile = getPathToInputData();
        checkThatFileExists(pathToInputFile);
        Job job = jobBuilder.build();
        FileInputFormat.setInputPaths(job, pathToInputFile);
        FileOutputFormat.setOutputPath(job, createPath(getOutputPath()));
        LOG.info("Submitting job...");
        job.submit();
        LOG.info("Job has been submitted.");
        String trackingUrl = job.getTrackingURL();
        String jobId = job.getJobID().toString();
        LOG.info("trackingUrl:" +trackingUrl);
        LOG.info("jobId:" +jobId);
        return job.waitForCompletion(true);
    }
    /**
     * By declaration input data should be stored in test/resources folder:
     * ConcreteTestClassName/in/getInputDatasetName()
     * Don't forget to override @link{AbstractClusterMRTest#getInputDatasetName()}
     * @return path to input data
     * */
    protected String getPathToInputData(){
        String pathFile = this.getClass().getSimpleName() + SLASH + "in" + SLASH+ getInputDatasetName();
        LOG.info("Path for getting URL to file:" + pathFile);
        URL urlToFile = this.getClass().getClassLoader().getResource(pathFile);
        File file = FileUtils.toFile(urlToFile);
        return file.getAbsolutePath();
    }
    /**
     * Dataset should be placed in resources/ConcreteClusterMapReduceTest
     * @return a name of a file from catalog.
     * */
    protected abstract String getInputDatasetName();
    /**
     * @return path reducer output
     * default is: @{link AbstractClusterMapReduceTestOld#DEFAULT_OUTPUT_CATALOG}
     * */
    protected String getOutputPath(){
        return getPathToOutputDirectory()+ SLASH + MR_DATA_OUTPUT;
    }
    /**
     * @return text lines from reducer output file.
     * */
    protected List<String> getLinesFromOutputFile() throws IOException{
        String pathToResult = getOutputPath()+SLASH+DEFAULT_OUTPUT_FILE_NAME;
        File resultFile = new File(pathToResult);
        return FileUtils.readLines(resultFile);
    }
    public abstract String getEtalonOutputFileName();
    protected List<String> getLinesFromEtalonOutputFile() throws IOException{
        String pathFile = this.getClass().getSimpleName() + SLASH + "out" + SLASH+ getEtalonOutputFileName();
        LOG.debug("path to etalon file: "+ pathFile);
        URL urlToFile = this.getClass().getClassLoader().getResource(pathFile);
        File file = FileUtils.toFile(urlToFile);
        return FileUtils.readLines(file);
    }

    /**
     * Creates @{link Path} using absolute path to some FS resource
     * @return new Path instance.
     * */
    protected Path createPath(String pathToFSResource){
        return new Path(pathToFSResource);
    }
    public void checkThatFileExists(String absolutePathToFile){
        if(! new File(absolutePathToFile).exists()){
            throw new Error("Path to input file is incorrect. Can't run MR job. Incorrect path is:"+absolutePathToFile);
        }
    }
}

只是扩展,创建您自己的具体测试类。从createTestableJobInstance获取Job(有一个简单的Job构建器包装器)。配置它并调用buildSubmitAndWaitForCompletion

相关内容

  • 没有找到相关文章

最新更新