PHP 简单实现延时操作-php教程

资源魔 40 0


场景

正在营业中有时会碰着提早操作,以下单后半小时未领取则勾销定单、下单后十五分钟未领取则发短信提示等等。那这样的需要若何去完成呢。

相干学习保举:PHP编程从入门到通晓

完成形式

  • 第一个简略的形式就是用一个后盾过程死轮回去查定单,依据下单工夫去做没有同的操作
  • 第二种就是应用音讯行列步队的按时音讯,下单之后发送按时音讯,没有同的按时行列步队行止理没有同的逻辑
  • 第三种能够应用框架提供的一些既有性能去做

完成代码

咱们以定单创立15分钟后未领取,给用户发送邮件为场景进行学习

预备工作:

  1. 简略的定单表:order
  2. 各类需求的composer包
  3. rabbitMq内陆效劳
  4. 守旧阿里云RocketMq效劳

第一种

  • 代码逻辑很简略就间接死轮回就好了
  • 启动这个剧本过程,能够用supervisor设置装备摆设
  • 局部代码
//创立定单的逻辑/**
 * 随机创立定单
 */$order = [
    'order_number' => mt_rand(100,10000).date("YmdHis"),
    'user_id' => mt_rand(1, 100),
    'order_amount' => mt_rand(100, 1000),];
    /**@var $manager Illuminate\Database\Capsule\Manager **/
    $conn = $manager;$insertResult = $conn::table("order")
    ->insert($order);print_r($insertResult);

提早解决逻辑

while(true) {
    // 未领取定单列表
    $orderList = $conn::table("order")
        ->where("created_time",  '<=', date("Y-m-d H:i:s", strtotime("-15 minutes")))
        ->where('sended_need_pay_notify', '=', 2)
        ->where('status', '=', 1)
        ->select(['user_id', 'id'])
        ->orderBy("id", 'asc')
        ->get();
    $orderList = json_decode(json_encode($orderList), true);
    foreach ($orderList as $orderInfo) {
        sendEmail($orderInfo['user_id']);
        $conn::table('order')
            ->where('id', '=', $orderInfo['id'])
            ->update(['sended_need_pay_notify' => 1]);
        logs("update-success-orderId-". $orderInfo['id']."-userId-".$orderInfo['user_id']);
    }

    sleep(10);}

执行解决剧本

gaoz@nobodyMBP delay_mq_demo % php first_while_handler.php
send email to 73 success ...
2020-06-24 11:37:36:update-success-orderId-3-userId-73

这类形式吧完成简略,然而没有优雅,同时少量量定单孕育发生也会遇到成绩。

第二种

  • 比方应用阿里云的MQ效劳,今朝rocketMq与rabbitMq版本支持提早音讯,然而rabbit的延时音讯免费过高了
  • 这里先应用rocketMq的提早音讯去完成
  • 需求守旧阿里云的效劳
// 创立定单的逻辑try
        {

            /**
             * 随机创立定单
             */
            $order = [
                'order_number' => mt_rand(100,10000).date("YmdHis"),
                'user_id' => mt_rand(1, 100),
                'order_amount' => mt_rand(100, 1000),
            ];

            /**@var $manager Illuminate\Database\Capsule\Manager **/
            $conn = $manager;

            $insertId = $conn::table("order")
                ->insertGetId($order);

            $body = json_encode(['order_id' => $insertId, 'created_time' => date("Y-m-d H:i:s")]);
            $publishMessage = new TopicMessage(
                $body            );
            // 设置音讯KEY
            $publishMessage->setMessageKey("MessageKey");

            // 按时音讯, 按时工夫为3分钟后
            $publishMessage->setStartDeliverTime(time() * 1000 + 3 * 60 * 1000);

            $result = $this->producer->publishMessage($publishMessage);

            print "Send mq message success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result
            -
            >getMessageBodyMD5() . "\n";
        } catch (\Exception $e) {
            print_r($e->getMessage() . "\n");
        }

生产逻辑 一样是正在生产者中解决

foreach ($messages as $message) {
                $receiptHandles[] = $message->getReceiptHandle();

                $messageBody = $message->getMessageBody();

                $orderInfo = json_decode($messageBody, true);
                if (!empty($orderInfo['order_id'])) {
                    $orderId = $orderInfo['order_id'];

                    /**@var $manager Illuminate\Database\Capsule\Manager * */
                    $conn = $manager;
                    $orderInfo = $conn::table("order")
                        ->select(['id', 'user_id'])
                        ->where('id', '=', $orderId)
                        ->where('status', '=', 1)
                        ->first();
                    if (!empty($orderInfo)) {
                        $orderInfo = json_decode(json_encode($orderInfo), true);
                        sendEmail($orderInfo['user_id']);
                        $conn::table('order')
                            ->where('id', '=', $orderInfo['id'])
                            ->update(['sended_need_pay_notify' => 1]);
                        logs("update-success-orderId-" . $orderInfo['id'] . 
                        "-userId-" . $orderInfo['user_id']);
                    }
                }
            }

启动消费一条音讯

gaoz@nobodyMBP delay_mq_demo % php rocket_mq_handler_producer.php 
Send mq message success. msgId is:76CF2135696C3D4EAC698A9FA1E1879D, bodyMD5 
is:63448B50AA7B8AF47B07AA7CE807E3D3
gaoz@nobodyMBP delay_mq_demo %

启动生产者缓缓期待

