2013-12-19 1 views
2

Моим вариантом использования является вызов запроса для извлечения записей из db с различными входными параметрами. После извлечения записей, сделайте некоторую обработку, а затем, наконец, напишите в файл. Мои значения входных параметров зависят от полной обработки предыдущего запроса. Моя проблема в том, как я узнаю в носике, что обработка предыдущего запроса завершена, то есть записи были успешно записаны в файл.вопрос об осуществлении транзакционной топологии в трезубце

Я пробовал реализовать ITridentSpout, но все равно не получаю никакого решения. Ниже мой код ITridentSpout:

TridentCoordinator.java

package com.TransactionlTopology; 

import java.util.concurrent.ConcurrentHashMap; 

import storm.trident.spout.ITridentSpout; 

public class TridentCoordinator implements ITridentSpout.BatchCoordinator<ConcurrentHashMap<Long,String>>{ 

    ConcurrentHashMap<Long,String> prevMetadata=new ConcurrentHashMap<Long, String>(); 
    boolean result=true; 

    @Override 
    public void success(long txid) { 
     System.out.println("inside success mehod with txid as "+txid); 
     if(prevMetadata.containsKey(txid)){ 
      prevMetadata.replace(txid, "SUCCESS"); 
     } 
    } 

    @Override 
    public boolean isReady(long txid) { 
     if(!prevMetadata.isEmpty()){ 
      result=true; 
     for(Long txId:prevMetadata.keySet()){ 
      System.out.println("txId:---- "+txId +" value"+prevMetadata.get(txId)); 
      if(prevMetadata.get(txId).equalsIgnoreCase("SUCESS")){ 
       prevMetadata.put(txid, "STARTED"); 
       result= true; 
      } 
     } 
     } 
     else{ 
      prevMetadata.put(txid, "STARTED"); 
      result= true; 
     } 

     System.out.println("inside isReady function with txid as:---- "+txid+"result value:-- "+result); 

     return result; 
    } 

    @Override 
    public void close() { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public ConcurrentHashMap<Long,String> initializeTransaction(long txid, ConcurrentHashMap<Long,String> prevMetadata, ConcurrentHashMap<Long,String> currMetadata) { 
     System.out.println("inside initialize transaction method with values as:----- "+txid+" "+prevMetadata+" "+currMetadata); 

     return prevMetadata; 
    } 
} 

TridentEmitterImpl.java

package com.TransactionlTopology; 

import java.util.concurrent.ConcurrentHashMap; 

import storm.trident.operation.TridentCollector; 
import storm.trident.spout.ITridentSpout; 
import storm.trident.topology.TransactionAttempt; 
import backtype.storm.tuple.Values; 

public class TridentEmitterImpl implements ITridentSpout.Emitter<ConcurrentHashMap<Long,String>> { 

    @Override 
    public void emitBatch(TransactionAttempt tx, ConcurrentHashMap<Long,String> coordinatorMeta,TridentCollector collector) { 
     System.out.println("inside emitbatch of emitter class with values as:--- "+coordinatorMeta); 
     System.out.println("tx.getAttemptId() "+tx.getAttemptId()+"tx.getTransactionId() "+tx.getTransactionId()+"tx.getId() "+tx.getId().toString()); 
     collector.emit(new Values("preeti")); 
    } 

    @Override 
    public void success(TransactionAttempt tx) { 
     System.out.println("inside success of emitter with tx id as "+tx.getTransactionId()); 

    } 

    @Override 
    public void close() { 
     // TODO Auto-generated method stub 

    } 
} 

TridentSpoutImpl.java

package com.TransactionlTopology; 

import java.util.HashMap; 
import java.util.Map; 
import java.util.concurrent.ConcurrentHashMap; 

import storm.trident.spout.ITridentSpout; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.tuple.Fields; 

public class TridentSpoutImpl implements ITridentSpout<ConcurrentHashMap<Long,String>> { 

    @Override 
    public storm.trident.spout.ITridentSpout.BatchCoordinator<ConcurrentHashMap<Long,String>> getCoordinator(String txStateId, Map conf, TopologyContext context) { 

     return new TridentCoordinator(); 
    } 

    @Override 
    public storm.trident.spout.ITridentSpout.Emitter<ConcurrentHashMap<Long,String>> getEmitter(String txStateId, Map conf, TopologyContext context) { 

     return new TridentEmitterImpl(); 
    } 

    @Override 
    public Map getComponentConfiguration() { 

     Map<String,String> newMap=new HashMap<String, String>(); 
     newMap.put("words","preeti"); 
     return newMap; 
    } 

    @Override 
    public Fields getOutputFields() { 

     return new Fields("word"); 
    } 

} 

также не в состоянии понять, какие ценности будут приходить в initializeTransaction как prevMetaData и curMetada. Пожалуйста, предоставьте некоторое решение

+0

предоставьте некоторое решение для вышеуказанного случая использования – user2435082

ответ

1

У вас есть множество доступных вам вариантов. Возможно, самый простой из них заключается в том, чтобы иметь окончательный болт в вашей топологии, после написания файла, уведомить носик, полезно начать следующий запрос через очередь сообщений, которую может наблюдать ваш носик. Когда носик принимает это уведомление, он может обработать следующий запрос.

Однако, говоря в целом, это похоже на спорный случай использования для Storm. Многие ресурсы вашей топологии, вероятно, будут простаивать много времени, так как у вас есть только одна транзакция за раз, проходя через нее. Очевидно, что я не знаю всех деталей вашей проблемы, но такая зависимость между транзакциями ограничивает ценность дополнительной сложности использования Storm.

+0

thanx для вашего rpl ... существует некоторое пороговое значение, основанное на том, что никакие запросы не будут решаться одновременно, поскольку db может управлять только некоторым запросом за раз. Есть ли способ получить эту функцию без реализации очереди сообщений. – user2435082

+0

Вы можете управлять количеством транзакций, которые вы выполняете через топологию Storm за один раз. Я не использовал Trident, но на обычном Storm, так как вы можете эффективно управлять этим, устанавливая 'maxSpoutPending'. –

+0

Мне нужно решение с трезубцем – user2435082