2017-02-14 6 views
1

Я столкнулся с запутанным поведением с помощью Net :: AMQP :: RabbitMQ и fork(). Если бы я ...Публикация в RabbitMQ бесшумно терпит неудачу в родительском процессе после разветвления дочернего элемента

  1. Установление соединения с RabbitMQ в родительском процессе
  2. Опубликовать сообщение
  3. Вилка ребенка и ждать его, чтобы выйти (ребенок спит)
  4. Опубликовать сообщение

... второе сообщение на самом деле не отправлено в RabbitMQ (и никаких ошибок не выбрасывается). Я провел много тестов, включая проверку $connection->is_connected() перед отправкой. Несколько интересные лакомые кусочки из моих экспериментов:

  • Если я пытаюсь открыть новый канал перед публикацией второго сообщения, то $connection->open_channel($newChannelId) зависает вызов.
  • Если я разрешаю ребенку продолжать работать, пока родитель публикует второе сообщение (и дождитесь окончания до waitpid), он успешно отправлен.

Я ищу способ обнаружить, что это соединение больше не действует, когда раздвоенный ребенок выходит, и принудительно отключить/снова подключиться. Я кэширую соединение в модуле perl, который используется различными другими модулями в системе, и я не знаю, если/когда эти другие модули fork() будут работать параллельно. Я не могу надежно установить обработчик $SIG{CHLD} и отказаться от соединения, когда сигнал получен, потому что другой код может перезаписать мой обработчик. Единственная пуленепробиваемая опция, которую я имею, - это отбросить кеш и подключиться для каждого сообщения, но это замедляет публикацию ставок значительно (в 30 раз).

Этот сценарий демонстрирует проблему (публикации на тему обмена называется 'вещать'):

#!/usr/bin/perl 
use strict; 
use Net::AMQP::RabbitMQ; 
use JSON -support_by_pp; 

my $connection; 
my $channelId = 0; 

sub sendToRabbit { 
    my ($message) = @_; 
    print "Sending message $message->{message}\n"; 
    my $contentType = 'application/json'; 
    my $payload = encode_json $message; 
    $connection->publish($channelId, 'test.route', $payload, { exchange => 'broadcast', force_utf8_in_header_strings => 1 }, { 'content_type' => $contentType }); 
    print "Sent!\n"; 
} 

sub main { 
    print "Starting...\n"; 

    $connection = Net::AMQP::RabbitMQ->new(); 
    $connection->connect('localhost', { user => 'guest', password => 'guest', port => 5672 }); 
    $connection->channel_open(++$channelId); 
    print "Connected!\n"; 

    # send first message 
    sendToRabbit({ message => 'body 1' }); 

    # fork child 
    my $child = fork(); 
    if(!$child) { 
    # child 
    sleep(1); 
    print "child exiting...\n"; 
    exit(0); 
    } 
    else { 
    # parent 
    waitpid($child, 0); 
    } 
    print "parent continuing...\n"; 

    # send second message - this will not be actually sent. 
    sendToRabbit({ message => 'body 2' }); 

    # allow I/O to settle... 
    sleep(1); 
} 

main(); 

EDIT: Раствор

Благодаря Ikegami пролить свет на решение!

В моем объекте управления RabbitMQ я ввел некоторый код в процедуру connect(), которая позволяет мне выборочно пропускать деструктор для раздвоенных детей, которые сами не вызывают connect(). Это, кажется, имеет желаемый эффект.

# Connect to RabbitMQ and create a channel 
sub connect { 
    my ($self) = @_; 

    $self->{pid} = $$; 

    # if we redefined the destructor and connect is called, we need to revert 
    # it so it can be redefined again properly 
    no warnings qw(redefine); 
    if($self->{original_destructor}) { 
    # reset original destructor 
    *Net::AMQP::RabbitMQ::DESTROY = $self->{original_destructor}; 
    delete $self->{original_destructor}; 
    } 

    # define alternate destructor so forked children that do not call "connect" do 
    # not destroy our connection 
    { 
    $self->debug("Overridding constructor..."); 
    $self->{original_destructor} = Net::AMQP::RabbitMQ->can('DESTROY'); 
    # only destroy the connection if the current pid is the owner's pid 
    my $new_destructor = sub { if($self->{pid} eq $$) { $self->debug("Destroying $_[0]!\n"); $self->{original_destructor}->(); } }; 
    *Net::AMQP::RabbitMQ::DESTROY = $new_destructor; 
    } 

    my $connection = Net::AMQP::RabbitMQ->new(); 
    $connection->connect('localhost', { user => $self->{username}, password => $self->{password}, port => $PORT, vhost => $VHOST }); 
    $self->{connection} = $connection; 
    $self->{channel} = $self->createChannel(); 

    1; 
} 

ответ

2

Ребенок является клоном родителя, а файл обрабатывает родительский объект совместно с дочерним элементом. В качестве копии родителя у ребенка есть копия $connection. Когда ребенок выходит, этот объект уничтожается, вызывая его деструктор, отправляя команду RabbitMQ, чтобы закрыть соединение.

Вы можете увидеть это, добавив

{ 
    my $old_destructor = Net::AMQP::RabbitMQ->can('DESTROY'); 
    my $new_destructor = sub { print("Destroying $_[0]!\n"); $old_destructor->(); }; 
    no warnings qw(redefine); 
    *Net::AMQP::RabbitMQ::DESTROY = $new_destructor; 
} 

Возможные решения:

  • Переместить код ребенка в отдельный файл, и exec этот файл.
  • Передвиньте дочерний код в подпункт, который вызывается при вызове сценария с «секретным» параметром, и перезапустите его с помощью exec с этим параметром.
  • Создайте ребенка раньше. В частности, создайте его перед созданием соединения RabbitMQ.
  • Создайте ребенка, чтобы сделать материал RabbitMQ.
  • Используйте нить вместо дочернего процесса.

PS — Не пишите fork + exec код самостоятельно. По крайней мере используйте open3.

sub spawn { 
    open(local *CHILD_STDIN, '<', '/dev/null') or die $!; 
    return open3('<&CHILD_STDIN', '>&STDOUT', '>&STDERR', @_); 
} 

sendToRabbit({ message => 'body 1' }); 

my $pid = spawn('child.pl');  
waitpid($pid, 0); 

sendToRabbit({ message => 'body 2' }); 
+0

Благодарим за быстрый и подробный ответ ... Я понял, что это что-то вроде этого, но не знал, как его проверить (спасибо за фрагмент). +1. – Voluntari