From c7abbbd4be7c48cab40efa61c407b860664f6803 Mon Sep 17 00:00:00 2001 From: Daniel Pupius Date: Sat, 26 Apr 2025 11:40:00 -0700 Subject: [PATCH] Allow 1.5s to drain cached packets --- mqtt/broker.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/mqtt/broker.go b/mqtt/broker.go index 6cb3af2..7e3a5ea 100644 --- a/mqtt/broker.go +++ b/mqtt/broker.go @@ -2,9 +2,11 @@ package mqtt import ( "sync" + "time" + + meshtreampb "meshstream/generated/meshstream" "github.com/dpup/prefab/logging" - meshtreampb "meshstream/generated/meshstream" ) // CircularBuffer implements a fixed-size circular buffer for caching packets @@ -50,7 +52,7 @@ func (cb *CircularBuffer) GetAll() []*meshtreampb.Packet { } result := make([]*meshtreampb.Packet, cb.count) - + // If buffer isn't full yet, just copy from start to next if cb.count < cb.size { copy(result, cb.buffer[:cb.count]) @@ -63,7 +65,7 @@ func (cb *CircularBuffer) GetAll() []*meshtreampb.Packet { if firstPartLength > 0 { copy(result, cb.buffer[cb.next:]) } - + // Then copy from start to next if cb.next > 0 { copy(result[firstPartLength:], cb.buffer[:cb.next]) @@ -115,13 +117,20 @@ func (b *Broker) Subscribe(bufferSize int) <-chan *meshtreampb.Packet { cachedPackets := b.cache.GetAll() if len(cachedPackets) > 0 { go func() { + defer func() { + if r := recover(); r != nil { + // This can happen if the channel was closed while we were sending + b.logger.Warn("Recovered from panic when sending cached packets, channel likely closed") + } + }() + for _, packet := range cachedPackets { select { case subscriberChan <- packet: // Successfully sent packet - default: - // Buffer is full, log warning and stop sending cached packets - b.logger.Warn("New subscriber buffer full, stopping cache replay") + case <-time.After(1500 * time.Millisecond): + // Give up after waiting some time to avoid blocking indefinitely + b.logger.Warn("Timeout when sending cached packet to new subscriber") return } } @@ -226,4 +235,4 @@ func (b *Broker) broadcast(packet *meshtreampb.Packet) { } }(ch) } -} \ No newline at end of file +}