在Hadoop中实现作业结束通知监听器



我正试图在工作完成后获得通知。为此,我使用以下链接作为参考。我无法使它工作。我在网上找不到很多东西。我使用tomcat作为在使用servlet的地方建立通知url点的方法。

http://hadoopi.wordpress.com/2013/09/18/hadoop-get-a-callback-on-mapreduce-job-completion/

这是servlet,根据我不应该明显工作,因为我没有直接从另一个页面调用它,我应该提交参数。这里我需要一个监听器从hadoop获取调用,然后获取将提交给servlet的jobId和jobStatus,但我不知道如何实现。

import java.io.*;
import java.util.Enumeration;
import javax.servlet.*;
import javax.servlet.http.*;
import java.sql.*;
public class Serv extends HttpServlet
{ 
    private static final long serialVersionUID = 1L;
    public void doGet(HttpServletRequest req,HttpServletResponse res)throws ServletException,IOException
    {
        String jobId=req.getParameter("jobId");
        String jobStatus=req.getParameter("jobStatus");
        res.setContentType("text/html");
        PrintWriter pw=res.getWriter();
        pw.println("JobId: n"+jobId);
        pw.println("JobStatus: n"+jobStatus);
        pw.close();
    }
    public void doPost(HttpServletRequest req,HttpServletResponse res)throws ServletException,IOException
    {
        String jobId=req.getParameter("jobId");
        String jobStatus=req.getParameter("jobStatus");
        res.setContentType("text/html");
        PrintWriter pw=res.getWriter();
        pw.println("JobId: n"+jobId);
        pw.println("JobStatus: n"+jobStatus);
        pw.close();
    }
} 

这是我的Toolrunner运行函数中的通知代码的一部分。

conf.set("job.end.notification.url", "http://localhost:8080/Serv?jobId=$jobId&jobStatus=$jobStatus");                                 
conf.setInt("job.end.retry.attempts", 3);
conf.setInt("job.end.retry.interval", 1000);

我认为可能还有很多工作要做,让这个工作。

我对php也做了同样的尝试,并相应地更改了代码。

编辑我得到了它的一部分,我必须在一个文件或其他地方记录输出,以便当Servlet被调用时,它应该将输出记录到一个文件,即创建一个具有作业ID和作业状态的文件。所以我改变了我的Servlet代码如下,但仍然没有创建文件。

import java.io.*;
import java.util.Enumeration;
import javax.servlet.*;
import javax.servlet.http.*;
import java.sql.*;
public class Serv extends HttpServlet
{ 
    private static final long serialVersionUID = 1L;
    public void doGet(HttpServletRequest req,HttpServletResponse res)throws ServletException,IOException
    {
        String jobId=req.getParameter("jobId");
        String jobStatus=req.getParameter("jobStatus");
        res.setContentType("text/html");
        PrintWriter pw=res.getWriter();
        pw.println("JobId: n"+jobId);
        pw.println("JobStatus: n"+jobStatus);
        PrintWriter writer = new PrintWriter("log.txt", "UTF-8");
        writer.println("Job ID : "+jobId);
        writer.println("Job Status : "+jobStatus);
        writer.close();
        pw.close();
    }
    public void doPost(HttpServletRequest req,HttpServletResponse res)throws ServletException,IOException
    {
        String jobId=req.getParameter("jobId");
        String jobStatus=req.getParameter("jobStatus");
        res.setContentType("text/html");
        PrintWriter pw=res.getWriter();
        pw.println("JobId: n"+jobId);
        pw.println("JobStatus: n"+jobStatus);
        PrintWriter writer = new PrintWriter("log.txt", "UTF-8");
        writer.println("Job ID : "+jobId);
        writer.println("Job Status : "+jobStatus);
        writer.close();
        pw.close();
    }
}

这是我最后做的工作。我之前所做的有些愚蠢,我试图回显输出。对我来说,这显然是错误的,但它没有点击我,我必须在调用Servlet时将输出记录在某个地方,所以我将输出写入了一个有效的文件。

String path = getServletContext().getRealPath("/");行在这里很重要,因为它将路径设置为根目录,否则它授予拒绝写日志文件的权限。

import java.io.*;
import java.util.Enumeration;
import javax.servlet.*;
import javax.servlet.http.*;
import java.sql.*;
public class Serv extends HttpServlet
{ 
    private static final long serialVersionUID = 1L;
    public void doGet(HttpServletRequest req,HttpServletResponse res)throws ServletException,IOException
    {
        String jobId=req.getParameter("jobId");
        String jobStatus=req.getParameter("jobStatus");
        res.setContentType("text/html");
        PrintWriter pw=res.getWriter();
        pw.println("JobId: n"+jobId);
        pw.println("JobStatus: n"+jobStatus);
        String path = getServletContext().getRealPath("/");
        PrintWriter writer = new PrintWriter(path+"log.txt", "UTF-8");
        writer.println("Job ID : "+jobId);
        writer.println("Job Status : "+jobStatus);
        writer.close();
        pw.close();
    }
    public void doPost(HttpServletRequest req,HttpServletResponse res)throws ServletException,IOException
    {
        String jobId=req.getParameter("jobId");
        String jobStatus=req.getParameter("jobStatus");
        res.setContentType("text/html");
        PrintWriter pw=res.getWriter();
        pw.println("JobId: n"+jobId);
        pw.println("JobStatus: n"+jobStatus);
        String path = getServletContext().getRealPath("/");
        PrintWriter writer = new PrintWriter(path+"log.txt", "UTF-8");
        writer.println("Job ID : "+jobId);
        writer.println("Job Status : "+jobStatus);
        writer.close();
        pw.close();
    }
} 

最新更新