Skip to content

Commit 7efde66

Browse files
committed
fix(go): implement framed async
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
1 parent 286afe2 commit 7efde66

2 files changed

Lines changed: 13 additions & 7 deletions

File tree

go/frame_reader.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func newFrameStreamReader(r *FrameStreamReader) *FrameStreamReader {
5151
}
5252

5353
func NewFrameStreamReader(ctx context.Context, r ByteReadCloser, paths ...SubscribePath) *FrameStreamReader {
54-
ch := make(chan []byte, 1)
54+
ch := make(chan []byte, 64)
5555

5656
var nestMu sync.Mutex
5757
nest := make(map[string]chan []byte, len(paths))
@@ -62,7 +62,7 @@ func NewFrameStreamReader(ctx context.Context, r ByteReadCloser, paths ...Subscr
6262
slog.ErrorContext(ctx, "invalid subscription path", "err", err)
6363
continue
6464
}
65-
nest[p] = make(chan []byte)
65+
nest[p] = make(chan []byte, 64)
6666
}
6767
var readers atomic.Int64
6868
readers.Add(1)
@@ -86,10 +86,10 @@ func NewFrameStreamReader(ctx context.Context, r ByteReadCloser, paths ...Subscr
8686
frame, err := ReadFrame(r)
8787
if err != nil {
8888
if err == io.EOF {
89-
break
89+
break outer
9090
}
9191
slog.ErrorContext(ctx, "failed to read frame", "err", err)
92-
break
92+
break outer
9393
}
9494

9595
slog.DebugContext(ctx, "read frame",
@@ -101,7 +101,7 @@ func NewFrameStreamReader(ctx context.Context, r ByteReadCloser, paths ...Subscr
101101
if !ok {
102102
slog.ErrorContext(ctx, "received a frame for unknown path", "path", frame.Path)
103103
nestMu.Unlock()
104-
break
104+
break outer
105105
}
106106
if len(frame.Data) == 0 {
107107
slog.DebugContext(ctx, "received shutdown frame, closing channel", "path", frame.Path)
@@ -116,11 +116,18 @@ func NewFrameStreamReader(ctx context.Context, r ByteReadCloser, paths ...Subscr
116116
slog.DebugContext(ctx, "context done, stop reading frames",
117117
"err", ctx.Err(),
118118
)
119+
nestMu.Unlock()
119120
break outer
120121
case ch <- frame.Data:
121122
slog.DebugContext(ctx, "sent received frame data",
122123
"frame", frame,
123124
)
125+
default:
126+
slog.ErrorContext(ctx, "reader too slow, dropping frame",
127+
"frame", frame,
128+
)
129+
nestMu.Unlock()
130+
break outer
124131
}
125132
}
126133
nestMu.Unlock()

go/frame_server.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,7 @@ func (s *FramedServer) Serve(instance string, name string, f HandleFunc, paths .
5959
delete(hs, name)
6060
}
6161

62-
// TODO: Fix deadlock here
63-
//wg.Wait()
62+
wg.Wait()
6463

6564
return nil
6665
}, nil

0 commit comments

Comments
 (0)