2016-09-05 9 views
2

Я пытаюсь написать процесс, который прослушивает ActiveMQ и на основе сообщения, выходит и захватывает данные из веб-службы, выполняет некоторую обработку, а затем помещает данные процесса в другой веб-сервис. (REST/JSON)AnyEvent :: STOMP :: Client + AnyEvent :: ForkManger = прерывистая ошибка

Модуль ниже работает нормально, пока один из неуклюжих веб-сервисов, с которыми я общаюсь, не вернет ошибку. Я пробовал много вещей, чтобы поймать ошибку, но пока не помогло. После того, как ошибка вебсервиса происходит, хотя я получаю следующее сообщение:

необработанного исключения обратного вызова по событию (MESSAGE, AnyEvent :: ТОПАЙТЕ :: Client = HASH (0x3ad5e48), HASH (0x3a6bbb0) { "действий": "создал", "данные": { "ID": 40578737, "Тип": "тревога", "кто": нулевая}, "справ": "ADCCEE0C-73A7-11E6-8084-74B346D1CA67", "Имя хоста": «MyServer», «PID»: 48632}): $ fork_manager-> старт() должен быть вызван в процессе менеджера

Хорошо, я понимаю, что концептуально дочерний процесс пытается запустить другой процесс, и что вилка менеджер говорит, что это нет. Но, учитывая приведенный ниже модуль, каков надлежащий способ запуска нового процесса для обработки длительной обработки. Или почему процесс ребенок умирает в результате чего это исключение, и как я могу предотвратить это

Вот модуль (усеченную)

package consumer; 

use AnyEvent::ForkManager; 
use AnyEvent::STOMP::Client; 
use JSON; 
use Data::Dumper; 
use v5.18; 
use Moose; 

sub run { 
    my $self = shift; 
    my $pm  = AnyEvent::ForkManager->new(max_workers => 20); 
    my $stomp = AnyEvent::STOMP::Client->new(); 

    $stomp->connect(); 
    $stomp->on_connected(sub { 
     my $stomp = shift; 
     $stomp->subscribe('/topic/test'); 
     say "Connected to STOMP"; 
    }); 

    $pm->on_start(sub { 
     my ($pm,$pid,@params) = @_; 
     say "Starting $pid worker"; 
    }); 

    $pm->on_finish(sub { 
     my ($pm, $pid,@params) = @_; 
     say "Finished $pid worker"; 
    }); 

    $pm->on_error(sub { 
     say Dumper(\@_); 
    }); 

    $stomp->on_message(sub { 
     my ($stomp, $header, $body) = @_; 
     my $href = decode_json $body; 
     $pm->start(cb => sub { 
      my ($pm, @params) = @_; 
      $self->process(@params); 
     }, 
     args => [ $href->{id}, $href->{data}->{type}, $href->{data}->{who} ], 
     ); 
    }); 

    my $cv = AnyEvent->condvar; 
    $cv->recv; 
} 

sub process { 
    say "Processing ".Dumper(\@_); 
    sleep 5; 
    if (int(rand(10)) < 5) { 
     die "OOPS"; # this triggers the error message above 
    } 
    say "Done Processing $_[1]"; 
} 

1; 

Heres драйвер для модуля выше:

#!/usr/bin/env perl 

use v5.18; 
use lib '.'; 
use consumer; 

my $c = consumer->new(); 
$c->run; 

Наконец генератор трафика, который вы можете использовать, чтобы увидеть это в действии:

#!/usr/bin/env perl 

use lib '../lib'; 
use lib '../../lib'; 
use v5.18; 

use Data::Dumper; 
use JSON; 
use Net::STOMP::Client; 

$ENV{'scot_mode'} = "testing"; 

my $stomp = Net::STOMP::Client->new(
    host => "127.0.0.1", 
    port => 61613 
); 
$stomp->connect(); 

for (my $i = 1; $i < 1000000; $i++) { 
    my $href = { 
     id  => $i, 
     type => "event", 
     what => "foo", 
    }; 
    my $json = encode_json $href; 
    say "Sending ".Dumper($href); 
    $stomp->send(
     destination => "/topic/test", 
     body  => $json, 
    ); 
} 

$stomp->disconnect(); 

ответ

1

Я был в состоянии ольве это с помощью Try :: Catch and wrapping call to self-> process with try catch:

$stomp->on_message(sub { 
     my ($stomp, $header, $body) = @_; 
     my $href = decode_json $body; 
     $pm->start(cb => sub { 
      my ($pm, @params) = @_; 
      try { 
       $self->process(@params); 
      } 
      catch { 
       # error handling stuff 
      }; 
     }, 
     args => [ $href->{id}, $href->{data}->{type}, $href->{data}->{who} ], 
     ); 
    } 
);