48 private const RAKLIB_TPS = 100;
49 private const RAKLIB_TIME_PER_TICK = 1 / self::RAKLIB_TPS;
51 protected int $receiveBytes = 0;
52 protected int $sendBytes = 0;
55 protected array $sessionsByAddress = [];
57 protected array $sessions = [];
61 protected string $name =
"";
63 protected int $packetLimit = 200;
65 protected bool $shutdown =
false;
67 protected int $ticks = 0;
70 protected array $block = [];
72 protected array $ipSec = [];
75 protected array $rawPacketFilters = [];
77 public bool $portChecking =
false;
79 protected int $nextSessionId = 0;
86 protected int $serverId,
89 protected int $maxMtuSize,
94 private int $recvMaxSplitParts = ServerSession::DEFAULT_MAX_SPLIT_PART_COUNT,
95 private int $recvMaxConcurrentSplits = ServerSession::DEFAULT_MAX_CONCURRENT_SPLIT_COUNT
97 if($maxMtuSize < Session::MIN_MTU_SIZE){
98 throw new \InvalidArgumentException(
"MTU size must be at least " . Session::MIN_MTU_SIZE .
", got $maxMtuSize");
100 $this->socket->setBlocking(
false);
105 public function getPort() : int{
106 return $this->socket->getBindAddress()->getPort();
109 public function getMaxMtuSize() : int{
110 return $this->maxMtuSize;
113 public function getLogger() : \
Logger{
114 return $this->logger;
117 public function tickProcessor() : void{
118 $start = microtime(true);
125 $stream = !$this->shutdown;
126 for($i = 0; $i < 100 && $stream && !$this->shutdown; ++$i){
127 $stream = $this->eventSource->process($this);
131 for($i = 0; $i < 100 && $socket; ++$i){
132 $socket = $this->receivePacket();
134 }
while($stream || $socket);
138 $time = microtime(
true) - $start;
139 if($time < self::RAKLIB_TIME_PER_TICK){
140 @time_sleep_until(microtime(
true) + self::RAKLIB_TIME_PER_TICK - $time);
148 $this->shutdown = true;
150 while($this->eventSource->process($this)){
156 foreach($this->sessions as $session){
157 $session->initiateDisconnect(DisconnectReason::SERVER_SHUTDOWN);
160 while(count($this->sessions) > 0){
161 $this->tickProcessor();
164 $this->socket->close();
165 $this->logger->debug(
"Graceful shutdown complete");
168 private function tick() : void{
169 $time = microtime(true);
170 foreach($this->sessions as $session){
171 $session->update($time);
172 if($session->isFullyDisconnected()){
173 $this->removeSessionInternal($session);
179 if(!$this->shutdown and ($this->ticks % self::RAKLIB_TPS) === 0){
180 if($this->sendBytes > 0 or $this->receiveBytes > 0){
181 $this->eventListener->onBandwidthStatsUpdate($this->sendBytes, $this->receiveBytes);
182 $this->sendBytes = 0;
183 $this->receiveBytes = 0;
186 if(count($this->block) > 0){
189 foreach($this->block as $address => $timeout){
190 if($timeout <= $now){
191 unset($this->block[$address]);
203 private function receivePacket() : bool{
205 $buffer = $this->socket->readPacket($addressIp, $addressPort);
206 }
catch(SocketException $e){
207 $error = $e->getCode();
208 if($error === SOCKET_ECONNRESET){
212 $this->logger->debug($e->getMessage());
215 if($buffer ===
null){
218 assert($addressIp !==
null,
"Can't be null if we got a buffer");
219 assert($addressPort !==
null,
"Can't be null if we got a buffer");
221 $len = strlen($buffer);
223 $this->receiveBytes += $len;
224 if(isset($this->block[$addressIp])){
228 if(isset($this->ipSec[$addressIp])){
229 if(++$this->ipSec[$addressIp] >= $this->packetLimit){
230 $this->blockAddress($addressIp);
234 $this->ipSec[$addressIp] = 1;
241 $address =
new InternetAddress($addressIp, $addressPort, $this->socket->getBindAddress()->getVersion());
243 $session = $this->getSessionByAddress($address);
244 if($session !==
null){
245 $header = ord($buffer[0]);
246 if(($header & Datagram::BITFLAG_VALID) !== 0){
247 if(($header & Datagram::BITFLAG_ACK) !== 0){
249 }elseif(($header & Datagram::BITFLAG_NAK) !== 0){
250 $packet =
new NACK();
252 $packet =
new Datagram();
254 $packet->decode(
new PacketSerializer($buffer));
256 $session->handlePacket($packet);
257 }
catch(PacketHandlingException $e){
258 $session->getLogger()->error(
"Error receiving packet: " . $e->getMessage());
259 $session->forciblyDisconnect($e->getDisconnectReason());
262 }elseif($session->isConnected()){
265 $this->logger->debug(
"Ignored unconnected packet from $address due to session already opened (0x" . bin2hex($buffer[0]) .
")");
270 if(!$this->shutdown){
271 if(!($handled = $this->unconnectedMessageHandler->handleRaw($buffer, $address))){
272 foreach($this->rawPacketFilters as $pattern){
273 if(preg_match($pattern, $buffer) > 0){
275 $this->eventListener->onRawPacketReceive($address->getIp(), $address->getPort(), $buffer);
282 $this->logger->debug(
"Ignored packet from $address due to no session opened (0x" . bin2hex($buffer[0]) .
")");
285 }
catch(BinaryDataException $e){
286 $logFn =
function() use ($address, $e, $buffer) :
void{
287 $this->logger->debug(
"Packet from $address (" . strlen($buffer) .
" bytes): 0x" . bin2hex($buffer));
288 $this->logger->debug(get_class($e) .
": " . $e->getMessage() .
" in " . $e->getFile() .
" on line " . $e->getLine());
289 foreach($this->traceCleaner->getTrace(0, $e->getTrace()) as $line){
290 $this->logger->debug($line);
292 $this->logger->error(
"Bad packet from $address: " . $e->getMessage());
295 $this->logger->buffer($logFn);
299 $this->blockAddress($address->getIp(), 5);
305 public function sendPacket(Packet $packet, InternetAddress $address) : void{
306 $out = new PacketSerializer();
307 $packet->encode($out);
309 $this->sendBytes += $this->socket->writePacket($out->getBuffer(), $address->getIp(), $address->getPort());
310 }
catch(SocketException $e){
311 $this->logger->debug($e->getMessage());
315 public function getEventListener() : ServerEventListener{
316 return $this->eventListener;
319 public function sendEncapsulated(
int $sessionId, EncapsulatedPacket $packet,
bool $immediate =
false) : void{
320 $session = $this->sessions[$sessionId] ?? null;
321 if($session !==
null and $session->isConnected()){
322 $session->addEncapsulatedToQueue($packet, $immediate);
326 public function sendRaw(
string $address,
int $port,
string $payload) : void{
328 $this->socket->writePacket($payload, $address, $port);
329 }
catch(SocketException $e){
330 $this->logger->debug($e->getMessage());
334 public function closeSession(
int $sessionId) : void{
335 if(isset($this->sessions[$sessionId])){
336 $this->sessions[$sessionId]->initiateDisconnect(DisconnectReason::SERVER_DISCONNECT);
340 public function setName(
string $name) : void{
344 public function setPortCheck(
bool $value) : void{
345 $this->portChecking = $value;
348 public function setPacketsPerTickLimit(
int $limit) : void{
349 $this->packetLimit = $limit;
352 public function blockAddress(
string $address,
int $timeout = 300) : void{
353 $final = time() + $timeout;
354 if(!isset($this->block[$address]) or $timeout === -1){
356 $final = PHP_INT_MAX;
358 $this->logger->notice(
"Blocked $address for $timeout seconds");
360 $this->block[$address] = $final;
361 }elseif($this->block[$address] < $final){
362 $this->block[$address] = $final;
366 public function unblockAddress(
string $address) : void{
367 unset($this->block[$address]);
368 $this->logger->debug(
"Unblocked $address");
371 public function addRawPacketFilter(
string $regex) : void{
372 $this->rawPacketFilters[] = $regex;
375 public function getSessionByAddress(InternetAddress $address) : ?ServerSession{
376 return $this->sessionsByAddress[$address->toString()] ?? null;
379 public function sessionExists(InternetAddress $address) : bool{
380 return isset($this->sessionsByAddress[$address->toString()]);
383 public function createSession(InternetAddress $address,
int $clientId,
int $mtuSize) : ServerSession{
384 $existingSession = $this->sessionsByAddress[$address->toString()] ?? null;
385 if($existingSession !==
null){
386 $existingSession->forciblyDisconnect(DisconnectReason::CLIENT_RECONNECT);
387 $this->removeSessionInternal($existingSession);
390 $this->checkSessions();
392 while(isset($this->sessions[$this->nextSessionId])){
393 $this->nextSessionId++;
394 $this->nextSessionId &= 0x7fffffff;
397 $session =
new ServerSession($this, $this->logger, clone $address, $clientId, $mtuSize, $this->nextSessionId, $this->recvMaxSplitParts, $this->recvMaxConcurrentSplits);
398 $this->sessionsByAddress[$address->toString()] = $session;
399 $this->sessions[$this->nextSessionId] = $session;
400 $this->logger->debug(
"Created session for $address with MTU size $mtuSize");
405 private function removeSessionInternal(ServerSession $session) : void{
406 unset($this->sessionsByAddress[$session->getAddress()->toString()], $this->sessions[$session->getInternalId()]);
409 public function openSession(ServerSession $session) : void{
410 $address = $session->getAddress();
411 $this->eventListener->onClientConnect($session->getInternalId(), $address->getIp(), $address->getPort(), $session->getID());
414 private function checkSessions() : void{
415 if(count($this->sessions) > 4096){
416 foreach($this->sessions as $sessionId => $session){
417 if($session->isTemporary()){
418 $this->removeSessionInternal($session);
419 if(count($this->sessions) <= 4096){
427 public function getName() : string{
431 public function getID() : int{
432 return $this->serverId;