PHP并发 任务队列(2)

简介:

 例如:邮件队列、任务队列、消息队列、Feed队列 ,手机短信,纪录日志,转换视频格式,数据挖掘采集 ,为了增强用户体验可以用队列或异步处理

 

PHP短耗时异步处理

Java代码   收藏代码
  1. <?php  
  2. echo '执行完毕提示';  
  3. fastcgi_finish_request(); /* 响应完成, 关闭连接 */  
  4.   
  5. echo '不会输出';  
  6. sleep(3); //sleep模拟一些耗时的操作  
  7. /* 记录日志 */  
  8. file_put_contents('log.txt''生存还是毁灭,这是个问题.');  
  9. ?>  

这个部署有一个好处,就是可以不至于让一个页面的响应时间太久,直接返回给用户一个执行完毕的提示,但PHP后台实际还在继续执行,这样做可能会有用户体验上的舒服感。


PHP 长耗时用队列

http://blog.s135.com/httpsqs/

HTTPSQS(HTTP Simple Queue Service)是一款基于 HTTP GET/POST 协议的轻量级开源简单消息队列服务,使用 Tokyo Cabinet 的 B+Tree Key/Value 数据库来做数据的持久化存储。

非常简单,基于 HTTP GET/POST 协议。PHP、Java、Perl、Shell、Python、Ruby等支持HTTP协议的编程语言均可调用

https://Git.oschina.net/664712890/PHP-Redis-Queue/tree/master

内存队列

Java代码   收藏代码
  1. <?php    
  2. /**   
  3. * 使用共享内存的PHP循环内存队列实现   
  4. * 支持多进程, 支持各种数据类型的存储   
  5. * 注: 完成入队或出队操作,尽快使用unset(), 以释放临界区   
  6. *   
  7. * @author wangbinandi@gmail.com   
  8. * @created 2009-12-23   
  9. */    
  10. class SHMQueue    
  11. {    
  12.     private $maxQSize = 0// 队列最大长度    
  13.         
  14.     private $front = 0// 队头指针    
  15.     private $rear = 0;  // 队尾指针    
  16.         
  17.     private $blockSize = 256;  // 块的大小(byte)    
  18.     private $memSize = 25600;  // 最大共享内存(byte)    
  19.     private $shmId = 0;    
  20.         
  21.     private $filePtr = './shmq.ptr';    
  22.         
  23.     private $semId = 0;    
  24.     public function __construct()    
  25.     {            
  26.         $shmkey = ftok(__FILE__, 't');    
  27.             
  28.         $this->shmId = shmop_open($shmkey, "c"0644, $this->memSize );    
  29.         $this->maxQSize = $this->memSize / $this->blockSize;    
  30.             
  31.          // 申請一个信号量    
  32.         $this->semId = sem_get($shmkey, 1);    
  33.         sem_acquire($this->semId); // 申请进入临界区            
  34.             
  35.         $this->init();    
  36.     }    
  37.         
  38.     private function init()    
  39.     {    
  40.         if ( file_exists($this->filePtr) ){    
  41.             $contents = file_get_contents($this->filePtr);    
  42.             $data = explode( '|', $contents );    
  43.             if ( isset($data[0]) && isset($data[1])){    
  44.                 $this->front = (int)$data[0];    
  45.                 $this->rear  = (int)$data[1];    
  46.             }    
  47.         }    
  48.     }    
  49.         
  50.     public function getLength()    
  51.     {    
  52.         return (($this->rear - $this->front + $this->memSize) % ($this->memSize) )/$this->blockSize;    
  53.     }    
  54.         
  55.     public function enQueue( $value )    
  56.     {    
  57.         if ( $this->ptrInc($this->rear) == $this->front ){ // 队满    
  58.             return false;    
  59.         }    
  60.             
  61.         $data = $this->encode($value);    
  62.         shmop_write($this->shmId, $data, $this->rear );    
  63.         $this->rear = $this->ptrInc($this->rear);    
  64.         return true;    
  65.     }    
  66.             
  67.     public function deQueue()    
  68.     {    
  69.         if ( $this->front == $this->rear ){ // 队空    
  70.             return false;    
  71.         }    
  72.         $value = shmop_read($this->shmId, $this->front, $this->blockSize-1);    
  73.         $this->front = $this->ptrInc($this->front);    
  74.         return $this->decode($value);    
  75.     }    
  76.         
  77.     private function ptrInc( $ptr )    
  78.     {    
  79.         return ($ptr + $this->blockSize) % ($this->memSize);    
  80.     }    
  81.         
  82.     private function encode( $value )    
  83.     {    
  84.         $data = serialize($value) . "__eof";    
  85.         if ( strlen($data) > $this->blockSize -1 ){    
  86.             throw new Exception(strlen($data)." is overload block size!");    
  87.         }    
  88.         return $data;    
  89.     }    
  90.         
  91.     private function decode( $value )    
  92.     {    
  93.         $data = explode("__eof", $value);    
  94.         return unserialize($data[0]);            
  95.     }    
  96.         
  97.     public function __destruct()    
  98.     {    
  99.         $data = $this->front . '|' . $this->rear;    
  100.         file_put_contents($this->filePtr, $data);    
  101.             
  102.         sem_release($this->semId); // 出临界区, 释放信号量    
  103.     }    
  104. }    
  105.   
  106. //使用的样例代码如下:  
  107. // 进队操作  
  108. $shmq = new SHMQueue();    
  109. $data = 'test data';    
  110. $shmq->enQueue($data);   
  111. $data = 'test';    
  112. $shmq->enQueue($data);    
  113. unset($shmq);    
  114. // 出队操作    
  115. $shmq = new SHMQueue();    
  116. $data = $shmq->deQueue();    
  117. unset($shmq);  
