Рубрики
Uncategorized

Система параллельной обработки очередей на основе опухших и красных

Автор оригинала: David Wong.

фон

PHP не поддерживает многопоточность, но, как идеальная система, многие операции должны выполняться асинхронно. Для выполнения этих асинхронных операций мы создали систему задач очередей на основе redis.

Как вы знаете, система обработки очереди сообщений в основном делится на две части: потребителей и производителей.

В нашей системе основной системой является производитель, а системой задач – потребитель.

Конкретный рабочий процесс выглядит следующим образом: 1. Основная система отправит имя задачи + параметры задачи, подлежащие обработке, в очередь. 2. Система задач может заполнять очередь задач в режиме реального времени. Задача из всплывающего окна разветвляет подпроцесс, и подпроцесс завершит определенную логику задачи.

Конкретный код выглядит следующим образом:

/**
 *Start Daemons
 */
public function runAction() {
    Tools::log_message('ERROR', 'daemon/run' . ' | action: restart', 'daemon-');
    while (true) {
        $this->fork_process();
    }
    exit;
}
 
/**
 *Create child process
 */
private function fork_process() {
    $ppid = getmypid();
    $pid = pcntl_fork();
    If ($PID = = 0) {// child process
        $pid = posix_getpid();
        //echo "* Process {$pid} was created \n\n";
        $this->mq_process();
        exit;
    }Else {// main process
        $PID = pcntl_wait ($status, untrached); // get the end status of the subprocess
        if (pcntl_wifexited($status)) {
            //echo "\n\n* Sub process: {$pid} exited with {$status}";
            //Tools::log_message('INFO', 'daemon/run succ' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid );
        } else {
            Tools::log_message('ERROR', 'daemon/run fail' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid, 'daemon-');
        }
    }
}
 
/**
 *Business task queue processing
 */
private function mq_process() {
    $data_pop = $this->masterRedis->rPop($this->redis_list_key);
    $data = json_decode($data_pop, 1);
    if (!$data) {
        return FALSE;
    }
    $worker = '_task_' . $data['worker'];
    $class_name = isset($data['class']) ? $data['class'] : 'TaskproModel';
    $params = $data['params'];
    $class = new $class_name();
    $class->$worker($params);
    return TRUE;
}

Это простая система обработки задач.

Благодаря этой системе задач мы добились асинхронности, которая стабильно работает уже почти год.

К сожалению, это единая технологическая система. Это постоянная вилка. Если есть задача, она будет обработана. Если задачи нет, она будет пропущена.

Он очень стабилен.

Но есть две проблемы: одна из них заключается в том, что постоянная вилка и pop будут тратить ресурсы сервера впустую; другая заключается в том, что параллелизм не поддерживается!

С первым все в порядке, но со вторым все очень серьезно.

Когда основная система одновременно выполняет большое количество задач, время обработки задач будет бесконечно увеличиваться.

Новый дизайн

Чтобы решить проблему параллелизма, мы планируем создать более эффективную и сильную систему командной обработки.

Поскольку многопоточность не поддерживается до php 7, мы используем многопроцессорную обработку.

Из Интернета, чтобы найти много информации, большинство так называемых мультипроцессов-это n процессов, работающих в фоновом режиме одновременно.

Очевидно, что это неуместно.

Я ожидаю, что каждый pop разветвляет задачу, как только у него появится задача, и подпроцесс завершится после выполнения задачи.

Возникшие проблемы

1. Как контролировать максимальное количество процессов Эта проблема очень проста, то есть каждый подпроцесс вилки будет увеличиваться один раз. Когда подпроцесс будет завершен, он будет вычтен один раз.

С автоматическим приращением проблем нет. Мы закончим основной процесс. Так как же уменьшить?

Вы могли бы сказать, конечно, что это в подпроцессе. Но здесь вам нужно отметить: при форке он копирует ресурс из основного процесса в подпроцесс, что означает, что вы не можете управлять счетчиком в основном процессе в подпроцессе!

Итак, здесь нам нужно понять один момент знания: сигнал.

