2016-01-18 12 views
3

Я реализую систему для распределенного выполнения cronjob (так называемый кластер cron computing). Cronjobs должны быть поставлены в очередь в очередь сообщений (RabbitMQ), когда время действия есть. С другой стороны (узлы/рабочие кластера) является демоном Perl, использующим AnyEvent::RabbitMQ, чтобы получить ровно одну cronjob/задачу/сообщение из очереди сообщений, обработать задачу и запросить еще одну команду cronjob/task/message из очереди сообщений и так далее.Как бороться с AnyEvent, RabbitMQ (сердцебиение) и длительными работами на Perl?

Я использую функцию Heartbeat RabbitMQ, которая реализована с помощью AnyEvent::RabbitMQ, чтобы помочь RabbitMQ идентифицировать сломанные соединения.

Не обращайте внимания на фактическое значение интервала сердцебиения! У меня также очень длительные рабочие места, которые занимают несколько дней. Таким образом, установка интервала в несколько самый длинный cronjob будет не вариантом.

См. Следующий фрагмент для выполнения фактической cronjob внутри рабочего демона Perl. Он реализован в таймере «AnyEvent->», а не для DoSing RabbitMQ для сообщения. Этот метод был использован из-за того, что RabbitMQ's consume был запрещен (руководством).

sub _timer_tick { 

    $rabbitmq_channel->get(
    queue  => 'job_queue', 
    on_success => sub { 
     my ($amqp_method) = @_; 
     if (not $amqp_method->{empty}) { 
     pause_timer(); 
     progress_job($amqp_method); 
     resume_timer(); 
     } 
    }, 
    on_failure => sub { $quit_programm->send('RABBITMQ_ERROR', @_) }, 
); 

    return; 
} 

progress_job() где сообщение получает разобраны и задание будет выполнено. pause_timer() и resume_timer() управляет AnyEvent->timer, который запускает _timer_tick().

use Capture::Tiny 'capture'; 
sub progress_job { 
    my ($amqp_method) = @_; 
    my $job = decode_json($amqp_method->{body}->to_raw_payload()); 
    my ($stdout, $stderr, $exit) = capture { 
    system $job->{execute}; 
    }; 
    return; 
} 

Первые длительные рабочие задания вошли в систему, и система «сработала» с различными сообщениями об ошибках. Иногда он выдает «Неизвестный идентификатор канала: 1», иначе он бросает «Канал уже закрыт». Таким образом, я сделал «тупой отлаживать» (пытаясь испортить конфигурацию) и выяснил, что, когда интервал heartbeat короче времени, принятого в пределах progress_job(), эти ошибки будут выброшены. После некоторых размышлений это имеет смысл. progress_job() является блокирующей подпрограммой, и AnyEvent не может продолжать отправку пакетов пульса в RabbitMQ.

Мое первое соображение по решению проблемы блокирующего теплового удара состояло в том, чтобы развить вилку и сделать progress_job() в процессе ребенка. AnyEvents documentation on FORK указывает, что при использовании в системе fork сохраняется доступ к системе событий (например, через AnyEvent). Следующая мысль: ОК, доступ к системе событий отсутствует, поэтому я могу сделать вилку. НО: Таймер должен возобновиться (resume_timer()) ПОСЛЕ progress_job() вернулся. Теоретически resume_timer() будет называться сразу после fork(), а не после progress_job(). Поэтому я прекратил свою реализацию.

Мой вопрос: как решить последний бит? Как resume_timer() после progress_job() (или, другими словами, раздвоенный ребенок) возвращается? Я не могу поставить resume_timer() внутри ребенка из-за разветвления, и система событий не является потокобезопасной.

ответ

3

AE не может обрабатывать события, если программа не заблокирована с помощью вызова, поддерживающего AE. system не поддерживает AE. Используйте вместо этого run_cmd от AnyEvent::Util.

+0

'$ run_cmd_cv-> recv' дает мне' 256' вместо ожидаемого '1'. Команда 'perl -E 'exit 1'; echo $? 'также echos' 1'. Как получить фактический код ошибки выполненной команды? – burnersk

+1

bash '$?' Отличается от ''? 'Perl. См. ['System'] (http://perldoc.perl.org/functions/system.html). – ikegami

+0

Спасибо, что указали это.Я добавил '$ exit = $ exit >> 8, если $ exit && $ exit> 255;' и теперь он работает. – burnersk