2017-01-29 8 views
0

Я искал некоторое время теперь о том, как использовать настроенную Java оператор жгуты InfoSphere Streams Java APIс помощью Java пользовательского оператора в Java API в InfoSphere Streams

Что мне нужно после написания настроенного оператора, как показано ниже .. .

public class Test extends AbstractOperator { 
private int i; 
private int num; 
@Override 
public synchronized void initialize(OperatorContext context) throws Exception { 
super.initialize(context); 
i = 0; .... 

Я хочу, чтобы использовать его как ниже ....

 Topology topology = new Topology("toplogy_test"); 
     TStream<String> inDataFileName = ... 
//call the "Test" operator here 

ответ

0

Вы можете вызвать оператор Java оператор/C++ от топологии API, выполнив следующие действия:

  1. Добавить инструментарий оператора Java:

    SPL.addToolkit (топологии, новый файл ("/ главная/streamsadmin/myTk"));

  2. Преобразование входящего потока в поток SPL:

    StreamSchema rstringSchema = Type.Factory.getStreamSchema("tuple<rstring rstring_attr_name>"); 
    
    SPLStream splInputStream = SPLStreams.convertStream(inDataFileName, new BiFunction<String, OutputTuple, OutputTuple>(){ 
    
        @Override 
        public OutputTuple apply(String input_string, OutputTuple output_rstring) { 
        output_rstring.setString("rstring_attr_name", input_string); 
        return output_rstring; 
        }}, rstringSchema); 
    
  3. Вызов оператора:

    SPLStream splOutputStream = SPL.invokeOperator ("OperatorNamespace :: YourOperatorName", splInputStream, rstringSchema, новый HashMap ());

Вы можете найти более подробную информацию об этом здесь:

http://ibmstreams.github.io/streamsx.documentation/docs/4.2/java/java-appapi-devguide/#integrating-spl-operators-with-the-java-application-api

На стороне записки, если вы думаете об использовании топологии API для записи топологии Streams, то проще написать регулярный Java-класс и вызывать его непосредственно из API топологии.

Например:

MyJavaCode someObj = new MyJavaCode(); 

Topology topology = new Topology("MyTopology"); 
TStream<String> inDataFileName = ... 
inDataFileName.transform(new Function<String, String>(){ 
     @Override 
     public String apply(String word) { 
      return someObj.someFunction(word); 
     } 
    }); 

Единственное требование в том, что ваш класс Java должен реализовывать Serializable.

+0

someObj.someFunction (word), может ли эта функция быть вызовом веб-службы или вызовом вставки JDBC? –

+0

Да, функция может делать что угодно. Я использовал этот подход и обернул библиотеку GSON для потоковой обработки JSON с помощью API топологии. Главное - убедиться, что класс Serializable и объекты, переданные обратно в API топологии, также могут быть сериализованы. –