Skip to content

Commit ac13841

Browse files
committed
refactor(go-wrpcnats): rework client constructor
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
1 parent e69425f commit ac13841

10 files changed

Lines changed: 50 additions & 32 deletions

File tree

examples/go/complex-server/cmd/complex-server-nats/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,8 @@ func run() (err error) {
8787
}
8888
}()
8989

90-
wrpc := wrpcnats.NewClient(nc, "go")
91-
stop, err := server.Serve(wrpc, &ResourcesHandler{})
90+
client := wrpcnats.NewClient(nc, wrpcnats.WithPrefix("go"))
91+
stop, err := server.Serve(client, &ResourcesHandler{})
9292
if err != nil {
9393
return fmt.Errorf("failed to serve `server` world: %w", err)
9494
}

examples/go/hello-client/cmd/hello-client-nats/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ func run() (err error) {
2929
}()
3030

3131
for _, prefix := range os.Args[1:] {
32-
wrpc := wrpcnats.NewClient(nc, prefix)
33-
greeting, err := handler.Hello(context.Background(), wrpc)
32+
client := wrpcnats.NewClient(nc, wrpcnats.WithPrefix(prefix))
33+
greeting, err := handler.Hello(context.Background(), client)
3434
if err != nil {
3535
return fmt.Errorf("failed to call `wrpc-examples:hello/handler.hello`: %w", err)
3636
}

examples/go/hello-server/cmd/hello-server-nats/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ func run() (err error) {
3737
}
3838
}()
3939

40-
wrpc := wrpcnats.NewClient(nc, "go")
41-
stop, err := server.Serve(wrpc, Handler{})
40+
client := wrpcnats.NewClient(nc, wrpcnats.WithPrefix("go"))
41+
stop, err := server.Serve(client, Handler{})
4242
if err != nil {
4343
return fmt.Errorf("failed to serve `server` world: %w", err)
4444
}

examples/go/streams-client/cmd/streams-client-nats/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ func run() (err error) {
7575
}()
7676

