2017-02-13 7 views
0

Это является продолжением вопрос оPyspark - управления раздаточной из искрового сессии (СБН)

Pyspark filter operation on Dstream

Чтобы сохранить счетчик, сколько сообщений об ошибках/предупреждения пришел через для скажем в день, час - как вы проектируете работу.

То, что я пробовал:

from __future__ import print_function 

import sys 

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 


    def counts(): 
      counter += 1 
      print(counter.value) 

    if __name__ == "__main__": 

      if len(sys.argv) != 3: 
        print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr) 
      exit(-1) 


      sc = SparkContext(appName="PythonStreamingNetworkWordCount") 
      ssc = StreamingContext(sc, 5) 
      counter = sc.accumulator(0) 

      lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) 
      errors = lines.filter(lambda l: "error" in l.lower()) 
      errors.foreachRDD(lambda e : e.foreach(counts)) 
      errors.pprint() 

      ssc.start() 
      ssc.awaitTermination() 

это, однако, есть несколько вопросов, чтобы начать с печатью не работает (не выводит на стандартный вывод, я прочитал об этом, лучших, что я могу использовать здесь является Ведение журнала). Могу ли я сохранить вывод этой функции в текстовый файл и вместо этого записать этот файл?

Я не знаю, почему программа просто выходит, нет ошибки/свалка никуда дальше смотреть в (1.6.2) искры

Как один сохранить состояние? То, что я пытаюсь это агрегатными журналы по серверу и тяжести, другой случай использования, чтобы подсчитать, сколько сделок были обработаны с помощью поиска определенных ключевых слов

псевдокода для того, что я хочу попробовать:

foreachRDD(Dstream): 
    if RDD.contains("keyword1 | keyword2 | keyword3"): 
    dictionary[keyword] = dictionary.get(keyword,0) + 1 //add the keyword if not present and increase the counter 
    print dictionary //or send this dictionary to else where 

последняя часть словаря для отправки или печати требует перехода из контекста искрообразования - может ли кто-нибудь объяснить концепцию, пожалуйста?

ответ

0

печать не работает

Я бы рекомендовал чтение design patterns section документации Спарк. Я думаю, что примерно то, что вы хотите что-то вроде этого:

def _process(iter): 
    for item in iter: 
     print item 

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) 
errors = lines.filter(lambda l: "error" in l.lower()) 
errors.foreachRDD(lambda e : e.foreachPartition(_process)) 

Это позволит получить ваш вызов print работать (хотя стоит отметить, что оператор печати будет выполняться на рабочих, а не водителей, так что если вам «Запустив этот код в кластере, вы увидите его только в рабочих журналах».

Однако, он не будет решить вторую проблему:

Как один сохранить состояние?

Для этого взгляните на updateStateByKey и на номер related example.

+0

Будет ли это напечатанный товар или печать (элемент). По какой-то причине, когда я использую «печать» в любом месте программы, он выдает любые ошибки - больше похоже на «сбой» без какой-либо информации или ошибки отладки – GreenThumb