我正在从java
应用程序中执行spark
批处理应用程序的应用程序。
有一个主要类启动线程以启动spark
应用程序。它使用zookeeper
在将启动spark
应用程序的计算机中找到leader
。Main
方法看起来像这样:
public static void main(String[] args) throws IOException {
final int id = Integer.valueOf(args[0]);
final String zkURL = args[1];
final ExecutorService service = Executors.newSingleThreadExecutor();
final Future<?> status = service.submit(new ProcessNode(id, zkURL));
try {
status.get();
} catch (InterruptedException | ExecutionException e) {
LOG.fatal(e.getMessage(), e);
service.shutdown();
}
选择了leader
后,以下代码将在其上运行以启动spark
应用程序。
protected Boolean executeCommand() {
try {
final Runtime rt = Runtime.getRuntime();
final Process proc = rt.exec("sh start-sparkapp.sh");
final int exitVal = proc.waitFor();
BufferedReader buf = new BufferedReader(new InputStreamReader(proc.getInputStream()));
String line = "";
while ((line=buf.readLine())!=null) {
System.out.println(line);
}
System.out.println(" commandToExecute exited with code: " + exitVal);
proc.destroy();
} catch (final Exception e) {
System.out.println("Exception occurred while Launching process : " + e.getMessage());
return Boolean.FALSE;
}
return Boolean.TRUE;
}
但这开始了很长的spark
作业。因此,我相信,仅当spark
作业完成时,代码的下一部分才能执行。我的要求是,一旦启动了spark
应用程序,控件将转到代码的下一部分,我正在监视同一spark
应用程序的状态。即我启动spark
应用程序,并从同一java
应用程序监视spark
应用程序的状态。假设我有一种方法montior
,可以监视应用程序的状态
public String monitor(ApplicationId id)
有什么建议如何实现这一目标?
由于您将使用方法public String monitor(ApplicationId id)
监视Spark应用程序,因此我假设您不希望当前线程使用proc.waitFor()
等待该过程。此外,您不想将过程的正常输出打印到控制台。这两个操作都使您的线程在产卵过程中等待。此外,您的监视方法应采用产卵过程的过程ID,而不是Spark Application ID作为输入。因此,修改的代码看起来像:
protected Boolean executeCommand() {
try {
final Runtime rt = Runtime.getRuntime();
final Process proc = rt.exec("sh start-sparkapp.sh");
/*
*Call to method monitor(ProcessId id)
*/
} catch (final Exception e) {
System.out.println("Exception occurred while Launching process : " + e.getMessage());
return Boolean.FALSE;
}
return Boolean.TRUE;
}