11

Я паркетные данные разделены date & hour, структуры папок:Свечи списков всех листьевы узлов даже в секционированных данных

events_v3 
    -- event_date=2015-01-01 
    -- event_hour=2015-01-1 
     -- part10000.parquet.gz 
    -- event_date=2015-01-02 
    -- event_hour=5 
     -- part10000.parquet.gz 

Я создал таблицу raw_events с помощью искры, но когда я пытаюсь запрос, он сканирует все каталоги для нижнего колонтитула и что замедляет первоначальный запрос, даже если я запрашиваю только данные за один день.

запрос: select * from raw_events where event_date='2016-01-01'

аналогичная проблема: http://mail-archives.apache.org/mod_mbox/spark-user/201508.mbox/%[email protected].com%3E (но старый)

Log:

App > 16/09/15 03:14:03 main INFO HadoopFsRelation: Listing leaf files and directories in parallel under: s3a://bucket/events_v3/ 

, а затем она нерестится 350 задач, так как есть 350 дней стоит данных.

У меня отключен schemaMerge, а также указана схема для чтения как, поэтому она может просто перейти к разделу, на который я смотрю, почему он должен быть загружен во все листовые файлы? Листинг листьев файлы с 2-исполнителях займет 10 минут, а запрос фактическое исполнение берет на 20 секунд

Пример кода:

val sparkSession = org.apache.spark.sql.SparkSession.builder.getOrCreate() 
val df = sparkSession.read.option("mergeSchema","false").format("parquet").load("s3a://bucket/events_v3") 
    df.createOrReplaceTempView("temp_events") 
    sparkSession.sql(
     """ 
     |select verb,count(*) from temp_events where event_date = "2016-01-01" group by verb 
     """.stripMargin).show() 
+0

Связано: [Поддерживает ли Spark подрезку раздела с помощью паркетных файлов] (http://stackoverflow.com/q/37180073/1560062) – zero323

+0

Я вообще не использую куст. Просто искра и искра sql –

+1

@lostinoverflow Я до сих пор не нашел, почему мы читаем рекурсивно, но я могу сбить 10 минут первоначального сканирования на 1 мин сканирования. Эффективное сокращение запроса до менее 2 минут –

ответ

3

Как только искра дается каталог для чтения из него выдает вызов listLeafFiles (орг/Apache/искровые/SQL/выполнение// fileSourceInterfaces.scala источники данных). Это, в свою очередь, вызывает fs.listStatus, который делает api-вызов для получения списка файлов и каталогов. Теперь для каждого каталога этот метод вызывается снова. Это происходит рекурсивно, пока не останется никаких каталогов. Это по дизайну хорошо работает в HDFS-системе. Но работает плохо в s3, поскольку файл списка является вызовом RPC. У S3 на других есть поддержка, чтобы получить все файлы с помощью префикса, что именно то, что нам нужно.

Так, например, если бы у нас была структура каталогов с данными за 1 год с каждым каталогом в течение часа и 10 подкаталогов, мы бы получили 365 * 24 * 10 = 87 тыс. Api звонков, это может быть сокращено до 138 апи-вызовов учитывая, что всего 137000 файлов. Каждый вызов s3 api возвращает 1000 файлов.

Код: org/apache/hadoop/fs/s3a/S3AFileSystem.java

public FileStatus[] listStatusRecursively(Path f) throws FileNotFoundException, 
      IOException { 
     String key = pathToKey(f); 
     if (LOG.isDebugEnabled()) { 
      LOG.debug("List status for path: " + f); 
     } 

     final List<FileStatus> result = new ArrayList<FileStatus>(); 
     final FileStatus fileStatus = getFileStatus(f); 

     if (fileStatus.isDirectory()) { 
      if (!key.isEmpty()) { 
       key = key + "/"; 
      } 

      ListObjectsRequest request = new ListObjectsRequest(); 
      request.setBucketName(bucket); 
      request.setPrefix(key); 
      request.setMaxKeys(maxKeys); 

      if (LOG.isDebugEnabled()) { 
       LOG.debug("listStatus: doing listObjects for directory " + key); 
      } 

      ObjectListing objects = s3.listObjects(request); 
      statistics.incrementReadOps(1); 

      while (true) { 
       for (S3ObjectSummary summary : objects.getObjectSummaries()) { 
        Path keyPath = keyToPath(summary.getKey()).makeQualified(uri, workingDir); 
        // Skip over keys that are ourselves and old S3N _$folder$ files 
        if (keyPath.equals(f) || summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) { 
         if (LOG.isDebugEnabled()) { 
          LOG.debug("Ignoring: " + keyPath); 
         } 
         continue; 
        } 

        if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) { 
         result.add(new S3AFileStatus(true, true, keyPath)); 
         if (LOG.isDebugEnabled()) { 
          LOG.debug("Adding: fd: " + keyPath); 
         } 
        } else { 
         result.add(new S3AFileStatus(summary.getSize(), 
           dateToLong(summary.getLastModified()), keyPath, 
           getDefaultBlockSize(f.makeQualified(uri, workingDir)))); 
         if (LOG.isDebugEnabled()) { 
          LOG.debug("Adding: fi: " + keyPath); 
         } 
        } 
       } 

       for (String prefix : objects.getCommonPrefixes()) { 
        Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir); 
        if (keyPath.equals(f)) { 
         continue; 
        } 
        result.add(new S3AFileStatus(true, false, keyPath)); 
        if (LOG.isDebugEnabled()) { 
         LOG.debug("Adding: rd: " + keyPath); 
        } 
       } 

       if (objects.isTruncated()) { 
        if (LOG.isDebugEnabled()) { 
         LOG.debug("listStatus: list truncated - getting next batch"); 
        } 

        objects = s3.listNextBatchOfObjects(objects); 
        statistics.incrementReadOps(1); 
       } else { 
        break; 
       } 
      } 
     } else { 
      if (LOG.isDebugEnabled()) { 
       LOG.debug("Adding: rd (not a dir): " + f); 
      } 
      result.add(fileStatus); 
     } 

     return result.toArray(new FileStatus[result.size()]); 
    } 

