Skip to content

Commit

Permalink
update and add PoolsManager
Browse files Browse the repository at this point in the history
  • Loading branch information
bingcool committed May 21, 2018
1 parent 97837eb commit 19c0320
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 21 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# swoolefy
swoolefy是一个基于swoole扩展实现的轻量级高性能的常驻内存型的API和Web应用服务框架,高度封装了http,websocket,udp服务器,以及基于tcp实现可扩展的rpc服务,同时支持composer包方式安装部署项目。基于实用,swoolefy抽象Event事件处理类,实现与底层的回调的解耦,支持同步|异步调用,内置view、Log、session、mysql、redis、memcached、mongodb等常用组件等。
swoolefy是一个基于swoole扩展实现的轻量级高性能的常驻内存型的API和Web应用服务框架,高度封装了http,websocket,udp服务器,以及基于tcp实现可扩展的rpc服务,同时支持composer包方式安装部署项目。基于实用,swoolefy抽象Event事件处理类,实现与底层的回调的解耦,支持同步|异步调用,内置view、log、session、mysql、redis、memcached、mongodb等常用组件等。

swoolefy是一个适合学习swoole的框架,特别适合中小团队,用过Tp的phper基本都会用,有过Yii2的经验者,那就更容易入手了。同时swoolefy底层屏蔽了swoole与传统php-fpm的一些差异,让大部分的phper使用起来可以像使用php-fpm那样,轻松入手。

Expand All @@ -8,8 +8,8 @@ swoolefy是一个适合学习swoole的框架,特别适合中小团队,用过
2、支持composer的PSR-4规范和实现自定义注册命名空间
3、支持多协议,目前支持http,websocket,tcp,udp,以及基于tcp实现的rpc,开放式的系统接口,可自定义协议数据格式
4、抽象Event的事件处理与底层的事件监听解耦,屏蔽不同协议之间的应用差异,大部分代码实现共用
5、实现超全局变量,IOC,静态延迟绑定,组件服务常驻内存化,trait的多路复用,钩子事件,单例,工厂模式等
6、简单易用的异步务管理TaskManager, 定时器管理TickManager, 内存表管理TableManager, 进程管理ProcessManager,超全局管理
5、实现超全局变量,IOC,静态延迟绑定,组件服务常驻内存化,trait的多路复用,钩子事件,单例,工厂模式,注册数模式等
6、简单易用的异步务管理TaskManager, 定时器管理TickManager, 内存表管理TableManager, 进程管理ProcessManager,进程池管理PoolsManger,超全局管理
7、灵活多层的配置,配置参数即可实现底层已封装的复杂功能
8、应用对象的深度复制,实现对象的常驻内存,每个请求只需要从内存中复制应用对象,不需要再重新创建,减少IO消耗
9、封装View,Log,Mysql,Redis,Mongodb,Swiftmail,Session等常用组件,其他组件根据业务按照约定即可封装成组件
Expand Down
19 changes: 14 additions & 5 deletions score/Core/Pools/AbstractProcessPools.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
use Swoolefy\Core\Table\TableManager;

