2016-04-05 3 views
4

Я использовал ZMQ в некоторых приложениях Python некоторое время, но только совсем недавно я решил переопределить один из них в Go, и я понял, что сокеты ZMQ не являются потокобезопасными.Как справиться с гнездами ZMQ недостатком безопасности потока?

Оригинальная реализация Python использует цикл событий, который выглядит следующим образом:

while running: 
    socks = dict(poller.poll(TIMEOUT)) 
    if socks.get(router) == zmq.POLLIN: 
     client_id = router.recv() 
     _ = router.recv() 
     data = router.recv() 
     requests.append((client_id, data)) 

    for req in requests: 
     rep = handle_request(req) 
     if rep: 
      replies.append(rep) 
      requests.remove(req) 

    for client_id, data in replies: 
     router.send(client_id, zmq.SNDMORE) 
     router.send(b'', zmq.SNDMORE) 
     router.send(data) 
     del replies[:] 

Проблема заключается в том, что ответ не может быть готов на первом проходе, поэтому каждый раз, когда я в ожидании запросов, я должен опрашивать с очень коротким таймаутом или клиенты будут ждать больше, чем нужно, и приложение заканчивается использованием большого количества CPU для опроса.

Когда я решил переопределять его в Go, я думал, что это будет так просто, как это, избегая проблемы, связанные с использованием бесконечного таймаута опроса:

for { 
    sockets, _ := poller.Poll(-1) 
    for _, socket := range sockets { 
     switch s := socket.Socket; s { 
     case router: 
      msg, _ := s.RecvMessage(0) 
      client_id := msg[0] 
      data := msg[2] 
      go handleRequest(router, client_id, data)     
     } 
    } 
} 

Но это идеальное выполнение работ только тогда, когда у меня есть подключен один клиент или легкая нагрузка. При большой нагрузке я получаю ошибки случайного утверждения внутри libzmq. Я попытался следующие:

  1. После zmq4 docs я попытался добавить sync.Mutex и блокировки/разблокировки всех операций с сокетами. Это не удается. Я предполагаю, что это потому, что ZMQ использует свои потоки для промывки.

  2. Создание одного goroutine для опроса/приема, а также для отправки и использования каналов таким же образом, что и я использовал очереди req/rep в версии Python. Он терпит неудачу, поскольку я все еще использую сокет.

  3. То же, что и 2, но установка GOMAXPROCS=1. Он терпит неудачу, и пропускная способность была очень ограниченной, потому что ответы сдерживались до тех пор, пока не вернется звонок Poll().

  4. Используйте каналы req/rep как в 2, но используйте runtime.LockOSThread, чтобы все операции сокета выполнялись в том же потоке, что и сокет. Имеет ту же проблему, что и выше. Это не подводит, но пропускная способность была очень ограниченной.

  5. То же, что и 4, но с использованием стратегии тайм-аута опроса из версии Python. Он работает, но имеет ту же проблему, что и версия Python.

  6. Разделите контекст вместо сокета и создайте один сокет для отправки и один для приема в отдельных goroutines, обмениваясь данными с каналами. Он работает, но мне придется переписать клиентские библиотеки, чтобы использовать два сокета вместо одного.

  7. Избавьтесь от zmq и используйте необработанные TCP-сокеты, которые являются потокобезопасными. Он отлично работает, но мне также придется переписать клиентские библиотеки.

Итак, это выглядит как 6, как ZMQ был действительно предназначен для использования, так как это единственный способ, которым я получил его легко работать с goroutines, но мне интересно, есть ли другой способ, которым я не пробовал , Есть идеи?


Update

С ответами здесь я понял, что я могу просто добавить гнездо inproc PULL к Poller и имеют goroutine подключения и нажмите байт вырваться из бесконечного ожидания.Это не так универсально, как предлагаемые здесь решения, но он работает, и я могу даже выполнить резервное копирование его на версию Python.

+0

Какую библиотеку ZMQ вы используете на стороне Go? – sberry

+0

@sberry github.com/pebbe/zmq4, связанный на # 1 –

+0

Я новичок в мире Go - есть ли причина, отличная от последовательности, которую вы использовали бы в сокетах inproc, а не в гибридной системе, которая использует ZMQ для внешних коммуникации и каналы для внутренней связи? – Jason

ответ

4

I opened an issue a 1,5 года назад ввести порт https://github.com/vaughan0/go-zmq/blob/master/channels.go в pebbe/zmq4. В конечном счете автор решил отказаться от этого, но мы использовали это в производстве (в ОЧЕНЬ тяжелых тяжелых нагрузках) уже давно.

Это файл gist файла, который необходимо добавить в пакет pebbe/zmq4 (так как он добавляет методы в Socket). Это может быть переписано таким образом, что методы на приемнике Socket вместо этого приняли аргумент Socket, но поскольку мы все равно продаем наш код, это был простой путь вперед.

Основное применение заключается в создании вашего Socket как нормальный (назовем его s, например), то вы можете:

channels := s.Channels() 
outBound := channels.Out() 
inBound := channels.In() 

Теперь у вас есть два канала типа [][]byte, которые можно использовать между goroutines, но один goroutine - управляется в рамках абстракции каналов, отвечает за управление Poller и связь с сокетом.

+0

Забудьте мой предыдущий комментарий. Я понял. По сути, это то, что он использует «inproc», чтобы вырваться из бесконечного таймаута и обернуть все в каналы, чтобы сделать его более плавным. Хороший. Благодарю. –

1

Благословенный способ сделать это с помощью pebbe/zmq4 с помощью Reactor. Реакторы имеют возможность прослушивать каналы Go, но вы не хотите делать это, потому что они делают это путем периодического опроса канала, используя тайм-аут опроса, который повторяет ту же самую точную проблему, что и в вашей версии Python. Вместо этого вы можете использовать гнезда zmq inproc, один конец которого находится в реакторе, а другой конец - горутин, который передает данные из канала. Это сложно, многословно и неприятно, но я использовал его успешно.

+0

Я понял, но я просто понял, что могу просто использовать сокет 'inproc', чтобы вырваться из вызова poller.Poll (-1)', мне даже не нужно использовать реактор. Благодарю. –

+0

@PedroWerneck умный. Рад, что я могу помочь. – hobbs

 Смежные вопросы

  • Нет связанных вопросов^_^