我正在尝试使用MiniMRYarnCluster在本地运行MR作业。我使用旧的mapreduce(不是YARN)和mapreduce API v2这些东西可以在这里找到
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.0.0-cdh4.1.1</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
这是log的一部分:
--127.0.1.1-58175-1358256748215, blockid: BP-1072059606-127.0.1.1-1358256746988:blk_6137856716359201843_1008, duration: 229871
13/01/15 17:32:34 INFO localizer.LocalizedResource: Resource hdfs://localhost:50123/apps_staging_dir/ssa/.staging/job_1358256748507_0001/job.xml transitioned from DOWNLOADING to LOCALIZED
13/01/15 17:32:34 INFO container.Container: Container container_1358256748507_0001_01_000001 transitioned from LOCALIZING to LOCALIZED
13/01/15 17:32:34 INFO container.Container: Container container_1358256748507_0001_01_000001 transitioned from LOCALIZED to RUNNING
13/01/15 17:32:34 INFO nodemanager.DefaultContainerExecutor: launchContainer: [bash, /home/ssa/devel/POIClusterMapreduceTest/ru.mrjob.poi.POIClusterMapreduceTest-localDir-nm-0_1/usercache/ssa/appcache/application_1358256748507_0001/container_1358256748507_0001_01_000001/default_container_executor.sh]
13/01/15 17:32:34 WARN nodemanager.DefaultContainerExecutor: Exit code from task is : 1
13/01/15 17:32:34 INFO nodemanager.ContainerExecutor:
13/01/15 17:32:34 WARN launcher.ContainerLaunch: Container exited with a non-zero exit code 1
这里有一个例外:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/service/CompositeService
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)
at java.lang.ClassLoader.defineClass(ClassLoader.java:615)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:283)
at java.net.URLClassLoader.access$000(URLClassLoader.java:58)
at java.net.URLClassLoader$1.run(URLClassLoader.java:197)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.service.CompositeService
at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
... 12 more
Could not find the main class: org.apache.hadoop.mapreduce.v2.app.MRAppMaster. Program will exit.
我使用了org.apache.hadoop.mapreduce.v2。TestMRJobs作为我自己测试的基础。有人遇到这个问题吗?
下面是我的代码,它是一个抽象基类,用于在CI服务器或开发人员机器上本地测试MR作业:
public abstract class AbstractClusterMapReduceTest {
private static final Log LOG = LogFactory.getLog(AbstractClusterMapReduceTest.class);
public static final String DEFAULT_LOG_CATALOG = "local-mr-logs";
private static final int DEFAULT_NAMENODE_PORT = 50123;
private static final int ONE_DATANODE = 1;
private static final int DEFAULT_REDUCE_NUM_TASKS = 1;
private static final String SLASH = "/";
private static final String DEFAULT_MR_INPUT_DATA_FILE = "mr-input-data-file";
private MiniMRYarnCluster mrCluster;
private MiniDFSCluster dfsCluster;
/** Shitty code from base Cloudera example*/
private static Path TEST_ROOT_DIR = new Path("target",
AbstractClusterMapReduceTest.class.getName() + "-tmpDir").makeQualified(getLocalFileSystem());
static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
private static FileSystem getLocalFileSystem(){
try {
return FileSystem.getLocal(new Configuration());
} catch (IOException e) {
throw new Error("Can't access local file system. MR cluster can't be started", e);
}
}
/**
* Always provide path to log catalog.
* Default is: ${project.build.directory}/{@link AbstractClusterMapReduceTest#DEFAULT_LOG_CATALOG}
* */
protected String getPathToLogCatalog(){
return getPathToOutputDirectory()+ SLASH + DEFAULT_LOG_CATALOG;
}
private String getPathToOutputDirectory(){
return System.getProperty("project.build.directory");
}
private void checkAppJar(){
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
throw new Error("MRAppJar " + MiniMRYarnCluster.APPJAR+ " not found. Not running test.");
}else{
LOG.info(MiniMRYarnCluster.APPJAR + " is at the right place. Can continue to setup Env...");
}
}
public void setupEnv() throws IOException{
checkAppJar();
System.setProperty("hadoop.log.dir", getPathToLogCatalog());
System.setProperty("javax.xml.parsers.SAXParserFactory",
"com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl");
dfsCluster = buildMiniDFSCluster();
//dfsCluster.getFileSystem().makeQualified(createPath(getHDFSPathToInputData()));
//dfsCluster.getFileSystem().makeQualified(createPath(getOutputPath()));
mrCluster = new MiniMRYarnCluster(this.getClass().getName(), 1);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", getFileSystem().getUri().toString()); // use HDFS
//conf.set(MRJobConfig.MR_AM_STAGING_DIR, getPathToOutputDirectory()+"/tmp-mapreduce");
conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir");
mrCluster.init(conf);
mrCluster.start();
//Cloudera tricks :)
// Copy MRAppJar and make it private. TODO: FIXME. This is a hack to
// workaround the absent public discache.
getLocalFileSystem().copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR);
getLocalFileSystem().setPermission(APP_JAR, new FsPermission("700"));
}
public void tearDown() {
if (mrCluster != null) {
mrCluster.stop();
mrCluster = null;
}
if (dfsCluster != null) {
dfsCluster.shutdown();
dfsCluster = null;
}
}
public boolean createAndSubmitJob() throws IOException, ClassNotFoundException, InterruptedException{
LOG.info("createAndSubmitJob: enter");
checkAppJar();
LOG.info("MRAppJar has been found. Can start to create Job");
Configuration configuration = mrCluster.getConfig();
configuration.set(MRConfig.MASTER_ADDRESS, "local");
Job job = Job.getInstance(configuration);
job.setJobName(this.getClass().getSimpleName()+"-job");
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.setJarByClass(getMRJobClass());
job.setJobName(getMRJobClass().getSimpleName());
job.setNumReduceTasks(getReduceNumTasks());
job.setOutputKeyClass(getOutputKeyClass());
job.setOutputValueClass(getOutputValueClass());
job.setMapperClass(getMapperClass());
job.setReducerClass(getReducerClass());
job.setInputFormatClass(getInputFormat());
job.setOutputFormatClass(getOutputFormat());
FileInputFormat.setInputPaths(job, getHDFSPathToInputData());
FileOutputFormat.setOutputPath(job, createPath(getOutputPath()));
job.setSpeculativeExecution(false);
job.setMaxMapAttempts(1); // speed up failures
LOG.info("Submitting job...");
job.submit();
LOG.info("Job has been submitted.");
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
LOG.info("trackingUrl:" +trackingUrl);
LOG.info("jobId:" +jobId);
return job.waitForCompletion(true);
}
protected FileSystem getFileSystem() throws IOException {
return dfsCluster.getFileSystem();
}
protected int getReduceNumTasks(){
return DEFAULT_REDUCE_NUM_TASKS;
}
/**
* @return InputStream instance to file you want to run with your MR job
* */
protected InputStream getInputStreamForInputData() {
return this.getClass().getClassLoader().getResourceAsStream(this.getClass().getSimpleName()+"/"+getInputDatasetName());
//return getPathToOutputDirectory()+ SLASH + DEFAULT_INPUT_CATALOG+"/mr-input-data";
}
protected String getHDFSPathToInputData() throws IOException{
InputStream inputStream = getInputStreamForInputData();
Path hdfsInputPath = new Path(DEFAULT_MR_INPUT_DATA_FILE);
FSDataOutputStream fsDataOutputStream = getFileSystem().create(hdfsInputPath);
copyStream(inputStream, fsDataOutputStream);
fsDataOutputStream.close();
inputStream.close();
return hdfsInputPath.toString();
}
private void copyStream(InputStream input, OutputStream output) throws IOException {
byte[] buffer = new byte[1024]; // Adjust if you want
int bytesRead;
while ((bytesRead = input.read(buffer)) != -1)
{
output.write(buffer, 0, bytesRead);
}
}
/**
* Dataset should be placed in resources/ConcreteClusterMapReduceTest
* @return a name of a file from catalog.
* */
protected abstract String getInputDatasetName();
/**
* @return path reducer output
* default is: @{link AbstractClusterMapReduceTest#DEFAULT_OUTPUT_CATALOG}
* */
protected String getOutputPath(){
return "mr-data-output";
}
/**
* Creates @{link Path} using absolute path to some FS resource
* @return new Path instance.
* */
protected Path createPath(String pathToFSResource){
return new Path(pathToFSResource);
}
/**
* Builds new instance of MiniDFSCluster
* Default: @{link DEFAULT_NAMENODE_PORT}, @{link DEFAULT_NAMENODE_PORT}
* @return MiniDFSCluster instance.
* */
protected MiniDFSCluster buildMiniDFSCluster() throws IOException {
return new MiniDFSCluster.Builder(new Configuration())
.nameNodePort(DEFAULT_NAMENODE_PORT)
.numDataNodes(ONE_DATANODE)
.build();
}
protected abstract Class<? extends Configured> getMRJobClass();
protected abstract Class<? extends Mapper> getMapperClass();
protected abstract Class<? extends Reducer> getReducerClass();
protected abstract Class<? extends InputFormat> getInputFormat();
protected abstract Class<? extends OutputFormat> getOutputFormat();
protected abstract Class<?> getOutputKeyClass();
protected abstract Class<?> getOutputValueClass();
}
和具体的测试子类:
public class POIClusterMapreduceTest extends AbstractClusterMapReduceTest{
private static final String INTEGRATION = "integration";
@BeforeClass(groups = INTEGRATION)
public void setup() throws IOException {
super.setupEnv();
}
//@Test(groups = INTEGRATION)
public void runJob() throws InterruptedException, IOException, ClassNotFoundException {
boolean result = createAndSubmitJob();
MatcherAssert.assertThat(result, Matchers.is(true));
String outputResultAsString = getFileSystem().open(createPath(getOutputPath())).readUTF();
MatcherAssert.assertThat(outputResultAsString.length(), Matchers.greaterThan(0));
}
@AfterClass(groups = INTEGRATION)
public void tearDown(){
super.tearDown();
}
@Override
protected Class<Main> getMRJobClass() {
return Main.class;
}
@Override
protected Class<POIMapper> getMapperClass() {
return POIMapper.class;
}
@Override
protected Class<Reducer> getReducerClass() {
return Reducer.class;
}
@Override
protected Class<TextInputFormat> getInputFormat() {
return TextInputFormat.class;
}
@Override
protected Class<TextOutputFormat> getOutputFormat() {
return TextOutputFormat.class;
}
@Override
protected Class<LongWritable> getOutputKeyClass() {
return LongWritable.class;
}
@Override
protected Class<XVLRDataWritable> getOutputValueClass() {
return XVLRDataWritable.class;
}
@Override
protected String getInputDatasetName() {
return "mr-input-data";
}
}
问题出在可怕的maven冲突中。以下是正确的maven依赖项:
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.common.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.common.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<classifier>tests</classifier>
<version>${hadoop.common.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-test</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop.version>2.0.0-mr1-cdh4.1.1</hadoop.version>
<hadoop.common.version>2.0.0-cdh4.1.1</hadoop.common.version>
</properties>
下面是测试的基类:
public abstract class AbstractClusterMRTest {
private static final Log LOG = LogFactory.getLog(AbstractClusterMRTest.class);
public static final String DEFAULT_LOG_CATALOG = "local-mr-logs";
public static final String SLASH = "/";
public static final String MR_DATA_OUTPUT = "mr-data-output";
public static final String DEFAULT_OUTPUT_FILE_NAME = "part-r-00000";
private static final int DEFAULT_REDUCE_NUM_TASKS = 1;
private Configuration configuration;
private FileSystem localFileSystem;
private MiniMRCluster mrCluster;
private JobConf mrClusterConf;
/**
* Always provide path to log catalog.
* */
protected String getPathToLogCatalog(){
File logCatalog = new File(getPathToOutputDirectory()+ SLASH + DEFAULT_LOG_CATALOG);
if(!logCatalog.exists()){
logCatalog.mkdir();
}
LOG.info("Path to log catalog is: "+logCatalog.getAbsolutePath());
return logCatalog.getAbsolutePath();
}
private String getPathToOutputDirectory(){
return System.getProperty("project.build.directory");
}
public void setup() throws IOException{
System.setProperty("hadoop.log.dir", getPathToLogCatalog());
configuration = new Configuration(true);
localFileSystem = FileSystem.get(configuration);
mrCluster = new MiniMRCluster(1, localFileSystem.getUri().toString(), 1, null, null, new JobConf(configuration));
mrClusterConf = mrCluster.createJobConf();
}
public void tearDown() {
if (mrCluster != null) {
mrCluster.shutdown();
mrCluster = null;
}
}
/**
* Use this method to get JobBuilder configured for testing purposes.
* @return JobBuilder instance ready for further configuration.
* */
public JobBuilder createTestableJobInstance() throws IOException{
return new JobBuilder(mrClusterConf, this.getClass().getSimpleName()+"-mrjob")
.withNumReduceTasks(DEFAULT_REDUCE_NUM_TASKS);
}
/**
* Pass configured JobBuilder and wait for completion
* @param jobBuilder is a JobBuilder ready to submit
* @return job completion result.
* */
public boolean buildSubmitAndWaitForCompletion(JobBuilder jobBuilder)
throws InterruptedException, IOException, ClassNotFoundException {
String pathToInputFile = getPathToInputData();
checkThatFileExists(pathToInputFile);
Job job = jobBuilder.build();
FileInputFormat.setInputPaths(job, pathToInputFile);
FileOutputFormat.setOutputPath(job, createPath(getOutputPath()));
LOG.info("Submitting job...");
job.submit();
LOG.info("Job has been submitted.");
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
LOG.info("trackingUrl:" +trackingUrl);
LOG.info("jobId:" +jobId);
return job.waitForCompletion(true);
}
/**
* By declaration input data should be stored in test/resources folder:
* ConcreteTestClassName/in/getInputDatasetName()
* Don't forget to override @link{AbstractClusterMRTest#getInputDatasetName()}
* @return path to input data
* */
protected String getPathToInputData(){
String pathFile = this.getClass().getSimpleName() + SLASH + "in" + SLASH+ getInputDatasetName();
LOG.info("Path for getting URL to file:" + pathFile);
URL urlToFile = this.getClass().getClassLoader().getResource(pathFile);
File file = FileUtils.toFile(urlToFile);
return file.getAbsolutePath();
}
/**
* Dataset should be placed in resources/ConcreteClusterMapReduceTest
* @return a name of a file from catalog.
* */
protected abstract String getInputDatasetName();
/**
* @return path reducer output
* default is: @{link AbstractClusterMapReduceTestOld#DEFAULT_OUTPUT_CATALOG}
* */
protected String getOutputPath(){
return getPathToOutputDirectory()+ SLASH + MR_DATA_OUTPUT;
}
/**
* @return text lines from reducer output file.
* */
protected List<String> getLinesFromOutputFile() throws IOException{
String pathToResult = getOutputPath()+SLASH+DEFAULT_OUTPUT_FILE_NAME;
File resultFile = new File(pathToResult);
return FileUtils.readLines(resultFile);
}
public abstract String getEtalonOutputFileName();
protected List<String> getLinesFromEtalonOutputFile() throws IOException{
String pathFile = this.getClass().getSimpleName() + SLASH + "out" + SLASH+ getEtalonOutputFileName();
LOG.debug("path to etalon file: "+ pathFile);
URL urlToFile = this.getClass().getClassLoader().getResource(pathFile);
File file = FileUtils.toFile(urlToFile);
return FileUtils.readLines(file);
}
/**
* Creates @{link Path} using absolute path to some FS resource
* @return new Path instance.
* */
protected Path createPath(String pathToFSResource){
return new Path(pathToFSResource);
}
public void checkThatFileExists(String absolutePathToFile){
if(! new File(absolutePathToFile).exists()){
throw new Error("Path to input file is incorrect. Can't run MR job. Incorrect path is:"+absolutePathToFile);
}
}
}
只是扩展,创建您自己的具体测试类。从createTestableJobInstance获取Job(有一个简单的Job构建器包装器)。配置它并调用buildSubmitAndWaitForCompletion