Рубрики
Uncategorized

Свул + Кафка продвинутый потребительский кейс

Автор оригинала: David Wong.

Runtime::enableCoroutine(false);
//        mgo(function () {
// Create the logger
//        $logger = new Logger('my_logger');
//        // Now add some handlers
//        $logger->pushHandler(new StdoutHandler());
//        //$logger->pushHandler(new \Monolog\Handler\NullHandler());
//        \Amp\Loop::set(new \Swoole\Driver\Amp());
$config = ConsumerConfig::getInstance();

$config->setMetadataRefreshIntervalMs(500);
$config->setMetadataBrokerList('192.168.3.243:9092');
$config->setGroupId('swoole');
$config->setBrokerVersion('1.0.0');
$config->setTopics(['save_user_travel_data_one']);
$config->setOffsetReset('earliest');
$consumer = new Consumer();
//        $consumer->setLogger($logger);
$consumer->start(function ($topic, $part, $message): void {
    mgo(function () use ($message) {
        $data                = json_decode(json_decode($message['message']['value'], true), true);
        $clearing'u start'u time = $data ['clearing'u start'u time ']; // clearing time
        $user id = $data ['user ID ']; // clear user ID
        $total = $data ['total ']; // total number of users in this liquidation
        Console:: debug ('Start a task. '$user? ID);
        $this->saveUserTravel($user_id, $clearing_start_time);
        Redis::incr('save_user_travel_log:' . $clearing_start_time);
        Console:: debug ('finished the first '.. redis:: get ('save ﹣ user ﹣ travel ﹣ log:'. $clearing ﹣ start ﹣ time));
        if (Redis::get('save_user_travel_log:' . $clearing_start_time) >= $total) {
            //do
            Console:: debug ('All tasks completed ');
        }
    }, false);
});

\Swoole\Event::wait();

Измерение этим методом приведет к переполнению памяти!!! , использование расширения rdkafka не приведет

Оригинал: “https://developpaper.com/swoole-kafka-advanced-consumer-case/”