PocketMine-MP 5.27.2 git-d86943fa8c6384be3e2c1901ebf94f584b27e784
Loading...
Searching...
No Matches
vendor/pocketmine/raklib/src/server/Server.php
1<?php
2
3/*
4 * This file is part of RakLib.
5 * Copyright (C) 2014-2022 PocketMine Team <https://github.com/pmmp/RakLib>
6 *
7 * RakLib is not affiliated with Jenkins Software LLC nor RakNet.
8 *
9 * RakLib is free software: you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation, either version 3 of the License, or
12 * (at your option) any later version.
13 */
14
15declare(strict_types=1);
16
17namespace raklib\server;
18
32use function asort;
33use function assert;
34use function bin2hex;
35use function count;
36use function get_class;
37use function microtime;
38use function ord;
39use function preg_match;
40use function strlen;
41use function time;
42use function time_sleep_until;
43use const PHP_INT_MAX;
44use const SOCKET_ECONNRESET;
45
46class Server implements ServerInterface{
47
48 private const RAKLIB_TPS = 100;
49 private const RAKLIB_TIME_PER_TICK = 1 / self::RAKLIB_TPS;
50
51 protected int $receiveBytes = 0;
52 protected int $sendBytes = 0;
53
55 protected array $sessionsByAddress = [];
57 protected array $sessions = [];
58
59 protected UnconnectedMessageHandler $unconnectedMessageHandler;
60
61 protected string $name = "";
62
63 protected int $packetLimit = 200;
64
65 protected bool $shutdown = false;
66
67 protected int $ticks = 0;
68
70 protected array $block = [];
72 protected array $ipSec = [];
73
75 protected array $rawPacketFilters = [];
76
77 public bool $portChecking = false;
78
79 protected int $nextSessionId = 0;
80
85 public function __construct(
86 protected int $serverId,
87 protected \Logger $logger,
88 protected ServerSocket $socket,
89 protected int $maxMtuSize,
90 ProtocolAcceptor $protocolAcceptor,
91 private ServerEventSource $eventSource,
92 private ServerEventListener $eventListener,
93 private ExceptionTraceCleaner $traceCleaner,
94 private int $recvMaxSplitParts = ServerSession::DEFAULT_MAX_SPLIT_PART_COUNT,
95 private int $recvMaxConcurrentSplits = ServerSession::DEFAULT_MAX_CONCURRENT_SPLIT_COUNT
96 ){
97 if($maxMtuSize < Session::MIN_MTU_SIZE){
98 throw new \InvalidArgumentException("MTU size must be at least " . Session::MIN_MTU_SIZE . ", got $maxMtuSize");
99 }
100 $this->socket->setBlocking(false);
101
102 $this->unconnectedMessageHandler = new UnconnectedMessageHandler($this, $protocolAcceptor);
103 }
104
105 public function getPort() : int{
106 return $this->socket->getBindAddress()->getPort();
107 }
108
109 public function getMaxMtuSize() : int{
110 return $this->maxMtuSize;
111 }
112
113 public function getLogger() : \Logger{
114 return $this->logger;
115 }
116
117 public function tickProcessor() : void{
118 $start = microtime(true);
119
120 /*
121 * The below code is designed to allow co-op between sending and receiving to avoid slowing down either one
122 * when high traffic is coming either way. Yielding will occur after 100 messages.
123 */
124 do{
125 $stream = !$this->shutdown;
126 for($i = 0; $i < 100 && $stream && !$this->shutdown; ++$i){ //if we received a shutdown event, we don't care about any more messages from the event source
127 $stream = $this->eventSource->process($this);
128 }
129
130 $socket = true;
131 for($i = 0; $i < 100 && $socket; ++$i){
132 $socket = $this->receivePacket();
133 }
134 }while($stream || $socket);
135
136 $this->tick();
137
138 $time = microtime(true) - $start;
139 if($time < self::RAKLIB_TIME_PER_TICK){
140 @time_sleep_until(microtime(true) + self::RAKLIB_TIME_PER_TICK - $time);
141 }
142 }
143
147 public function waitShutdown() : void{
148 $this->shutdown = true;
149
150 while($this->eventSource->process($this)){
151 //Ensure that any late messages are processed before we start initiating server disconnects, so that if the
152 //server implementation used a custom disconnect mechanism (e.g. a server transfer), we don't break it in
153 //race conditions.
154 }
155
156 foreach($this->sessions as $session){
157 $session->initiateDisconnect(DisconnectReason::SERVER_SHUTDOWN);
158 }
159
160 while(count($this->sessions) > 0){
161 $this->tickProcessor();
162 }
163
164 $this->socket->close();
165 $this->logger->debug("Graceful shutdown complete");
166 }
167
168 private function tick() : void{
169 $time = microtime(true);
170 foreach($this->sessions as $session){
171 $session->update($time);
172 if($session->isFullyDisconnected()){
173 $this->removeSessionInternal($session);
174 }
175 }
176
177 $this->ipSec = [];
178
179 if(!$this->shutdown and ($this->ticks % self::RAKLIB_TPS) === 0){
180 if($this->sendBytes > 0 or $this->receiveBytes > 0){
181 $this->eventListener->onBandwidthStatsUpdate($this->sendBytes, $this->receiveBytes);
182 $this->sendBytes = 0;
183 $this->receiveBytes = 0;
184 }
185
186 if(count($this->block) > 0){
187 asort($this->block);
188 $now = time();
189 foreach($this->block as $address => $timeout){
190 if($timeout <= $now){
191 unset($this->block[$address]);
192 }else{
193 break;
194 }
195 }
196 }
197 }
198
199 ++$this->ticks;
200 }
201
203 private function receivePacket() : bool{
204 try{
205 $buffer = $this->socket->readPacket($addressIp, $addressPort);
206 }catch(SocketException $e){
207 $error = $e->getCode();
208 if($error === SOCKET_ECONNRESET){ //client disconnected improperly, maybe crash or lost connection
209 return true;
210 }
211
212 $this->logger->debug($e->getMessage());
213 return false;
214 }
215 if($buffer === null){
216 return false; //no data
217 }
218 assert($addressIp !== null, "Can't be null if we got a buffer");
219 assert($addressPort !== null, "Can't be null if we got a buffer");
220
221 $len = strlen($buffer);
222
223 $this->receiveBytes += $len;
224 if(isset($this->block[$addressIp])){
225 return true;
226 }
227
228 if(isset($this->ipSec[$addressIp])){
229 if(++$this->ipSec[$addressIp] >= $this->packetLimit){
230 $this->blockAddress($addressIp);
231 return true;
232 }
233 }else{
234 $this->ipSec[$addressIp] = 1;
235 }
236
237 if($len < 1){
238 return true;
239 }
240
241 $address = new InternetAddress($addressIp, $addressPort, $this->socket->getBindAddress()->getVersion());
242 try{
243 $session = $this->getSessionByAddress($address);
244 if($session !== null){
245 $header = ord($buffer[0]);
246 if(($header & Datagram::BITFLAG_VALID) !== 0){
247 if(($header & Datagram::BITFLAG_ACK) !== 0){
248 $packet = new ACK();
249 }elseif(($header & Datagram::BITFLAG_NAK) !== 0){
250 $packet = new NACK();
251 }else{
252 $packet = new Datagram();
253 }
254 $packet->decode(new PacketSerializer($buffer));
255 try{
256 $session->handlePacket($packet);
257 }catch(PacketHandlingException $e){
258 $session->getLogger()->error("Error receiving packet: " . $e->getMessage());
259 $session->forciblyDisconnect($e->getDisconnectReason());
260 }
261 return true;
262 }elseif($session->isConnected()){
263 //allows unconnected packets if the session is stuck in DISCONNECTING state, useful if the client
264 //didn't disconnect properly for some reason (e.g. crash)
265 $this->logger->debug("Ignored unconnected packet from $address due to session already opened (0x" . bin2hex($buffer[0]) . ")");
266 return true;
267 }
268 }
269
270 if(!$this->shutdown){
271 if(!($handled = $this->unconnectedMessageHandler->handleRaw($buffer, $address))){
272 foreach($this->rawPacketFilters as $pattern){
273 if(preg_match($pattern, $buffer) > 0){
274 $handled = true;
275 $this->eventListener->onRawPacketReceive($address->getIp(), $address->getPort(), $buffer);
276 break;
277 }
278 }
279 }
280
281 if(!$handled){
282 $this->logger->debug("Ignored packet from $address due to no session opened (0x" . bin2hex($buffer[0]) . ")");
283 }
284 }
285 }catch(BinaryDataException $e){
286 $logFn = function() use ($address, $e, $buffer) : void{
287 $this->logger->debug("Packet from $address (" . strlen($buffer) . " bytes): 0x" . bin2hex($buffer));
288 $this->logger->debug(get_class($e) . ": " . $e->getMessage() . " in " . $e->getFile() . " on line " . $e->getLine());
289 foreach($this->traceCleaner->getTrace(0, $e->getTrace()) as $line){
290 $this->logger->debug($line);
291 }
292 $this->logger->error("Bad packet from $address: " . $e->getMessage());
293 };
294 if($this->logger instanceof \BufferedLogger){
295 $this->logger->buffer($logFn);
296 }else{
297 $logFn();
298 }
299 $this->blockAddress($address->getIp(), 5);
300 }
301
302 return true;
303 }
304
305 public function sendPacket(Packet $packet, InternetAddress $address) : void{
306 $out = new PacketSerializer(); //TODO: reusable streams to reduce allocations
307 $packet->encode($out);
308 try{
309 $this->sendBytes += $this->socket->writePacket($out->getBuffer(), $address->getIp(), $address->getPort());
310 }catch(SocketException $e){
311 $this->logger->debug($e->getMessage());
312 }
313 }
314
315 public function getEventListener() : ServerEventListener{
316 return $this->eventListener;
317 }
318
319 public function sendEncapsulated(int $sessionId, EncapsulatedPacket $packet, bool $immediate = false) : void{
320 $session = $this->sessions[$sessionId] ?? null;
321 if($session !== null and $session->isConnected()){
322 $session->addEncapsulatedToQueue($packet, $immediate);
323 }
324 }
325
326 public function sendRaw(string $address, int $port, string $payload) : void{
327 try{
328 $this->socket->writePacket($payload, $address, $port);
329 }catch(SocketException $e){
330 $this->logger->debug($e->getMessage());
331 }
332 }
333
334 public function closeSession(int $sessionId) : void{
335 if(isset($this->sessions[$sessionId])){
336 $this->sessions[$sessionId]->initiateDisconnect(DisconnectReason::SERVER_DISCONNECT);
337 }
338 }
339
340 public function setName(string $name) : void{
341 $this->name = $name;
342 }
343
344 public function setPortCheck(bool $value) : void{
345 $this->portChecking = $value;
346 }
347
348 public function setPacketsPerTickLimit(int $limit) : void{
349 $this->packetLimit = $limit;
350 }
351
352 public function blockAddress(string $address, int $timeout = 300) : void{
353 $final = time() + $timeout;
354 if(!isset($this->block[$address]) or $timeout === -1){
355 if($timeout === -1){
356 $final = PHP_INT_MAX;
357 }else{
358 $this->logger->notice("Blocked $address for $timeout seconds");
359 }
360 $this->block[$address] = $final;
361 }elseif($this->block[$address] < $final){
362 $this->block[$address] = $final;
363 }
364 }
365
366 public function unblockAddress(string $address) : void{
367 unset($this->block[$address]);
368 $this->logger->debug("Unblocked $address");
369 }
370
371 public function addRawPacketFilter(string $regex) : void{
372 $this->rawPacketFilters[] = $regex;
373 }
374
375 public function getSessionByAddress(InternetAddress $address) : ?ServerSession{
376 return $this->sessionsByAddress[$address->toString()] ?? null;
377 }
378
379 public function sessionExists(InternetAddress $address) : bool{
380 return isset($this->sessionsByAddress[$address->toString()]);
381 }
382
383 public function createSession(InternetAddress $address, int $clientId, int $mtuSize) : ServerSession{
384 $existingSession = $this->sessionsByAddress[$address->toString()] ?? null;
385 if($existingSession !== null){
386 $existingSession->forciblyDisconnect(DisconnectReason::CLIENT_RECONNECT);
387 $this->removeSessionInternal($existingSession);
388 }
389
390 $this->checkSessions();
391
392 while(isset($this->sessions[$this->nextSessionId])){
393 $this->nextSessionId++;
394 $this->nextSessionId &= 0x7fffffff; //we don't expect more than 2 billion simultaneous connections, and this fits in 4 bytes
395 }
396
397 $session = new ServerSession($this, $this->logger, clone $address, $clientId, $mtuSize, $this->nextSessionId, $this->recvMaxSplitParts, $this->recvMaxConcurrentSplits);
398 $this->sessionsByAddress[$address->toString()] = $session;
399 $this->sessions[$this->nextSessionId] = $session;
400 $this->logger->debug("Created session for $address with MTU size $mtuSize");
401
402 return $session;
403 }
404
405 private function removeSessionInternal(ServerSession $session) : void{
406 unset($this->sessionsByAddress[$session->getAddress()->toString()], $this->sessions[$session->getInternalId()]);
407 }
408
409 public function openSession(ServerSession $session) : void{
410 $address = $session->getAddress();
411 $this->eventListener->onClientConnect($session->getInternalId(), $address->getIp(), $address->getPort(), $session->getID());
412 }
413
414 private function checkSessions() : void{
415 if(count($this->sessions) > 4096){
416 foreach($this->sessions as $sessionId => $session){
417 if($session->isTemporary()){
418 $this->removeSessionInternal($session);
419 if(count($this->sessions) <= 4096){
420 break;
421 }
422 }
423 }
424 }
425 }
426
427 public function getName() : string{
428 return $this->name;
429 }
430
431 public function getID() : int{
432 return $this->serverId;
433 }
434}
__construct(protected int $serverId, protected \Logger $logger, protected ServerSocket $socket, protected int $maxMtuSize, ProtocolAcceptor $protocolAcceptor, private ServerEventSource $eventSource, private ServerEventListener $eventListener, private ExceptionTraceCleaner $traceCleaner, private int $recvMaxSplitParts=ServerSession::DEFAULT_MAX_SPLIT_PART_COUNT, private int $recvMaxConcurrentSplits=ServerSession::DEFAULT_MAX_CONCURRENT_SPLIT_COUNT)