Я реализовал очень простой конвейер с перекодировкой и сложены с некоторыми проблемами.Как использовать Google App Engine MapReduce с большим объемом данных (> 1000000)
Сценарий: существует более 1000000 объектов в одном виде модели в облачном хранилище данных, и я хотел бы проверить все сущности, имеет ли каждый объект свойство несоответствия.
Вот мой фрагмент кода.
class User(ndb.model)
parent = ndb.KeyProperty(Group) # want to check if this key property actually exist
class CheckKeyExistencePipeline(pipeline.Pipeline):
def map(self, entity):
logging.info(entity.urlsafe()) # added for debug
prop = getattr(entity, 'parent')
if not prop.get():
yield 'parent does not exist: %s\n' % (entity.key.urlsafe())
def run(self, modelname, shards):
mapreduce_pipeline.MapperPipeline(
'parent check',
handler_spec='CheckKeyExistencePipeline.map',
input_reader_spec='mapreduce.input_readers.DatastoreInputReader',
output_writer_spec="mapreduce.output_writers.GoogleCloudStorageOutputWriter",
params={
'input_reader': {
'entity_kind': 'User',
},
'output_writer': {
'bucket_name': app_identity.get_default_gcs_bucket_name(),
'content_type': 'text/plain'
}
},
shards=10)
Проблема в том, что на самом деле она часто встречается с ошибкой.
Превышен мягкий частный предел памяти объемом 128 Мб с 133 МБ после обслуживание 2 запросов всего
Там нет никаких проблем, когда я запускаю этот код с данными около 10000 лиц. В чем проблема и как я могу правильно настроить этот конвейер для применения большого количества данных?
EDIT1
Я изменил не использовать Ndb кэш, но это не кажется, никакого улучшения. Я думаю, кэш уже отключен по умолчанию в соответствии с исходным кодом.
def _set_ndb_cache_policy():
"""Tell NDB to never cache anything in memcache or in-process.
This ensures that entities fetched from Datastore input_readers via NDB
will not bloat up the request memory size and Datastore Puts will avoid
doing calls to memcache. Without this you get soft memory limit exits,
which hurts overall throughput.
"""
ndb_ctx = ndb.get_context()
ndb_ctx.set_cache_policy(lambda key: False)
ndb_ctx.set_memcache_policy(lambda key: False)
я сделал дальнейшее расследование, чтобы выяснить проблему. Я установил один из параметров отображения processing_rate
на 10 и shards
на 100, так что он обрабатывает только 1 или 2 объекта для каждой задачи.
Адрес: mapreduce stat. Граф кажется разумным. (Трубопровод еще не завершен в это время.)
Но когда я проверяю журнал трассировки одной из задач рабочего, это действительно странно. Он показывает группу /datastore_v3.Next
и /datastore_v3.Get
, несмотря на то, что функция «map» вызывается только дважды (согласно моему журналу отладки). Поскольку я не менял batch_size, это должно быть 50. Таким образом, по моему мнению, /datastore_v3.Next
должен быть только один раз и /datastore_v3.Get
дважды.
Кто-нибудь знает, почему такие многие RPC вызовы на базе срабатывают?
EDIT2
Опять же, я сделал дальнейшее расследование и сделал код простым. Функция карты просто получает данные, используя ndb.Key
, позвонив в get
.
class CheckKeyExistencePipeline(pipeline.Pipeline):
def map(self, entity):
logging.info('start')
entity.parent.get()
logging.info('end')
def run(self):
mapreduce_pipeline.MapperPipeline(
'parent check',
handler_spec='CheckKeyExistencePipeline.map',
input_reader_spec='mapreduce.input_readers.DatastoreInputReader',
output_writer_spec="mapreduce.output_writers.GoogleCloudStorageOutputWriter",
params={
'input_reader': {
'entity_kind': 'User',
},
'output_writer': {
'bucket_name': app_identity.get_default_gcs_bucket_name(),
'content_type': 'text/plain'
}
},
shards=10)
Tracelog of Stackdriver выглядит следующим образом.
Это просто вызов get
, но это вызывает RPC вызовов много раз между «начала» и «конца». Это кажется немного странным и может быть одной из причин этого потребления памяти. Это регулярное поведение?
Иногда вы можете заполнить много памяти, используя для каждого экземпляра кэша, что 'ndb' предоставляет. Вы можете получить объекты без использования кеша: 'prop.get (use_cache = False)'. В этом случае, поскольку вы не делаете кучу попаданий на один и тот же ключ подряд, вы все равно ничего не получаете от кеша ... Вы также можете попробовать [увеличить класс экземпляра] (https://cloud.google.com/appengine/docs/about-the-standard-environment#instance_classes) для службы, обрабатывающей эти запросы ... – mgilson
Да, кэширование станет ключевым компонентом здесь (и вообще кэширует объекты ndb не поможет, если вы не знаете, что будете использовать их повторно). Возможно, вы захотите сделать итератор вместо ключевого, чтобы вы могли лучше контролировать поведение кэша. –
О, я понял. Я не думал о кеше. Я попробую. Благодаря! – Kunihiko