memcache队列 可以使用比较专业的工具,例如:Apache ActiveMQ、 memcacheq,下面是两个基本简单的实现方式:
Java代码   收藏代码
  1. <?php  
  2. /** 
  3.  * PHP memcache 队列类 
  4.  * @author LKK/lianq.net 
  5.  * @version 0.3 
  6.  * @修改说明: 
  7.  * 1.放弃了之前的AB面轮值思路,使用类似数组的构造,重写了此类. 
  8.  * 2.队列默认先进先出,但增加了反向读取功能. 
  9.  * @example: 
  10.     $obj = new memcacheQueue('duilie'); 
  11.     for ($i = 1; $i <= 10; $i++) { 
  12.         $obj->add($i . 'asdf'); 
  13.     } 
  14.     $count = $obj->getQueueLength(); 
  15.     $list = $obj->read(10); //读数据10条,没有出栈 
  16.     $poplist = $obj->get(8); //出栈 
  17.  */  
  18. class memcacheQueue  
  19. {  
  20.     public static $client; //memcache客户端连接  
  21.     public $access; //队列是否可更新  
  22.     private $expire; //过期时间,秒,1~2592000,即30天内  
  23.     private $sleepTime; //等待解锁时间,微秒  
  24.     private $queueName; //队列名称,唯一值  
  25.     private $retryNum; //重试次数,= 10 * 理论并发数  
  26.     public $currentHead; //当前队首值  
  27.     public $currentTail; //当前队尾值  
  28.   
  29.     const MAXNUM = 20000//最大队列数,建议上限10K  
  30.     const HEAD_KEY = '_lkkQueueHead_'//队列首key  
  31.     const TAIL_KEY = '_lkkQueueTail_'//队列尾key  
  32.     const VALU_KEY = '_lkkQueueValu_'//队列值key  
  33.     const LOCK_KEY = '_lkkQueueLock_'//队列锁key  
  34.   
  35.     /** 
  36.      * 构造函数 
  37.      * @param string $queueName 队列名称 
  38.      * @param int $expire 过期时间 
  39.      * @param array $config memcache配置 
  40.      * 
  41.      * @return <type> 
  42.      */  
  43.     public function __construct($queueName = '', $expire = 0, $config = '')  
  44.     {  
  45.         if (empty($config)) {  
  46.             self::$client = memcache_pconnect('127.0.0.1'11211);  
  47.         } elseif (is_array($config)) { //array('host'=>'127.0.0.1','port'=>'11211')  
  48.             self::$client = memcache_pconnect($config['host'], $config['port']);  
  49.         } elseif (is_string($config)) { //"127.0.0.1:11211"  
  50.             $tmp = explode(':', $config);  
  51.             $conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';  
  52.             $conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';  
  53.             self::$client = memcache_pconnect($conf['host'], $conf['port']);  
  54.         }  
  55.         if (!self::$client) return false;  
  56.   
  57.         ignore_user_abort(true); //当客户断开连接,允许继续执行  
  58.         set_time_limit(0); //取消脚本执行延时上限  
  59.   
  60.         $this->access = false;  
  61.         $this->sleepTime = 1000;  
  62.         $expire = empty($expire) ? 3600 : intval($expire) + 1;  
  63.         $this->expire = $expire;  
  64.         $this->queueName = $queueName;  
  65.         $this->retryNum = 1000;  
  66.   
  67.         $this->head_key = $this->queueName . self::HEAD_KEY;  
  68.         $this->tail_key = $this->queueName . self::TAIL_KEY;  
  69.         $this->lock_key = $this->queueName . self::LOCK_KEY;  
  70.   
  71.         $this->_initSetHeadNTail();  
  72.     }  
  73.   
  74.     /** 
  75.      * 初始化设置队列首尾值 
  76.      */  
  77.     private function _initSetHeadNTail()  
  78.     {  
  79.         //当前队列首的数值  
  80.         $this->currentHead = memcache_get(self::$client, $this->head_key);  
  81.         if ($this->currentHead === false) $this->currentHead = 0;  
  82.   
  83.         //当前队列尾的数值  
  84.         $this->currentTail = memcache_get(self::$client, $this->tail_key);  
  85.         if ($this->currentTail === false) $this->currentTail = 0;  
  86.     }  
  87.   
  88.     /** 
  89.      * 当取出元素时,改变队列首的数值 
  90.      * @param int $step 步长值 
  91.      */  
  92.     private function _changeHead($step = 1)  
  93.     {  
  94.         $this->currentHead += $step;  
  95.         memcache_set(self::$client, $this->head_key, $this->currentHead, false, $this->expire);  
  96.     }  
  97.   
  98.     /** 
  99.      * 当添加元素时,改变队列尾的数值 
  100.      * @param int $step 步长值 
  101.      * @param bool $reverse 是否反向 
  102.      * @return null 
  103.      */  
  104.     private function _changeTail($step = 1, $reverse = false)  
  105.     {  
  106.         if (!$reverse) {  
  107.             $this->currentTail += $step;  
  108.         } else {  
  109.             $this->currentTail -= $step;  
  110.         }  
  111.   
  112.         memcache_set(self::$client, $this->tail_key, $this->currentTail, false, $this->expire);  
  113.     }  
  114.   
  115.     /** 
  116.      * 队列是否为空 
  117.      * @return bool 
  118.      */  
  119.     private function _isEmpty()  
  120.     {  
  121.         return (bool)($this->currentHead === $this->currentTail);  
  122.     }  
  123.   
  124.     /** 
  125.      * 队列是否已满 
  126.      * @return bool 
  127.      */  
  128.     private function _isFull()  
  129.     {  
  130.         $len = $this->currentTail - $this->currentHead;  
  131.         return (bool)($len === self::MAXNUM);  
  132.     }  
  133.   
  134.     /** 
  135.      * 队列加锁 
  136.      */  
  137.     private function _getLock()  
  138.     {  
  139.         if ($this->access === false) {  
  140.             while (!memcache_add(self::$client, $this->lock_key, 1false, $this->expire)) {  
  141.                 usleep($this->sleepTime);  
  142.                 @$i++;  
  143.                 if ($i > $this->retryNum) { //尝试等待N次  
  144.                     return false;  
  145.                     break;  
  146.                 }  
  147.             }  
  148.   
  149.             $this->_initSetHeadNTail();  
  150.             return $this->access = true;  
  151.         }  
  152.   
  153.         return $this->access;  
  154.     }  
  155.   
  156.     /** 
  157.      * 队列解锁 
  158.      */  
  159.     private function _unLock()  
  160.     {  
  161.         memcache_delete(self::$client, $this->lock_key, 0);  
  162.         $this->access = false;  
  163.     }  
  164.   
  165.     /** 
  166.      * 获取当前队列的长度 
  167.      * 该长度为理论长度,某些元素由于过期失效而丢失,真实长度<=该长度 
  168.      * @return int 
  169.      */  
  170.     public function getQueueLength()  
  171.     {  
  172.         $this->_initSetHeadNTail();  
  173.         return intval($this->currentTail - $this->currentHead);  
  174.     }  
  175.   
  176.     /** 
  177.      * 添加队列数据 
  178.      * @param void $data 要添加的数据 
  179.      * @return bool 
  180.      */  
  181.     public function add($data)  
  182.     {  
  183.         if (!$this->_getLock()) return false;  
  184.   
  185.         if ($this->_isFull()) {  
  186.             $this->_unLock();  
  187.             return false;  
  188.         }  
  189.   
  190.         $value_key = $this->queueName . self::VALU_KEY . strval($this->currentTail + 1);  
  191.         $result = memcache_set(self::$client, $value_key, $data, MEMCACHE_COMPRESSED, $this->expire);  
  192.         if ($result) {  
  193.             $this->_changeTail();  
  194.         }  
  195.   
  196.         $this->_unLock();  
  197.         return $result;  
  198.     }  
  199.   
  200.     /** 
  201.      * 读取队列数据 
  202.      * @param int $length 要读取的长度(反向读取使用负数) 
  203.      * @return array 
  204.      */  
  205.     public function read($length = 0)  
  206.     {  
  207.         if (!is_numeric($length)) return false;  
  208.         $this->_initSetHeadNTail();  
  209.   
  210.         if ($this->_isEmpty()) {  
  211.             return false;  
  212.         }  
  213.   
  214.         if (empty($length)) $length = self::MAXNUM; //默认所有  
  215.         $keyArr = array();  
  216.         if ($length > 0) { //正向读取(从队列首向队列尾)  
  217.             $tmpMin = $this->currentHead;  
  218.             $tmpMax = $tmpMin + $length;  
  219.             for ($i = $tmpMin; $i <= $tmpMax; $i++) {  
  220.                 $keyArr[] = $this->queueName . self::VALU_KEY . $i;  
  221.             }  
  222.         } else { //反向读取(从队列尾向队列首)  
  223.             $tmpMax = $this->currentTail;  
  224.             $tmpMin = $tmpMax + $length;  
  225.             for ($i = $tmpMax; $i > $tmpMin; $i--) {  
  226.                 $keyArr[] = $this->queueName . self::VALU_KEY . $i;  
  227.             }  
  228.         }  
  229.   
  230.         $result = @memcache_get(self::$client, $keyArr);  
  231.   
  232.         return $result;  
  233.     }  
  234.   
  235.     /** 
  236.      * 取出队列数据 
  237.      * @param int $length 要取出的长度(反向读取使用负数) 
  238.      * @return array 
  239.      */  
  240.     public function get($length = 0)  
  241.     {  
  242.         if (!is_numeric($length)) return false;  
  243.         if (!$this->_getLock()) return false;  
  244.   
  245.         if ($this->_isEmpty()) {  
  246.             $this->_unLock();  
  247.             return false;  
  248.         }  
  249.   
  250.         if (empty($length)) $length = self::MAXNUM; //默认所有  
  251.         $length = intval($length);  
  252.         $keyArr = array();  
  253.         if ($length > 0) { //正向读取(从队列首向队列尾)  
  254.             $tmpMin = $this->currentHead;  
  255.             $tmpMax = $tmpMin + $length;  
  256.             for ($i = $tmpMin; $i <= $tmpMax; $i++) {  
  257.                 $keyArr[] = $this->queueName . self::VALU_KEY . $i;  
  258.             }  
  259.             $this->_changeHead($length);  
  260.         } else { //反向读取(从队列尾向队列首)  
  261.             $tmpMax = $this->currentTail;  
  262.             $tmpMin = $tmpMax + $length;  
  263.             for ($i = $tmpMax; $i > $tmpMin; $i--) {  
  264.                 $keyArr[] = $this->queueName . self::VALU_KEY . $i;  
  265.             }  
  266.             $this->_changeTail(abs($length), true);  
  267.         }  
  268.         $result = @memcache_get(self::$client, $keyArr);  
  269.   
  270.         foreach ($keyArr as $v) { //取出之后删除  
  271.             @memcache_delete(self::$client, $v, 0);  
  272.         }  
  273.   
  274.         $this->_unLock();  
  275.   
  276.         return $result;  
  277.     }  
  278.   
  279.     /** 
  280.      * 清空队列 
  281.      */  
  282.     public function clear()  
  283.     {  
  284.         if (!$this->_getLock()) return false;  
  285.   
  286.         if ($this->_isEmpty()) {  
  287.             $this->_unLock();  
  288.             return false;  
  289.         }  
  290.   
  291.         $tmpMin = $this->currentHead--;  
  292.         $tmpMax = $this->currentTail++;  
  293.   
  294.         for ($i = $tmpMin; $i <= $tmpMax; $i++) {  
  295.             $tmpKey = $this->queueName . self::VALU_KEY . $i;  
  296.             @memcache_delete(self::$client, $tmpKey, 0);  
  297.         }  
  298.   
  299.         $this->currentTail = $this->currentHead = 0;  
  300.         memcache_set(self::$client, $this->head_key, $this->currentHead, false, $this->expire);  
  301.         memcache_set(self::$client, $this->tail_key, $this->currentTail, false, $this->expire);  
  302.   
  303.         $this->_unLock();  
  304.     }  
  305.   
  306.     /* 
  307.      * 清除所有memcache缓存数据 
  308.      */  
  309.     public function memFlush()  
  310.     {  
  311.         memcache_flush(self::$client);  
  312.     }  
  313.   
  314. }  

 

 
