1

Я использую pyspark с приемником Kafka для обработки потока твитов. Один из шагов моего приложения включает вызов API Google Natural Language для получения оценки настроения за твит. Тем не менее, я вижу, что API получает несколько вызовов за обработанный твит (я вижу числовые вызовы в Google Cloud Console).Spark streaming duplicate network calls

Кроме того, если я печатаю tweetID (внутри отображаемой функции), я получаю тот же ID 3 или 4 раза. В конце моего приложения твиты отправляются на другую тему в Kafka, и там я получаю правильное количество твитов (без повторных идентификаторов), поэтому в принципе все работает правильно, но я не знаю, как избежать вызова Google API более одного раза за твит.

Это связано с некоторыми параметрами конфигурации в Spark или Kafka?

Вот пример моей консоли вывода:

TIME 21:53:36: Google Response for tweet 801181843500466177 DONE! 
TIME 21:53:36: Google Response for tweet 801181854766399489 DONE! 
TIME 21:53:36: Google Response for tweet 801181844808966144 DONE! 
TIME 21:53:37: Google Response for tweet 801181854372012032 DONE! 
TIME 21:53:37: Google Response for tweet 801181843500466177 DONE! 
TIME 21:53:37: Google Response for tweet 801181854766399489 DONE! 
TIME 21:53:37: Google Response for tweet 801181844808966144 DONE! 
TIME 21:53:37: Google Response for tweet 801181854372012032 DONE! 

Но в приемнике Кафки я получаю только 4 обработанных твиты (что является правильным, что нужно получить, так как они только 4 уникальными твиты).

Код, который делает это:

def sendToKafka(rdd,topic,address): 
    publish_producer = KafkaProducer(bootstrap_servers=address,\ 
          value_serializer=lambda v: json.dumps(v).encode('utf-8')) 
    records = rdd.collect() 
    msg_dict = defaultdict(list) 
    for rec in records: 
     msg_dict["results"].append(rec) 
    publish_producer.send(resultTopic,msg_dict) 
    publish_producer.close() 


kafka_stream = KafkaUtils.createStream(ssc, zookeeperAddress, "spark-consumer-"+myTopic, {myTopic: 1}) 

dstream_tweets=kafka_stream.map(lambda kafka_rec: get_json(kafka_rec[1]))\ 
       .map(lambda post: add_normalized_text(post))\ 
       .map(lambda post: tagKeywords(post,tokenizer,desired_keywords))\ 
       .filter(lambda post: post["keywords"] == True)\ 
       .map(lambda post: googleNLP.complementTweetFeatures(post,job_id)) 

dstream_tweets.foreachRDD(lambda rdd: sendToKafka(rdd,resultTopic,PRODUCER_ADDRESS)) 
+0

Что вы уже сделали? Не могли бы вы вставить свой код в вопрос? –

+0

Я обновил вопрос с помощью кода. Функция googleNLP.complementTweetFeatures() выполняет один запрос API Google и возвращает ответ. –

ответ

1

я уже нашел решение этого! Я просто должен был кэшировать DStream с:

dstream_tweets.cache() 

Множественных вызовы сети произошли потому, что Спарк пересчитал РД внутри этого DStream перед выполнением операций в последних моем сценарии. Когда я кеширую() DStream, нужно только вычислить его один раз; и поскольку он сохраняется в памяти, более поздние функции могут получить доступ к этой информации без повторных вычислений (в этом случае для повторного расчета требуется повторный вызов API, поэтому стоит заплатить больше за использование памяти).