2013-05-30 3 views
11

Я играю с Спарком. Это стандартное, предварительно построенное распределение (0.7.0) с веб-сайта, с конфигурацией по умолчанию, режимом кластера, одним рабочим (мой localhost). Я читал документы об установке, и все кажется прекрасным.Спайк-кластер не работает на большом входе, хорошо работает для небольших

У меня есть файл CSV (различные размеры, 1000 - 1 миллион строк). Если я запустил приложение с небольшим входным файлом (например, 1000 строк), все в порядке, программа выполняется за считанные секунды и выдает ожидаемый результат. Но когда я поставляю больший файл (100 000 строк или 1 миллион), выполнение не выполняется. Я пытался копаться в журналах, но не очень помог (он повторяет весь процесс примерно в 9-10 раз и exitst с fail после этого. Также есть некоторая ошибка, связанная с извлечением из некоторого нулевого источника).

Результат Iterable, возвращенный первым JavaRDD, является подозрительным для меня. Если я верну жесткокодированный одноэлементный список (например, res.add («something»), return res;), все в порядке, даже с миллионом строк. Но если я добавлю все свои ключи, я хочу (28 строк длиной 6-20 символов), процесс завершится неудачно с большим входом. Проблема в том, что мне нужны все эти ключи, это настоящая бизнес-логика.

Я использую Linux amd64, четырехъядерный ядро, 8 ГБ оперативной памяти. Последняя версия Oracle Java7 JDK. Свеча конфигурации:

SPARK_WORKER_MEMORY=4g 
SPARK_MEM=3g 
SPARK_CLASSPATH=$SPARK_CLASSPATH:/my/super/application.jar 

я должен отметить, что при запуске программы, она говорит:

13/05/30 11:41:52 WARN spark.Utils: Your hostname, *** resolves to a loopback address: 127.0.1.1; using 192.168.1.157 instead (on interface eth1) 
13/05/30 11:41:52 WARN spark.Utils: Set SPARK_LOCAL_IP if you need to bind to another address 

Вот моя программа. Он основан на примере JavaWordCount, минимально модифицированном.

public final class JavaWordCount 
{ 
    public static void main(final String[] args) throws Exception 
    { 
     final JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount", 
      System.getenv("SPARK_HOME"), new String[] {"....jar" }); 

     final JavaRDD<String> words = ctx.textFile(args[1], 1).flatMap(new FlatMapFunction<String, String>() { 

      @Override 
      public Iterable<String> call(final String s) 
      { 
       // parsing "s" as the line, computation, building res (it's a List<String>) 
       return res; 
      } 
     }); 

     final JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() { 

      @Override 
      public Tuple2<String, Integer> call(final String s) 
      { 
       return new Tuple2<String, Integer>(s, 1); 
      } 
     }); 
     final JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() { 

      @Override 
      public Integer call(final Integer i1, final Integer i2) 
      { 
       return i1 + i2; 
      } 
     }); 

     counts.collect(); 

     for (Tuple2<?, ?> tuple : counts.collect()) { 
      System.out.println(tuple._1 + ": " + tuple._2); 
     } 
    } 
} 
+0

Перед изменением свойства системы Spark, с какими исключениями/ошибками работала ваша работа? –

+0

В группе искроберителей я получил ответ, что .collect() вызовет сборку всех (временных) RDD. Это была настоящая проблема. Тема с решением здесь: http://stackoverflow.com/questions/16832429/spark-cluster-fails-on-bigger-input-works-well-for-small?noredirect1_comment24468201_16832429 – gyorgyabraham

+1

Я искал googled для возрастов и возрастов чтобы найти решение моей проблемы, ответ на этот вопрос решает мою проблему, поэтому, пожалуйста, отредактируйте свой вопрос, включив в свой вопрос «org.apache.spark.SparkException: сообщение об ошибке с MapOutputTracker», чтобы сделать googling проще для других в будущем , – samthebest

ответ

13

мне удалось это исправить, установив свойство spark.mesos.coarse к истине. Больше информации here.

Обновление: Я пару раз работаю с Спарком. Эти настройки немного помогли мне, но, похоже, почти невозможно обработать ~ 10 миллионов строк текста на одной машине.

System.setProperty("spark.serializer", "spark.KryoSerializer"); // kryo is much faster 
System.setProperty("spark.kryoserializer.buffer.mb", "256"); // I serialize bigger objects 
System.setProperty("spark.mesos.coarse", "true"); // link provided 
System.setProperty("spark.akka.frameSize", "500"); // workers should be able to send bigger messages 
System.setProperty("spark.akka.askTimeout", "30"); // high CPU/IO load 

Примечание: Увеличение размера кадра представляется особенно полезным в предотвращении: org.apache.spark.SparkException: Error communicating with MapOutputTracker

+1

'spark.akka.frameSize' также решила мою проблему org.apache.spark.SparkException: ошибка, связанная с проблемой MapOutputTracker. – samthebest

+0

Система.setProperty() также работает в искровой оболочке? Я не могу получить набор frameSize –