/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala

def listLeafFiles(fs: FileSystem, status: FileStatus, filter: PathFilter): Array[FileStatus] = { 
    logTrace(s"Listing ${status.getPath}") 
    val name = status.getPath.getName.toLowerCase 
    if (shouldFilterOut(name)) { 
     Array.empty[FileStatus] 
    } 
    else { 
     val statuses = { 
     val stats = if(fs.isInstanceOf[S3AFileSystem]){ 
      logWarning("Using Monkey patched version of list status") 
      println("Using Monkey patched version of list status") 
      val a = fs.asInstanceOf[S3AFileSystem].listStatusRecursively(status.getPath) 
      a 
//   Array.empty[FileStatus] 
     } 
     else{ 
      val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory) 
      files ++ dirs.flatMap(dir => listLeafFiles(fs, dir, filter)) 

     } 
     if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats 
     } 
     // statuses do not have any dirs. 
     statuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map { 
     case f: LocatedFileStatus => f 

     // NOTE: 
     // 
     // - Although S3/S3A/S3N file system can be quite slow for remote file metadata 
     // operations, calling `getFileBlockLocations` does no harm here since these file system 
     // implementations don't actually issue RPC for this method. 
     // 
     // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not 
     // be a big deal since we always use to `listLeafFilesInParallel` when the number of 
     // paths exceeds threshold. 
     case f => createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) 
     } 
    } 
    } 
2

Чтобы уточнить ответ Gaurav, в том, что код отрезала от Hadoop ветвей 2, вероятно, не будет не на поверхность до Hadoop 2.9 (см HADOOP-13208) ; и кому-то нужно обновить Spark, чтобы использовать эту функцию (которая не повредит код с использованием HDFS, просто не покажет никакого ускорения там).

Одна вещь, которую следует учитывать, - это то, что делает хороший макет файла для объектов-объектов.

  • Не имеют глубокие деревья каталогов с только несколько файлов в одном каталоге
  • ли у мелких деревьев с большим количеством файлов
  • Рассмотрите возможность использования нескольких первых символов файла для наиболее изменяющегося значения (например, день/час), а не последний. Зачем?В некоторых объектных хранилищах появляются, чтобы использовать лидирующие символы для их хеширования, а не для конечных ... если вы дадите своим именам больше уникальности, тогда они будут распределены по большему количеству серверов с лучшей пропускной способностью/меньшим риском дросселирования.
  • Если вы используете библиотеки Hadoop 2.7, переключитесь на s3a: // через s3n: //. Это уже быстрее и становится лучше каждую неделю, по крайней мере, в исходном дереве ASF.

Наконец, Apache Hadoop, Apache Spark и связанные с ними проекты - все с открытым исходным кодом. Взносы приветствуются. Это не просто код, это документация, тестирование, а для этого продукта производительности - тестирование ваших фактических наборов данных. Интересно даже дать нам подробные сведения о причинах проблем (и ваших макетов набора данных).

+0

Спасибо. Возможно, вы хотите ответить на http://stackoverflow.com/q/39898067/6022341 –

+0

, они внесли это исправление в 2.8.0, которое должно быть через пару недель :) –

+0

Не знаю о расписаниях; никто еще не начал этот процесс выпуска. Я действительно верю, что это доставка в HDP-2.5, и по мере того, как я получу призывы к поддержке, если он не работает, вы получите вызовы поддержки. Когда и когда начнется процесс 2.8 RC, тестирование поможет. В любом случае Spark не получает никакого ускорения, так как его также нужно настроить, и есть другие вещи, на которые нужно смотреть. Сделайте вашу жизнь проще, выложив данные в меньшее количество каталогов, например, в месяц, а не в день –

 Смежные вопросы

  • Нет связанных вопросов^_^