PHP使用Beanstalkd实例详解-php教程

资源魔 28 0
无关Beanstalkd的根本概念,编译以及yum的装置办法曾经正在上篇文章《Beanstalkd音讯/义务行列步队的详解》中引见了,明天操练下PHP应用Beanstalkd的进程,我抉择的是应用Pheanstalk类来衔接Beanstalkd

1.应用Composer装置Pheanstalk

composer require pda/pheanstalk

2.完成代码

php查看beanstalkd状态剧本Status.php

<?php
/**
 * Created by PhpStorm.
 * User: jmsite.cn
 * Date: 2019/1/21
 * Time: 10:32
 */
require "../vendor/autoload.php";
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk('192.168.75.135',11300);
print_r($pheanstalk->stats());

消费者代码Producter.php

<?php
/**
 * Created by PhpStorm.
 * User: jmsite.cn
 * Date: 2019/1/20
 * Time: 16:30
 */
require "../vendor/autoload.php";
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk('192.168.75.135',11300);
for ($i=0;$i<50;$i++){
    $data = array(
        'key' => 'testkey'.$i,
        'value' => 'testvalue',
        'time' => time(),
    );
    $ret = $pheanstalk->putInTube('test-tube', json_encode($data), Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, Pheanstalk::DEFAULT_TTR);
    var_dump($ret);
}

生产者代码Consumer.php

<?php
/**
 * Created by PhpStorm.
 * User: jmsite.cn
 * Date: 2019/1/20
 * Time: 16:31
 */
set_time_limit(0);
ini_set('default_socket_timeout', 900);
require "../vendor/autoload.php";
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk('192.168.75.135',11300);
while (true){
    $job = $pheanstalk
        ->watch('test-tube')
        ->ignore('default')
        ->reserve();
    if ($job){
        sleep(2);
        echo $job->getData();
        echo "\n";
        $pheanstalk->delete($job);
    }
}

关上饬令行/终端窗口,执行消费者,会向tube写入50条义务

PS E:\repository\work\beanstalk> php .\Producter.php
int(101)
int(102)
int(103)
int(104)
int(105)
int(106)
int(107)
int(108)
int(109)
int(110)
int(111)
int(112)
int(113)
int(114)
......

因而可知,$pheanstalk->putInTube胜利后前往的是job的id

查看状态

