Моим вариантом использования является вызов запроса для извлечения записей из db с различными входными параметрами. После извлечения записей, сделайте некоторую обработку, а затем, наконец, напишите в файл. Мои значения входных параметров зависят от полной обработки предыдущего запроса. Моя проблема в том, как я узнаю в носике, что обработка предыдущего запроса завершена, то есть записи были успешно записаны в файл.вопрос об осуществлении транзакционной топологии в трезубце
Я пробовал реализовать ITridentSpout
, но все равно не получаю никакого решения. Ниже мой код ITridentSpout
:
TridentCoordinator.java
package com.TransactionlTopology;
import java.util.concurrent.ConcurrentHashMap;
import storm.trident.spout.ITridentSpout;
public class TridentCoordinator implements ITridentSpout.BatchCoordinator<ConcurrentHashMap<Long,String>>{
ConcurrentHashMap<Long,String> prevMetadata=new ConcurrentHashMap<Long, String>();
boolean result=true;
@Override
public void success(long txid) {
System.out.println("inside success mehod with txid as "+txid);
if(prevMetadata.containsKey(txid)){
prevMetadata.replace(txid, "SUCCESS");
}
}
@Override
public boolean isReady(long txid) {
if(!prevMetadata.isEmpty()){
result=true;
for(Long txId:prevMetadata.keySet()){
System.out.println("txId:---- "+txId +" value"+prevMetadata.get(txId));
if(prevMetadata.get(txId).equalsIgnoreCase("SUCESS")){
prevMetadata.put(txid, "STARTED");
result= true;
}
}
}
else{
prevMetadata.put(txid, "STARTED");
result= true;
}
System.out.println("inside isReady function with txid as:---- "+txid+"result value:-- "+result);
return result;
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public ConcurrentHashMap<Long,String> initializeTransaction(long txid, ConcurrentHashMap<Long,String> prevMetadata, ConcurrentHashMap<Long,String> currMetadata) {
System.out.println("inside initialize transaction method with values as:----- "+txid+" "+prevMetadata+" "+currMetadata);
return prevMetadata;
}
}
TridentEmitterImpl.java
package com.TransactionlTopology;
import java.util.concurrent.ConcurrentHashMap;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.ITridentSpout;
import storm.trident.topology.TransactionAttempt;
import backtype.storm.tuple.Values;
public class TridentEmitterImpl implements ITridentSpout.Emitter<ConcurrentHashMap<Long,String>> {
@Override
public void emitBatch(TransactionAttempt tx, ConcurrentHashMap<Long,String> coordinatorMeta,TridentCollector collector) {
System.out.println("inside emitbatch of emitter class with values as:--- "+coordinatorMeta);
System.out.println("tx.getAttemptId() "+tx.getAttemptId()+"tx.getTransactionId() "+tx.getTransactionId()+"tx.getId() "+tx.getId().toString());
collector.emit(new Values("preeti"));
}
@Override
public void success(TransactionAttempt tx) {
System.out.println("inside success of emitter with tx id as "+tx.getTransactionId());
}
@Override
public void close() {
// TODO Auto-generated method stub
}
}
TridentSpoutImpl.java
package com.TransactionlTopology;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import storm.trident.spout.ITridentSpout;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
public class TridentSpoutImpl implements ITridentSpout<ConcurrentHashMap<Long,String>> {
@Override
public storm.trident.spout.ITridentSpout.BatchCoordinator<ConcurrentHashMap<Long,String>> getCoordinator(String txStateId, Map conf, TopologyContext context) {
return new TridentCoordinator();
}
@Override
public storm.trident.spout.ITridentSpout.Emitter<ConcurrentHashMap<Long,String>> getEmitter(String txStateId, Map conf, TopologyContext context) {
return new TridentEmitterImpl();
}
@Override
public Map getComponentConfiguration() {
Map<String,String> newMap=new HashMap<String, String>();
newMap.put("words","preeti");
return newMap;
}
@Override
public Fields getOutputFields() {
return new Fields("word");
}
}
также не в состоянии понять, какие ценности будут приходить в initializeTransaction
как prevMetaData
и curMetada
. Пожалуйста, предоставьте некоторое решение
предоставьте некоторое решение для вышеуказанного случая использования – user2435082