2012-06-14 5 views
0

Я использую RavenDB для хранения нескольких тысяч документов. Данные поступают из ежедневной XML-ленты, которую я буду обновлять, запустив консольное приложение C#. Ниже приведен код, который обрабатывает фид, чтобы синхронизировать базу данных с любыми изменениями. У меня было немало проблем с этим, поэтому мне интересно, выбрал ли я неправильную стратегию. Вот несколько важных замечаний.Как мне обновить несколько документов RavenDB из ежедневного фида?

  1. Новые элементы могут быть добавлены к сырью и существующие элементы могут изменились, поэтому каждый раз, когда он работает, я хочу добавить или обновить документ в зависимости от того, это или нет новый.
  2. Корзина xml не содержит ссылок на мои идентификаторы RavenDB, только ее внутренний ключ для каждого элемента. Поэтому при получении существующего документа для обновления я могу сделать это только путем изучения свойства «SourceID» в документе.
  3. Я использую «брать», чтобы работать только с 500 документами, частично из-за того, что мой db ограничен 1000 документами, а отчасти потому, что без Take() Кажется, я могу получить только 128 документов.
  4. Как указано выше, этот код падает с ошибкой «не может сделать более 30 обновлений в сеансе», я думаю, потому что каждый раз, когда я пытаюсь выполнить , вы можете получить существующую запись из dbItems, которая на самом деле попадает в базу данных снова ,
  5. Я могу исправить проблему в (4) выше, вызвав ToList() на элементах, но если я сделаю это, то существующий элемент не будет обновляться, когда я вызываю session.SaveChanges() (Я представляю это как a отключен recordset).

Может ли кто-нибудь дать мне несколько указателей?

 public void ProcessFeed(string rawXml) 
     { 
      XDocument doc = XDocument.Parse(rawXml); 
      var items = ExtractItemsFromFeed(doc).OrderBy(x => x.SourceId).Take(500); 
      using (IDocumentSession session = _store.OpenSession()) 
      { 
       var dbItems = session.Query<AccItem>().OrderBy(x => x.SourceId).Take(500); 
       foreach (var item in items) 
       { 
        var existingRecord = dbItems.SingleOrDefault(x => x.SourceId == item.SourceId); 
        if (existingRecord == null) 
        { 
         session.Store(item); 
         _logger.Info("Saved new item {0}.", item.ShortName); 
        } 
        else 
        { 
         // update just one field for now 
         existingRecord.Village = item.Village; 
         _logger.Info("Updated item {0}.", item.ShortName); 
        } 
       } 
       session.SaveChanges(); 
      }    
     } 
+0

В (4) вы пишете «не может делать более 30 обновлений в сеансе». Насколько я понимаю, записи переносятся, и поскольку вы только вызываете SaveChanges один раз за сеанс, так что это должно привести только к одному запросу на сервер для записи: https://ravendb.net/docs/theory/safe-by-default –

+0

Это именно то, что я думал, и это сделало результаты, которые я видел сбивчивыми. Однако я думаю, что это чтение, а не записи в этом случае, которые вызывают проблему (см. Ответ ниже). – centralscru

ответ

0

Ниже приведен код, в котором я оказался для этого. Я думаю, что первоначальная проблема с исходной версией была просто в том, что я пытался использовать один и тот же сеанс для каждого элемента, нарушая предел 30.

Настроенный некоторым кодом на экране в TekPub screencast Я исправил это, выполнив весь процесс в наборе из 15 (чтобы можно было читать один и один писать, поэтому 30 запросов в общей сложности за пакет). Это довольно медленно, но не так медленно, как я ожидал. Я ожидаю, возможно, 10 000 записей за раз, поэтому я просто оставлю это тикающим, пока это не будет сделано.

public void ProcessFeed(string rawXml) 
{ 
    XDocument doc = XDocument.Parse(rawXml); 
    var items = ExtractItemsFromFeed(doc).OrderBy(x => x.SourceId); 
    int numberOfItems = items.Count; 
    int batchSize = 15; 
    int numberOfBatchesRequired = numberOfItems/batchSize; 
    int numberOfBatchesProcessed = 0; 
    int numberOfItemsInLastBatch = numberOfItems - (numberOfBatchesRequired * batchSize); 
    for (var i = 0;i <= numberOfBatchesRequired;i++) 
    { 
     using (IDocumentSession session = _store.OpenSession()) 
     { 
      var numberOfItemsProcessedSoFar = numberOfBatchesProcessed * batchSize; 
      var numberOfItemsRemaining = numberOfItems - numberOfItemsProcessedSoFar; 
      int itemsToTake = 15; 
      if (numberOfItemsRemaining > 0 && numberOfItemsRemaining < 15) 
      itemsToTake = numberOfItemsRemaining; 
      foreach (var item in items.Skip(numberOfItemsProcessedSoFar).Take(itemsToTake)) 
      { 
      var existingRecords = session.Query<AccItem>().Where(x => x.SourceId == item.SourceId).ToList(); 
      if (!existingRecords.Any()) 
      { 
       session.Store(item); 
       _logger.Info("Saved new item {0}.", item.ShortName); 
      } 
      else 
      { 
       if (existingRecords.Count() > 1) 
       _logger.Warn("There's more than one item in the database with the sourceid {0}", item.SourceId); 
       existingRecords.First().Village = item.Village; 
       _logger.Info("Updated item {0}.", item.ShortName); 
      } 
      session.SaveChanges(); 
      } 
     }    
     numberOfBatchesProcessed++; 
    } 
}