2009-04-15 9 views
14

Проблема: Я хочу реализовать несколько php-рабочих процессов, которые прослушивают очередь MQ-сервера для асинхронных заданий. Проблема в том, что просто запуск этих процессов в качестве демонов на сервере не дает мне никакого контроля над экземплярами (Load, Status, locked) ... кроме, возможно, для сброса ps-aux. Из-за этого я ищу среду выполнения, которая позволяет мне контролировать и контролировать экземпляры на уровне системы (процесса) или на более высоком уровне (какой-то сервер приложений в стиле Java)PHP Daemon/рабочая среда

Любой указатели?

+0

Также смотрите: http://symfony.com/doc/master/components/process.html –

ответ

12

Вот код, который может пригодиться.

<? 
define('WANT_PROCESSORS', 5); 
define('PROCESSOR_EXECUTABLE', '/path/to/your/processor'); 
set_time_limit(0); 
$cycles = 0; 
$run = true; 
$reload = false; 
declare(ticks = 30); 

function signal_handler($signal) { 
    switch($signal) { 
    case SIGTERM : 
     global $run; 
     $run = false; 
     break; 
    case SIGHUP : 
     global $reload; 
     $reload = true; 
     break; 
    } 
} 

pcntl_signal(SIGTERM, 'signal_handler'); 
pcntl_signal(SIGHUP, 'signal_handler'); 

function spawn_processor() { 
    $pid = pcntl_fork(); 
    if($pid) { 
     global $processors; 
     $processors[] = $pid; 
    } else { 
     if(posix_setsid() == -1) 
      die("Forked process could not detach from terminal\n"); 
     fclose(stdin); 
     fclose(stdout); 
     fclose(stderr); 
     pcntl_exec(PROCESSOR_EXECUTABLE); 
     die('Failed to fork ' . PROCESSOR_EXECUTABLE . "\n"); 
    } 
} 

function spawn_processors() { 
    global $processors; 
    if($processors) 
     kill_processors(); 
    $processors = array(); 
    for($ix = 0; $ix < WANT_PROCESSORS; $ix++) 
     spawn_processor(); 
} 

function kill_processors() { 
    global $processors; 
    foreach($processors as $processor) 
     posix_kill($processor, SIGTERM); 
    foreach($processors as $processor) 
     pcntl_waitpid($processor); 
    unset($processors); 
} 

function check_processors() { 
    global $processors; 
    $valid = array(); 
    foreach($processors as $processor) { 
     pcntl_waitpid($processor, $status, WNOHANG); 
     if(posix_getsid($processor)) 
      $valid[] = $processor; 
    } 
    $processors = $valid; 
    if(count($processors) > WANT_PROCESSORS) { 
     for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--) 
      posix_kill($processors[$ix], SIGTERM); 
     for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--) 
      pcntl_waitpid($processors[$ix]); 
    } elseif(count($processors) < WANT_PROCESSORS) { 
     for($ix = count($processors); $ix < WANT_PROCESSORS; $ix++) 
      spawn_processor(); 
    } 
} 

spawn_processors(); 

while($run) { 
    $cycles++; 
    if($reload) { 
     $reload = false; 
     kill_processors(); 
     spawn_processors(); 
    } else { 
     check_processors(); 
    } 
    usleep(150000); 
} 
kill_processors(); 
pcntl_wait(); 
?> 
+0

Где вы это взяли? Проект с открытым исходным кодом или собственный код? Любая документация или объяснение того, что именно происходит здесь? – leek

+0

Мой собственный код. Я не склонен это объяснять, нет. – chaos

+0

Также https://github.com/kvz/system_daemon. – HyderA

1

Вам действительно нужно, чтобы он работал непрерывно?

Если вы хотите только создать новый процесс по запросу, вы можете зарегистрировать его как услугу в xinetd.

+0

Нерезидентный аспект не является большой проблемой imho, потому что количество рабочих зависит от производительности системы, которая обычно является постоянной. Более важным был бы мониторинг аспекта статуса отдельного работника (разбился, что угодно). Один инструмент, который я только что открыл для этого, может быть DJBs deamontools – Sebastian

+0

Это один из вариантов. Для мониторинга вы также можете использовать файлы flock() - ed PID. После сбоя все блокировки освобождаются. – vartec

4

Похоже, что у вас уже есть MQ и работает в системе * nix, и вы просто хотите управлять людьми.

