PocketMine-MP 5.23.3 git-976fc63567edab7a6fb6aeae739f43cf9fe57de4
Loading...
Searching...
No Matches
AsyncPool.php
1<?php
2
3/*
4 *
5 * ____ _ _ __ __ _ __ __ ____
6 * | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
7 * | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
8 * | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
9 * |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
10 *
11 * This program is free software: you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation, either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * @author PocketMine Team
17 * @link http://www.pocketmine.net/
18 *
19 *
20 */
21
22declare(strict_types=1);
23
25
26use pmmp\thread\Thread as NativeThread;
34use function array_keys;
35use function array_map;
36use function assert;
37use function count;
38use function get_class;
39use function spl_object_id;
40use function time;
41use const PHP_INT_MAX;
42
48 private const WORKER_START_OPTIONS = NativeThread::INHERIT_INI | NativeThread::INHERIT_COMMENTS;
49
54 private array $workers = [];
55
60 private array $workerStartHooks = [];
61
62 public function __construct(
63 protected int $size,
64 private int $workerMemoryLimit,
65 private ThreadSafeClassLoader $classLoader,
66 private ThreadSafeLogger $logger,
67 private SleeperHandler $eventLoop
68 ){}
69
73 public function getSize() : int{
74 return $this->size;
75 }
76
80 public function increaseSize(int $newSize) : void{
81 if($newSize > $this->size){
82 $this->size = $newSize;
83 }
84 }
85
94 public function addWorkerStartHook(\Closure $hook) : void{
95 Utils::validateCallableSignature(function(int $worker) : void{}, $hook);
96 $this->workerStartHooks[spl_object_id($hook)] = $hook;
97 foreach($this->workers as $i => $worker){
98 $hook($i);
99 }
100 }
101
107 public function removeWorkerStartHook(\Closure $hook) : void{
108 unset($this->workerStartHooks[spl_object_id($hook)]);
109 }
110
116 public function getRunningWorkers() : array{
117 return array_keys($this->workers);
118 }
119
124 private function getWorker(int $workerId) : AsyncPoolWorkerEntry{
125 if(!isset($this->workers[$workerId])){
126 $sleeperEntry = $this->eventLoop->addNotifier(function() use ($workerId) : void{
127 $this->collectTasksFromWorker($workerId);
128 });
129 $this->workers[$workerId] = new AsyncPoolWorkerEntry(new AsyncWorker($this->logger, $workerId, $this->workerMemoryLimit, $sleeperEntry), $sleeperEntry->getNotifierId());
130 $this->workers[$workerId]->worker->setClassLoaders([$this->classLoader]);
131 $this->workers[$workerId]->worker->start(self::WORKER_START_OPTIONS);
132
133 foreach($this->workerStartHooks as $hook){
134 $hook($workerId);
135 }
136 }else{
137 $this->checkCrashedWorker($workerId, null);
138 }
139
140 return $this->workers[$workerId];
141 }
142
146 public function submitTaskToWorker(AsyncTask $task, int $worker) : void{
147 if($worker < 0 || $worker >= $this->size){
148 throw new \InvalidArgumentException("Invalid worker $worker");
149 }
150 if($task->isSubmitted()){
151 throw new \InvalidArgumentException("Cannot submit the same AsyncTask instance more than once");
152 }
153
154 $task->setSubmitted();
155
156 $this->getWorker($worker)->submit($task);
157 }
158
166 public function selectWorker() : int{
167 $worker = null;
168 $minUsage = PHP_INT_MAX;
169 foreach($this->workers as $i => $entry){
170 if(($usage = $entry->tasks->count()) < $minUsage){
171 $worker = $i;
172 $minUsage = $usage;
173 if($usage === 0){
174 break;
175 }
176 }
177 }
178 if($worker === null || ($minUsage > 0 && count($this->workers) < $this->size)){
179 //select a worker to start on the fly
180 for($i = 0; $i < $this->size; ++$i){
181 if(!isset($this->workers[$i])){
182 $worker = $i;
183 break;
184 }
185 }
186 }
187
188 assert($worker !== null);
189 return $worker;
190 }
191
196 public function submitTask(AsyncTask $task) : int{
197 if($task->isSubmitted()){
198 throw new \InvalidArgumentException("Cannot submit the same AsyncTask instance more than once");
199 }
200
201 $worker = $this->selectWorker();
202 $this->submitTaskToWorker($task, $worker);
203 return $worker;
204 }
205
206 private function checkCrashedWorker(int $workerId, ?AsyncTask $crashedTask) : void{
207 $entry = $this->workers[$workerId];
208 if($entry->worker->isTerminated()){
209 if($crashedTask === null){
210 foreach($entry->tasks as $task){
211 if($task->isTerminated()){
212 $crashedTask = $task;
213 break;
214 }elseif(!$task->isFinished()){
215 break;
216 }
217 }
218 }
219 $info = $entry->worker->getCrashInfo();
220 if($info !== null){
221 if($crashedTask !== null){
222 $message = "Worker $workerId crashed while running task " . get_class($crashedTask) . "#" . spl_object_id($crashedTask);
223 }else{
224 $message = "Worker $workerId crashed while doing unknown work";
225 }
226 throw new ThreadCrashException($message, $info);
227 }else{
228 throw new \RuntimeException("Worker $workerId crashed for unknown reason");
229 }
230 }
231 }
232
239 public function collectTasks() : bool{
240 foreach($this->workers as $workerId => $entry){
241 $this->collectTasksFromWorker($workerId);
242 }
243
244 //we check this in a second loop, because task collection could have caused new tasks to be added to the queues
245 foreach($this->workers as $entry){
246 if(!$entry->tasks->isEmpty()){
247 return true;
248 }
249 }
250 return false;
251 }
252
253 public function collectTasksFromWorker(int $worker) : bool{
254 if(!isset($this->workers[$worker])){
255 throw new \InvalidArgumentException("No such worker $worker");
256 }
257 $queue = $this->workers[$worker]->tasks;
258 $more = false;
259 while(!$queue->isEmpty()){
261 $task = $queue->bottom();
262 if($task->isFinished()){ //make sure the task actually executed before trying to collect
263 $queue->dequeue();
264
265 if($task->isTerminated()){
266 $this->checkCrashedWorker($worker, $task);
267 throw new AssumptionFailedError("checkCrashedWorker() should have thrown an exception, making this unreachable");
268 }else{
269 Timings::getAsyncTaskCompletionTimings($task)->time(function() use ($task) : void{
270 $task->onCompletion();
271 });
272 }
273 }else{
274 $more = true;
275 break; //current task is still running, skip to next worker
276 }
277 }
278 $this->workers[$worker]->worker->collect();
279 return $more;
280 }
281
288 public function getTaskQueueSizes() : array{
289 return array_map(function(AsyncPoolWorkerEntry $entry) : int{ return $entry->tasks->count(); }, $this->workers);
290 }
291
292 public function shutdownUnusedWorkers() : int{
293 $ret = 0;
294 $time = time();
295 foreach($this->workers as $i => $entry){
296 if($entry->lastUsed + 300 < $time && $entry->tasks->isEmpty()){
297 $entry->worker->quit();
298 $this->eventLoop->removeNotifier($entry->sleeperNotifierId);
299 unset($this->workers[$i]);
300 $ret++;
301 }
302 }
303
304 return $ret;
305 }
306
310 public function shutdown() : void{
311 while($this->collectTasks()){
312 //NOOP
313 }
314
315 foreach($this->workers as $worker){
316 $worker->worker->quit();
317 $this->eventLoop->removeNotifier($worker->sleeperNotifierId);
318 }
319 $this->workers = [];
320 }
321}
addWorkerStartHook(\Closure $hook)
Definition AsyncPool.php:94
removeWorkerStartHook(\Closure $hook)
submitTaskToWorker(AsyncTask $task, int $worker)