基于Gearman的PHP封装类

openkk 12年前

在多个jobs server时,PHP的gearman扩展在jobs server端口不通的情况下会自动检测出来,从而自动切换到另一个;但是在IP不通的情况下,就会出错了。

该封装解决了几个问题:

1、Jobs Server的IP如果突然不可达(例如机器关机),worker会自动重新添加jobs server(默认会报错然后worker终止);

2、添加jobs server时候只添加有效的server(如果把IP不通的server添加进去,后面会报错);

PS:最下面那个函数就是我用来检测有效jobs server的,粗暴的方法:P

    /**        * worker 类        */        class GW{            private $enabled = False;            private $worker = null;            private $config = array();               private $task = array(); //注册的任务信息                    /**            * 创建worker对象,添加job服务器            * @param  array    $config     job server config            * @return void            */            public function __construct($config)            {                $this->config = $config;                $this->initWorker();            }                    /**            * 注册任务及处理函数            * @param   string  $task_name  要注册的"任务"            * @param   string  $fun_name   对应的处理函数的函数名            * @return  boolean            */            public function regTask($task_name, $fun_name){                if (!$this->enabled){                    $ret = false;                } else {                    //Register and add callback function                    $ret = $this->worker->addFunction($task_name, $fun_name);                    if ($ret){                        $this->task[$task_name] = $fun_name;                        }                }                return $ret;            }                    /**            * 运行worker            * @return  void/boolean            */            public function run(){                if (!$this->enabled){                    return False;                }                while(@$this->worker->work() || $this->worker->returnCode()!=GEARMAN_SUCCESS)                {                               // 运行中出错                    //echo "error: " . $this->worker->error() . "\n";                    //echo "return_code: " . $this->worker->returnCode() . "\n";                            // 自动重启worker,并重新注册之前的task                    if($this->worker->returnCode()!=GEARMAN_SUCCESS)                    {                        $this->enabled = False;                        $this->initWorker();                        $ret = $this->reloadTask();                        if ($ret){                            // 重启成功                            $this->run();                        } else {                            // 重启失败                        }                    }                       }            }                    /**            * 添加有效的jobs server到worker中            */            private function initWorker()            {                       $this->worker = new GearmanWorker();                //add job server                foreach ($this->config as $value) {                              $host = trim(strval($value['host']));                    $port = array_key_exists('port', $value) ? intval($value['port']) : 4730;                    if(!check_conn($host, $port)){                        continue;                    } else {                        $this->worker->addServer($host,$port);                        $this->enabled = True;                    }                }                    }                    /**            * 重新注册之前的任务            */            private function reloadTask(){                $ret = False;                foreach ($this->task as $task_name => $fun_name){                    $ret = $this->regTask($task_name, $fun_name);                    }                return $ret;            }                    /**            * 是否已添加有效jobs server            */            public function isEnabled(){                return $this->enabled;                }        }                           /**        * client 类        */        class GC{            private $enabled = false;            private $client = null;                    /**            * 创建client对象,添加job服务器            * @param  array    $config     job server config            * @param  int      $timeout    超时时间(毫秒)            * @return void            */            public function __construct($config, $timeout=5000)            {                $this->client = new GearmanClient();                        //add job server                foreach ($config as $value) {                     $host = trim(strval($value['host']));                    $port = array_key_exists('port', $value) ? intval($value['port']) : 4730;                    if(!check_conn($host, $port)){                        continue;                    } else {                        $this->client->addServer($host,$port);                        $this->enabled = True;                    }                }                $this->enabled && $this->client->setTimeout($timeout);            }            /**            * 发送消息,并等待响应            * @param   string      任务名            * @param   string      该任务的数据            * @return  mixed       job server返回的结果            */            public function send($task_name, $task_data)            {                if (!$this->enabled){                    $ret = false;                   } else {                    $ret = $this->client->do($task_name, strval($task_data));                }                return $ret;            }                    /**            * 是否已添加有效jobs server            */            public function isEnabled(){                return $this->enabled;                }        }                        /**        * 网络检测        * @param   string  机器IP        * @param   string  机器port        * @return  bool                   */        function check_conn($ip, $port = 4730)        {            // socket链接测试,200ms超时            @$fp = fsockopen($ip, $port, $errno, $errstr, 0.2);             if ($fp){                       $fp && fclose($fp);                return true;               } else {                return false;               }        }