34 private const DATAGRAM_MTU_OVERHEAD = 36 + Datagram::HEADER_SIZE;
35 private const MIN_POSSIBLE_PACKET_SIZE_LIMIT = Session::MIN_MTU_SIZE - self::DATAGRAM_MTU_OVERHEAD;
40 private const UNACKED_RETRANSMIT_DELAY = 2.0;
43 private array $sendQueue = [];
45 private int $splitID = 0;
47 private int $sendSeqNumber = 0;
49 private int $messageIndex = 0;
51 private int $reliableWindowStart;
52 private int $reliableWindowEnd;
57 private array $reliableWindow = [];
60 private array $sendOrderedIndex;
62 private array $sendSequencedIndex;
65 private array $reliableBacklog = [];
68 private array $resendQueue = [];
71 private array $reliableCache = [];
74 private array $needACK = [];
77 private int $maxDatagramPayloadSize;
86 private \Closure $sendDatagramCallback,
87 private \Closure $onACK,
88 private int $reliableWindowSize = 512,
90 $this->sendOrderedIndex = array_fill(0, PacketReliability::MAX_ORDER_CHANNELS, 0);
91 $this->sendSequencedIndex = array_fill(0, PacketReliability::MAX_ORDER_CHANNELS, 0);
93 $this->maxDatagramPayloadSize = $this->mtuSize - self::DATAGRAM_MTU_OVERHEAD;
95 $this->reliableWindowStart = 0;
96 $this->reliableWindowEnd = $this->reliableWindowSize;
102 private function sendDatagram(array $packets) : void{
104 $datagram->seqNumber = $this->sendSeqNumber++;
105 $datagram->packets = $packets;
106 ($this->sendDatagramCallback)($datagram);
109 foreach($datagram->packets as $pk){
110 if(PacketReliability::isReliable($pk->reliability)){
114 if(count($resendable) !== 0){
115 $this->reliableCache[$datagram->seqNumber] =
new ReliableCacheEntry($resendable);
119 public function sendQueue() : void{
120 if(count($this->sendQueue) > 0){
121 $this->sendDatagram($this->sendQueue);
122 $this->sendQueue = [];
126 private function addToQueue(EncapsulatedPacket $pk,
bool $immediate) : void{
127 if(PacketReliability::isReliable($pk->reliability)){
128 if($pk->messageIndex ===
null || $pk->messageIndex < $this->reliableWindowStart){
129 throw new \InvalidArgumentException(
"Cannot send a reliable packet with message index less than the window start ($pk->messageIndex < $this->reliableWindowStart)");
131 if($pk->messageIndex >= $this->reliableWindowEnd){
133 $this->reliableBacklog[$pk->messageIndex] = $pk;
137 $this->reliableWindow[$pk->messageIndex] =
false;
140 if($pk->identifierACK !==
null and $pk->messageIndex !==
null){
141 $this->needACK[$pk->identifierACK][$pk->messageIndex] = $pk->messageIndex;
145 foreach($this->sendQueue as $queued){
146 $length += $queued->getTotalLength();
149 if($length + $pk->getTotalLength() > $this->maxDatagramPayloadSize){
153 if($pk->identifierACK !==
null){
154 $this->sendQueue[] = clone $pk;
155 $pk->identifierACK = null;
157 $this->sendQueue[] = $pk;
166 public function addEncapsulatedToQueue(EncapsulatedPacket $packet,
bool $immediate =
false) : void{
167 if($packet->identifierACK !== null){
168 $this->needACK[$packet->identifierACK] = [];
171 if(PacketReliability::isOrdered($packet->reliability)){
172 if($packet->orderChannel === null){
173 throw new \InvalidArgumentException(
"Order channel must be set reliability $packet->reliability");
175 $packet->orderIndex = $this->sendOrderedIndex[$packet->orderChannel]++;
176 }elseif(PacketReliability::isSequenced($packet->reliability)){
177 if($packet->orderChannel ===
null){
178 throw new \InvalidArgumentException(
"Order channel must be set for reliability $packet->reliability");
180 $packet->orderIndex = $this->sendOrderedIndex[$packet->orderChannel];
181 $packet->sequenceIndex = $this->sendSequencedIndex[$packet->orderChannel]++;
184 $maxBufferSize = $this->maxDatagramPayloadSize - $packet->getHeaderLength();
186 if(strlen($packet->buffer) > $maxBufferSize){
187 $buffers = str_split($packet->buffer, $maxBufferSize - EncapsulatedPacket::SPLIT_INFO_LENGTH);
188 $bufferCount = count($buffers);
190 $splitID = ++$this->splitID % 65536;
191 foreach($buffers as $count => $buffer){
192 $pk =
new EncapsulatedPacket();
193 $pk->splitInfo =
new SplitPacketInfo($splitID, $count, $bufferCount);
194 $pk->reliability = $packet->reliability;
195 $pk->buffer = $buffer;
196 $pk->identifierACK = $packet->identifierACK;
198 if(PacketReliability::isReliable($pk->reliability)){
199 $pk->messageIndex = $this->messageIndex++;
202 $pk->sequenceIndex = $packet->sequenceIndex;
203 $pk->orderChannel = $packet->orderChannel;
204 $pk->orderIndex = $packet->orderIndex;
206 $this->addToQueue($pk,
true);
209 if(PacketReliability::isReliable($packet->reliability)){
210 $packet->messageIndex = $this->messageIndex++;
212 $this->addToQueue($packet, $immediate);
216 private function updateReliableWindow() : void{
218 isset($this->reliableWindow[$this->reliableWindowStart]) &&
219 $this->reliableWindow[$this->reliableWindowStart] === true
221 unset($this->reliableWindow[$this->reliableWindowStart]);
222 $this->reliableWindowStart++;
223 $this->reliableWindowEnd++;
227 public function onACK(ACK $packet) : void{
228 foreach($packet->packets as $seq){
229 if(isset($this->reliableCache[$seq])){
230 foreach($this->reliableCache[$seq]->getPackets() as $pk){
231 assert($pk->messageIndex !==
null && $pk->messageIndex >= $this->reliableWindowStart && $pk->messageIndex < $this->reliableWindowEnd);
232 $this->reliableWindow[$pk->messageIndex] =
true;
233 $this->updateReliableWindow();
235 if($pk->identifierACK !==
null){
236 unset($this->needACK[$pk->identifierACK][$pk->messageIndex]);
237 if(count($this->needACK[$pk->identifierACK]) === 0){
238 unset($this->needACK[$pk->identifierACK]);
239 ($this->onACK)($pk->identifierACK);
243 unset($this->reliableCache[$seq]);
248 public function onNACK(NACK $packet) : void{
249 foreach($packet->packets as $seq){
250 if(isset($this->reliableCache[$seq])){
251 foreach($this->reliableCache[$seq]->getPackets() as $pk){
252 $this->resendQueue[] = $pk;
254 unset($this->reliableCache[$seq]);
259 public function needsUpdate() : bool{
261 count($this->sendQueue) !== 0 or
262 count($this->reliableBacklog) !== 0 or
263 count($this->resendQueue) !== 0 or
264 count($this->reliableCache) !== 0
268 public function update() : void{
269 $retransmitOlderThan = microtime(true) - self::UNACKED_RETRANSMIT_DELAY;
270 foreach($this->reliableCache as $seq => $pk){
271 if($pk->getTimestamp() < $retransmitOlderThan){
273 array_push($this->resendQueue, ...$pk->getPackets());
274 unset($this->reliableCache[$seq]);
280 if(count($this->resendQueue) > 0){
281 foreach($this->resendQueue as $pk){
283 $this->addToQueue($pk,
false);
285 $this->resendQueue = [];
288 if(count($this->reliableBacklog) > 0){
289 foreach($this->reliableBacklog as $k => $pk){
290 assert($pk->messageIndex !==
null && $pk->messageIndex >= $this->reliableWindowStart);
291 if($pk->messageIndex >= $this->reliableWindowEnd){
296 $this->addToQueue($pk,
false);
297 unset($this->reliableBacklog[$k]);