参考:PHP高级编程之消息队列
消息队列就是在消息的传输过程中,可以保存消息的容器。
常见用途:
存储转发:异步处理耗时的任务分布式事务:多个消费者消费同一个消息队列应对高并发:通过消息队列保存任务,慢慢处理发布订阅:实现解耦PHP 可以基于 Redis 的 List 数据类型实现简单的消息队列,可以参考 php-resque。当然也可以使用更强大的 RabbitMQ。
PHP 业务代码:
<?php class MyDaemon { public $procNum = 8; // 进程总数 // 启动进程 public function run() { for ($i = 0; $i < $this->procNum; $i++) { $nPID = \pcntl_fork();//创建子进程 if ($nPID == 0) { //子进程 \Org\Util\MsgQ::init(); $this->work(); exit(0); } } // 等待子进程执行完毕,避免僵尸进程 $n = 0; while ($n < $this->procNum) { $nStatus = -1; $nPID = \pcntl_wait($nStatus); if ($nPID > 0) { ++$n; } } } //业务代码 public function work() { $MsgData = ""; while (true) { usleep(10000); // 10 ms 执行一次 $ret = MsgQ::BlockSubsribe("MyMsgName", $MsgData); // 业务代码 } }消息队列(基于Redis)库代码:
<?php namespace Org\Util; class MsgQ { public static $errCode = 0; public static $errMsg = ""; public static $redis; private static $preFix = "MsgQ."; private static $timeOut = 10; private static $redisHost = ''; private static $redisPort = ''; private static $redisAuth = ''; function __construct() { self::$redis = new \Redis(); $ret = self::$redis->connect($redisHost,$redisPort,self::$timeOut); $ret = self::$redis->auth($redisAuth); } function __destruct() { if(self::$redis) { self::$redis->close(); } } public static function init($timeOut = 0){ if (!self::$redis) { self::$redis = new \Redis(); if(!empty($timeOut)){ self::$timeOut = $timeOut; ini_set('default_socket_timeout', 259200); $ret = self::$redis->connect($redisHost,$redisPort,self::$timeOut); $ret = self::$redis->auth($redisAuth); } else{ self::$timeOut = 0; ini_set('default_socket_timeout', 259200); $ret = self::$redis->connect($redisHost,$redisPort,259200); $ret = self::$redis->auth($redisAuth); } } } public static function Publish($pubKey,$data){ if(!self::PingAndConnect()){ return false; } $ret = self::$redis->rPush(self::$preFix.$pubKey,$data); if ($ret === false){ return false; } return true; } public static function GetListLen($pubKey,&$len){ if(!self::PingAndConnect()){ return false; } $len = 0; $ret = self::$redis->lLen(self::$preFix.$pubKey); if ($ret === false){ return false; } $len = $ret; return true; } public static function Subsribe($pubKey,&$data){ if(!self::PingAndConnect()){ return false; } $ret = self::$redis->lPop(self::$preFix.$pubKey); if ($ret === false){ return false; } $data = $ret; return true; } public static function BlockSubsribe($pubKey,&$data){ if(!self::PingAndConnect()){ return false; } try{ $ret = self::$redis->blPop(array(self::$preFix.$pubKey),0); } catch(Exception $e){ if(!self::PingAndConnect(true)){ return false; } return false; } if ($ret === false){ return false; } if ($ret === array()){ return false; } $data = $ret[1]; return true; } private static function PingAndConnect($isException = false){ if (!self::$redis) { self::$redis = new \Redis(); if (self::$timeOut == 0){ ini_set('default_socket_timeout', 259200); $ret = self::$redis->connect($redisHost,$redisPort,259200); } else{ $ret = self::$redis->connect($redisHost,$redisPort,self::$timeOut); } if ($ret === false){ return false; } $ret = self::$redis->auth($redisAuth); if ($ret === false){ return false; } } else{ if (self::$timeOut == 0 && !$isException){ return true; } $ret = self::$redis->ping(); if ($ret === false){ if (self::$timeOut == 0){ ini_set('default_socket_timeout', 259200); $ret = self::$redis->connect($redisHost,$redisPort,259200); } else{ $ret = self::$redis->connect($redisHost,$redisPort,self::$timeOut); } if ($ret === false){ return false; } $ret = self::$redis->auth($redisAuth); if ($ret === false){ return false; } } } return true; } }重启守护进程的 shell 脚本 restartprocess.sh:
#!/bin/sh if [ ! -n "$1" ]; then echo "input proc name" exit else procname=$1 fi pids=`(ps -ef | grep "$procname" | grep -v "grep" | grep -v $0) | awk '{print $2}'` for pid in ${pids[*]} do kill -9 $pid done cd /path/to/your/project/ setsid $procname &启动守护进程的命令:
restartprocess.sh "php index.php /path/to/your/MyDaemon/func/run"可以设置一分钟或一秒钟执行一次 PHP 脚本。因为每次处理消息的时间不固定,可能导致消息积压或服务器负载过大。
用于处理偶然需求,简单。
转载于:https://www.cnblogs.com/kika/p/10851535.html
相关资源:数据结构—成绩单生成器