Предисловие
вводить
- вводить
[ 'host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost'=>'/', ], // switch 'exchange'=>'word', // routing 'routes' => [], ];
- вводить
conf = $conf['host'] ; $this->exchange = $conf['exchange'] ; $this->AMQPConnection = new \AMQPConnection($this->conf); if (!$this->AMQPConnection->connect()) throw new \AMQPConnectionException("Cannot connect to the broker!\n"); } /** * close link */ public function close() { $this->AMQPConnection->disconnect(); } /** Channel * @return \AMQPChannel * @throws \AMQPConnectionException */ public function channel() { if(!$this->AMQPChannel) { $this->AMQPChannel = new \AMQPChannel($this->AMQPConnection); } return $this->AMQPChannel; } /** Exchange * @return \AMQPExchange * @throws \AMQPConnectionException * @throws \AMQPExchangeException */ public function exchange() { if(!$this->AMQPExchange) { $this->AMQPExchange = new \AMQPExchange($this->channel()); $this->AMQPExchange->setName($this->exchange); } return $this->AMQPExchange ; } /** queue * @return \AMQPQueue * @throws \AMQPConnectionException * @throws \AMQPQueueException */ public function queue() { if(!$this->AMQPQueue) { $this->AMQPQueue = new \AMQPQueue($this->channel()); } return $this->AMQPQueue ; } /** Envelope * @return \AMQPEnvelope */ public function envelope() { if(!$this->AMQPEnvelope) { $this->AMQPEnvelope = new \AMQPEnvelope(); } return $this->AMQPEnvelope; } }
- вводить
channel(); // Creating Switch Objects $ex = $this->exchange(); // Message content $message = 'product message '.rand(1,99999); // Start a business $channel->startTransaction(); $sendEd = true ; foreach ($this->routes as $route) { $sendEd = $ex->publish($message, $route) ; echo "Send Message:".$sendEd."\n"; } if(!$sendEd) { $channel->rollbackTransaction(); } $channel - > commitTransaction ();//commit transaction $this->close(); die ; } } try{ (new ProductMQ())->run(); }catch (\Exception $exception){ var_dump($exception->getMessage()) ; }
- вводить
exchange(); $ex - > setType (AMQP_EX_TYPE_DIRECT); //direct type $ex - > setFlags (AMQP_DURABLE); //persistence //echo "Exchange Status:".$ex->declare()."\n"; // Create queues $q = $this->queue(); //var_dump($q->declare());exit(); $q->setName($this->q_name); $q - > setFlags (AMQP_DURABLE); // Persistence //echo "Message Total:".$q->declareQueue()."\n"; // Bind switches to queues and specify routing keys echo 'Queue Bind: '.$q->bind($this->exchange, $this->route)."\n"; // Blocking mode receives messages echo "Message:\n"; while(True){ $q->consume(function ($envelope,$queue){ $msg = $envelope->getBody(); Echo $msg. " n";//Processing messages $queue - > ACK ($envelope - > getDelivery Tag ();// Send ACK response manually }); // $q - > consume ('processMessage', AMQP_AUTOACK); //automatic ACK response } $this->close(); } } try{ (new ConsumerMQ)->run(); }catch (\Exception $exception){ var_dump($exception->getMessage()) ; }