2016-12-18 7 views
5

Я пытаюсь построить пример приложения, используя Apache Flink, который делает следующее:Apache Flink - использовать значения из потока данных, чтобы динамически создать источник потоковых данных

  1. Считывает поток символов акций (например, 'CSCO', 'FB') из очереди Kafka.
  2. Для каждого символа в реальном времени просматриваются текущие цены и потоки значений для последующей обработки.

* Обновление исходного сообщение *

я переместил функцию карты в отдельный класс и не получить сообщение об ошибке во время выполнения "Реализация MapFunction не является сериализуемой больше. Объект, вероятно, содержит или ссылается на несериализуемые поля ".

Проблема, с которой я сейчас сталкиваюсь, заключается в том, что тема Кафки «Запасы», которую я пытаюсь написать, не получает их. Я пытаюсь бежать и публиковать любые обновления.

public class RetrieveStockPrices { 
    @SuppressWarnings("serial") 
    public static void main(String[] args) throws Exception { 
     final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); 
     streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); 

     Properties properties = new Properties(); 
     properties.setProperty("bootstrap.servers", "localhost:9092"); 
     properties.setProperty("zookeeper.connect", "localhost:2181"); 
     properties.setProperty("group.id", "stocks"); 

     DataStream<String> streamOfStockSymbols = streamExecEnv.addSource(new FlinkKafkaConsumer08<String>("stocksymbol", new SimpleStringSchema(), properties)); 

     DataStream<String> stockPrice = 
      streamOfStockSymbols 
      //get unique keys 
      .keyBy(new KeySelector<String, String>() { 
       @Override 
       public String getKey(String trend) throws Exception { 
        return trend; 
       } 
       }) 
      //collect events over a window 
      .window(TumblingEventTimeWindows.of(Time.seconds(60))) 
      //return the last event from the window...all elements are the same "Symbol" 
      .apply(new WindowFunction<String, String, String, TimeWindow>() { 
       @Override 
       public void apply(String key, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception { 
        out.collect(input.iterator().next().toString()); 
       } 
      }) 
      .map(new StockSymbolToPriceMapFunction()); 

     streamExecEnv.execute("Retrieve Stock Prices"); 
    } 
} 

public class StockSymbolToPriceMapFunction extends RichMapFunction<String, String> { 
    @Override 
    public String map(String stockSymbol) throws Exception { 
     final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); 
     streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); 
     System.out.println("StockSymbolToPriceMapFunction: stockSymbol: " + stockSymbol); 

     DataStream<String> stockPrices = streamExecEnv.addSource(new LookupStockPrice(stockSymbol)); 
     stockPrices.keyBy(new CustomKeySelector()).addSink(new FlinkKafkaProducer08<String>("localhost:9092", "stockprices", new SimpleStringSchema())); 

     return "100000"; 
    } 

    private static class CustomKeySelector implements KeySelector<String, String> { 
     @Override 
     public String getKey(String arg0) throws Exception { 
      return arg0.trim(); 
     } 
    } 
} 


public class LookupStockPrice extends RichSourceFunction<String> { 
    public String stockSymbol = null; 
    public boolean isRunning = true; 

    public LookupStockPrice(String inSymbol) { 
      stockSymbol = inSymbol; 
    } 

    @Override 
    public void open(Configuration parameters) throws Exception { 
      isRunning = true; 
    } 


    @Override 
    public void cancel() { 
      isRunning = false; 
    } 

    @Override 
    public void run(SourceFunction.SourceContext<String> ctx) 
        throws Exception { 
      String stockPrice = "0"; 
      while (isRunning) { 
       //TODO: query Google Finance API 
       stockPrice = Integer.toString((new Random()).nextInt(100)+1); 
       ctx.collect(stockPrice); 
       Thread.sleep(10000); 
      } 
    } 
} 

ответ

4

StreamExecutionEnvironment не отступа, которые будут использоваться внутри операторов потокового приложения. Не предназначенные средства, это не проверено и не поощряется. Он может работать и что-то делать, но, скорее всего, не будет хорошо себя вести и, вероятно, убить ваше приложение.

StockSymbolToPriceMapFunction в вашей программе определяет для каждой входящей записи совершенно новое и независимое новое потоковое приложение. Однако, поскольку вы не вызываете streamExecEnv.execute(), программы не запускаются, и метод map возвращается без каких-либо действий.

Если вы бы вызов streamExecEnv.execute(), функция будет начать новый локальный кластер Flink в рабочих JVM и запустить приложение на этом локальном кластере FLiNK. Местный экземпляр Flink займет много места для кучи, и после того, как несколько кластеров будут запущены, рабочий, вероятно, умрет из-за OutOfMemoryError, который не является тем, что вы хотите.

+0

Возможно ли вообще динамически создавать потоки в ответ на поступающие данные? –

+0

Вы можете реализовать функцию FlatMapFunction, которая динамически считывает и испускает данные на основе поступающих записей. Например, если у вас есть поток с именами файлов, «FlatMapFunction», вы можете открыть эти файлы и испустить их данные. Однако типы вывода всех записей должны быть одинаковыми. Кроме того, было бы сложно получить семантику обработки событий и времени, но это более общая проблема динамически добавленных источников. –

+0

@FabianHueske Я решаю аналогичный вариант использования. Поэтому, если мне нужно использовать FlatMapFunction, нам нужно будет прочитать файл, используя обычные API файлов из scala/Java и не используя readTextFile от Flink. Причина в том, что мы не можем использовать StreamExecutionEnvironment внутри flatMap. Правильно ли я понимаю? –