从Java应用程序启动和监视Spark应用程序



我正在从java应用程序中执行spark批处理应用程序的应用程序。

有一个主要类启动线程以启动spark应用程序。它使用zookeeper在将启动spark应用程序的计算机中找到leaderMain方法看起来像这样:

public static void main(String[] args) throws IOException {
        final int id = Integer.valueOf(args[0]);
        final String zkURL = args[1];
        final ExecutorService service = Executors.newSingleThreadExecutor();
        final Future<?> status = service.submit(new ProcessNode(id, zkURL));
        try {
            status.get();
        } catch (InterruptedException | ExecutionException e) {
            LOG.fatal(e.getMessage(), e);
            service.shutdown();
        }

选择了leader后,以下代码将在其上运行以启动spark应用程序。

protected Boolean executeCommand() {
    try {
        final Runtime rt = Runtime.getRuntime();
        final Process proc = rt.exec("sh start-sparkapp.sh");
        final int exitVal = proc.waitFor();
        BufferedReader buf = new BufferedReader(new InputStreamReader(proc.getInputStream()));
        String line = "";
        while ((line=buf.readLine())!=null) {
        System.out.println(line);
        }
        System.out.println(" commandToExecute exited with code: " + exitVal);
        proc.destroy();
    } catch (final Exception e) {
        System.out.println("Exception occurred while Launching process : " + e.getMessage());
        return Boolean.FALSE;
    }
         return Boolean.TRUE;
}

但这开始了很长的spark作业。因此,我相信,仅当spark作业完成时,代码的下一部分才能执行。我的要求是,一旦启动了spark应用程序,控件将转到代码的下一部分,我正在监视同一spark应用程序的状态。即我启动spark应用程序,并从同一java应用程序监视spark应用程序的状态。假设我有一种方法montior,可以监视应用程序的状态

public String monitor(ApplicationId id)

有什么建议如何实现这一目标?

由于您将使用方法public String monitor(ApplicationId id)监视Spark应用程序,因此我假设您不希望当前线程使用proc.waitFor()等待该过程。此外,您不想将过程的正常输出打印到控制台。这两个操作都使您的线程在产卵过程中等待。此外,您的监视方法应采用产卵过程的过程ID,而不是Spark Application ID作为输入。因此,修改的代码看起来像:

protected Boolean executeCommand() {
try {
    final Runtime rt = Runtime.getRuntime();
    final Process proc = rt.exec("sh start-sparkapp.sh");
    /* 
     *Call to method monitor(ProcessId id)
     */
    } catch (final Exception e) {
        System.out.println("Exception occurred while Launching process : " + e.getMessage());
        return Boolean.FALSE;
    }
     return Boolean.TRUE;
}

相关内容

  • 没有找到相关文章

最新更新