2016-07-10 3 views
0

Я пытаюсь сделать массовое upsert с библиотекой mgo. Я читал documentation относительно массовых upserts, так как это первый раз, когда я когда-либо использовал MongoDB, и похоже, что я должен предоставить пару документов для обновления.Mgo неправильный тип для поля

В моей функции я выполняю поиск всего запроса, а затем используя результаты запроса для использования в качестве существующей части пары для операции bulk.Upsert(). Я не уверен, что это правильный путь, но я должен делать upserts на ~ 65k документов за раз.

Вот структуры типов и функция рабочего пула, которые читают из канала для выполнения вышеупомянутых операций MongoDB.

// types from my project's `lib` package. 
type Auctions struct { 
    Auc   int `json:"auc" bson:"_id"` 
    Item   int `json:"item" bson:"item"` 
    Owner  string `json:"owner" bson:"owner"` 
    OwnerRealm string `json:"ownerRealm" bson:"ownerRealm"` 
    Bid   int `json:"bid" bson:"bid"` 
    Buyout  int `json:"buyout" bson:"buyout"` 
    Quantity  int `json:"quantity" bson:"quantity"` 
    TimeLeft  string `json:"timeLeft" bson:"timeLeft"` 
    Rand   int `json:"rand" bson:"rand"` 
    Seed   int `json:"seed" bson:"seed"` 
    Context  int `json:"context" bson:"context"` 
    BonusLists []struct { 
     BonusListID int `json:"bonusListId" bson:"bonusListId"` 
    } `json:"bonusLists,omitempty" bson:"bonusLists,omitempty"` 
    Modifiers []struct { 
     Type int `json:"type" bson:"type"` 
     Value int `json:"value" bson:"value"` 
    } `json:"modifiers,omitempty" bson:"modifiers,omitempty"` 
    PetSpeciesID int `json:"petSpeciesId,omitempty" bson:"petSpeciesId,omitempty"` 
    PetBreedID int `json:"petBreedId,omitempty" bson:"petBreedId,omitempty"` 
    PetLevel  int `json:"petLevel,omitempty" bson:"petLevel,omitempty"` 
    PetQualityID int `json:"petQualityId,omitempty" bson:"petQualityId,omitempty"` 
} 

type AuctionResponse struct { 
    Realms []struct { 
     Name string `json:"name"` 
     Slug string `json:"slug"` 
    } `json:"realms"` 
    Auctions []Auctions `json:"auctions"` 
} 

func (b *Blizzard) RealmAuctionGrabber(realms chan string, db *mgo.Database, wg *sync.WaitGroup) { 
    defer wg.Done() 
    for i := range realms { 
     NewReq, err := http.NewRequest("GET", fmt.Sprintf("%s/auction/data/%s", b.Url, i), nil) 
     if err != nil { 
      fmt.Printf("Cannot create new request for realm %s: %s", i, err) 
     } 

     // Update the request with the default parameters and grab the files links. 
     Request := PopulateDefaultParams(NewReq) 
     log.Debugf("downloading %s auction locations.", i) 
     Response, err := b.Client.Do(Request) 
     if err != nil { 
      fmt.Printf("Error request realm auction data: %s\n", err) 
     } 
     defer Response.Body.Close() 

     Body, err := ioutil.ReadAll(Response.Body) 
     if err != nil { 
      fmt.Printf("Error parsing request body: %s\n", err) 
     } 

     var AuctionResp lib.AuctionAPI 
     err = json.Unmarshal(Body, &AuctionResp) 
     if err != nil { 
      fmt.Printf("Error marshalling auction repsonse body: %s\n", err) 
     } 

     for _, x := range AuctionResp.Files { 
      NewDataReq, err := http.NewRequest("GET", x.URL, nil) 
      if err != nil { 
       log.Error(err) 
      } 

      AuctionResponse, err := b.Client.Do(NewDataReq) 
      if err != nil { 
       fmt.Printf("Error request realm auction data: %s\n", err) 
      } 
      defer Response.Body.Close() 

      AuctionBody, err := ioutil.ReadAll(AuctionResponse.Body) 
      if err != nil { 
       fmt.Printf("Error parsing request body: %s\n", err) 
      } 

      var AuctionData lib.AuctionResponse 
      err = json.Unmarshal(AuctionBody, &AuctionData) 

      // grab all the current records, then perform an Upsert! 
      var existing []lib.Auctions 
      col := db.C(i) 
      err = col.Find(nil).All(&existing) 
      if err != nil { 
       log.Error(err) 
      } 

      log.Infof("performing bulk upsert for %s", i) 

      auctionData, err := bson.Marshal(AuctionData.Auctions) 
      if err != nil { 
       log.Error("error marshalling bson: %s", err) 
      } 

      existingData, _ := bson.Marshal(existing) 
      bulk := db.C(i).Bulk() 
      bulk.Upsert(existingData, auctionData) 
      _, err = bulk.Run() 
      if err != nil { 
       log.Error("error performing upsert! error: ", err) 
      } 
     } 
    } 
} 

