PocketMine-MP 5.39.3 git-21ae710729750cd637333d673bbbbbc598fc659e
Loading...
Searching...
No Matches
ReceiveReliabilityLayer.php
1<?php
2
3/*
4 * This file is part of RakLib.
5 * Copyright (C) 2014-2022 PocketMine Team <https://github.com/pmmp/RakLib>
6 *
7 * RakLib is not affiliated with Jenkins Software LLC nor RakNet.
8 *
9 * RakLib is free software: you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation, either version 3 of the License, or
12 * (at your option) any later version.
13 */
14
15declare(strict_types=1);
16
17namespace raklib\generic;
18
25use function array_fill;
26use function assert;
27use function count;
28
30
31 public static int $WINDOW_SIZE = 2048;
32
33 private int $windowStart;
34 private int $windowEnd;
35 private int $highestSeqNumber = -1;
36
38 private array $ACKQueue = [];
40 private array $NACKQueue = [];
41
42 private int $reliableWindowStart;
43 private int $reliableWindowEnd;
45 private array $reliableWindow = [];
46
48 private array $receiveOrderedIndex;
50 private array $receiveSequencedHighestIndex;
52 private array $receiveOrderedPackets;
53
55 private array $splitPackets = [];
56
62 public function __construct(
63 private \Logger $logger,
64 private \Closure $onRecv,
65 private \Closure $sendPacket,
66 private int $maxSplitPacketPartCount = PHP_INT_MAX,
67 private int $maxConcurrentSplitPackets = PHP_INT_MAX
68 ){
69 $this->windowStart = 0;
70 $this->windowEnd = self::$WINDOW_SIZE;
71
72 $this->reliableWindowStart = 0;
73 $this->reliableWindowEnd = self::$WINDOW_SIZE;
74
75 $this->receiveOrderedIndex = array_fill(0, PacketReliability::MAX_ORDER_CHANNELS, 0);
76 $this->receiveSequencedHighestIndex = array_fill(0, PacketReliability::MAX_ORDER_CHANNELS, 0);
77
78 $this->receiveOrderedPackets = array_fill(0, PacketReliability::MAX_ORDER_CHANNELS, []);
79 }
80
81 private function handleEncapsulatedPacketRoute(EncapsulatedPacket $pk) : void{
82 ($this->onRecv)($pk);
83 }
84
96 private function handleSplit(EncapsulatedPacket $packet) : ?EncapsulatedPacket{
97 if($packet->splitInfo === null){
98 return $packet;
99 }
100 $totalParts = $packet->splitInfo->getTotalPartCount();
101 $partIndex = $packet->splitInfo->getPartIndex();
102 if($totalParts >= $this->maxSplitPacketPartCount || $totalParts < 0){
103 throw new PacketHandlingException("Invalid split packet part count ($totalParts)", DisconnectReason::SPLIT_PACKET_TOO_LARGE);
104 }
105 if($partIndex >= $totalParts || $partIndex < 0){
106 throw new PacketHandlingException("Invalid split packet part index (part index $partIndex, part count $totalParts)", DisconnectReason::SPLIT_PACKET_INVALID_PART_INDEX);
107 }
108
109 $splitId = $packet->splitInfo->getId();
110 if(!isset($this->splitPackets[$splitId])){
111 if(count($this->splitPackets) >= $this->maxConcurrentSplitPackets){
112 throw new PacketHandlingException("Exceeded concurrent split packet reassembly limit of $this->maxConcurrentSplitPackets", DisconnectReason::SPLIT_PACKET_TOO_MANY_CONCURRENT);
113 }
114 $this->splitPackets[$splitId] = array_fill(0, $totalParts, null);
115 }elseif(count($this->splitPackets[$splitId]) !== $totalParts){
116 throw new PacketHandlingException("Wrong split count $totalParts for split packet $splitId, expected " . count($this->splitPackets[$splitId]), DisconnectReason::SPLIT_PACKET_INCONSISTENT_HEADER);
117 }
118
119 $this->splitPackets[$splitId][$partIndex] = $packet;
120
121 $parts = [];
122 foreach($this->splitPackets[$splitId] as $splitIndex => $part){
123 if($part === null){
124 return null;
125 }
126 $parts[$splitIndex] = $part;
127 }
128
129 //got all parts, reassemble the packet
130 $pk = new EncapsulatedPacket();
131 $pk->buffer = "";
132
133 $pk->reliability = $packet->reliability;
134 $pk->messageIndex = $packet->messageIndex;
135 $pk->sequenceIndex = $packet->sequenceIndex;
136 $pk->orderIndex = $packet->orderIndex;
137 $pk->orderChannel = $packet->orderChannel;
138
139 for($i = 0; $i < $totalParts; ++$i){
140 $pk->buffer .= $parts[$i]->buffer;
141 }
142
143 unset($this->splitPackets[$splitId]);
144
145 return $pk;
146 }
147
151 private function handleEncapsulatedPacket(EncapsulatedPacket $packet) : void{
152 if($packet->messageIndex !== null){
153 //check for duplicates or out of range
154 if($packet->messageIndex < $this->reliableWindowStart or $packet->messageIndex > $this->reliableWindowEnd or isset($this->reliableWindow[$packet->messageIndex])){
155 return;
156 }
157
158 $this->reliableWindow[$packet->messageIndex] = true;
159
160 if($packet->messageIndex === $this->reliableWindowStart){
161 for(; isset($this->reliableWindow[$this->reliableWindowStart]); ++$this->reliableWindowStart){
162 unset($this->reliableWindow[$this->reliableWindowStart]);
163 ++$this->reliableWindowEnd;
164 }
165 }
166 }
167
168 if(($packet = $this->handleSplit($packet)) === null){
169 return;
170 }
171
172 if(PacketReliability::isSequencedOrOrdered($packet->reliability) and ($packet->orderChannel < 0 or $packet->orderChannel >= PacketReliability::MAX_ORDER_CHANNELS)){
173 //TODO: this should result in peer banning
174 $this->logger->debug("Invalid packet, bad order channel ($packet->orderChannel)");
175 return;
176 }
177
178 if(PacketReliability::isSequenced($packet->reliability)){
179 assert($packet->orderChannel !== null && $packet->sequenceIndex !== null, 'These should have been set during decode');
180 if($packet->sequenceIndex < $this->receiveSequencedHighestIndex[$packet->orderChannel] or $packet->orderIndex < $this->receiveOrderedIndex[$packet->orderChannel]){
181 //too old sequenced packet, discard it
182 return;
183 }
184
185 $this->receiveSequencedHighestIndex[$packet->orderChannel] = $packet->sequenceIndex + 1;
186 $this->handleEncapsulatedPacketRoute($packet);
187 }elseif(PacketReliability::isOrdered($packet->reliability)){
188 $orderChannel = $packet->orderChannel;
189 assert($orderChannel !== null, 'This should have been set during decode');
190 if($packet->orderIndex === $this->receiveOrderedIndex[$orderChannel]){
191 //this is the packet we expected to get next
192 //Any ordered packet resets the sequence index to zero, so that sequenced packets older than this ordered
193 //one get discarded. Sequenced packets also include (but don't increment) the order index, so a sequenced
194 //packet with an order index less than this will get discarded
195 $this->receiveSequencedHighestIndex[$orderChannel] = 0;
196 $this->receiveOrderedIndex[$orderChannel] = $packet->orderIndex + 1;
197
198 $this->handleEncapsulatedPacketRoute($packet);
199 $i = $this->receiveOrderedIndex[$orderChannel];
200 for(; isset($this->receiveOrderedPackets[$orderChannel][$i]); ++$i){
201 $this->handleEncapsulatedPacketRoute($this->receiveOrderedPackets[$orderChannel][$i]);
202 unset($this->receiveOrderedPackets[$orderChannel][$i]);
203 }
204
205 $this->receiveOrderedIndex[$orderChannel] = $i;
206 }elseif($packet->orderIndex > $this->receiveOrderedIndex[$orderChannel]){
207 if(count($this->receiveOrderedPackets[$orderChannel]) >= self::$WINDOW_SIZE){
208 //queue overflow for this channel - we should probably disconnect the peer at this point
209 return;
210 }
211 $this->receiveOrderedPackets[$orderChannel][$packet->orderIndex] = $packet;
212 }else{
213 //duplicate/already received packet
214 }
215 }else{
216 //not ordered or sequenced
217 $this->handleEncapsulatedPacketRoute($packet);
218 }
219 }
220
224 public function onDatagram(Datagram $packet) : void{
225 if($packet->seqNumber < $this->windowStart or $packet->seqNumber > $this->windowEnd or isset($this->ACKQueue[$packet->seqNumber])){
226 $this->logger->debug("Received duplicate or out-of-window packet (sequence number $packet->seqNumber, window " . $this->windowStart . "-" . $this->windowEnd . ")");
227 return;
228 }
229
230 unset($this->NACKQueue[$packet->seqNumber]);
231 $this->ACKQueue[$packet->seqNumber] = $packet->seqNumber;
232 if($this->highestSeqNumber < $packet->seqNumber){
233 $this->highestSeqNumber = $packet->seqNumber;
234 }
235
236 if($packet->seqNumber === $this->windowStart){
237 //got a contiguous packet, shift the receive window
238 //this packet might complete a sequence of out-of-order packets, so we incrementally check the indexes
239 //to see how far to shift the window, and stop as soon as we either find a gap or have an empty window
240 for(; isset($this->ACKQueue[$this->windowStart]); ++$this->windowStart){
241 ++$this->windowEnd;
242 }
243 }elseif($packet->seqNumber > $this->windowStart){
244 //we got a gap - a later packet arrived before earlier ones did
245 //we add the earlier ones to the NACK queue
246 //if the missing packets arrive before the end of tick, they'll be removed from the NACK queue
247 for($i = $this->windowStart; $i < $packet->seqNumber; ++$i){
248 if(!isset($this->ACKQueue[$i])){
249 $this->NACKQueue[$i] = $i;
250 }
251 }
252 }else{
253 assert(false, "received packet before window start");
254 }
255
256 foreach($packet->packets as $pk){
257 $this->handleEncapsulatedPacket($pk);
258 }
259 }
260
261 public function update() : void{
262 $diff = $this->highestSeqNumber - $this->windowStart + 1;
263 assert($diff >= 0);
264 if($diff > 0){
265 //Move the receive window to account for packets we either received or are about to NACK
266 //we ignore any sequence numbers that we sent NACKs for, because we expect the client to resend them
267 //when it gets a NACK for it
268
269 $this->windowStart += $diff;
270 $this->windowEnd += $diff;
271 }
272
273 if(count($this->ACKQueue) > 0){
274 $pk = new ACK();
275 $pk->packets = $this->ACKQueue;
276 ($this->sendPacket)($pk);
277 $this->ACKQueue = [];
278 }
279
280 if(count($this->NACKQueue) > 0){
281 $pk = new NACK();
282 $pk->packets = $this->NACKQueue;
283 ($this->sendPacket)($pk);
284 $this->NACKQueue = [];
285 }
286 }
287
288 public function needsUpdate() : bool{
289 return count($this->ACKQueue) !== 0 or count($this->NACKQueue) !== 0;
290 }
291}
__construct(private \Logger $logger, private \Closure $onRecv, private \Closure $sendPacket, private int $maxSplitPacketPartCount=PHP_INT_MAX, private int $maxConcurrentSplitPackets=PHP_INT_MAX)