У меня есть поток кафки, входящий в тему ввода. Это код, который я написал для принятия потока кафки.Как объединить два DStreams (pyspark)?
conf = SparkConf().setAppName(appname)
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc)
kvs = KafkaUtils.createDirectStream(ssc, topics,\
{"metadata.broker.list": brokers})
Затем я создаю два DStreams ключей и значений исходного потока.
keys = kvs.map(lambda x: x[0].split(" "))
values = kvs.map(lambda x: x[1].split(" "))
Затем я выполняю вычисления в значениях DStream. Для примера,
val = values.flatMap(lambda x: x*2)
Теперь, мне нужно объединить ключи и VAL DStream и возвращает результат в виде Кафки потока.
Как совместить val с corressponding ключом?
я не получил эту часть '(любые операции, как карта, flatmap ...)', Вы можете уточнить больше. – vidhan
Я не понимаю то, что вы хотите на самом деле (я дал общий ответ о слиянии 2 DStreams). Дело в том, что если вы делаете плоскую карту значений, нет возможности сопоставить их с ключами, так как вывод этого будет одним сплющенным списком .... Слиянием 2 Dstreams вы можете создавать RDD, каждый из которых имеет элементы оба ключа и значения, просто чтобы не было сопоставления один к одному ... –