Skip to content

Commit e69425f

Browse files
committed
feat(go-bindgen): rework async functionality
1 parent f4f273d commit e69425f

20 files changed

Lines changed: 1096 additions & 223 deletions

File tree

crates/wit-bindgen-go/src/interface.rs

Lines changed: 169 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -2792,87 +2792,115 @@ func ServeInterface(s {wrpc}.Server, h Handler) (stop func() error, err error) {
27922792
let wrpc = self.deps.wrpc();
27932793

27942794
self.print_docs_and_params(func);
2795-
if let FunctionKind::Constructor(id) = &func.kind {
2796-
self.push_str(" (r0__ ");
2797-
self.print_own(*id);
2795+
2796+
self.src.push_str(" (");
2797+
for (i, ty) in func.results.iter_types().enumerate() {
2798+
uwrite!(self.src, "r{i}__ ");
2799+
self.print_opt_ty(ty, true);
27982800
self.src.push_str(", ");
2799-
} else {
2800-
self.src.push_str(" (");
2801-
for (i, ty) in func.results.iter_types().enumerate() {
2802-
uwrite!(self.src, "r{i}__ ");
2803-
self.print_opt_ty(ty, true);
2804-
self.src.push_str(", ");
2805-
}
28062801
}
2807-
self.push_str("close__ func() error, err__ error) ");
2808-
uwrite!(
2809-
self.src,
2810-
r#"{{
2811-
if err__ = wrpc__.Invoke(ctx__, "{instance}", ""#
2812-
);
2813-
self.src.push_str(rpc_func_name(func));
2814-
uwriteln!(
2815-
self.src,
2816-
r#"", func(w__ {wrpc}.IndexWriteCloser, r__ {wrpc}.IndexReadCloser) error {{
2817-
close__ = r__.Close"#
2818-
);
2802+
2803+
let async_params = func.params.iter().any(|(_, ty)| {
2804+
let (paths, fut) = async_paths_ty(self.resolve, ty);
2805+
fut || !paths.is_empty()
2806+
});
2807+
if async_params {
2808+
self.push_str("writeErrs__ <-chan error, ");
2809+
}
2810+
self.push_str("err__ error) {");
28192811
if !func.params.is_empty() {
28202812
let bytes = self.deps.bytes();
2821-
uwriteln!(
2813+
uwrite!(
28222814
self.src,
2823-
r"var buf__ {bytes}.Buffer
2824-
writes__ := make(map[uint32]func({wrpc}.IndexWriter) error, {})",
2825-
func.params.len(),
2815+
r"
2816+
var buf__ {bytes}.Buffer",
28262817
);
2818+
if async_params {
2819+
uwrite!(
2820+
self.src,
2821+
r"
2822+
var writeCount__ uint32"
2823+
);
2824+
}
28272825
for (i, (name, ty)) in func.params.iter().enumerate() {
2828-
uwrite!(self.src, "write{i}__, err__ :=");
2826+
uwrite!(
2827+
self.src,
2828+
r"
2829+
write{i}__, err__ :="
2830+
);
28292831
self.print_write_ty(ty, &to_go_ident(name), "&buf__");
2830-
self.src.push_str("\nif err__ != nil {\n");
2831-
uwriteln!(
2832+
uwrite!(
28322833
self.src,
2833-
r#"return {fmt}.Errorf("failed to write `{name}` parameter: %w", err__)"#,
2834+
r#"
2835+
if err__ != nil {{
2836+
err__ = {fmt}.Errorf("failed to write `{name}` parameter: %w", err__)
2837+
return
2838+
}}"#,
28342839
);
2835-
self.src.push_str("}\n");
2836-
uwriteln!(
2840+
if async_params {
2841+
uwrite!(
2842+
self.src,
2843+
r"
2844+
if write{i}__ != nil {{
2845+
writeCount__++
2846+
}}"
2847+
);
2848+
}
2849+
}
2850+
if async_params {
2851+
uwrite!(
28372852
self.src,
2838-
r#"if write{i}__ != nil {{
2839-
writes__[{i}] = write{i}__
2840-
}}"#,
2853+
r"
2854+
writes__ := make(map[uint32]func({wrpc}.IndexWriter) error, uint(writeCount__))",
2855+
);
2856+
}
2857+
for (i, (name, _)) in func.params.iter().enumerate() {
2858+
uwrite!(
2859+
self.src,
2860+
r"
2861+
if write{i}__ != nil {{"
2862+
);
2863+
if async_params {
2864+
uwrite!(
2865+
self.src,
2866+
r"
2867+
writes__[{i}] = write{i}__",
2868+
);
2869+
} else {
2870+
uwrite!(
2871+
self.src,
2872+
r#"
2873+
err__ = {errors}.New("unexpected deferred write for synchronous `{name}` parameter")
2874+
return"#,
2875+
errors = self.deps.errors(),
2876+
);
2877+
}
2878+
uwrite!(
2879+
self.src,
2880+
r#"
2881+
}}"#,
28412882
);
28422883
}
2843-
self.push_str("_, err__ = w__.Write(buf__.Bytes())\n");
2844-
self.push_str("if err__ != nil {\n");
2845-
uwriteln!(
2846-
self.src,
2847-
r#"return {fmt}.Errorf("failed to write parameters: %w", err__)"#,
2848-
);
2849-
self.src.push_str("}\n");
2850-
} else {
2851-
self.push_str("_, err__ = w__.Write(nil)\n");
2852-
self.push_str("if err__ != nil {\n");
2853-
uwriteln!(
2854-
self.src,
2855-
r#"return {fmt}.Errorf("failed to write empty parameters: %w", err__)"#,
2856-
);
2857-
self.src.push_str("}\n");
28582884
}
2859-
for (i, ty) in func.results.iter_types().enumerate() {
2860-
uwrite!(self.src, "r{i}__, err__ = ");
2861-
self.print_read_ty(ty, "r__", &format!("[]uint32{{ {i} }}"));
2862-
self.push_str("\n");
2863-
uwriteln!(
2864-
self.src,
2865-
r#"if err__ != nil {{ return {fmt}.Errorf("failed to read result {i}: %w", err__) }}"#,
2866-
);
2885+
uwrite!(
2886+
self.src,
2887+
r#"
2888+
var w__ {wrpc}.IndexWriteCloser
2889+
var r__ {wrpc}.IndexReadCloser
2890+
w__, r__, err__ = wrpc__.Invoke(ctx__, "{instance}", ""#
2891+
);
2892+
self.src.push_str(rpc_func_name(func));
2893+
self.src.push_str("\", ");
2894+
if !func.params.is_empty() {
2895+
self.src.push_str("buf__.Bytes()");
2896+
} else {
2897+
self.src.push_str("nil");
28672898
}
2868-
self.src.push_str("return nil\n");
2869-
self.src.push_str("},");
2899+
self.src.push_str(",\n");
28702900
for (i, ty) in func.results.iter_types().enumerate() {
28712901
let (nested, fut) = async_paths_ty(self.resolve, ty);
28722902
for path in nested {
2873-
self.push_str(wrpc);
2874-
self.push_str(".NewSubscribePath().Index(");
2875-
uwrite!(self.src, "{i})");
2903+
uwrite!(self.src, "{wrpc}.NewSubscribePath().Index({i})");
28762904
for p in path {
28772905
if let Some(p) = p {
28782906
uwrite!(self.src, ".Index({p})");
@@ -2886,15 +2914,87 @@ func ServeInterface(s {wrpc}.Server, h Handler) (stop func() error, err error) {
28862914
uwrite!(self.src, "{wrpc}.NewSubscribePath().Index({i}), ");
28872915
}
28882916
}
2889-
self.src.push_str("); err__ != nil {\n");
2890-
uwriteln!(
2917+
let slog = self.deps.slog();
2918+
uwrite!(
28912919
self.src,
2892-
r#"err__ = {fmt}.Errorf("failed to invoke `{}`: %w", err__)
2893-
return
2894-
}}
2920+
r#"
2921+
)
2922+
if err__ != nil {{
2923+
err__ = {fmt}.Errorf("failed to invoke `{name}`: %w", err__)
28952924
return
2925+
}}
2926+
defer func() {{
2927+
if err := r__.Close(); err != nil {{
2928+
{slog}.ErrorContext(ctx__, "failed to close reader", "instance", "{instance}", "name", "{name}", "err", err)
2929+
}}
2930+
}}()"#,
2931+
name = func.name,
2932+
);
2933+
if async_params {
2934+
let sync = self.deps.sync();
2935+
uwrite!(
2936+
self.src,
2937+
r#"
2938+
if writeCount__ > 0 {{
2939+
writeErrCh__ := make(chan error, uint(writeCount__))
2940+
writeErrs__ = writeErrCh__
2941+
var wg__ {sync}.WaitGroup
2942+
for index, write := range writes__ {{
2943+
wg__.Add(1)
2944+
w, err := w__.Index(index)
2945+
if err != nil {{
2946+
if cErr := w__.Close(); cErr != nil {{
2947+
{slog}.DebugContext(ctx__, "failed to close outgoing stream", "instance", "{instance}", "name", "{}", "err", cErr)
2948+
}}
2949+
err__ = {fmt}.Errorf("failed to index writer at index `%v`: %w", index, err)
2950+
return
2951+
}}
2952+
write := write
2953+
go func() {{
2954+
defer wg__.Done()
2955+
if err := write(w); err != nil {{
2956+
writeErrCh__ <- err
2957+
}}
2958+
}}()
2959+
}}
2960+
go func() {{
2961+
wg__.Wait()
2962+
close(writeErrCh__)
2963+
}}()
2964+
}}"#,
2965+
func.name,
2966+
);
2967+
}
2968+
uwrite!(
2969+
self.src,
2970+
r#"
2971+
if cErr__ := w__.Close(); cErr__ != nil {{
2972+
{slog}.DebugContext(ctx__, "failed to close outgoing stream", "instance", "{instance}", "name", "{}", "err", cErr__)
28962973
}}"#,
2897-
func.name
2974+
func.name,
2975+
);
2976+
2977+
for (i, ty) in func.results.iter_types().enumerate() {
2978+
uwrite!(
2979+
self.src,
2980+
"
2981+
r{i}__, err__ = "
2982+
);
2983+
self.print_read_ty(ty, "r__", &format!("[]uint32{{ {i} }}"));
2984+
uwrite!(
2985+
self.src,
2986+
r#"
2987+
if err__ != nil {{
2988+
err__ = {fmt}.Errorf("failed to read result {i}: %w", err__)
2989+
return
2990+
}}"#,
2991+
);
2992+
}
2993+
uwriteln!(
2994+
self.src,
2995+
r#"
2996+
return
2997+
}}"#,
28982998
);
28992999
}
29003000
}

examples/go/hello-client/bindings/wrpc_examples/hello/handler/bindings.wrpc.go

Lines changed: 48 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -11,55 +11,60 @@ import (
1111
utf8 "unicode/utf8"
1212
)
1313

14-
func Hello(ctx__ context.Context, wrpc__ wrpc.Invoker) (r0__ string, close__ func() error, err__ error) {
15-
if err__ = wrpc__.Invoke(ctx__, "wrpc-examples:hello/handler", "hello", func(w__ wrpc.IndexWriteCloser, r__ wrpc.IndexReadCloser) error {
16-
close__ = r__.Close
17-
_, err__ = w__.Write(nil)
18-
if err__ != nil {
19-
return fmt.Errorf("failed to write empty parameters: %w", err__)
14+
func Hello(ctx__ context.Context, wrpc__ wrpc.Invoker) (r0__ string, err__ error) {
15+
var w__ wrpc.IndexWriteCloser
16+
var r__ wrpc.IndexReadCloser
17+
w__, r__, err__ = wrpc__.Invoke(ctx__, "wrpc-examples:hello/handler", "hello", nil)
18+
if err__ != nil {
19+
err__ = fmt.Errorf("failed to invoke `hello`: %w", err__)
20+
return
21+
}
22+
defer func() {
23+
if err := r__.Close(); err != nil {
24+
slog.ErrorContext(ctx__, "failed to close reader", "instance", "wrpc-examples:hello/handler", "name", "hello", "err", err)
2025
}
21-
r0__, err__ = func(r interface {
22-
io.ByteReader
23-
io.Reader
24-
}) (string, error) {
25-
var x uint32
26-
var s uint8
27-
for i := 0; i < 5; i++ {
28-
slog.Debug("reading string length byte", "i", i)
29-
b, err := r.ReadByte()
30-
if err != nil {
31-
if i > 0 && err == io.EOF {
32-
err = io.ErrUnexpectedEOF
33-
}
34-
return "", fmt.Errorf("failed to read string length byte: %w", err)
26+
}()
27+
if cErr__ := w__.Close(); cErr__ != nil {
28+
slog.DebugContext(ctx__, "failed to close outgoing stream", "instance", "wrpc-examples:hello/handler", "name", "hello", "err", cErr__)
29+
}
30+
r0__, err__ = func(r interface {
31+
io.ByteReader
32+
io.Reader
33+
}) (string, error) {
34+
var x uint32
35+
var s uint8
36+
for i := 0; i < 5; i++ {
37+
slog.Debug("reading string length byte", "i", i)
38+
b, err := r.ReadByte()
39+
if err != nil {
40+
if i > 0 && err == io.EOF {
41+
err = io.ErrUnexpectedEOF
3542
}
36-
if s == 28 && b > 0x0f {
37-
return "", errors.New("string length overflows a 32-bit integer")
43+
return "", fmt.Errorf("failed to read string length byte: %w", err)
44+
}
45+
if s == 28 && b > 0x0f {
46+
return "", errors.New("string length overflows a 32-bit integer")
47+
}
48+
if b < 0x80 {
49+
x = x | uint32(b)<<s
50+
buf := make([]byte, x)
51+
slog.Debug("reading string bytes", "len", x)
52+
_, err = r.Read(buf)
53+
if err != nil {
54+
return "", fmt.Errorf("failed to read string bytes: %w", err)
3855
}
39-
if b < 0x80 {
40-
x = x | uint32(b)<<s
41-
buf := make([]byte, x)
42-
slog.Debug("reading string bytes", "len", x)
43-
_, err = r.Read(buf)
44-
if err != nil {
45-
return "", fmt.Errorf("failed to read string bytes: %w", err)
46-
}
47-
if !utf8.Valid(buf) {
48-
return string(buf), errors.New("string is not valid UTF-8")
49-
}
50-
return string(buf), nil
56+
if !utf8.Valid(buf) {
57+
return string(buf), errors.New("string is not valid UTF-8")
5158
}
52-
x |= uint32(b&0x7f) << s
53-
s += 7
59+
return string(buf), nil
5460
}
55-
return "", errors.New("string length overflows a 32-bit integer")
56-
}(r__)
57-
if err__ != nil {
58-
return fmt.Errorf("failed to read result 0: %w", err__)
61+
x |= uint32(b&0x7f) << s
62+
s += 7
5963
}
60-
return nil
61-
}); err__ != nil {
62-
err__ = fmt.Errorf("failed to invoke `hello`: %w", err__)
64+
return "", errors.New("string length overflows a 32-bit integer")
65+
}(r__)
66+
if err__ != nil {
67+
err__ = fmt.Errorf("failed to read result 0: %w", err__)
6368
return
6469
}
6570
return

examples/go/hello-client/cmd/hello-client-nats/main.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,11 @@ func run() (err error) {
3030

3131
for _, prefix := range os.Args[1:] {
3232
wrpc := wrpcnats.NewClient(nc, prefix)
33-
greeting, cleanup, err := handler.Hello(context.Background(), wrpc)
33+
greeting, err := handler.Hello(context.Background(), wrpc)
3434
if err != nil {
3535
return fmt.Errorf("failed to call `wrpc-examples:hello/handler.hello`: %w", err)
3636
}
3737
fmt.Printf("%s: %s\n", prefix, greeting)
38-
if err := cleanup(); err != nil {
39-
return fmt.Errorf("failed to shutdown `wrpc-examples:hello/handler.hello` invocation: %w", err)
40-
}
4138
}
4239
return nil
4340
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
// Generated by `wit-bindgen-wrpc-go` 0.6.0. DO NOT EDIT!
2+
// client package contains wRPC bindings for `client` world
3+
package client

0 commit comments

Comments
 (0)