48 private const WORKER_START_OPTIONS = NativeThread::INHERIT_INI | NativeThread::INHERIT_COMMENTS;
54 private array $workers = [];
60 private array $workerStartHooks = [];
62 public function __construct(
64 private int $workerMemoryLimit,
81 if($newSize > $this->size){
82 $this->size = $newSize;
95 Utils::validateCallableSignature(function(int $worker) : void{}, $hook);
96 $this->workerStartHooks[spl_object_id($hook)] = $hook;
97 foreach($this->workers as $i => $worker){
108 unset($this->workerStartHooks[spl_object_id($hook)]);
117 return array_keys($this->workers);
125 if(!isset($this->workers[$workerId])){
126 $sleeperEntry = $this->eventLoop->addNotifier(
function() use ($workerId) :
void{
127 $this->collectTasksFromWorker($workerId);
129 $this->workers[$workerId] =
new AsyncPoolWorkerEntry(
new AsyncWorker($this->logger, $workerId, $this->workerMemoryLimit, $sleeperEntry), $sleeperEntry->getNotifierId());
130 $this->workers[$workerId]->worker->setClassLoaders([$this->classLoader]);
131 $this->workers[$workerId]->worker->start(self::WORKER_START_OPTIONS);
133 foreach($this->workerStartHooks as $hook){
137 $this->checkCrashedWorker($workerId,
null);
140 return $this->workers[$workerId];
147 if($worker < 0 || $worker >= $this->size){
148 throw new \InvalidArgumentException(
"Invalid worker $worker");
150 if($task->isSubmitted()){
151 throw new \InvalidArgumentException(
"Cannot submit the same AsyncTask instance more than once");
154 $task->setSubmitted();
156 $this->getWorker($worker)->submit($task);
168 $minUsage = PHP_INT_MAX;
169 foreach($this->workers as $i => $entry){
170 if(($usage = $entry->tasks->count()) < $minUsage){
178 if($worker ===
null || ($minUsage > 0 && count($this->workers) < $this->size)){
180 for($i = 0; $i < $this->size; ++$i){
181 if(!isset($this->workers[$i])){
188 assert($worker !==
null);
197 if($task->isSubmitted()){
198 throw new \InvalidArgumentException(
"Cannot submit the same AsyncTask instance more than once");
201 $worker = $this->selectWorker();
202 $this->submitTaskToWorker($task, $worker);
206 private function checkCrashedWorker(
int $workerId, ?AsyncTask $crashedTask) : void{
207 $entry = $this->workers[$workerId];
208 if($entry->worker->isTerminated()){
209 if($crashedTask ===
null){
210 foreach($entry->tasks as $task){
211 if($task->isTerminated()){
212 $crashedTask = $task;
214 }elseif(!$task->isFinished()){
219 $info = $entry->worker->getCrashInfo();
221 if($crashedTask !==
null){
222 $message =
"Worker $workerId crashed while running task " . get_class($crashedTask) .
"#" . spl_object_id($crashedTask);
224 $message =
"Worker $workerId crashed while doing unknown work";
226 throw new ThreadCrashException($message, $info);
228 throw new \RuntimeException(
"Worker $workerId crashed for unknown reason");
240 foreach($this->workers as $workerId => $entry){
241 $this->collectTasksFromWorker($workerId);
245 foreach($this->workers as $entry){
246 if(!$entry->tasks->isEmpty()){
253 public function collectTasksFromWorker(
int $worker) : bool{
254 if(!isset($this->workers[$worker])){
255 throw new \InvalidArgumentException(
"No such worker $worker");
257 $queue = $this->workers[$worker]->tasks;
259 while(!$queue->isEmpty()){
261 $task = $queue->bottom();
262 if($task->isFinished()){
265 if($task->isTerminated()){
266 $this->checkCrashedWorker($worker, $task);
267 throw new AssumptionFailedError(
"checkCrashedWorker() should have thrown an exception, making this unreachable");
278 $this->checkTaskProgressUpdates($task);
279 Timings::getAsyncTaskCompletionTimings($task)->time(
function() use ($task) :
void{
280 $task->onCompletion();
284 $this->checkTaskProgressUpdates($task);
289 $this->workers[$worker]->worker->collect();
300 return array_map(function(
AsyncPoolWorkerEntry $entry) : int{ return $entry->tasks->count(); }, $this->workers);
303 public function shutdownUnusedWorkers() : int{
306 foreach($this->workers as $i => $entry){
307 if($entry->lastUsed + 300 < $time && $entry->tasks->isEmpty()){
308 $entry->worker->quit();
309 $this->eventLoop->removeNotifier($entry->sleeperNotifierId);
310 unset($this->workers[$i]);
322 while($this->collectTasks()){
326 foreach($this->workers as $worker){
327 $worker->worker->quit();
328 $this->eventLoop->removeNotifier($worker->sleeperNotifierId);
333 private function checkTaskProgressUpdates(AsyncTask $task) : void{
334 Timings::getAsyncTaskProgressUpdateTimings($task)->time(function() use ($task) : void{
335 $task->checkProgressUpdates();