如何修复阿帕奇风暴三叉戟拓扑中的错误'Component: [x] subscribes from non-existent component [y]'



我刚刚实现了一个三叉戟DRPC函数来处理传入的消息,我正在尝试将拓扑的最后阶段处理的元组计数保留为三叉戟状态。这是我的拓扑:

topology.newDRPCStream("portfolio")
    .map(parseMapFunction,
        new Fields("portfolioUrn", "portfolioSourceSystem", "portfolioRegion",
            "portfolioTimestamp", "portfolioPayload"))
    .filter(new FilterNull())
    .flatMap(splitMapFunction,
        new Fields("portfolioUrn", "portfolioSourceSystem", "portfolioRegion",
            "portfolioTimestamp", "strategyCode"))
    .parallelismHint(1)
    .shuffle()
    .each(new Fields("strategyCode"), findMongoTradesFunction,
        new Fields("uitid", "id", "sourceSystem", "sourceTransactionTime", "publicationTime",
            "tradeVersion", "urn", "riskViewFrom", "riskViewTo", "authorized"))
    .parallelismHint(10)
    .shuffle()
    .filter(tradeFilterFunction)
    .parallelismHint(150)
    .groupBy(new Fields("uitid"))
    .aggregate(
        new Fields("portfolioUrn", "portfolioTimestamp", "strategyCode", "id", "sourceSystem",
            "sourceTransactionTime", "publicationTime", "tradeVersion", "urn", "riskViewFrom",
            "riskViewTo", "uitid"), reduceAggregateFunction,
        new Fields("portfolioUrn", "portfolioTimestamp", "strategyCode", "id", "sourceSystem",
            "sourceTransactionTime", "publicationTime", "tradeVersion", "urn", "riskViewFrom",
            "riskViewTo"))
    .parallelismHint(200)
    .groupBy(new Fields("portfolioUrn"))
    .persistentAggregate(stateFactory, new Count(), new Fields("count"));

当我尝试将此拓扑提交给 Storm 时,我遇到了此错误:

Exception in thread "main" java.lang.RuntimeException: InvalidTopologyException(msg:Component: [b-4] subscribes from non-existent component [$mastercoord-bg0])
at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:273)
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:387)
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:159)
at com.citi.tm.portfolio.tps.PortfolioLauncher.main(PortfolioLauncher.java:34)
Caused by: InvalidTopologyException(msg:Component: [b-4] subscribes from non-existent component [$mastercoord-bg0])
at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8070)
at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8047)
at org.apache.storm.generated.Nimbus$submitTopology_result.read(Nimbus.java:7981)
at org.apache.storm.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
at org.apache.storm.generated.Nimbus$Client.recv_submitTopology(Nimbus.java:306)
at org.apache.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:290)
at org.apache.storm.StormSubmitter.submitTopologyInDistributeMode(StormSubmitter.java:326)
at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:260)
... 3 more

如果我从该拓扑中删除最后 2 个函数,我可以成功提交拓扑,即:

.groupBy(new Fields("portfolioUrn"))
.persistentAggregate(stateFactory, new Count(), new Fields("count"));

在我运行我的聚合函数(agregate((((之后,我想使用"portfolioUrn"字段对元组进行分组,并将计数保存到mongoDB中。我不明白为什么最后一个groupBy((.persistentAggregate((部分会导致此错误。你能帮忙找到原因吗?

经过一番研究,我发现了这个对我来说似乎类似的页面。Nathan Marz 表示 DRPC 拓扑不支持分区持久性(截至 2013 年(,我相信我的情况也是如此。我认为(未完全验证(Storm 1.2.1 DRPC 拓扑可能根本不支持状态持久性。

最新更新