Автор оригинала: David Wong.
Runtime::enableCoroutine(false);
// mgo(function () {
// Create the logger
// $logger = new Logger('my_logger');
// // Now add some handlers
// $logger->pushHandler(new StdoutHandler());
// //$logger->pushHandler(new \Monolog\Handler\NullHandler());
// \Amp\Loop::set(new \Swoole\Driver\Amp());
$config = ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(500);
$config->setMetadataBrokerList('192.168.3.243:9092');
$config->setGroupId('swoole');
$config->setBrokerVersion('1.0.0');
$config->setTopics(['save_user_travel_data_one']);
$config->setOffsetReset('earliest');
$consumer = new Consumer();
// $consumer->setLogger($logger);
$consumer->start(function ($topic, $part, $message): void {
mgo(function () use ($message) {
$data = json_decode(json_decode($message['message']['value'], true), true);
$clearing'u start'u time = $data ['clearing'u start'u time ']; // clearing time
$user id = $data ['user ID ']; // clear user ID
$total = $data ['total ']; // total number of users in this liquidation
Console:: debug ('Start a task. '$user? ID);
$this->saveUserTravel($user_id, $clearing_start_time);
Redis::incr('save_user_travel_log:' . $clearing_start_time);
Console:: debug ('finished the first '.. redis:: get ('save ﹣ user ﹣ travel ﹣ log:'. $clearing ﹣ start ﹣ time));
if (Redis::get('save_user_travel_log:' . $clearing_start_time) >= $total) {
//do
Console:: debug ('All tasks completed ');
}
}, false);
});
\Swoole\Event::wait();Измерение этим методом приведет к переполнению памяти!!! , использование расширения rdkafka не приведет
Оригинал: “https://developpaper.com/swoole-kafka-advanced-consumer-case/”