PS E:\repository\work\beanstalk> php Status.php
Pheanstalk\Response\ArrayResponse Object
(
    [_name:Pheanstalk\Response\ArrayResponse:private] => OK
    [storage:ArrayObject:private] => Array
        (
            [current-jobs-urgent] => 0
            [current-jobs-ready] => 50
            [current-jobs-reserved] => 0
            [current-jobs-delayed] => 0
            [current-jobs-buried] => 0
            ......

后果中显示处于ready待读取状态的job是50个

关上两个或以上饬令行/终端窗口,执行生产者,模仿多生产者竞争

生产者1

PS E:\repository\work\beanstalk> php .\Consumer.php
{"key":"testkey0","value":"testvalue","time":1548039103}
{"key":"testkey1","value":"testvalue","time":1548039103}
{"key":"testkey2","value":"testvalue","time":1548039103}
{"key":"testkey4","value":"testvalue","time":1548039103}
{"key":"testkey6","value":"testvalue","time":1548039103}
{"key":"testkey8","value":"testvalue","time":1548039103}
{"key":"testkey10","value":"testvalue","time":1548039103}
{"key":"testkey12","value":"testvalue","time":1548039103}
{"key":"testkey14","value":"testvalue","time":1548039103}
{"key":"testkey16","value":"testvalue","time":1548039103}
{"key":"testkey18","value":"testvalue","time":1548039103}
{"key":"testkey20","value":"testvalue","time":1548039103}
{"key":"testkey22","value":"testvalue","time":1548039103}
{"key":"testkey24","value":"testvalue","time":1548039103}
{"key":"testkey26","value":"testvalue","time":1548039103}
{"key":"testkey28","value":"testvalue","time":1548039103}
{"key":"testkey30","value":"testvalue","time":1548039103}
{"key":"testkey32","value":"testvalue","time":1548039103}
{"key":"testkey34","value":"testvalue","time":1548039103}
{"key":"testkey36","value":"testvalue","time":1548039103}
{"key":"testkey38","value":"testvalue","time":1548039103}
{"key":"testkey40","value":"testvalue","time":1548039103}
{"key":"testkey42","value":"testvalue","time":1548039103}
{"key":"testkey44","value":"testvalue","time":1548039103}
{"key":"testkey46","value":"testvalue","time":1548039103}
{"key":"testkey48","value":"testvalue","time":1548039103}

生产者2

PS E:\repository\work\beanstalk> php .\Consumer.php
{"key":"testkey3","value":"testvalue","time":1548039103}
{"key":"testkey5","value":"testvalue","time":1548039103}
{"key":"testkey7","value":"testvalue","time":1548039103}
{"key":"testkey9","value":"testvalue","time":1548039103}
{"key":"testkey11","value":"testvalue","time":1548039103}
{"key":"testkey13","value":"testvalue","time":1548039103}
{"key":"testkey15","value":"testvalue","time":1548039103}
{"key":"testkey17","value":"testvalue","time":1548039103}
{"key":"testkey19","value":"testvalue","time":1548039103}
{"key":"testkey21","value":"testvalue","time":1548039103}
{"key":"testkey23","value":"testvalue","time":1548039103}
{"key":"testkey25","value":"testvalue","time":1548039103}
{"key":"testkey27","value":"testvalue","time":1548039103}
{"key":"testkey29","value":"testvalue","time":1548039103}
{"key":"testkey31","value":"testvalue","time":1548039103}
{"key":"testkey33","value":"testvalue","time":1548039103}
{"key":"testkey35","value":"testvalue","time":1548039103}
{"key":"testkey37","value":"testvalue","time":1548039103}
{"key":"testkey39","value":"testvalue","time":1548039103}
{"key":"testkey41","value":"testvalue","time":1548039103}
{"key":"testkey43","value":"testvalue","time":1548039103}
{"key":"testkey45","value":"testvalue","time":1548039103}
{"key":"testkey47","value":"testvalue","time":1548039103}
{"key":"testkey49","value":"testvalue","time":1548039103}

两个生产者竞争着实现了全副义务,因为我的beanstalkd启动时开启了binlog耐久,以是beanstalkd重启前任务也没有会失落

3.需求留意的事项

1.创立job时,设置的超不时间Pheanstalk::DEFAULT_TTR肯定要比生产者解决一个job的工夫要长,不然job正在超时之后会被tube更改成ready状态,被其余生产者猎取,而此时以后生产者还正在解决该job,这就呈现了一个job被多个生产者反复执行的可骇景象

2.Pheanstalk的保护者发作了变动,正在新版的Pheanstalk中是没有支持长衔接的,当客户端socket衔接效劳器工夫超越php.ini中设置的default_socket_timeout时,假如未能从效劳端tube取得job,衔接将会被断开,以是生产者过程需求保护,以便正在加入后能够从新开启过程,保举应用supervisord保护生产者过程。

判别socket超时的代码

public function getLine($length = null)
    {
        $timeout = ini_get('default_socket_timeout');
        $timer   = microtime(true);
        do {
            $data = isset($length) ?
                $this->_wrapper()->fgets($this->_socket, $length) :
                $this->_wrapper()->fgets($this->_socket);
            if ($this->_wrapper()->feof($this->_socket)) {
                throw new Exception\SocketException('Socket closed by server!');
            }
            if (($data === false) && microtime(true) - $timer > $timeout) {
                $this->disconnect();
                throw new Exception\SocketException('Socket timed out!');
            }
        } while ($data === false);
        return rtrim($data);
    }

以上就是PHP应用Beanstalkd实例详解的具体内容,更多请存眷资源魔其它相干文章!

标签: php php开发教程 php开发资料 php开发自学 Beanstalkd

抱歉,评论功能暂时关闭!