gaoz@nobodyMBP delay_mq_demo % php rocket_mq_handler_consumer.php 
No message, contine long polling!RequestId:5EF752583441411C74869BA9
No message, contine long polling!RequestId:5EF7525B3441411C74869FE2
No message, contine long polling!RequestId:5EF7525E3441411C7486A42C
No message, contine long polling!RequestId:5EF752613441411C7486A7D9
consume finish, messages:send email to 95 success ...2020-06-27 12:08:05:update-success-orderId-8-userId-95
 Array(
    [0] => 76CF2135696C3D4EAC698A9FA1E1879D-MCAxNTkzMjY2NzkxNDM5IDMwMDAwMCAzIDAgYmpzaGFyZTUtMDggNSAw)
    ack

这类形式有现有的效劳能够应用,缩小开发工夫

第三种 应用rabbitMq去完成

  • 查阅文档不找到rabbitMq支持提早行列步队的原生性能,然而能够经过音讯的ttl+死信行列步队完成
  • 私信行列步队就是用来寄存不被生产或许生产失败等音讯的行列步队
  • 当设置音讯的无效期内不被生产音讯就会被转发到死信行列步队
  • 经过设置音讯的无效期完成延时性能
// 消费者$exchange = 'order15min_notify_exchange';
$queue = 'order15minx_notify_queue';$dlxExchange = "dlx_order15min_exchange";
$dlxQueue = "dlx_order15min_queue";
$connection = new AMQPStreamConnection(getenv('RABBIT_HOST'), getenv('RABBIT_PORT'), getenv("RABBIT_USER"), getenv("RABBIT_PASS"), getenv("RABBIT_VHOST"));
$channel = $connection->channel();$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
$channel->exchange_declare($dlxExchange, AMQPExchangeType::DIRECT, false, true, false);// 设置行列步队的过时工夫// 失常行列步队$table = new \PhpAmqpLib\Wire\AMQPTable();// 音讯无效期$table->set('x-message-ttl', 3*60*1000);$table->set("x-dead-letter-exchange", $dlxExchange);$channel->queue_declare($queue, false, true, false, false, false, $table);$channel->queue_bind($queue, $exchange);// 死信行列步队$channel->queue_declare($dlxQueue, false, true, false, false, false);$channel->queue_bind($dlxQueue, $dlxExchange);/**
 * 随机创立定单
 */$order = [
    'order_number' => mt_rand(100,10000).date("YmdHis"),
    'user_id' => mt_rand(1, 100),
    'order_amount' => mt_rand(100, 1000),];/**@var $manager Illuminate\Database\Capsule\Manager **/$conn = $manager;$insertId = $conn::table("order")
    ->insertGetId($order);$messageBody = json_encode(['order_id' => $insertId, 'created_time' => date("Y-m-d H:i:s")]);
    $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
    $channel->basic_publish($message, $exchange);

生产者

$dlxExchange = "dlx_order15min_exchange";$dlxQueue = "dlx_order15min_queue";
$connection = new AMQPStreamConnection(getenv('RABBIT_HOST'), getenv('RABBIT_PORT'), getenv("RABBIT_USER"), getenv("RABBIT_PASS"), getenv("RABBIT_VHOST"));
$channel = $connection->channel();
$channel->queue_declare($dlxQueue, false, true, false, false);$channel->exchange_declare($dlxExchange, AMQPExchangeType::DIRECT, false, true, false);
$channel->queue_bind($dlxQueue, $dlxExchange);/**
 * @param \PhpAmqpLib\Message\AMQPMessage $message
 */function process_message($message){
    echo "\n--------\n";
    echo $message->body;
    echo "\n--------\n";

    $orderInfo = json_decode($message->body, true);
    if (!empty($orderInfo['order_id'])) {
        $orderId = $orderInfo['order_id'];

        /**@var $conn Illuminate\Database\Capsule\Manager * */
        $conn = getdb();
        $orderInfo = $conn::table("order")
            ->select(['id', 'user_id'])
            ->where('id', '=', $orderId)
            ->where('status', '=', 1)
            ->first();
        if (!empty($orderInfo)) {
            $orderInfo = json_decode(json_encode($orderInfo), true);
            sendEmail($orderInfo['user_id']);
            $conn::table('order')
                ->where('id', '=', $orderInfo['id'])
                ->update(['sended_need_pay_notify' => 1]);
            logs("update-success-orderId-" . $orderInfo['id'] . "-userId-" . $orderInfo['user_id']);
        }

    }
    $message->delivery_info['channel']->basic_ack(
        $message->delivery_info['delivery_tag']);}$channel->basic_consume($dlxQueue, $consumerTag, false, false, false, false, 'process_message');

启动生产者

gaoz@nobodyMBP delay_mq_demo % php rabbit_mq_handler_consumer.php
--------
{"order_id":7,"created_time":"2020-06-27 11:50:08"}
--------
send email to 2 success ...
2020-06-27 11:56:55:update-success-orderId-7-userId-2

辨别启动生产者、消费者就能够了,这外面音讯的流转能够看到

音讯进步前辈入到失常行列步队,过时落后入了死信行列步队而被生产

第四种

  • 应用laravel自带的Queue去完成
  • 这里不整顿具体代码,前面更新进去
  • 能够查看民间文档 行列步队《Laravel 5.7 中文文档》

代码示例:github.com/nobody05/delay_mq_demo

以上就是PHP 简略完成延时操作的具体内容,更多请存眷资源魔其它相干文章!

标签: php php开发教程 php开发资料 php开发自学 延时操作

抱歉,评论功能暂时关闭!