Skip to content

Commit fa3a703

Browse files
refactor: make Worker an embeddable struct (#1884)
* make WorkerExtension embeddable Signed-off-by: Robert Landers <landers.robert@gmail.com> * change names Signed-off-by: Robert Landers <landers.robert@gmail.com> --------- Signed-off-by: Robert Landers <landers.robert@gmail.com>
1 parent ab1ec71 commit fa3a703

2 files changed

Lines changed: 57 additions & 57 deletions

File tree

threadFramework.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"log/slog"
66
"net/http"
77
"sync"
8+
"sync/atomic"
89
)
910

1011
// EXPERIMENTAL: WorkerExtension allows you to register an external worker where instead of calling frankenphp handlers on
@@ -99,3 +100,46 @@ func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension, thread *
99100
}
100101
}
101102
}
103+
104+
type Worker struct {
105+
ExtensionName string
106+
WorkerFileName string
107+
WorkerEnv PreparedEnv
108+
MinThreads int
109+
RequestChan chan *WorkerRequest[any, any]
110+
ActivatedCount atomic.Int32
111+
DrainCount atomic.Int32
112+
}
113+
114+
func (w *Worker) Name() string {
115+
return w.ExtensionName
116+
}
117+
118+
func (w *Worker) FileName() string {
119+
return w.WorkerFileName
120+
}
121+
122+
func (w *Worker) Env() PreparedEnv {
123+
return w.WorkerEnv
124+
}
125+
126+
func (w *Worker) GetMinThreads() int {
127+
return w.MinThreads
128+
}
129+
130+
func (w *Worker) ThreadActivatedNotification(threadId int) {
131+
w.ActivatedCount.Add(1)
132+
}
133+
134+
func (w *Worker) ThreadDrainNotification(threadId int) {
135+
w.DrainCount.Add(1)
136+
}
137+
138+
func (w *Worker) ThreadDeactivatedNotification(threadId int) {
139+
w.DrainCount.Add(-1)
140+
w.ActivatedCount.Add(-1)
141+
}
142+
143+
func (w *Worker) ProvideRequest() *WorkerRequest[any, any] {
144+
return <-w.RequestChan
145+
}

threadFramework_test.go

Lines changed: 13 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package frankenphp
33
import (
44
"io"
55
"net/http/httptest"
6-
"sync"
6+
"sync/atomic"
77
"testing"
88
"time"
99

@@ -13,73 +13,29 @@ import (
1313

1414
// mockWorkerExtension implements the WorkerExtension interface
1515
type mockWorkerExtension struct {
16-
name string
17-
fileName string
18-
env PreparedEnv
19-
minThreads int
20-
requestChan chan *WorkerRequest[any, any]
21-
activatedCount int
22-
drainCount int
23-
deactivatedCount int
24-
mu sync.Mutex
16+
Worker
2517
}
2618

2719
func newMockWorkerExtension(name, fileName string, minThreads int) *mockWorkerExtension {
2820
return &mockWorkerExtension{
29-
name: name,
30-
fileName: fileName,
31-
env: make(PreparedEnv),
32-
minThreads: minThreads,
33-
requestChan: make(chan *WorkerRequest[any, any], 10), // Buffer to avoid blocking
21+
Worker: Worker{
22+
ExtensionName: name,
23+
WorkerFileName: fileName,
24+
WorkerEnv: nil,
25+
MinThreads: minThreads,
26+
RequestChan: make(chan *WorkerRequest[any, any], minThreads),
27+
ActivatedCount: atomic.Int32{},
28+
DrainCount: atomic.Int32{},
29+
},
3430
}
3531
}
3632

37-
func (m *mockWorkerExtension) Name() string {
38-
return m.name
39-
}
40-
41-
func (m *mockWorkerExtension) FileName() string {
42-
return m.fileName
43-
}
44-
45-
func (m *mockWorkerExtension) Env() PreparedEnv {
46-
return m.env
47-
}
48-
49-
func (m *mockWorkerExtension) GetMinThreads() int {
50-
return m.minThreads
51-
}
52-
53-
func (m *mockWorkerExtension) ThreadActivatedNotification(threadId int) {
54-
m.mu.Lock()
55-
defer m.mu.Unlock()
56-
m.activatedCount++
57-
}
58-
59-
func (m *mockWorkerExtension) ThreadDrainNotification(threadId int) {
60-
m.mu.Lock()
61-
defer m.mu.Unlock()
62-
m.drainCount++
63-
}
64-
65-
func (m *mockWorkerExtension) ThreadDeactivatedNotification(threadId int) {
66-
m.mu.Lock()
67-
defer m.mu.Unlock()
68-
m.deactivatedCount++
69-
}
70-
71-
func (m *mockWorkerExtension) ProvideRequest() *WorkerRequest[any, any] {
72-
return <-m.requestChan
73-
}
74-
7533
func (m *mockWorkerExtension) InjectRequest(r *WorkerRequest[any, any]) {
76-
m.requestChan <- r
34+
m.RequestChan <- r
7735
}
7836

7937
func (m *mockWorkerExtension) GetActivatedCount() int {
80-
m.mu.Lock()
81-
defer m.mu.Unlock()
82-
return m.activatedCount
38+
return int(m.ActivatedCount.Load())
8339
}
8440

8541
func TestWorkerExtension(t *testing.T) {

0 commit comments

Comments
 (0)