Я столкнулся с запутанным поведением с помощью Net :: AMQP :: RabbitMQ и fork(). Если бы я ...Публикация в RabbitMQ бесшумно терпит неудачу в родительском процессе после разветвления дочернего элемента
- Установление соединения с RabbitMQ в родительском процессе
- Опубликовать сообщение
- Вилка ребенка и ждать его, чтобы выйти (ребенок спит)
- Опубликовать сообщение
... второе сообщение на самом деле не отправлено в RabbitMQ (и никаких ошибок не выбрасывается). Я провел много тестов, включая проверку $connection->is_connected()
перед отправкой. Несколько интересные лакомые кусочки из моих экспериментов:
- Если я пытаюсь открыть новый канал перед публикацией второго сообщения, то
$connection->open_channel($newChannelId)
зависает вызов. - Если я разрешаю ребенку продолжать работать, пока родитель публикует второе сообщение (и дождитесь окончания до
waitpid
), он успешно отправлен.
Я ищу способ обнаружить, что это соединение больше не действует, когда раздвоенный ребенок выходит, и принудительно отключить/снова подключиться. Я кэширую соединение в модуле perl, который используется различными другими модулями в системе, и я не знаю, если/когда эти другие модули fork()
будут работать параллельно. Я не могу надежно установить обработчик $SIG{CHLD}
и отказаться от соединения, когда сигнал получен, потому что другой код может перезаписать мой обработчик. Единственная пуленепробиваемая опция, которую я имею, - это отбросить кеш и подключиться для каждого сообщения, но это замедляет публикацию ставок значительно (в 30 раз).
Этот сценарий демонстрирует проблему (публикации на тему обмена называется 'вещать'):
#!/usr/bin/perl
use strict;
use Net::AMQP::RabbitMQ;
use JSON -support_by_pp;
my $connection;
my $channelId = 0;
sub sendToRabbit {
my ($message) = @_;
print "Sending message $message->{message}\n";
my $contentType = 'application/json';
my $payload = encode_json $message;
$connection->publish($channelId, 'test.route', $payload, { exchange => 'broadcast', force_utf8_in_header_strings => 1 }, { 'content_type' => $contentType });
print "Sent!\n";
}
sub main {
print "Starting...\n";
$connection = Net::AMQP::RabbitMQ->new();
$connection->connect('localhost', { user => 'guest', password => 'guest', port => 5672 });
$connection->channel_open(++$channelId);
print "Connected!\n";
# send first message
sendToRabbit({ message => 'body 1' });
# fork child
my $child = fork();
if(!$child) {
# child
sleep(1);
print "child exiting...\n";
exit(0);
}
else {
# parent
waitpid($child, 0);
}
print "parent continuing...\n";
# send second message - this will not be actually sent.
sendToRabbit({ message => 'body 2' });
# allow I/O to settle...
sleep(1);
}
main();
EDIT: Раствор
Благодаря Ikegami пролить свет на решение!
В моем объекте управления RabbitMQ я ввел некоторый код в процедуру connect()
, которая позволяет мне выборочно пропускать деструктор для раздвоенных детей, которые сами не вызывают connect()
. Это, кажется, имеет желаемый эффект.
# Connect to RabbitMQ and create a channel
sub connect {
my ($self) = @_;
$self->{pid} = $$;
# if we redefined the destructor and connect is called, we need to revert
# it so it can be redefined again properly
no warnings qw(redefine);
if($self->{original_destructor}) {
# reset original destructor
*Net::AMQP::RabbitMQ::DESTROY = $self->{original_destructor};
delete $self->{original_destructor};
}
# define alternate destructor so forked children that do not call "connect" do
# not destroy our connection
{
$self->debug("Overridding constructor...");
$self->{original_destructor} = Net::AMQP::RabbitMQ->can('DESTROY');
# only destroy the connection if the current pid is the owner's pid
my $new_destructor = sub { if($self->{pid} eq $$) { $self->debug("Destroying $_[0]!\n"); $self->{original_destructor}->(); } };
*Net::AMQP::RabbitMQ::DESTROY = $new_destructor;
}
my $connection = Net::AMQP::RabbitMQ->new();
$connection->connect('localhost', { user => $self->{username}, password => $self->{password}, port => $PORT, vhost => $VHOST });
$self->{connection} = $connection;
$self->{channel} = $self->createChannel();
1;
}
Благодарим за быстрый и подробный ответ ... Я понял, что это что-то вроде этого, но не знал, как его проверить (спасибо за фрагмент). +1. – Voluntari