Когда я звоню bulk.Upsert(existingData,auctionData), все в порядке. Однако, когда я называю bulk.Run(), я получаю это регистрируется сообщение об ошибке:

{"level":"error","msg":"error performing upsert! error: wrong type for 'q' field, expected object, found q: BinData(0, 0500000000)","time":"2016-07-09T16:53:45-07:00"} 

Я предполагаю, что это связано с тем, как я делаю BSON мечение в Auction структуры, но я не уверен, потому что это первый раз, когда я работал с MongoDB. Прямо сейчас в базе данных нет коллекций,

Это сообщение об ошибке, связанное с тегами BSON, и как его исправить?

ответ

1

Не знаю, действительно ли это актуально. Вот код:

package main 

import (
    "gopkg.in/mgo.v2" 
    "log" 
    "io/ioutil" 
    "encoding/json" 
    "gopkg.in/mgo.v2/bson" 
) 

type Auctions struct { 
    Auc int `json:"auc" bson:"_id"` 
    Owner string `json:"owner" bson:"owner"` 
} 

type AuctionResponse struct { 
    Auctions []Auctions `json:"auctions"` 
} 

func main() { 
    session, err := mgo.Dial("mongodb://127.0.0.1:27017/aucs") 

    if err != nil { 
     panic(err) 
    } 
    defer session.Close() 
    session.SetMode(mgo.Monotonic, true) 

    db := session.DB("aucs") 

    var AuctionData AuctionResponse 
    AuctionBody, _ := ioutil.ReadFile("auctions.json") 
    err = json.Unmarshal(AuctionBody, &AuctionData) 

    log.Println("performing bulk upsert for %s", "realm") 

    bulk := db.C("realm").Bulk() 
    for _, auc := range AuctionData.Auctions { 
     bulk.Upsert(bson.M{"_id": auc.Auc}, auc) 
    } 
    _, err = bulk.Run() 
    if err != nil { 
     log.Panic("error performing upsert! error: ", err) 
    } 
} 

Есть некоторые изменения, которые я сделал, чтобы проверить, был ли я право на реальных данных, но я надеюсь, что это не трудно понять, что происходит. Теперь позвольте мне немного пояснить часть Монго.

  1. Вам не нужно получать existing документов. Было бы странно запрашивать все существующие документы перед их обновлением или вставкой, особенно в огромные базы данных.
  2. mgo «s upsert функция требует два параметра:
    • selector документ (я предпочитаю думать об этом как о WHERE состоянии в терминах SQL)
    • document, что вы на самом деле есть, и что будет вставлена ​​в БД если запрос не удается селектор
  3. в случае объемных upsert существует неограниченное количество selector - document пар, доступных для толкать в одном запросе, как bulk.Upsert(sel1, doc1, sel2, doc2, sel3, doc3), но в в вашем случае нет смысла использовать эту функцию.
  4. upsert влияет только на один документ, поэтому даже если вы selector подходит только для нескольких документов, сначала будет обновляться. Другими словами, upsert больше похож на MySQL INSERT ... ON DUPLICATE KEY UPDATE, чем на UPDATE ... WHERE. В вашем случае (уникальные идентификаторы аукциона) одного upsert выглядит как запрос MySQL:

    INSERT INTO realm (auc, owner) VALUES ('1', 'Hogger') ON DUPLICATE KEY UPDATE owner = 'Hogger'; 
    
  5. С bulk мы можем просто сделать несколько upserts в цикле, а затем запустить их сразу.

+0

А, я вижу, как это работает, что имеет больше смысла. Благодаря! – mxplusb

+0

@mynameismevin приветствуется :-) –

+0

Я получаю максимальную ошибку размера партии. Я делаю 'bulk.Upsert()' партиями по 100, а затем вызывает 'bulk.Run()', чтобы нажимать эти upserts. это должен быть правильный способ обойти это, правильно? – mxplusb

 Смежные вопросы

  • Нет связанных вопросов^_^