-
Notifications
You must be signed in to change notification settings - Fork 2
/
Coroutine.php
105 lines (90 loc) · 2.92 KB
/
Coroutine.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
<?php
class Coroutine{
protected static $_ioQueue;
protected static $_mode; //server 为swoole_server模式,cli为普通cli模式
protected static $_maxTaskId = 0;
protected static $_coroutines = array();
public static function init($mode = "cli"){
self::$_mode = $mode;
self::$_ioQueue = array("Mysql","Http","Tcp","WebSocket");
}
public static function task($coroutine){
self::wrap(self::_wrap($coroutine));
}
public static function wrap($coroutine){
if($coroutine instanceof \Generator){
$coKey = $coroutine->key();
$coValue = $coroutine->current();
if($coKey && in_array($coKey,self::$_ioQueue)){
try{
$client = \Pool::get($coKey,$coValue);
if($client){
$client->setCoroutine($coroutine);
$coroutine->send($client);
}else{
Queue::push($coKey,$coroutine);
}
}catch(exception $e){
$coroutine->throw($e);
}
}else{
if($coValue instanceof \Coroutine\Base){
$coValue->setCoroutine($coroutine);
}
}
}
return $coroutine;
}
protected static function _wrap($coroutine){
$taskId = self::newTaskId();
$coroutine = (function($taskId,$coroutine){
$resp = yield from $coroutine;
\Coroutine::unregister($taskId);
})($taskId,$coroutine);
self::register($taskId,$coroutine);
return $coroutine;
}
public static function newTaskId(){
return ++self::$_maxTaskId;
}
public static function register($taskId,$coroutine){
self::$_coroutines[$taskId] = $coroutine;
}
public static function unregister($taskId){
unset(self::$_coroutines[$taskId]);
if(!self::$_coroutines){
self::exit();
}
}
public static function resume($coroutine){
$wrapCurrent = true;
$coKey = $coroutine->key();
if($coKey && in_array($coKey,self::$_ioQueue)){
//io中断,执行队列不为空则将当前协程入栈
if(!Queue::isEmpty($coKey)){
Queue::push($coKey,$coroutine);
$wrapCurrent = false;
}
}
if($wrapCurrent){
//是否立即执行当前协程
self::wrap($coroutine);
}
}
public static function next($queueName){
$wrapCoroutine = Queue::pop($queueName);
if($wrapCoroutine){
self::wrap($wrapCoroutine);
}
}
public static function wait(){
swoole_event_wait();
}
public static function exit(){
if(self::$_mode == "cli"){
//swoole_server模式下不进程event_exit
swoole_event_exit();
}
}
}
?>