Несколько дней назад мы опубликовали пример mix PHP V2: пример асинхронной отправки демонов по почте в пуле совместной работы. На этот раз мы предоставляем большой заводской SDK для выполнения задачи параллельной отправки сообщений в сотрудничестве с swool hook. Эта статья является примером простого кода и высокой производительности ввода-вывода.
Пожалуйста, сначала обновитесь до mix framework.0.5 。
В этом примере очередь сообщений по-прежнему используется для получения задач отправки SMS. Промежуточное программное обеспечение сообщений использует:
- редис
Производитель
В общем случае при использовании redis в рамках будет установлена библиотека классов для использования. В этом случае для удобства понимания используется собственный код.
// connection
$redis = new \Redis();
if (!$redis->connect('127.0.0.1', 6379)) {
throw new \Exception('Redis connect failed.');
}
$redis->auth('');
$redis->select(0);
//Delivery task
for($i = 0; $i < 3; $i++){
$data = [
'phone' => '***',
'templateCode' => 'SMS_***',
'templateParam' => ['code' => 123456],
];
$redis->lpush('queue:sms', serialize($data));
}Потребитель
Служба сообщений Ali cloud используется для просмотра официальных документов PHP SDK. Используемая библиотека является:
composer require alibabacloud/client
Посмотрев файл зависимостей библиотеки composer, мы узнаем, что библиотека разработана на основе guzzlehttp. Поскольку mix PHP предоставляет инструмент, который можно использовать в процессе сотрудничества без изменения кода, экология mix PHP V2: пусть guzzle поддерживает процесс взаимодействия с крючком swool, поэтому мы можем в основном определить, что библиотека может использоваться в процессе сотрудничества swool.
Во-первых, мы устанавливаем https://github.com/mix- php /жрут-крючок, чтобы позволить alibaba cloud/клиент Его можно использовать в процессе сотрудничества:
composer require mix/guzzle-hook
А затем в файле composer.json Добавьте дополнительный элемент конфигурации в файл, как показано ниже:
"extra": {
"include_files": [
"vendor/mix/guzzle-hook/src/functions_include.php"
]
}Автоматическая загрузка обновления:
composer dump-autoload
Затем мы используем демоны mix PHP V2 и пулы сотрудничества для завершения сверхвысокой производительности программы отправки SMS.
Прежде всего, мы настраиваем applications/console/config/main.php Зарегистрируйте команду в:
// command
'commands' => [
'smser' => [
'Smser',
'description' => "SMS send daemon demo.",
'options' => [
[['d', 'daemon'], 'description' => 'Run in the background'],
],
],
],Класс smsercommand, указанный в зарегистрированной команде. Далее мы напишем класс smsercommand:
applications/console/src/Commands/SmserCommand.php
[email protected]>
*/
class SmserCommand
{
const ACCESS_KEY = '***';
const ACCESS_SECRET = '***';
/**
* exit
* @var bool
*/
public $quit = false;
/**
* main function
*/
public function main()
{
//Guard processing
$daemon = Flag::bool(['d', 'daemon'], false);
if ($daemon) {
ProcessHelper::daemon();
}
//Acquisition signal
ProcessHelper::signal([SIGHUP, SIGINT, SIGTERM, SIGQUIT], function ($signal) {
$this->quit = true;
ProcessHelper::signal([SIGHUP, SIGINT, SIGTERM, SIGQUIT], null);
});
//Set global parameters of Ali cloud
AlibabaCloud::accessKeyClient(static::ACCESS_KEY, static::ACCESS_SECRET)->regionId('cn-hangzhou')->asDefaultClient();
//Manually close the file hook of swoole, because the UUID library that Ali cloud depends on has a file hook compatibility problem, which swoole 4.4 has adapted to
Coroutine::enableHook(SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_FILE);
//Pool execution task
xgo(function () {
$maxWorkers = 20;
$maxQueue = 20;
$jobQueue = new Channel($maxQueue);
$dispatch = new Dispatcher([
'jobQueue' => $jobQueue,
'maxWorkers' => $maxWorkers,
]);
$dispatch->start(SmserWorker::class);
//Launch task
$redis = app()->redisPool->getConnection();
while (true) {
if ($this->quit) {
$dispatch->stop();
return;
}
try {
$data = $redis->brPop(['queue:sms'], 3);
} catch (\Throwable $e) {
$dispatch->stop();
return;
}
if (!$data) {
continue;
}
$data = array_pop ($data); // the last key of brpop command is the value
$jobQueue->push($data);
}
});
//Waiting for events
Event::wait();
}
}из $data = $redis->brPop(['очередь:sms'], 3); Запись внешних исключений показывает, что при сбое подключения redis, например, при перезапуске redis или ненормальном подключении, пул совместной работы завершится безопасно, то есть, когда процесс завершится ненормально, пользователю необходимо использовать супервизор 、 |/pm2 Дождаться инструментов для перезапуска демонов.
Выше приведен код использования пула совместной работы mix PHP, который можно скопировать напрямую. По умолчанию платформа содержит демонстрационную версию пула совместной работы. Этот экземпляр изменяет только работника пула совместной работы. Эта команда предназначена для получения сообщений из очереди redis и отправки их в очередь заданий. Данные в очереди заданий будут выполняться параллельно после того, как их вытеснит один из 20 рабочих экземпляров. Логика кода отправки в примере относится к классу smserworker:
applications/console/src/Libraries/SmserWorker.php
[email protected]>
*/
class SmserWorker extends AbstractWorker implements WorkerInterface
{
/**
*Mail sender
* @var Smser
*/
public $smser;
/**
*Initialize event
*/
public function onInitialize()
{
parent::onInitialize(); // TODO: Change the autogenerated stub
//Instantiate some objects to be reused
$this->smser = new Smser();
}
/**
* processing
* @param $data
*/
public function handle($data)
{
// TODO: Implement handle() method.
$data = unserialize($data);
if (empty($data)) {
return;
}
try {
$result = $this->smser->send($data['phone'], $data['templateCode'], $data['templateParam']);
app()->log->info("SMS sent successfully:phone {phone} templateCode {templateCode} result {result}", array_merge($data, ['result' => json_encode($result, JSON_UNESCAPED_UNICODE)]));
} catch (\Throwable $e) {
app()->log->error("SMS failed to send:phone {phone} templateCode {templateCode} error {error}", array_merge($data, ['error' => $e->getMessage()]));
}
}
}Из приведенного выше кода видно, что при инициализации рабочего добавляется новый атрибут класса семестра. Когда сообщение из очереди заданий будет доставлено, оно будет доставлено методу обработки. В этом методе экземпляр класса mailer используется для выполнения задачи отправки почты. Поэтому нам нужно написать отправителю sms:
applications/console/src/Libraries/Smser.php
[email protected]>
*/
class Smser
{
/**
*Configuration information
*/
const SIGN_NAME = '***';
/**
* Smser constructor.
*/
public function __construct()
{
//Open the process hook
Coroutine::enableHook();
}
/**
* send
* @param $phone
* @param $templateCode
* @param $templateParam
* @return array
* @throws ClientException
* @throws ServerException
*/
public function send($phone, $templateCode, $templateParam)
{
$result = AlibabaCloud::rpc()
->product('Dysmsapi')
// ->scheme('https') // https | http
->version('2017-05-25')
->action('SendSms')
->method('POST')
->options([
'query' => [
'PhoneNumbers' => $phone,
'SignName' => static::SIGN_NAME,
'TemplateCode' => $templateCode,
'TemplateParam' => json_encode($templateParam),
],
])
->request();
return $result->toArray();
}
}Вышесказанное завершает всю логику кода. Теперь давайте начнем тест и запустим демонов-потребителей:
[[email protected] bin]# ./mix-console smser
Назовите сценарий производителя выше push.php Затем выполните в cli (откройте новый терминал):
[[email protected] bin]# php /tmp/push.php
Результаты демонов потребителей:
[[email protected] bin]# ./mix-console smser [info] 2019-05-24 12:03:32 <101014> [message] SMS sent successfully:phone *** templateCode SMS_*** result {"Message":"OK","RequestId":"4071D031-6D9E-4F70-9269-6C1979080858","BizId":"939807358670612546^0","Code":"OK"} [info] 2019-05-24 12:03:32 < 101014 > [message] SMS send successfully: Phone * * * templatecode SMS * * result {"message": "trigger minute level flow control limits: 1", "requestid": "490b73d7-317e-4362-b2dd-5e2153a7b891", "code": "ISV. Business_limit_control"} [info] 2019-05-24 12:03:32 < 101014 > [message] SMS send successfully: Phone * * * templatecode SMS * * result {"message": "trigger minute level flow control limits: 1", "requestid": "1fd22edb-baa4-4416-8ff9-242edcf34359", "code": "ISV. Business_limit_control"}
Терминал командной строки печатает журнал успешной отправки, и отправка завершена.