2017-01-27 9 views
1

Я реализовал очень простой конвейер с перекодировкой и сложены с некоторыми проблемами.Как использовать Google App Engine MapReduce с большим объемом данных (> 1000000)

Сценарий: существует более 1000000 объектов в одном виде модели в облачном хранилище данных, и я хотел бы проверить все сущности, имеет ли каждый объект свойство несоответствия.

Вот мой фрагмент кода.

class User(ndb.model) 
    parent = ndb.KeyProperty(Group) # want to check if this key property actually exist 


class CheckKeyExistencePipeline(pipeline.Pipeline): 

    def map(self, entity): 
     logging.info(entity.urlsafe()) # added for debug 
     prop = getattr(entity, 'parent') 
     if not prop.get(): 
      yield 'parent does not exist: %s\n' % (entity.key.urlsafe()) 

    def run(self, modelname, shards): 
     mapreduce_pipeline.MapperPipeline(
      'parent check', 
      handler_spec='CheckKeyExistencePipeline.map', 
      input_reader_spec='mapreduce.input_readers.DatastoreInputReader', 
      output_writer_spec="mapreduce.output_writers.GoogleCloudStorageOutputWriter", 
      params={ 
       'input_reader': { 
        'entity_kind': 'User', 
       }, 
       'output_writer': { 
        'bucket_name': app_identity.get_default_gcs_bucket_name(), 
        'content_type': 'text/plain' 
       } 
      }, 
      shards=10) 

Проблема в том, что на самом деле она часто встречается с ошибкой.

Превышен мягкий частный предел памяти объемом 128 Мб с 133 МБ после обслуживание 2 запросов всего

Там нет никаких проблем, когда я запускаю этот код с данными около 10000 лиц. В чем проблема и как я могу правильно настроить этот конвейер для применения большого количества данных?

EDIT1

Я изменил не использовать Ndb кэш, но это не кажется, никакого улучшения. Я думаю, кэш уже отключен по умолчанию в соответствии с исходным кодом.

https://github.com/GoogleCloudPlatform/appengine-mapreduce/blob/6e103ac52855c3214de3ec3721d6ec0e7edd5f77/python/src/mapreduce/util.py#L381-L383

def _set_ndb_cache_policy(): 
    """Tell NDB to never cache anything in memcache or in-process. 
    This ensures that entities fetched from Datastore input_readers via NDB 
    will not bloat up the request memory size and Datastore Puts will avoid 
    doing calls to memcache. Without this you get soft memory limit exits, 
    which hurts overall throughput. 
    """ 
    ndb_ctx = ndb.get_context() 
    ndb_ctx.set_cache_policy(lambda key: False) 
    ndb_ctx.set_memcache_policy(lambda key: False) 

я сделал дальнейшее расследование, чтобы выяснить проблему. Я установил один из параметров отображения processing_rate на 10 и shards на 100, так что он обрабатывает только 1 или 2 объекта для каждой задачи.

Адрес: mapreduce stat. Граф кажется разумным. (Трубопровод еще не завершен в это время.)

mapreduce detail

Но когда я проверяю журнал трассировки одной из задач рабочего, это действительно странно. Он показывает группу /datastore_v3.Next и /datastore_v3.Get, несмотря на то, что функция «map» вызывается только дважды (согласно моему журналу отладки). Поскольку я не менял batch_size, это должно быть 50. Таким образом, по моему мнению, /datastore_v3.Next должен быть только один раз и /datastore_v3.Get дважды.

tracelog

Кто-нибудь знает, почему такие многие RPC вызовы на базе срабатывают?

EDIT2

Опять же, я сделал дальнейшее расследование и сделал код простым. Функция карты просто получает данные, используя ndb.Key, позвонив в get.

class CheckKeyExistencePipeline(pipeline.Pipeline): 

    def map(self, entity): 
     logging.info('start') 
     entity.parent.get() 
     logging.info('end') 


    def run(self): 
     mapreduce_pipeline.MapperPipeline(
      'parent check', 
      handler_spec='CheckKeyExistencePipeline.map', 
      input_reader_spec='mapreduce.input_readers.DatastoreInputReader', 
      output_writer_spec="mapreduce.output_writers.GoogleCloudStorageOutputWriter", 
      params={ 
       'input_reader': { 
        'entity_kind': 'User', 
       }, 
       'output_writer': { 
        'bucket_name': app_identity.get_default_gcs_bucket_name(), 
        'content_type': 'text/plain' 
       } 
      }, 
      shards=10) 

Tracelog of Stackdriver выглядит следующим образом.

trace log

Это просто вызов get, но это вызывает RPC вызовов много раз между «начала» и «конца». Это кажется немного странным и может быть одной из причин этого потребления памяти. Это регулярное поведение?

+1

Иногда вы можете заполнить много памяти, используя для каждого экземпляра кэша, что 'ndb' предоставляет. Вы можете получить объекты без использования кеша: 'prop.get (use_cache = False)'. В этом случае, поскольку вы не делаете кучу попаданий на один и тот же ключ подряд, вы все равно ничего не получаете от кеша ... Вы также можете попробовать [увеличить класс экземпляра] (https://cloud.google.com/appengine/docs/about-the-standard-environment#instance_classes) для службы, обрабатывающей эти запросы ... – mgilson

+0

Да, кэширование станет ключевым компонентом здесь (и вообще кэширует объекты ndb не поможет, если вы не знаете, что будете использовать их повторно). Возможно, вы захотите сделать итератор вместо ключевого, чтобы вы могли лучше контролировать поведение кэша. –

+0

О, я понял. Я не думал о кеше. Я попробую. Благодаря! – Kunihiko

ответ

0

Я думаю, что, наконец, я нашел, в чем проблема.

Дело в том, что MapReduce использует ndb.query.iter и использует eventloops для управления вызовами async RPC. В моем вызове MapReduce он вызывает два типа вызова RPC, один из которых запускается библиотекой MapReduce для извлечения записей базы данных (A), а другой - моей функцией map (B).

Если я не вызываю никакого вызова RPC внутри моей функции map, нет возможности запуска следующего вызова RPC. Это означает, что следующий (A) запускается только после того, как выложили 50 записей. Однако попытка для (B) запускает следующий вызов RPC, и поскольку вызовы RPC не запускаются серийно (это означает, что это не очередь FIFO), может быть очень возможно, чтобы (A) запускался непрерывно, пока он не выберет все объекты ,

Я установил shards в 100, но все же один осколок отвечает за 10000 записей. Таким образом, это превышает предел мягкой памяти.

И когда я увеличиваю shards до 10000, другая ошибка происходит ...

В заключение, Там нет никакого способа использовать MapReduce с большими данными с низкой инстанции памяти. Я полагаю.

Подробнее см. Ниже.

https://code.google.com/p/googleappengine/issues/detail?id=11648 https://code.google.com/p/googleappengine/issues/detail?id=9610 (оригинальный выпуск)

 Смежные вопросы

  • Нет связанных вопросов^_^