如何使用Apex进行批处理处理



如何使用Apache Apex创建批处理处理应用程序?

我发现的所有示例都是流媒体应用程序,这意味着它们没有结束,我希望我的应用程序在处理所有数据后关闭。

谢谢

您的用例是什么?本地支持批处理在路线图上,现在正在使用。

交替,直到那时,一旦您确定处理完成,输入操作员就可以将信号发送为shutdownexception(),并且它将通过dag传播并关闭dag。

让我们知道您是否需要更多详细信息。

您可以在运行应用程序之前添加出口条件。例如

public void testMapOperator() throws Exception
{
   LocalMode lma = LocalMode.newInstance();
   DAG dag = lma.getDAG();
   NumberGenerator numGen = dag.addOperator("numGen", new NumberGenerator());
   FunctionOperator.MapFunctionOperator<Integer, Integer> mapper
    = dag.addOperator("mapper", new  FunctionOperator.MapFunctionOperator<Integer, Integer>(new Square()));
   ResultCollector collector = dag.addOperator("collector", new ResultCollector());
   dag.addStream("raw numbers", numGen.output, mapper.input);
   dag.addStream("mapped results", mapper.output, collector.input);
// Create local cluster
   LocalMode.Controller lc = lma.getController();
   lc.setHeartbeatMonitoringEnabled(false);
 //Condition to exit the application
  ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
  {
     @Override
     public Boolean call() throws Exception
    {
       return TupleCount == NumTuples;
    }
  });
  lc.run();
  Assert.assertEquals(sum, 285);
}

有关完整代码,请参阅https://github.com/apache/apacex-malhar/blob/blob/master/stream/src/src/src/src/java/java/org/apache/apache/malhar/malhar/malhar/functionoperator/functionoperator/functionoperatortest.java

相关内容

  • 没有找到相关文章

最新更新