相关文章
|
存储 缓存 分布式计算
如何使用PHP处理大规模的并发请求?底层原理是什么?
如何使用PHP处理大规模的并发请求?底层原理是什么?
|
数据采集 网络协议 PHP
如何使用PHP的swoole扩展提高服务器并发能力
PHP的swoole扩展是一个高性能的网络通信框架,它可以让PHP开发者轻松地创建TCP/HTTP服务,来响应客户端的请求。但是,有些请求可能涉及到一些复杂和耗时的业务逻辑,如果在工作进程中直接处理,可能会影响服务器的并发能力。
148 0
如何使用PHP的swoole扩展提高服务器并发能力
|
Unix API PHP
PHP如何实现多进程并发?底层原理是什么?
PHP如何实现多进程并发?底层原理是什么?
152 0
|
PHP UED
PHP的并发能力是什么意思?底层原理是什么?
PHP的并发能力是什么意思?底层原理是什么?
173 0
|
消息中间件 NoSQL 关系型数据库
PHP 使用数据库的并发问题
在秒杀,抢购等并发场景下,可能会出现超卖的现象; 如:我们一共只有100个商品,在最后一刻,我们已经消耗了99个商品,仅剩最后一个。这个时候,系统发来多个并发请求,这批请求读取到的商品余量都是1个,然后都通过了这一个余量判断,最终导致超发。
109 0
|
Ubuntu 应用服务中间件 测试技术
php + nginx 网站并发压力测试及优化
测试工具: Apache 压力测试工具ab ab是针对apache的性能测试工具,可以只安装ab工具。 ubuntu安装ab
php + nginx 网站并发压力测试及优化
|
SQL 关系型数据库 MySQL
php mysql 异步, php mysql 异步并发查询
php mysql 异步, php mysql 异步并发查询
172 0