2016-08-18 6 views
0

У меня есть поток кафки, входящий в тему ввода. Это код, который я написал для принятия потока кафки.Как объединить два DStreams (pyspark)?

conf = SparkConf().setAppName(appname) 
sc = SparkContext(conf=conf) 
ssc = StreamingContext(sc) 
kvs = KafkaUtils.createDirectStream(ssc, topics,\ 
      {"metadata.broker.list": brokers}) 

Затем я создаю два DStreams ключей и значений исходного потока.

keys = kvs.map(lambda x: x[0].split(" ")) 
values = kvs.map(lambda x: x[1].split(" ")) 

Затем я выполняю вычисления в значениях DStream. Для примера,

val = values.flatMap(lambda x: x*2) 

Теперь, мне нужно объединить ключи и VAL DStream и возвращает результат в виде Кафки потока.

Как совместить val с corressponding ключом?

ответ

0

Вы можете просто использовать оператор join на 2 DStreams, чтобы объединить их. Когда вы делаете карту, вы по существу создаете другой поток. Таким образом, объединение поможет вам объединить их вместе.

Например:

Joined_Stream = keys.join(values).(any operation like map, flatmap...) 
+0

я не получил эту часть '(любые операции, как карта, flatmap ...)', Вы можете уточнить больше. – vidhan

+0

Я не понимаю то, что вы хотите на самом деле (я дал общий ответ о слиянии 2 DStreams). Дело в том, что если вы делаете плоскую карту значений, нет возможности сопоставить их с ключами, так как вывод этого будет одним сплющенным списком .... Слиянием 2 Dstreams вы можете создавать RDD, каждый из которых имеет элементы оба ключа и значения, просто чтобы не было сопоставления один к одному ... –

 Смежные вопросы

  • Нет связанных вопросов^_^