Мы используем asio в производстве уже много лет, а недавно мы достигли критической точки, когда наши серверы загружаются достаточно, чтобы заметить загадочную проблему.boost :: asio рассуждает за num_implementations для io_service :: strand
В нашей архитектуре каждый отдельный объект, который работает независимо, использует персональный объект strand
. Некоторые из объектов могут выполнять долгую работу (чтение из файла, выполнение запроса MySQL и т. Д.). Очевидно, что работа выполняется в обработчиках, завернутых цепочкой. Все звучит красиво и красиво и должно работать безупречно, пока мы не начнем замечать невозможные вещи, такие как таймеры, истекающие секунды после их появления, даже если потоки «ждут работы» и работа останавливается без видимых причин. Похоже, что долгая работа, выполненная внутри пряди, оказала влияние на другие несвязанные нити, не все из них, но большинство.
Бесчисленное количество часов было потрачено, чтобы выявить проблему. Дорожка привела к способу strand
объект создан: strand_service::construct
(here).
По какой-то причине разработчики решили ограничить количество реализаций strand
. Это означает, что некоторые полностью несвязанные объекты будут использовать единую реализацию и, следовательно, будут узкими.
В автономном режиме (non-boost) asio библиотека аналогичный подход используется. Но вместо общих реализаций каждая реализация теперь независима, но может совместно использовать объект mutex
с другими реализациями (here).
В чем дело? Я никогда не слышал о ограничениях на количество мьютексов в системе. Или любые накладные расходы, связанные с их созданием/уничтожением. Хотя последняя проблема может быть легко решена путем утилизации мьютексов вместо их уничтожения.
У меня есть простейший тест, чтобы показать, насколько драматичным является снижение производительности:
#include <boost/asio.hpp>
#include <atomic>
#include <functional>
#include <iostream>
#include <thread>
std::atomic<bool> running{true};
std::atomic<int> counter{0};
struct Work
{
Work(boost::asio::io_service & io_service)
: _strand(io_service)
{ }
static void start_the_work(boost::asio::io_service & io_service)
{
std::shared_ptr<Work> _this(new Work(io_service));
_this->_strand.get_io_service().post(_this->_strand.wrap(std::bind(do_the_work, _this)));
}
static void do_the_work(std::shared_ptr<Work> _this)
{
counter.fetch_add(1, std::memory_order_relaxed);
if (running.load(std::memory_order_relaxed)) {
start_the_work(_this->_strand.get_io_service());
}
}
boost::asio::strand _strand;
};
struct BlockingWork
{
BlockingWork(boost::asio::io_service & io_service)
: _strand(io_service)
{ }
static void start_the_work(boost::asio::io_service & io_service)
{
std::shared_ptr<BlockingWork> _this(new BlockingWork(io_service));
_this->_strand.get_io_service().post(_this->_strand.wrap(std::bind(do_the_work, _this)));
}
static void do_the_work(std::shared_ptr<BlockingWork> _this)
{
sleep(5);
}
boost::asio::strand _strand;
};
int main(int argc, char ** argv)
{
boost::asio::io_service io_service;
std::unique_ptr<boost::asio::io_service::work> work{new boost::asio::io_service::work(io_service)};
for (std::size_t i = 0; i < 8; ++i) {
Work::start_the_work(io_service);
}
std::vector<std::thread> workers;
for (std::size_t i = 0; i < 8; ++i) {
workers.push_back(std::thread([&io_service] {
io_service.run();
}));
}
if (argc > 1) {
std::cout << "Spawning a blocking work" << std::endl;
workers.push_back(std::thread([&io_service] {
io_service.run();
}));
BlockingWork::start_the_work(io_service);
}
sleep(5);
running = false;
work.reset();
for (auto && worker : workers) {
worker.join();
}
std::cout << "Work performed:" << counter.load() << std::endl;
return 0;
}
Построить его, используя следующую команду:
g++ -o asio_strand_test_case -pthread -I/usr/include -std=c++11 asio_strand_test_case.cpp -lboost_system
Тестовый прогон в обычном порядке:
time ./asio_strand_test_case
Work performed:6905372
real 0m5.027s
user 0m24.688s
sys 0m12.796s
Испытание с длительной блокировкой:
time ./asio_strand_test_case 1
Spawning a blocking work
Work performed:770
real 0m5.031s
user 0m0.044s
sys 0m0.004s
Разница драматична. Что происходит, каждая новая неблокирующая работа создает новый объект strand
до тех пор, пока он не будет иметь одну и ту же реализацию с strand
работы блокировки. Когда это происходит, это тупик, пока не закончится работа.
Edit: Уменьшенных параллельная работа до числа рабочих потоков (от 1000
до 8
) и обновленного выхода тестового прогона. Это потому, что, когда оба числа близки, проблема более заметна.
* «Мое предположение для того, чтобы делать такую вещь, было бы запретить перегрузку ОС, создав много-много-много мьютексов. Это было бы плохо». * Почему? Какие накладные расходы есть в стороне от небольшой постоянной памяти (на мьютекс)? –
@yurikilochek Это мьютексы. По определению они бесполезны, если они не используются для синхронизации. Это делает одновременным ожидание больших коллекций примитивов синхронизации. ':: WaitForMultipleObjectsEx' может не возражать, но это контекстный переключатель, это не просто несколько байт памяти. На linux нет такого звонка AFAIK. – sehe
@Arunmu Независимо от количества реализаций проблема будет сохраняться, потому что она находится в дизайне. Увеличение числа может выиграть некоторое время, но только в некоторой степени. В режиме реального времени это никогда не будет работать. Попробуйте мой пример с «рабочими объектами», равными числу потоков, то есть '8' вместо' 1000'. В этом случае реализация '1024' едва помогает (' Выполненная работа: 8331'). – GreenScape