Я реализую систему для распределенного выполнения cronjob (так называемый кластер cron computing). Cronjobs должны быть поставлены в очередь в очередь сообщений (RabbitMQ), когда время действия есть. С другой стороны (узлы/рабочие кластера) является демоном Perl, использующим AnyEvent::RabbitMQ
, чтобы получить ровно одну cronjob/задачу/сообщение из очереди сообщений, обработать задачу и запросить еще одну команду cronjob/task/message из очереди сообщений и так далее.Как бороться с AnyEvent, RabbitMQ (сердцебиение) и длительными работами на Perl?
Я использую функцию Heartbeat RabbitMQ, которая реализована с помощью AnyEvent::RabbitMQ
, чтобы помочь RabbitMQ идентифицировать сломанные соединения.
Не обращайте внимания на фактическое значение интервала сердцебиения! У меня также очень длительные рабочие места, которые занимают несколько дней. Таким образом, установка интервала в несколько самый длинный cronjob будет не вариантом.
См. Следующий фрагмент для выполнения фактической cronjob внутри рабочего демона Perl. Он реализован в таймере «AnyEvent->», а не для DoSing RabbitMQ для сообщения. Этот метод был использован из-за того, что RabbitMQ's consume
был запрещен (руководством).
sub _timer_tick {
$rabbitmq_channel->get(
queue => 'job_queue',
on_success => sub {
my ($amqp_method) = @_;
if (not $amqp_method->{empty}) {
pause_timer();
progress_job($amqp_method);
resume_timer();
}
},
on_failure => sub { $quit_programm->send('RABBITMQ_ERROR', @_) },
);
return;
}
progress_job()
где сообщение получает разобраны и задание будет выполнено. pause_timer()
и resume_timer()
управляет AnyEvent->timer
, который запускает _timer_tick()
.
use Capture::Tiny 'capture';
sub progress_job {
my ($amqp_method) = @_;
my $job = decode_json($amqp_method->{body}->to_raw_payload());
my ($stdout, $stderr, $exit) = capture {
system $job->{execute};
};
return;
}
Первые длительные рабочие задания вошли в систему, и система «сработала» с различными сообщениями об ошибках. Иногда он выдает «Неизвестный идентификатор канала: 1», иначе он бросает «Канал уже закрыт». Таким образом, я сделал «тупой отлаживать» (пытаясь испортить конфигурацию) и выяснил, что, когда интервал heartbeat
короче времени, принятого в пределах progress_job()
, эти ошибки будут выброшены. После некоторых размышлений это имеет смысл. progress_job()
является блокирующей подпрограммой, и AnyEvent не может продолжать отправку пакетов пульса в RabbitMQ.
Мое первое соображение по решению проблемы блокирующего теплового удара состояло в том, чтобы развить вилку и сделать progress_job()
в процессе ребенка. AnyEvents documentation on FORK указывает, что при использовании в системе fork
сохраняется доступ к системе событий (например, через AnyEvent). Следующая мысль: ОК, доступ к системе событий отсутствует, поэтому я могу сделать вилку. НО: Таймер должен возобновиться (resume_timer()
) ПОСЛЕ progress_job()
вернулся. Теоретически resume_timer()
будет называться сразу после fork()
, а не после progress_job()
. Поэтому я прекратил свою реализацию.
Мой вопрос: как решить последний бит? Как resume_timer()
после progress_job()
(или, другими словами, раздвоенный ребенок) возвращается? Я не могу поставить resume_timer()
внутри ребенка из-за разветвления, и система событий не является потокобезопасной.
'$ run_cmd_cv-> recv' дает мне' 256' вместо ожидаемого '1'. Команда 'perl -E 'exit 1'; echo $? 'также echos' 1'. Как получить фактический код ошибки выполненной команды? – burnersk
bash '$?' Отличается от ''? 'Perl. См. ['System'] (http://perldoc.perl.org/functions/system.html). – ikegami
Спасибо, что указали это.Я добавил '$ exit = $ exit >> 8, если $ exit && $ exit> 255;' и теперь он работает. – burnersk