PocketMine-MP 5.39.3 git-9a46a8bd745880ddf8eebaf28cda326bb97d2efa
Loading...
Searching...
No Matches
SendReliabilityLayer.php
1<?php
2
3/*
4 * This file is part of RakLib.
5 * Copyright (C) 2014-2022 PocketMine Team <https://github.com/pmmp/RakLib>
6 *
7 * RakLib is not affiliated with Jenkins Software LLC nor RakNet.
8 *
9 * RakLib is free software: you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation, either version 3 of the License, or
12 * (at your option) any later version.
13 */
14
15declare(strict_types=1);
16
17namespace raklib\generic;
18
25use function array_fill;
26use function array_push;
27use function assert;
28use function count;
29use function microtime;
30use function str_split;
31use function strlen;
32
34 private const DATAGRAM_MTU_OVERHEAD = 36 + Datagram::HEADER_SIZE; //IP header (20 bytes) + UDP header (8 bytes) + RakNet weird (8 bytes) = 36
35 private const MIN_POSSIBLE_PACKET_SIZE_LIMIT = Session::MIN_MTU_SIZE - self::DATAGRAM_MTU_OVERHEAD;
40 private const UNACKED_RETRANSMIT_DELAY = 2.0;
41
43 private array $sendQueue = [];
44
45 private int $splitID = 0;
46
47 private int $sendSeqNumber = 0;
48
49 private int $messageIndex = 0;
50
51 private int $reliableWindowStart;
52 private int $reliableWindowEnd;
57 private array $reliableWindow = [];
58
60 private array $sendOrderedIndex;
62 private array $sendSequencedIndex;
63
65 private array $reliableBacklog = [];
66
68 private array $resendQueue = [];
69
71 private array $reliableCache = [];
72
74 private array $needACK = [];
75
77 private int $maxDatagramPayloadSize;
78
84 public function __construct(
85 private int $mtuSize,
86 private \Closure $sendDatagramCallback,
87 private \Closure $onACK,
88 private int $reliableWindowSize = 512,
89 ){
90 $this->sendOrderedIndex = array_fill(0, PacketReliability::MAX_ORDER_CHANNELS, 0);
91 $this->sendSequencedIndex = array_fill(0, PacketReliability::MAX_ORDER_CHANNELS, 0);
92
93 $this->maxDatagramPayloadSize = $this->mtuSize - self::DATAGRAM_MTU_OVERHEAD;
94
95 $this->reliableWindowStart = 0;
96 $this->reliableWindowEnd = $this->reliableWindowSize;
97 }
98
102 private function sendDatagram(array $packets) : void{
103 $datagram = new Datagram();
104 $datagram->seqNumber = $this->sendSeqNumber++;
105 $datagram->packets = $packets;
106 ($this->sendDatagramCallback)($datagram);
107
108 $resendable = [];
109 foreach($datagram->packets as $pk){
110 if(PacketReliability::isReliable($pk->reliability)){
111 $resendable[] = $pk;
112 }
113 }
114 if(count($resendable) !== 0){
115 $this->reliableCache[$datagram->seqNumber] = new ReliableCacheEntry($resendable);
116 }
117 }
118
119 public function sendQueue() : void{
120 if(count($this->sendQueue) > 0){
121 $this->sendDatagram($this->sendQueue);
122 $this->sendQueue = [];
123 }
124 }
125
126 private function addToQueue(EncapsulatedPacket $pk, bool $immediate) : void{
127 if(PacketReliability::isReliable($pk->reliability)){
128 if($pk->messageIndex === null || $pk->messageIndex < $this->reliableWindowStart){
129 throw new \InvalidArgumentException("Cannot send a reliable packet with message index less than the window start ($pk->messageIndex < $this->reliableWindowStart)");
130 }
131 if($pk->messageIndex >= $this->reliableWindowEnd){
132 //If we send this now, the client's reliable window may overflow, causing the packet to need redelivery
133 $this->reliableBacklog[$pk->messageIndex] = $pk;
134 return;
135 }
136
137 $this->reliableWindow[$pk->messageIndex] = false;
138 }
139
140 if($pk->identifierACK !== null and $pk->messageIndex !== null){
141 $this->needACK[$pk->identifierACK][$pk->messageIndex] = $pk->messageIndex;
142 }
143
144 $length = 0;
145 foreach($this->sendQueue as $queued){
146 $length += $queued->getTotalLength();
147 }
148
149 if($length + $pk->getTotalLength() > $this->maxDatagramPayloadSize){
150 $this->sendQueue();
151 }
152
153 if($pk->identifierACK !== null){
154 $this->sendQueue[] = clone $pk;
155 $pk->identifierACK = null;
156 }else{
157 $this->sendQueue[] = $pk;
158 }
159
160 if($immediate){
161 // Forces pending sends to go out now, rather than waiting to the next update interval
162 $this->sendQueue();
163 }
164 }
165
166 public function addEncapsulatedToQueue(EncapsulatedPacket $packet, bool $immediate = false) : void{
167 if($packet->identifierACK !== null){
168 $this->needACK[$packet->identifierACK] = [];
169 }
170
171 if(PacketReliability::isOrdered($packet->reliability)){
172 if($packet->orderChannel === null){
173 throw new \InvalidArgumentException("Order channel must be set reliability $packet->reliability");
174 }
175 $packet->orderIndex = $this->sendOrderedIndex[$packet->orderChannel]++;
176 }elseif(PacketReliability::isSequenced($packet->reliability)){
177 if($packet->orderChannel === null){
178 throw new \InvalidArgumentException("Order channel must be set for reliability $packet->reliability");
179 }
180 $packet->orderIndex = $this->sendOrderedIndex[$packet->orderChannel]; //sequenced packets don't increment the ordered channel index
181 $packet->sequenceIndex = $this->sendSequencedIndex[$packet->orderChannel]++;
182 }
183
184 $maxBufferSize = $this->maxDatagramPayloadSize - $packet->getHeaderLength();
185
186 if(strlen($packet->buffer) > $maxBufferSize){
187 $buffers = str_split($packet->buffer, $maxBufferSize - EncapsulatedPacket::SPLIT_INFO_LENGTH);
188 $bufferCount = count($buffers);
189
190 $splitID = ++$this->splitID % 65536;
191 foreach($buffers as $count => $buffer){
192 $pk = new EncapsulatedPacket();
193 $pk->splitInfo = new SplitPacketInfo($splitID, $count, $bufferCount);
194 $pk->reliability = $packet->reliability;
195 $pk->buffer = $buffer;
196 $pk->identifierACK = $packet->identifierACK;
197
198 if(PacketReliability::isReliable($pk->reliability)){
199 $pk->messageIndex = $this->messageIndex++;
200 }
201
202 $pk->sequenceIndex = $packet->sequenceIndex;
203 $pk->orderChannel = $packet->orderChannel;
204 $pk->orderIndex = $packet->orderIndex;
205
206 $this->addToQueue($pk, true);
207 }
208 }else{
209 if(PacketReliability::isReliable($packet->reliability)){
210 $packet->messageIndex = $this->messageIndex++;
211 }
212 $this->addToQueue($packet, $immediate);
213 }
214 }
215
216 private function updateReliableWindow() : void{
217 while(
218 isset($this->reliableWindow[$this->reliableWindowStart]) && //this messageIndex has been used
219 $this->reliableWindow[$this->reliableWindowStart] === true //we received an ack for this messageIndex
220 ){
221 unset($this->reliableWindow[$this->reliableWindowStart]);
222 $this->reliableWindowStart++;
223 $this->reliableWindowEnd++;
224 }
225 }
226
227 public function onACK(ACK $packet) : void{
228 foreach($packet->packets as $seq){
229 if(isset($this->reliableCache[$seq])){
230 foreach($this->reliableCache[$seq]->getPackets() as $pk){
231 assert($pk->messageIndex !== null && $pk->messageIndex >= $this->reliableWindowStart && $pk->messageIndex < $this->reliableWindowEnd);
232 $this->reliableWindow[$pk->messageIndex] = true;
233 $this->updateReliableWindow();
234
235 if($pk->identifierACK !== null){
236 unset($this->needACK[$pk->identifierACK][$pk->messageIndex]);
237 if(count($this->needACK[$pk->identifierACK]) === 0){
238 unset($this->needACK[$pk->identifierACK]);
239 ($this->onACK)($pk->identifierACK);
240 }
241 }
242 }
243 unset($this->reliableCache[$seq]);
244 }
245 }
246 }
247
248 public function onNACK(NACK $packet) : void{
249 foreach($packet->packets as $seq){
250 if(isset($this->reliableCache[$seq])){
251 foreach($this->reliableCache[$seq]->getPackets() as $pk){
252 $this->resendQueue[] = $pk;
253 }
254 unset($this->reliableCache[$seq]);
255 }
256 }
257 }
258
259 public function needsUpdate() : bool{
260 return (
261 count($this->sendQueue) !== 0 or
262 count($this->reliableBacklog) !== 0 or
263 count($this->resendQueue) !== 0 or
264 count($this->reliableCache) !== 0
265 );
266 }
267
268 public function update() : void{
269 $retransmitOlderThan = microtime(true) - self::UNACKED_RETRANSMIT_DELAY;
270 foreach($this->reliableCache as $seq => $pk){
271 if($pk->getTimestamp() < $retransmitOlderThan){
272 //behave as if a NACK was received
273 array_push($this->resendQueue, ...$pk->getPackets());
274 unset($this->reliableCache[$seq]);
275 }else{
276 break;
277 }
278 }
279
280 if(count($this->resendQueue) > 0){
281 foreach($this->resendQueue as $pk){
282 //resends should always be within the reliable window
283 $this->addToQueue($pk, false);
284 }
285 $this->resendQueue = [];
286 }
287
288 if(count($this->reliableBacklog) > 0){
289 foreach($this->reliableBacklog as $k => $pk){
290 assert($pk->messageIndex !== null && $pk->messageIndex >= $this->reliableWindowStart);
291 if($pk->messageIndex >= $this->reliableWindowEnd){
292 //we can't send this packet yet, the client's reliable window will drop it
293 break;
294 }
295
296 $this->addToQueue($pk, false);
297 unset($this->reliableBacklog[$k]);
298 }
299 }
300
301 $this->sendQueue();
302 }
303}
__construct(private int $mtuSize, private \Closure $sendDatagramCallback, private \Closure $onACK, private int $reliableWindowSize=512,)