2014-01-25 3 views
1

Я пишу приложение, которое быстро записывает в mongodb. Слишком быстро для mongodb и mgo для обработки. Мой вопрос в том, есть ли способ определить, что монго не может идти в ногу и начинать блокировать? Но я также не хочу блокировать излишне. Вот пример кода, который эмулирует проблему:Golang Mgo pacing

package main 

import (
    "labix.org/v2/mgo" 
    "time" 
    "fmt" 
) 

// in database name is a string and age is an int 

type Dog struct{ 
    Breed string "breed" 
} 

type Person struct{ 
    Name string "name" 
    Pet Dog `bson:",inline"` 
    Ts  time.Time 
} 

func insert(session *mgo.Session, bob Person){ 
    err := session.DB("db_log").C("people").Insert(&bob) 
    if err != nil { 
    panic("Could not insert into database") 
    } 
} 

func main() { 
    session, _ := mgo.Dial("localhost:27017") 
    bob := Person{Name : "Robert", Pet : Dog{}} 
    i := 0 
    for { 
    time.Sleep(time.Duration(1) * time.Microsecond) 
    i++ 
    go insert(session, bob) 
    } 
} 

Я часто получаю ошибки как:

panic: Could not insert into database 

или

panic: write tcp 127.0.0.1:27017: i/o timeout 
+0

@EvanShaw, что вы рекомендуете вместо этого? – thwd

+1

Трудно сделать рекомендацию, не зная ничего о рассматриваемом приложении, но PostgreSQL часто является хорошим выбором по умолчанию. –

+2

Если это круто, чтобы писать мнения, я собираюсь предложить противоположное мнение @EvanShaw. Мы используем Mongo в производстве и любим его. Мы получаем отличную производительность. Не позволяйте ненавистникам возиться с вами. Мне еще предстоит услышать спор против Монго, на который нельзя ответить. Дэвид Миттон из Server Density написал [потрясающий контрапункт] (https://blog.serverdensity.com/does-everyone-hate-mongodb/) полтора года назад, который я рекомендую читать. – Tyson

ответ

6

Я подозреваю, что вы получите гораздо более высокую производительность, если вы allow Go to use multiple threads и Copy() then Close() ваши сессии.

Чтобы ответить на ваш вопрос, это, вероятно, идеальный вариант использования для канала. Подавайте предметы в канал в одном канате и потребляйте их/записывайте их в Mongo в другом. Вы можете настроить размер канала в соответствии с вашими потребностями. Поток производителя блокируется, когда канал будет заполнен, когда он попытается отправить его.

Вы также можете играть с настройками метода Safe(). Установка W: 0 заставит Mongo работать в режиме «огонь и забыть», что значительно ускорит работу, рискуя потерять некоторые данные. Вы также можете изменить время ожидания.

+0

Хорошо спасибо. Я попробую и вернусь к вам завтра – Gary

+0

Я попытался использовать Copy(), а затем Close(). У меня есть некоторые ошибки, которые выглядят так: 2014/01/27 18:28:36 http: Accept error: accept tcp [::]: 9090: слишком много открытых файлов; retrying in 1s – Gary

+0

Также я посмотрел на параметр Safe(), и я в очень опасном режиме, поэтому он должен быть очень быстрым – Gary

0

Я еще не тестировал, но я думаю, что этот код должен работать. Я получаю эту проблему после продолжительного сеанса, так что у меня есть таймер для возобновления сеанса каждый определенный раз.

package main 

import (
    "gopkg.in/mgo.v2" 
    "time" 
    "fmt" 
) 

// in database name is a string and age is an int 

type Dog struct{ 
    Breed string "breed" 
} 

type Person struct{ 
    Name string "name" 
    Pet Dog `bson:",inline"` 
    Ts  time.Time 
} 

func insert(session *mgo.Session, bob Person){ 
    err := session.DB("db_log").C("people").Insert(&bob) 
    if err != nil { 
    panic("Could not insert into database") 
    } 
} 

func main() { 
    current_session, _ := mgo.Dial("localhost:27017") 
    using_session := current_session 
    bob := Person{Name : "Robert", Pet : Dog{}} 

    /* 
    * this technical to prevent connect timeout after long time connection on mongodb from golang session 
    * Idea is simple: the session will be renew after certain time such as 1 hour 
    */ 
    //ticker := time.NewTicker(time.Hour * 1) 

    //Set 10 seconds for test 
    ticker := time.NewTicker(time.Second * 10) 

    go func() { 

    for t := range ticker.C { 
     fmt.Println("Tick at", t) 
     new_session := current_session.Copy() 
     fmt.Printf("Current session here %p\n", current_session) 
     fmt.Printf("New session here %p\n", new_session) 
     using_session = new_session 
     //setTimeout 30 second before close old sesion, to make sure current instance use current connection isn't affect 
     //time.AfterFunc(time.Second * 30, func() { 

     //Set 2 seconds for test 
     time.AfterFunc(time.Second * 2, func() { 

     //close previous session 

     current_session.Close() 
     current_session = new_session 

     //assign to new session 

     }) 

    } 
    }() 

    i := 0 
    for { 
    time.Sleep(time.Duration(1) * time.Microsecond) 
    i++ 
    go insert(using_session, bob) 
    } 

} 

 Смежные вопросы

  • Нет связанных вопросов^_^