2016-12-23 1 views
0

Я пытаюсь Потоковые данные из Кафки СпаркНе удается получить доступ к данным в Кафки Спарк Потоковые глобально

JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, 
       String.class, 
       String.class, 
       StringDecoder.class, 
       StringDecoder.class, 
       kafkaParams, topics); 

Здесь я итерация по JavaPairInputDStream для обрабатывать РДД-х.

directKafkaStream.foreachRDD(rdd ->{ 
      rdd.foreachPartition(items ->{ 
       while (items.hasNext()) { 
        String[] State = items.next()._2.split("\\,"); 
        System.out.println(State[2]+","+State[3]+","+State[4]+"--"); 
       }; 
      });   
     }); 

я могу иметь возможность получать данные в foreachRDD и мое требование иметь доступ к государству массива в глобальном масштабе. Когда я пытаюсь получить доступ к State Array во всем мире, я получаю исключение

java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 

Любые предложения? Благодарю.

+0

'для доступа к массиву состояний по всему миру' означает? вне 'directKafkaStream.foreachRDD'? Я не думаю, что это возможно. Если бы вы могли объяснить ваш фактический прецедент (что вы хотите сделать с глобальным массивом состояний), я мог бы предложить лучшие решения. – code

+0

YEs точно @Darshan Мне нужно «получить доступ к массиву состояний по всему миру». Моя учетная запись: у меня была таблица поиска с 4 кулаками [Код, Определение, Manhattan_Area, Other_states] в Hive. В потоковых данных у меня было 3 столбца [State, IssueDate, ViolationCode]. Я хочу выполнить операцию, например, рассчитать сумму суммы, которую правительство генерирует за каждый код нарушения за каждый день, исходя из состояния. –

+0

В каком поле в улье находится поле поиска? и какое поле в потоковых данных вы ищете? И как ваше состояние [], как глобальная переменная, помогает вам? Кажется, что вы используете искрообразование неправильно. – code

ответ

1

Это больше похоже на вашу таблицу поиска с потоковым RDD, чтобы получить все элементы, имеющие соответствующие поля «code» и «exceptionCode».

Поток должен быть таким.

  1. Создание РДД из улья таблицы поиска => lookupRdd
  2. Создание DStream от Кафки потока
  3. Для каждого RDD в Dstream, присоединиться к lookupRDD с streamRdd, процесс соединяемых элементов (подсчитать сумму суммы ...) и сохраните этот обработанный результат.

Примечание Ниже код не заполнен. Пожалуйста, заполните все комментарии TODO.

JavaPairDStream<String, String> streamPair = directKafkaStream.mapToPair(new PairFunction<Tuple2<String, String>, String, String>() { 
     @Override 
     public Tuple2<String, String> call(Tuple2<String, String> tuple2) throws Exception { 
      System.out.println("Tuple2 Message is----------" + tuple2._2()); 
      String[] state = tuple2._2.split("\\,"); 
      return new Tuple2<>(state[4], tuple2._2()); //pair <ViolationCode, data> 
     } 
    }); 

    streamPair.foreachRDD(new Function<JavaPairRDD<String, String>, Void>() { 
     JavaPairRDD<String, String> hivePairRdd = null; 
     @Override 
     public Void call(JavaPairRDD<String, String> stringStringJavaPairRDD) throws Exception { 
      if (hivePairRdd == null) { 
       hivePairRdd = initHiveRdd(); 
      } 
      JavaPairRDD<String, Tuple2<String, String>> joinedRdd = stringStringJavaPairRDD.join(hivePairRdd); 
      System.out.println(joinedRdd.take(10)); 
      //todo process joinedRdd here and save the results. 
      joinedRdd.count(); //to trigger an action 
      return null; 
     } 
    }); 
} 

public static JavaPairRDD<String, String> initHiveRdd() { 
    JavaRDD<String> hiveTableRDD = null; //todo code to create RDD from hive table 
    JavaPairRDD<String, String> hivePairRdd = hiveTableRDD.mapToPair(new PairFunction<String, String, String>() { 
     @Override 
     public Tuple2<String, String> call(String row) throws Exception { 
      String code = null; //TODO process 'row' and get 'code' field 
      return new Tuple2<>(code, row); 
     } 
    }); 
    return hivePairRdd; 
} 
+0

так вы сказали? 'JavaDStream lines = directKafkaStream.карта (новая функция , String>() { \t \t \t \t \t \t \t \t \t \t частный статический окончательный долго serialVersionUID = 1L; \t \t \t \t \t @Override \t \t \t \t \t public String call (Tuple2 tuple2) { \t \t \t \t \t \t \t \t \t \t \t \t System.out.println ("Сообщение Tuple2 является ----------" + tuple2._2()); \t \t \t \t \t \t возвращение tuple2._2(); \t \t \t \t \t} \t \t \t \t}); ' –

+0

Я добавил фрагмент кода. Пожалуйста, отредактируйте его в соответствии с вашими потребностями. – code