关于在三叉戟中实现事务拓扑的问题



我的用例是调用查询以使用不同的输入参数从数据库获取记录。获取记录后,进行一些处理,最后将其写入文件。我的输入参数值取决于上一个查询的完整处理。我的问题是,我如何在喷口中知道先前查询的处理已完成,即记录已成功写入文件。

我尝试实施ITridentSpout但仍然没有得到任何解决方案。以下是我的ITridentSpout代码:

三叉戟协调员.java

package com.TransactionlTopology;
import java.util.concurrent.ConcurrentHashMap;
import storm.trident.spout.ITridentSpout;
public class TridentCoordinator implements ITridentSpout.BatchCoordinator<ConcurrentHashMap<Long,String>>{
    ConcurrentHashMap<Long,String> prevMetadata=new ConcurrentHashMap<Long, String>();
    boolean result=true;
    @Override
    public void success(long txid) {
        System.out.println("inside success mehod with txid as  "+txid);
        if(prevMetadata.containsKey(txid)){
            prevMetadata.replace(txid, "SUCCESS");
        }
    }
    @Override
    public boolean isReady(long txid) {
        if(!prevMetadata.isEmpty()){
            result=true;
        for(Long txId:prevMetadata.keySet()){
            System.out.println("txId:---- "+txId +"    value"+prevMetadata.get(txId) );
            if(prevMetadata.get(txId).equalsIgnoreCase("SUCESS")){
                prevMetadata.put(txid, "STARTED");
                result= true;
            }
        }
        }
        else{
            prevMetadata.put(txid, "STARTED");
            result= true;
        }
        System.out.println("inside isReady function with txid as:---- "+txid+"result value:--  "+result);
        return result;
    }
    @Override
    public void close() {
        // TODO Auto-generated method stub
    }
    @Override
    public ConcurrentHashMap<Long,String> initializeTransaction(long txid, ConcurrentHashMap<Long,String> prevMetadata, ConcurrentHashMap<Long,String> currMetadata) {
        System.out.println("inside initialize transaction method with values as:----- "+txid+"   "+prevMetadata+"   "+currMetadata);
        return prevMetadata;
    }
}

三叉戟发射器.java

package com.TransactionlTopology;
import java.util.concurrent.ConcurrentHashMap;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.ITridentSpout;
import storm.trident.topology.TransactionAttempt;
import backtype.storm.tuple.Values;
public class TridentEmitterImpl implements ITridentSpout.Emitter<ConcurrentHashMap<Long,String>> {
    @Override
    public void emitBatch(TransactionAttempt tx, ConcurrentHashMap<Long,String> coordinatorMeta,TridentCollector collector) {
        System.out.println("inside emitbatch of emitter class with values as:--- "+coordinatorMeta);
        System.out.println("tx.getAttemptId()   "+tx.getAttemptId()+"tx.getTransactionId()  "+tx.getTransactionId()+"tx.getId()  "+tx.getId().toString());
        collector.emit(new Values("preeti"));
    }
    @Override
    public void success(TransactionAttempt tx) {
        System.out.println("inside success of emitter with tx id as   "+tx.getTransactionId());
    }
    @Override
    public void close() {
        // TODO Auto-generated method stub
    }
}

三叉戟喷口.java

package com.TransactionlTopology;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import storm.trident.spout.ITridentSpout;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
public class TridentSpoutImpl implements ITridentSpout<ConcurrentHashMap<Long,String>> {
    @Override
    public storm.trident.spout.ITridentSpout.BatchCoordinator<ConcurrentHashMap<Long,String>> getCoordinator(String txStateId, Map conf, TopologyContext context) {
        return new TridentCoordinator();
    }
    @Override
    public storm.trident.spout.ITridentSpout.Emitter<ConcurrentHashMap<Long,String>> getEmitter(String txStateId, Map conf, TopologyContext context) {
        return new TridentEmitterImpl();
    }
    @Override
    public Map getComponentConfiguration() {
        Map<String,String> newMap=new HashMap<String, String>();
        newMap.put("words","preeti");
        return newMap;
    }
    @Override
    public Fields getOutputFields() {
        return new Fields("word");
    }
}

也无法理解initializeTransaction prevMetaDatacurMetada时会出现哪些价值观.请提供一些解决方案

您有多种选择。不过,也许最简单的方法是在拓扑中设置最后一个螺栓,在写入文件后,通知 spout 最好通过您的 spout 可以监视的消息队列启动下一个查询。当喷口拾取此通知时,它可以处理下一个查询。

然而,更笼统地说,这似乎是 Storm 的一个可疑用例。许多拓扑资源在很多时候都可能处于空闲状态,因为一次只有一个事务通过它运行。显然,我不知道您问题的所有细节,但是事务之间的这种依赖性限制了使用 Storm 增加的复杂性的价值。

最新更新