abstract class AbstractProcessPools {
private $swooleProcess;
private $processName;
private $async = null;
private $args = [];
private $process_num = 1;
protected $swooleProcess;
protected $processName;
protected $async = null;
protected $args = [];
protected $process_num = 1;

/**
* __construct
Expand Down Expand Up @@ -102,6 +102,15 @@ public function getArgs() {
public function getProcessName() {
return $this->processName;
}

/**
* onFinish 进程任务完成,默认返回进程名称
* @return string
*/
public function finish() {
$this->swooleProcess->write($this->processName);
}

/**
* run 进程创建后的run方法
* @param Process $process
Expand Down
116 changes: 104 additions & 12 deletions score/Core/Pools/PoolsManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

use Swoole\Process;
use Swoole\Table;
use Swoolefy\Core\Timer\TickManager;
use Swoolefy\Core\Table\TableManager;

class PoolsManager {
Expand Down Expand Up @@ -44,26 +45,33 @@ class PoolsManager {

private static $processNameList = [];

public static $process_used = [];
private static $process_used = [];

private static $process_name_list = [];

private static $channels = [];

private static $timer_ids = [];

/**
* __construct
* @param $total_process 进程池总数
*/
public function __construct(int $total_process = 100) {
public function __construct(int $total_process = 256) {
self::$table_process['table_process_pools_map']['size'] = self::$table_process['table_process_pools_number']['size']= $total_process;
TableManager::getInstance()->createTable(self::$table_process);
}

/**
* addProcess 添加一个进程
* addProcess 添加创建进程
* @param string $processName
* @param string $processClass
* @param int $processNumber
* @param boolean $async
* @param boolean $polling 是否是轮训向空闲进程写数据
* @param array $args
*/
public static function addProcessPools(string $processName, string $processClass, int $processNumber = 1, $async = true, array $args = []) {
public static function addProcessPools(string $processName, string $processClass, int $processNumber = 1, $polling = false, int $timer_int= 50, $async = true, array $args = []) {
if(!TableManager::isExistTable('table_process_pools_map')) {
TableManager::getInstance()->createTable(self::$table_process);
}
Expand All @@ -90,6 +98,7 @@ public static function addProcessPools(string $processName, string $processClass
try{
$process = new $processClass($process_name, $async, $args, $i);
self::$processList[$key] = $process;
self::$process_name_list[$processName][$key] = $process;
self::$processNameList[$processName][] = $process_name;
}catch (\Exception $e){
throw new \Exception($e->getMessage(), 1);
Expand All @@ -100,7 +109,11 @@ public static function addProcessPools(string $processName, string $processClass
}
}

// self::registerProcessFinish(self::$processList);
if($polling) {
self::registerProcessFinish(self::$process_name_list[$processName], $processName);
self::$channels[$processName] = new \Swoole\Channel(2 * 1024 * 2014);
self::loopWrite($processName, $timer_int);
}

Process::signal(SIGCHLD, function($signo) use($processName, $processClass, $async, $args) {
while($ret = Process::wait(false)) {
Expand All @@ -110,11 +123,13 @@ public static function addProcessPools(string $processName, string $processClass
$process_name = $processName.$process_num;
$key = md5($process_name);
unset(self::$processList[$key]);
unset(self::$process_name_list[$processName][$key]);
TableManager::getInstance()->getTable('table_process_pools_number')->del($pid);
try{
$process = new $processClass($process_name, $async, $args, $process_num);
self::$processList[$key] = $process;
// self::registerProcessFinish([$process]);
self::$process_name_list[$processName][$key] = $process;
$polling && self::registerProcessFinish([$process], $processName);
}catch (\Exception $e){
throw new \Exception($e->getMessage(), 1);
}
Expand All @@ -127,16 +142,80 @@ public static function addProcessPools(string $processName, string $processClass
* registerProcessFinish
* @return
*/
public static function registerProcessFinish(array $processList = []) {
foreach ($processList as $process_class) {
public static function registerProcessFinish(array $processList = [], string $processName) {
foreach($processList as $process_class) {
$process = $process_class->getProcess();
swoole_event_add($process->pipe, function ($pipe) use($process) {
$pid = $process->read();
self::$process_used[$pid] = 0;
$processname = $process_class->getProcessName();
// 默认所有进程空闲
self::$process_used[$processName][$processname] = 0;
swoole_event_add($process->pipe, function ($pipe) use($process, $processName) {
$process_name = $process->read(64 * 1024);
if(in_array($process_name, self::$processNameList[$processName])) {
self::$process_used[$processName][$process_name] = 0;
}
});
}
}

/**
* loopWrite 定时循环向子进程写数据
* @return mixed
*/
public static function loopWrite(string $processName, $timer_int) {
$timer_id = swoole_timer_tick($timer_int, function($timer_id) use($processName) {
if(count(self::$process_used[$processName])) {
$channel= self::$channels[$processName];
$data = $channel->pop();
if($data) {
// 获取其中一个空闲进程
$process_name = array_rand(self::$process_used[$processName]);
self::writeByProcessName($process_name, $data);
unset(self::$process_used[$processName][$process_name]);
return $process_name;
}
}
});
self::$timer_ids[$processName] = $timer_id;
return $timer_id;
}

/**
* getTimerId 获取当前的定时器id
* @param string $processName
* @return mixed
*/
public static function getTimerId(string $processName) {
if(isset(self::$timer_ids[$processName]) && self::$timer_ids[$processName]!== null) {
return self::$timer_ids[$processName];
}
return null;
}

/**
* clearTimer 清除进程内的定时器
* @param string $processName
* @return boolean
*/
public static function clearTimer(string $processName) {
$timer_id = self::getTimerId($processName);
if($timer_id) {
return swoole_timer_clear($timer_id);
}
return false;
}

/**
* getChannel
* @param string $processName
* @return object
*/
public static function getChannel(string $processName) {
if(is_object(self::$channels[$processName])) {
return self::$channels[$processName];
}
return null;
}

/**
* getProcessByName 通过名称获取一个进程
* @param string $processName
Expand Down Expand Up @@ -212,7 +291,7 @@ public static function writeByProcessName(string $name, string $data) {
}

/**
* writeByRandom
* writeByRandom 任意方式向进程写数据
* @param string $name
* @param string $data
* @return
Expand All @@ -223,8 +302,21 @@ public static function writeByRandom(string $name, string $data) {
$process_name = self::$processNameList[$name][$key];
}
self::writeByProcessName($process_name, $data);
return $process_name;
}

/**
* writeByPolling 轮训方式向空闲进程写数据
* @param string $name
* @param string $data
* @return
*/
public static function writeByPolling(string $name, string $data) {
$channel = self::$channels[$name];
return $channel->push($data);
}


/**
* readByProcessName 读取某个进程数据
* @param string $name
Expand Down
2 changes: 1 addition & 1 deletion score/MPHP.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
defined('SWOOLEFY_COM_FUNC') or define('SWOOLEFY_COM_FUNC', 'func');

// 定义版本
defined('SWOOLEFY_VERSION') or define('SWOOLEFY_VERSION', '1.0.4');
defined('SWOOLEFY_VERSION') or define('SWOOLEFY_VERSION', '1.0.5');

class MPHP {

Expand Down

0 comments on commit 19c0320

Please sign in to comment.