Очень простой способ сделать это - использовать экран GNU. Для того, чтобы начать 10 работников вы можете использовать:

#!/bin/sh 
for x in `seq 1 10` ; do 
screen -dmS worker_$x php /path/to/script.php worker$x 
end 

Это запустит 10 рабочих в фоновом режиме, используя экраны с именем worker_1,2,3 и так далее.

Вы можете подключиться к экранам, запустив экран -r worker_ и перечислив работающих сотрудников с помощью списка экранов.

Для получения дополнительной информации данное руководство может оказаться полезным: http://www.kuro5hin.org/story/2004/3/9/16838/14935

Попробуйте также:

  • экрана --help
  • человек экран
  • или google.

Для производственных серверов я обычно рекомендую использовать обычные сценарии запуска системы, но в течение многих лет я запускал команды экрана из сценариев запуска без проблем.

0

пыльник нашей рабочей реализации @chaos ответа. Код для обработки сигналов был удален, так как этот скрипт живет обычно всего несколько миллисекунд.

Кроме того, в коде мы добавили 2 функции для сохранения сообщений между вызовами: restore_processors_state() и save_processors_state(). Мы использовали redis, но вы можете решить использовать реализацию в файлах.

Мы запускаем этот скрипт каждую минуту, используя cron. Cron проверяет, все ли процессы активны. Если нет - он снова запускает их и затем умирает. Если мы хотим убить существующие процессы, мы просто запускаем этот скрипт с аргументом kill: php script.php kill.

Очень удобный способ запускать рабочих без ввода скриптов в init.d.

<?php 

include_once dirname(__FILE__) . '/path/to/bootstrap.php'; 

define('WANT_PROCESSORS', 5); 
define('PROCESSOR_EXECUTABLE', '' . dirname(__FILE__) . '/path/to/worker.php'); 
set_time_limit(0); 

$run = true; 
$reload = false; 
declare(ticks = 30); 

function restore_processors_state() 
{ 
    global $processors; 

    $redis = Zend_Registry::get('redis'); 
    $pids = $redis->hget('worker_procs', 'pids'); 

    if(!$pids) 
    { 
     $processors = array(); 
    } 
    else 
    { 
     $processors = json_decode($pids, true); 
    } 
} 

function save_processors_state() 
{ 
    global $processors; 

    $redis = Zend_Registry::get('redis'); 
    $redis->hset('worker_procs', 'pids', json_encode($processors)); 
} 

function spawn_processor() { 
    $pid = pcntl_fork(); 
    if($pid) { 
     global $processors; 
     $processors[] = $pid; 
    } else { 
     if(posix_setsid() == -1) 
      die("Forked process could not detach from terminal\n"); 
     fclose(STDIN); 
     fclose(STDOUT); 
     fclose(STDERR); 
     pcntl_exec('/usr/bin/php', array(PROCESSOR_EXECUTABLE)); 
     die('Failed to fork ' . PROCESSOR_EXECUTABLE . "\n"); 
    } 
} 

function spawn_processors() { 
    restore_processors_state(); 

    check_processors(); 

    save_processors_state(); 
} 

function kill_processors() { 
    global $processors; 
    foreach($processors as $processor) 
     posix_kill($processor, SIGTERM); 
    foreach($processors as $processor) 
     pcntl_waitpid($processor, $trash); 
    unset($processors); 
} 

function check_processors() { 
    global $processors; 
    $valid = array(); 
    foreach($processors as $processor) { 
     pcntl_waitpid($processor, $status, WNOHANG); 
     if(posix_getsid($processor)) 
      $valid[] = $processor; 
    } 
    $processors = $valid; 
    if(count($processors) > WANT_PROCESSORS) { 
     for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--) 
      posix_kill($processors[$ix], SIGTERM); 
     for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--) 
      pcntl_waitpid($processors[$ix], $trash); 
    } 
    elseif(count($processors) < WANT_PROCESSORS) { 
     for($ix = count($processors); $ix < WANT_PROCESSORS; $ix++) 
      spawn_processor(); 
    } 
} 

if(isset($argv) && count($argv) > 1) { 
    if($argv[1] == 'kill') { 
     restore_processors_state(); 
     kill_processors(); 
     save_processors_state(); 

     exit(0); 
    } 
} 

spawn_processors();