Я использую pyspark с приемником Kafka для обработки потока твитов. Один из шагов моего приложения включает вызов API Google Natural Language для получения оценки настроения за твит. Тем не менее, я вижу, что API получает несколько вызовов за обработанный твит (я вижу числовые вызовы в Google Cloud Console).Spark streaming duplicate network calls
Кроме того, если я печатаю tweetID (внутри отображаемой функции), я получаю тот же ID 3 или 4 раза. В конце моего приложения твиты отправляются на другую тему в Kafka, и там я получаю правильное количество твитов (без повторных идентификаторов), поэтому в принципе все работает правильно, но я не знаю, как избежать вызова Google API более одного раза за твит.
Это связано с некоторыми параметрами конфигурации в Spark или Kafka?
Вот пример моей консоли вывода:
TIME 21:53:36: Google Response for tweet 801181843500466177 DONE!
TIME 21:53:36: Google Response for tweet 801181854766399489 DONE!
TIME 21:53:36: Google Response for tweet 801181844808966144 DONE!
TIME 21:53:37: Google Response for tweet 801181854372012032 DONE!
TIME 21:53:37: Google Response for tweet 801181843500466177 DONE!
TIME 21:53:37: Google Response for tweet 801181854766399489 DONE!
TIME 21:53:37: Google Response for tweet 801181844808966144 DONE!
TIME 21:53:37: Google Response for tweet 801181854372012032 DONE!
Но в приемнике Кафки я получаю только 4 обработанных твиты (что является правильным, что нужно получить, так как они только 4 уникальными твиты).
Код, который делает это:
def sendToKafka(rdd,topic,address):
publish_producer = KafkaProducer(bootstrap_servers=address,\
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
records = rdd.collect()
msg_dict = defaultdict(list)
for rec in records:
msg_dict["results"].append(rec)
publish_producer.send(resultTopic,msg_dict)
publish_producer.close()
kafka_stream = KafkaUtils.createStream(ssc, zookeeperAddress, "spark-consumer-"+myTopic, {myTopic: 1})
dstream_tweets=kafka_stream.map(lambda kafka_rec: get_json(kafka_rec[1]))\
.map(lambda post: add_normalized_text(post))\
.map(lambda post: tagKeywords(post,tokenizer,desired_keywords))\
.filter(lambda post: post["keywords"] == True)\
.map(lambda post: googleNLP.complementTweetFeatures(post,job_id))
dstream_tweets.foreachRDD(lambda rdd: sendToKafka(rdd,resultTopic,PRODUCER_ADDRESS))
Что вы уже сделали? Не могли бы вы вставить свой код в вопрос? –
Я обновил вопрос с помощью кода. Функция googleNLP.complementTweetFeatures() выполняет один запрос API Google и возвращает ответ. –