Рубрики
Uncategorized

Пример реализации асинхронной очереди задач в реальном времени с помощью расширенного свула PHP

Автор оригинала: David Wong.

В примере этой статьи описывается, как расширить swool в PHP для реализации асинхронной очереди задач в реальном времени. Чтобы поделиться с вами для вашей справки, следующим образом:

Если хотите отправить 100 писем, ибо тиражируется 100 раз, пользователь напрямую встает, какой ломается сайт!

Но на самом деле у нас, скорее всего, будет более 10000 электронных писем. Как справиться с этой задержкой?

Ответ – асинхронность. Инкапсулируйте операцию “отправить почту”, а затем выполните ее 10000 раз асинхронно в фоновом режиме. Таким образом, когда пользователь отправляет веб-страницу, время, которое он ждет, – это только время, чтобы “отправить запрос на задание по электронной почте в очередь”. И наш закулисный сервис будет работать там, где пользователи не смогут его увидеть.

При реализации “асинхронной очереди” некоторые люди используют таблицу MySQL или redis для хранения отправляемых сообщений, а затем регулярно каждую минуту читают список отправляемых сообщений, а затем обрабатывают его. Это синхронизированная асинхронная очередь задач. Однако в настоящее время отправленные задачи могут быть выполнены только через минуту, что не является быстрым в некоторых сценариях приложений в реальном времени. В некоторых сценариях требуется, чтобы задача выполнялась сразу после ее отправки, но пользователю не нужно ждать возвращенного результата.

В этой статье мы обсудим реализацию асинхронной очереди задач в реальном времени с помощью PHP extended swool.

На стороне сервера

Создайте новый сервер. php в каталоге, где вы планируете разместить скрипт (вы также можете создать его самостоятельно). Код выглядит следующим образом

serv = new swoole_server("0.0.0.0", 9501);
    $this->serv->set(array(
      'worker_num' = > 1, // usually set to 1-4 times the number of CPU in the server
      'daemonize' = > 1, // executed as a Daemons
      'max_request' => 10000,
      'dispatch_mode' => 2,
      'task \ worker \ num' = > 8, // number of task processes
      "Task? IPC? Mode" = > 3, // use message queuing to communicate and set to scramble mode
      //"Log file" = > log / taskqueueu. Log ", // log
    ));
    $this->serv->on('Receive', array($this, 'onReceive'));
    // bind callback
    $this->serv->on('Task', array($this, 'onTask'));
    $this->serv->on('Finish', array($this, 'onFinish'));
    $this->serv->start();
  }
  public function onReceive(swoole_server $serv, $fd, $from_id, $data)
  {
    //echo "Get Message From Client {$fd}:{$data}\n";
    // send a task to task worker.
    $serv->task($data);
  }
  public function onTask($serv, $task_id, $from_id, $data)
  {
    $array = json_decode($data, true);
    if ($array['url']) {
      return $this->httpGet($array['url'], $array['param']);
    }
  }
  public function onFinish($serv, $task_id, $data)
  {
    //echo "Task {$task_id} finish\n";
    //echo "Result: {$data}\n";
  }
  protected function httpGet($url, $data)
  {
    if ($data) {
      $url .= '?' . http_build_query($data);
    }
    $curlobj = curl init(); // initialize curl,
    Curl_setopt ($curlobj, curlopt_url, $URL); // set URL
    Curl_setopt ($curlobj, curlopt_returntransfer, 1); // returns the result of curl_exec
    curl_setopt($curlObj, CURLOPT_SSL_VERIFYPEER, FALSE);
    curl_setopt($curlObj, CURLOPT_SSL_VERIFYHOST, FALSE);
    Curl_setopt ($curlobj, curlopt_header, 0); // output return header information
    $response = curl_exec ($curlobj); // execute
    Curl_close ($curlobj); // close the session
    return $response;
  }
}
$server = new Server();

Клиент

После запуска службы давайте посмотрим, как ее вызвать. Создайте новый тестовый файл client_test.php

client = new swoole_client(SWOOLE_SOCK_TCP);
  }
  public function connect()
  {
    if (!$this->client->connect("127.0.0.1", 9501, 1)) {
      throw new Exception(sprintf('Swoole Error: %s', $this->client->errCode));
    }
  }
  public function send($data)
  {
    if ($this->client->isConnected()) {
      if (!is_string($data)) {
        $data = json_encode($data);
      }
      return $this->client->send($data);
    } else {
      throw new Exception('Swoole Server does not connected.');
    }
  }
  public function close()
  {
    $this->client->close();
  }
}
$data = array(
  "url" => "http://192.168.10.19/send_mail",
  "param" => array(
    "username" => 'test',
    "password" => 'test'
  )
);
$client = new Client();
$client->connect();
if ($client->send($data)) {
  echo 'success';
} else {
  echo 'fail';
}
$client->close();

В приведенном выше коде URL-адрес является адресом задачи, а параметр-обязательным параметром.

Сохраните код, выполните client_test.php в командной строке или браузере, и реализуется асинхронная очередь задач. URL – адрес, который вы заполняете, будет выполняться асинхронно HTTP get после отправки каждой асинхронной задачи.

Для получения дополнительной информации о PHP вы можете ознакомиться со следующими разделами: Учебник по расширенной разработке PHP, краткое описание навыков сетевого программирования PHP, краткое описание использования PHP curl, набор навыков работы с массивами PHP, учебник по структуре данных и алгоритмам PHP, краткое описание алгоритмов программирования PHP и краткое описание использования строк PHP

Я надеюсь, что эта статья будет полезна для программирования на PHP.