Я использовал ZMQ в некоторых приложениях Python некоторое время, но только совсем недавно я решил переопределить один из них в Go, и я понял, что сокеты ZMQ не являются потокобезопасными.Как справиться с гнездами ZMQ недостатком безопасности потока?
Оригинальная реализация Python использует цикл событий, который выглядит следующим образом:
while running:
socks = dict(poller.poll(TIMEOUT))
if socks.get(router) == zmq.POLLIN:
client_id = router.recv()
_ = router.recv()
data = router.recv()
requests.append((client_id, data))
for req in requests:
rep = handle_request(req)
if rep:
replies.append(rep)
requests.remove(req)
for client_id, data in replies:
router.send(client_id, zmq.SNDMORE)
router.send(b'', zmq.SNDMORE)
router.send(data)
del replies[:]
Проблема заключается в том, что ответ не может быть готов на первом проходе, поэтому каждый раз, когда я в ожидании запросов, я должен опрашивать с очень коротким таймаутом или клиенты будут ждать больше, чем нужно, и приложение заканчивается использованием большого количества CPU для опроса.
Когда я решил переопределять его в Go, я думал, что это будет так просто, как это, избегая проблемы, связанные с использованием бесконечного таймаута опроса:
for {
sockets, _ := poller.Poll(-1)
for _, socket := range sockets {
switch s := socket.Socket; s {
case router:
msg, _ := s.RecvMessage(0)
client_id := msg[0]
data := msg[2]
go handleRequest(router, client_id, data)
}
}
}
Но это идеальное выполнение работ только тогда, когда у меня есть подключен один клиент или легкая нагрузка. При большой нагрузке я получаю ошибки случайного утверждения внутри libzmq. Я попытался следующие:
После zmq4 docs я попытался добавить sync.Mutex и блокировки/разблокировки всех операций с сокетами. Это не удается. Я предполагаю, что это потому, что ZMQ использует свои потоки для промывки.
Создание одного goroutine для опроса/приема, а также для отправки и использования каналов таким же образом, что и я использовал очереди req/rep в версии Python. Он терпит неудачу, поскольку я все еще использую сокет.
То же, что и 2, но установка
GOMAXPROCS=1
. Он терпит неудачу, и пропускная способность была очень ограниченной, потому что ответы сдерживались до тех пор, пока не вернется звонокPoll()
.Используйте каналы req/rep как в 2, но используйте
runtime.LockOSThread
, чтобы все операции сокета выполнялись в том же потоке, что и сокет. Имеет ту же проблему, что и выше. Это не подводит, но пропускная способность была очень ограниченной.То же, что и 4, но с использованием стратегии тайм-аута опроса из версии Python. Он работает, но имеет ту же проблему, что и версия Python.
Разделите контекст вместо сокета и создайте один сокет для отправки и один для приема в отдельных goroutines, обмениваясь данными с каналами. Он работает, но мне придется переписать клиентские библиотеки, чтобы использовать два сокета вместо одного.
Избавьтесь от zmq и используйте необработанные TCP-сокеты, которые являются потокобезопасными. Он отлично работает, но мне также придется переписать клиентские библиотеки.
Итак, это выглядит как 6, как ZMQ был действительно предназначен для использования, так как это единственный способ, которым я получил его легко работать с goroutines, но мне интересно, есть ли другой способ, которым я не пробовал , Есть идеи?
Update
С ответами здесь я понял, что я могу просто добавить гнездо inproc
PULL к Poller и имеют goroutine подключения и нажмите байт вырваться из бесконечного ожидания.Это не так универсально, как предлагаемые здесь решения, но он работает, и я могу даже выполнить резервное копирование его на версию Python.
Какую библиотеку ZMQ вы используете на стороне Go? – sberry
@sberry github.com/pebbe/zmq4, связанный на # 1 –
Я новичок в мире Go - есть ли причина, отличная от последовательности, которую вы использовали бы в сокетах inproc, а не в гибридной системе, которая использует ZMQ для внешних коммуникации и каналы для внутренней связи? – Jason