2
2
namespace cathy \AsyncTask \command ;
3
3
use cathy \AsyncTask \main \AsyncTaskSynchronizer ;
4
4
use Exception ;
5
+ use RecursiveDirectoryIterator ;
6
+ use RecursiveIteratorIterator ;
5
7
use think \console \Command ;
6
8
use think \console \Input ;
7
9
use think \console \input \Argument ;
8
10
use think \console \Output ;
11
+ use think \facade \App ;
9
12
use think \facade \Config ;
13
+ use Workerman \Lib \Timer ;
10
14
use Workerman \Worker ;
11
15
use function app ;
12
16
13
17
class AsyncTaskService extends Command
14
18
{
19
+ protected $ lastMtime ;
20
+
15
21
public function configure ()
16
22
{
17
23
$ this ->setName ('worker:task ' )
@@ -53,24 +59,39 @@ public function execute(Input $input, Output $output)
53
59
} else {
54
60
$ port = !empty ($ option ['port ' ]) ? $ option ['port ' ] : $ configs ['port ' ];
55
61
}
56
- if (empty ($ option ['process_number ' ])){
57
- $ option ['process_number ' ] = $ configs ['process_number ' ];
62
+
63
+ if ($ input ->hasOption ('daemon ' )){
64
+ $ option ['daemon ' ] = true ;
65
+ } else {
66
+ $ option ['daemon ' ] = !empty ($ option ['daemon ' ]) ? $ option ['daemon ' ] : $ configs ['daemon ' ];
67
+ }
68
+
69
+ if (empty ($ option ['count ' ])){
70
+ $ option ['count ' ] = $ configs ['count ' ];
58
71
}
72
+
59
73
if (empty ($ option ['name ' ])){
60
74
$ option ['name ' ] = $ configs ['name ' ];
61
75
}
76
+
62
77
if (empty ($ option ['reuse_port ' ])){
63
- if (isset ($ option ['reuse_port ' ])){
64
- $ option ['reuse_port ' ] = false ;
65
- }else {
66
- $ option ['reuse_port ' ] = $ configs ['reuse_port ' ];
67
- }
78
+ $ option ['reuse_port ' ] = $ configs ['reuse_port ' ];
68
79
}
80
+
69
81
if (empty ($ option ['synchronizer_ttl ' ])){
70
82
$ option ['synchronizer_ttl ' ] = $ configs ['synchronizer_ttl ' ];
71
83
}
72
- if ($ input ->hasOption ('daemon ' )){
73
- $ option ['daemon ' ] = true ;
84
+
85
+ if (empty ($ option ['file_monitor ' ])){
86
+ $ option ['file_monitor ' ] = $ configs ['file_monitor ' ];
87
+ }
88
+
89
+ if (empty ($ option ['file_monitor_interval ' ])){
90
+ $ option ['file_monitor_interval ' ] = $ configs ['file_monitor_interval ' ];
91
+ }
92
+
93
+ if (empty ($ option ['file_monitor_path ' ])){
94
+ $ option ['file_monitor_path ' ] = $ configs ['file_monitor_path ' ];
74
95
}
75
96
76
97
$ this ->start ($ host , (int ) $ port , $ option );
@@ -81,9 +102,32 @@ public function start(string $host, int $port, array $option = []){
81
102
Worker::$ daemonize = true ;
82
103
}
83
104
$ task_worker = new Worker ('tcp:// ' .$ host .': ' .$ port );
84
- $ task_worker ->count = $ option ['process_number ' ]; // 进程数
105
+ $ task_worker ->count = $ option ['count ' ]; // 进程数
85
106
$ task_worker ->name = $ option ['name ' ]; // 名称
86
107
$ task_worker ->reusePort = $ option ['reuse_port ' ]; // 根据配置开启端口复用,让每一个任务进程平衡异步任务,仅php7支持
108
+ // 设置文件监控
109
+ if (DIRECTORY_SEPARATOR !== '\\' && App::isDebug () && $ option ['file_monitor ' ] && 0 == $ task_worker ->id ) {
110
+ $ timer = $ option ['file_monitor_interval ' ] ?: 2 ;
111
+ $ paths = !empty ($ option ['file_monitor_path ' ]) ? $ option ['file_monitor_path ' ] : [App::getAppPath (), App::getConfigPath ()];
112
+ Timer::add ($ timer , function () use ($ paths ) {
113
+ foreach ($ paths as $ path ) {
114
+ $ dir = new RecursiveDirectoryIterator ($ path );
115
+ $ iterator = new RecursiveIteratorIterator ($ dir );
116
+ foreach ($ iterator as $ file ) {
117
+ if (pathinfo ($ file , PATHINFO_EXTENSION ) != 'php ' ) {
118
+ continue ;
119
+ }
120
+
121
+ if ($ this ->lastMtime < $ file ->getMTime ()) {
122
+ echo '[update] ' . $ file . "\n" ;
123
+ posix_kill (posix_getppid (), SIGUSR1 );
124
+ $ this ->lastMtime = $ file ->getMTime ();
125
+ return ;
126
+ }
127
+ }
128
+ }
129
+ });
130
+ }
87
131
$ task_worker ->onMessage = function ($ connection , $ data ) use ($ option ) {
88
132
$ data = json_decode ($ data ,true );
89
133
$ taskClass = $ data ['taskClass ' ];
0 commit comments