В частности, вы можете самостоятельно погуглить. Здесь вы можете непосредственно ознакомиться с кодом.

// install signal handler for dead kids
pcntl_signal(SIGCHLD, array($this, "sig_handler"));

При этом устанавливается сигнальный процессор. Конечно, кое-чего еще не хватает.

declare(ticks = 1);

Declare-это оператор структуры управления. Пожалуйста, перейдите в Google для конкретного использования.

Этот код означает, что сигнальный процессор вызывается каждый раз, когда выполняется оператор низкого уровня.

Таким образом, сигнальный процессор будет вызываться в конце каждого подпроцесса, и мы сможем выполнить вычитание в сигнальном процессоре.

2. Как решить проблему остатков процесса

При разработке нескольких процессов, если их не обрабатывать должным образом, это приведет к остаткам процесса.

Чтобы устранить остатки процесса, подпроцесс должен быть переработан.

Итак, как переработать подпроцессы-это технический момент.

В демо-версии pcntl, включая множество сообщений в блоге, говорится, что подпроцессы перерабатываются в основном процессе.

Но мы основываемся на redis brpop, который заблокирован.

Это приводит к проблеме: при выполнении n задач основной процесс блокируется, когда система задач простаивает, а подпроцессы все еще выполняются при возникновении блокировки, поэтому повторная обработка последних подпроцессов не может быть завершена…

Здесь всегда была неразбериха, но это легко, когда я получаю сигнальный процессор.

Переработка процесса также вводится в сигнальный процессор.

Оценка новой системы

Pcntl является расширением процесса обработки, но его поддержка многопроцессорной обработки очень слаба.

Итак, вот процесс во всем расширении.

Конкретный код выглядит следующим образом:

declare(ticks = 1);
class JobDaemonController extends Yaf_Controller_Abstract{
 
    use Trait_Redis;
 
    private $maxProcesses = 800;
    private $child;
    private $masterRedis;
    Private $redis_task_wing ='task: Wing '; // queue to be processed
 
    public function init(){
        // install signal handler for dead kids
        pcntl_signal(SIGCHLD, array($this, "sig_handler"));
        set_time_limit(0);
        Ini set ('default socket timeout ', - 1); // the queue processing does not timeout. Resolve the redis error: read error on connection
    }
 
    private function redis_client(){
        $rds = new Redis();
        $rds->connect('redis.master.host',6379);
        return $rds;
    }
 
    Public function process (swoole [process $worker) {// first process
        $GLOBALS['worker'] = $worker;
        swoole_event_add($worker->pipe, function($pipe) {
            $worker = $GLOBALS['worker'];
            $recv = $worker->read();            //send data to master
 
            sleep(rand(1, 3));
            echo "From Master: $recv\n";
            $worker->exit(0);
        });
        exit;
    }
 
    public function testAction(){
        for ($i = 0; $i < 10000; $i++){
            $data = [
                'abc' => $i,
                'timestamp' => time().rand(100,999)
            ];
            $this->masterRedis->lpush($this->redis_task_wing, json_encode($data));
        }
        exit;
    }
 
    public function runAction(){
        while (1){
//            echo "\t now we de have $this->child child processes\n";
            if ($this->child < $this->maxProcesses){
                $rds = $this->redis_client();
                $data_pop = $RDS - > brpop ($this - > redis_task_wing, 3); // when there is no task, block waiting
                if (!$data_pop){
                    continue;
                }
                echo "\t Starting new child | now we de have $this->child child processes\n";
                $this->child++;
                $process = new swoole_process([$this, 'process']);
                $process->write(json_encode($data_pop));
                $pid = $process->start();
            }
        }
    }
 
    private function sig_handler($signo) {
//        echo "Recive: $signo \r\n";
        switch ($signo) {
            case SIGCHLD:
                while($ret = swoole_process::wait(false)) {
//                    echo "PID={$ret['pid']}\n";
                    $this->child--;
                }
        }
    }
}

Наконец, после тестирования одноядерный сервер 1g может выполнять 800 одновременных задач за 1-3 секунды.