Allow 1.5s to drain cached packets

This commit is contained in:
Daniel Pupius
2025-04-26 11:40:00 -07:00
parent 6c891a1b88
commit c7abbbd4be

View File

@@ -2,9 +2,11 @@ package mqtt
import (
"sync"
"time"
meshtreampb "meshstream/generated/meshstream"
"github.com/dpup/prefab/logging"
meshtreampb "meshstream/generated/meshstream"
)
// CircularBuffer implements a fixed-size circular buffer for caching packets
@@ -50,7 +52,7 @@ func (cb *CircularBuffer) GetAll() []*meshtreampb.Packet {
}
result := make([]*meshtreampb.Packet, cb.count)
// If buffer isn't full yet, just copy from start to next
if cb.count < cb.size {
copy(result, cb.buffer[:cb.count])
@@ -63,7 +65,7 @@ func (cb *CircularBuffer) GetAll() []*meshtreampb.Packet {
if firstPartLength > 0 {
copy(result, cb.buffer[cb.next:])
}
// Then copy from start to next
if cb.next > 0 {
copy(result[firstPartLength:], cb.buffer[:cb.next])
@@ -115,13 +117,20 @@ func (b *Broker) Subscribe(bufferSize int) <-chan *meshtreampb.Packet {
cachedPackets := b.cache.GetAll()
if len(cachedPackets) > 0 {
go func() {
defer func() {
if r := recover(); r != nil {
// This can happen if the channel was closed while we were sending
b.logger.Warn("Recovered from panic when sending cached packets, channel likely closed")
}
}()
for _, packet := range cachedPackets {
select {
case subscriberChan <- packet:
// Successfully sent packet
default:
// Buffer is full, log warning and stop sending cached packets
b.logger.Warn("New subscriber buffer full, stopping cache replay")
case <-time.After(1500 * time.Millisecond):
// Give up after waiting some time to avoid blocking indefinitely
b.logger.Warn("Timeout when sending cached packet to new subscriber")
return
}
}
@@ -226,4 +235,4 @@ func (b *Broker) broadcast(packet *meshtreampb.Packet) {
}
}(ch)
}
}
}