7777
for _, prefix := range os.Args[1:] {
78-
cl := wrpcnats.NewClient(nc, prefix)
79-
numbers, bytes, errCh, err := handler.Echo(context.Background(), cl, &handler.Req{
78+
client := wrpcnats.NewClient(nc, wrpcnats.WithPrefix(prefix))
79+
numbers, bytes, errCh, err := handler.Echo(context.Background(), client, &handler.Req{
8080
Numbers: &ThrottleStream[uint64]{
8181
tick: time.Tick(time.Second),
8282
values: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},

examples/go/streams-server/cmd/streams-server-nats/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ func run() (err error) {
3939
}
4040
}()
4141

42-
wrpc := wrpcnats.NewClient(nc, "go")
43-
stop, err := server.Serve(wrpc, Handler{})
42+
client := wrpcnats.NewClient(nc, wrpcnats.WithPrefix("go"))
43+
stop, err := server.Serve(client, Handler{})
4444
if err != nil {
4545
return fmt.Errorf("failed to serve `server` world: %w", err)
4646
}

go/nats/client.go

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,38 @@ import (
1212
"github.com/nats-io/nats.go"
1313
)
1414

15+
// Client is a thin wrapper around *nats.Conn, which is able to serve and invoke wRPC functions
16+
type Client struct {
17+
conn *nats.Conn
18+
prefix string
19+
group string
20+
}
21+
22+
// ClientOpt is option client configuration option passed to NewClient
23+
type ClientOpt func(*Client)
24+
25+
// WithPrefix sets a prefix for this Client
26+
func WithPrefix(prefix string) ClientOpt {
27+
return func(c *Client) {
28+
c.prefix = prefix
29+
}
30+
}
31+
32+
// WithGroup sets a queue group for this Client
33+
func WithGroup(group string) ClientOpt {
34+
return func(c *Client) {
35+
c.group = group
36+
}
37+
}
38+
39+
func NewClient(conn *nats.Conn, opts ...ClientOpt) *Client {
40+
c := &Client{conn: conn}
41+
for _, opt := range opts {
42+
opt(c)
43+
}
44+
return c
45+
}
46+
1547
type headerKey struct{}
1648

1749
func HeaderFromContext(ctx context.Context) (nats.Header, bool) {
@@ -78,20 +110,6 @@ func subscribe(conn *nats.Conn, prefix string, f func(context.Context, []byte),
78110
})
79111
}
80112

81-
type Client struct {
82-
conn *nats.Conn
83-
prefix string
84-
queueGroup string
85-
}
86-
87-
func NewClient(conn *nats.Conn, prefix string) *Client {
88-
return &Client{conn: conn, prefix: prefix, queueGroup: ""}
89-
}
90-
91-
func NewClientWithQueueGroup(conn *nats.Conn, prefix string, queueGroup string) *Client {
92-
return &Client{conn, prefix, queueGroup}
93-
}
94-
95113
type paramWriter struct {
96114
nc *nats.Conn
97115
init func() (*initState, error)
@@ -447,13 +465,13 @@ func (c *Client) handleMessage(instance string, name string, f func(context.Cont
447465
}
448466

449467
func (c *Client) Serve(instance string, name string, f func(context.Context, wrpc.IndexWriteCloser, wrpc.IndexReadCloser), paths ...wrpc.SubscribePath) (stop func() error, err error) {
450-
slog.Debug("serving", "instance", instance, "name", name, "group", c.queueGroup)
468+
slog.Debug("serving", "instance", instance, "name", name, "group", c.group)
451469

452470
subject := invocationSubject(c.prefix, instance, name)
453471
handle := c.handleMessage(instance, name, f, paths...)
454472
var sub *nats.Subscription
455-
if c.queueGroup != "" {
456-
sub, err = c.conn.QueueSubscribe(subject, c.queueGroup, handle)
473+
if c.group != "" {
474+
sub, err = c.conn.QueueSubscribe(subject, c.group, handle)
457475
} else {
458476
sub, err = c.conn.Subscribe(subject, handle)
459477
}

tests/go/async_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func TestAsync(t *testing.T) {
3232
return
3333
}
3434
}()
35-
client := wrpcnats.NewClient(nc, "go")
35+
client := wrpcnats.NewClient(nc, wrpcnats.WithPrefix("go"))
3636

3737
stop, err := async_server.Serve(client, integration.AsyncHandler{})
3838
if err != nil {

tests/go/cmd/sync-server-nats/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ func run(url string) error {
3030
}
3131
}()
3232

33-
wrpc := wrpcnats.NewClient(nc, "go")
33+
client := wrpcnats.NewClient(nc, wrpcnats.WithPrefix("go"))
3434
var h integration.SyncHandler
35-
stop, err := sync_server.Serve(wrpc, h, h)
35+
stop, err := sync_server.Serve(client, h, h)
3636
if err != nil {
3737
return fmt.Errorf("failed to serve world: %w", err)
3838
}

tests/go/resources_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func TestResources(t *testing.T) {
3232
return
3333
}
3434
}()
35-
client := wrpcnats.NewClient(nc, "go")
35+
client := wrpcnats.NewClient(nc, wrpcnats.WithPrefix("go"))
3636

3737
stop, err := resources_server.Serve(client, &integration.ResourcesHandler{}, integration.ResourcesStrangeHandler{})
3838
if err != nil {

tests/go/sync_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func TestSync(t *testing.T) {
3333
return
3434
}
3535
}()
36-
client := wrpcnats.NewClient(nc, "go")
36+
client := wrpcnats.NewClient(nc, wrpcnats.WithPrefix("go"))
3737

3838
var h integration.SyncHandler
3939
stop, err := sync_server.Serve(client, h, h)

0 commit comments

Comments
 (0)