-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathworker.class.php
More file actions
173 lines (162 loc) · 3.33 KB
/
Copy pathworker.class.php
File metadata and controls
173 lines (162 loc) · 3.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
<?php
namespace wx;
class Worker
{
//进程数
public $process_num = 0;
//当前进程的id
private $process_id = 0;
//当前进程的pid
private $process_pid = 0;
//进程中需要执行的函数
public $worker_start_fun = null;
//主进程pid
protected static $master_pid = 0;
//主进程中记录子进程
protected static $master_sids = array();
/**
* 构造函数
* @access public
*/
public function __construct()
{
self::$master_pid = posix_getpid();
// declare(ticks = 1); //ticks机制性能比较差
$this->installSignal();
}
/**
* 安装信号处理函数
* @access protected
* @return void
*/
protected function installSignal()
{
pcntl_signal(SIGINT, array($this,'handlerSignal'));
pcntl_signal(SIGUSR1, array($this,'handlerSignal'));
pcntl_signal(SIGUSR2, array($this,'handlerSignal'));
}
/**
* 信号处理函数
* @access public
* @param Int $signo 信号
* @return void
*/
public function handlerSignal($signo)
{
switch ($signo) {
case SIGINT:
$this->stopAll();
break;
case SIGUSR1:
echo 'Caught SIGUSR1...' . PHP_EOL;
break;
case SIGUSR2:
echo 'Caught SIGUSR2...' . PHP_EOL;
break;
}
}
/**
* 卸载信号处理
* @access protected
* @return void
*/
protected function uninstallSignal()
{
pcntl_signal(SIGINT, SIG_IGN);
pcntl_signal(SIGUSR1, SIG_IGN);
pcntl_signal(SIGUSR2, SIG_IGN);
}
/**
* 执行关闭流程
* @access protected
* @return void
*/
protected function stopAll()
{
//主进程
if(self::$master_pid == posix_getpid())
{
//关闭所有的子进程
foreach(self::$master_sids as $master_sid)
{
posix_kill($master_sid, SIGINT);
}
}
//子进程
else
{
exit(0);
}
}
/**
* 创建一个子进程
* @access protected
* @param Int $process_id
*/
protected function forkOneWorker($process_id)
{
//创建进程
$pid = pcntl_fork();
//主进程
if($pid > 0)
self::$master_sids[$pid] = $pid;
//子进程
elseif(0 == $pid)
{
$this->process_id = $process_id;
$this->process_pid = posix_getpid();
self::$master_sids = array();
if($this->worker_start_fun)
call_user_func($this->worker_start_fun);
//子进程退出,防止继续
exit(0);
}
else{
exit('fork one worker failed');
}
}
/**
* 监控所有子进程
* @access protected
* @return bool
*/
protected function monitorWorkers()
{
while(true)
{
//状态码
$status = 0;
//调用信号控制器
pcntl_signal_dispatch();
$pid = pcntl_wait($status,WUNTRACED);
//调用信号控制器
pcntl_signal_dispatch();
//子进程退出
if($pid > 0)
{
//异常退出
if(0 !== $status)
echo 'worker ' . $pid . ' exit with status ' . $status . PHP_EOL;
//去除子进程
unset(self::$master_sids[$pid]);
if(!self::$master_sids)
return true;
}
else
{
return false;
}
}
}
/**
* 运行worker实例
*/
public function run()
{
for ($i = 0; $i < $this->process_num; ++$i)
{
$this->forkOneWorker($i);
}
$this->monitorWorkers();
}
}