Skip to content

Commit c369df3

Browse files
committed
Generate OTEL queue count metrics
See [1] for context, but here we implement `HookQueueStateCount` in the existing OTEL middleware so that it can emit metrics around queue counts, a pretty handy feature that we have on definitive request for, but which I'd guess most River users would like to have access to. [1] riverqueue/river#1203
1 parent f64003a commit c369df3

File tree

3 files changed

+118
-7
lines changed

3 files changed

+118
-7
lines changed

otelriver/go.mod

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,12 @@ require (
4040
gopkg.in/yaml.v3 v3.0.1 // indirect
4141
)
4242

43-
// replace github.com/riverqueue/river => ../../river
43+
replace github.com/riverqueue/river => ../../river
4444

45-
// replace github.com/riverqueue/river/riverdriver => ../../river/riverdriver
45+
replace github.com/riverqueue/river/riverdriver => ../../river/riverdriver
4646

47-
// replace github.com/riverqueue/river/riverdriver => ../../river/riverdriver
47+
replace github.com/riverqueue/river/riverdriver/riverpgxv5 => ../../river/riverdriver/riverpgxv5
4848

49-
// replace github.com/riverqueue/river/riverdriver/riverpgxv5 => ../../river/riverdriver/riverpgxv5
49+
replace github.com/riverqueue/river/rivershared => ../../river/rivershared
5050

51-
// replace github.com/riverqueue/river/rivershared => ../../river/rivershared
52-
53-
// replace github.com/riverqueue/river/rivertype => ../../river/rivertype
51+
replace github.com/riverqueue/river/rivertype => ../../river/rivertype

otelriver/middleware.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type MiddlewareConfig struct {
6060
// Middleware is a River middleware that emits OpenTelemetry metrics when jobs
6161
// are inserted or worked.
6262
type Middleware struct {
63+
river.HookDefaults
6364
river.MiddlewareDefaults
6465

6566
config *MiddlewareConfig
@@ -78,6 +79,8 @@ type middlewareMetrics struct {
7879
messagingClientOperationDuration metric.Float64Histogram
7980
messagingClientSentMessages metric.Int64Counter
8081
messagingProcessDuration metric.Float64Histogram
82+
queueStateCount metric.Int64Gauge
83+
queueTotalCount metric.Int64Gauge
8184
workCount metric.Int64Counter
8285
workDuration metric.Float64Gauge
8386
workDurationHistogram metric.Float64Histogram
@@ -116,6 +119,8 @@ func NewMiddleware(config *MiddlewareConfig) *Middleware {
116119
insertManyCount: mustInt64Counter(meter, prefix+"insert_many_count", metric.WithDescription("Number of job batches inserted (all jobs are inserted in a batch, but batches may be one job)"), metric.WithUnit("{job_batch}")),
117120
insertManyDuration: mustFloat64Gauge(meter, prefix+"insert_many_duration", metric.WithDescription("Duration of job batch insertion"), metric.WithUnit(durationUnit)),
118121
insertManyDurationHistogram: mustFloat64Histogram(meter, prefix+"insert_many_duration_histogram", metric.WithDescription("Duration of job batch insertion (histogram)"), metric.WithUnit(durationUnit)),
122+
queueStateCount: mustInt64Gauge(meter, prefix+"queue_state_count", metric.WithDescription("Number of jobs in a queue by state"), metric.WithUnit("{job}")),
123+
queueTotalCount: mustInt64Gauge(meter, prefix+"queue_total_count", metric.WithDescription("Total number of jobs in a queue across all states"), metric.WithUnit("{job}")),
119124
workCount: mustInt64Counter(meter, prefix+"work_count", metric.WithDescription("Number of jobs worked"), metric.WithUnit("{job}")),
120125
workDuration: mustFloat64Gauge(meter, prefix+"work_duration", metric.WithDescription("Duration of job being worked"), metric.WithUnit(durationUnit)),
121126
workDurationHistogram: mustFloat64Histogram(meter, prefix+"work_duration_histogram", metric.WithDescription("Duration of job being worked (histogram)"), metric.WithUnit(durationUnit)),
@@ -182,6 +187,25 @@ func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.Job
182187
return insertRes, err
183188
}
184189

190+
func (m *Middleware) QueueStateCount(ctx context.Context, params *rivertype.HookQueueStateCountParams) {
191+
for queue, result := range params.ByQueue {
192+
for state, count := range result.ByState {
193+
m.metrics.queueStateCount.Record(ctx, int64(count),
194+
metric.WithAttributes(
195+
attribute.String("queue", queue),
196+
attribute.String("state", string(state)),
197+
),
198+
)
199+
}
200+
201+
m.metrics.queueTotalCount.Record(ctx, int64(result.Total),
202+
metric.WithAttributes(
203+
attribute.String("queue", queue),
204+
),
205+
)
206+
}
207+
}
208+
185209
func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(context.Context) error) error {
186210
spanName := prefix + "work"
187211
if m.config.EnableWorkSpanJobKindSuffix {
@@ -288,6 +312,14 @@ func mustFloat64Histogram(meter metric.Meter, name string, options ...metric.Flo
288312
return metric
289313
}
290314

315+
func mustInt64Gauge(meter metric.Meter, name string, options ...metric.Int64GaugeOption) metric.Int64Gauge {
316+
metric, err := meter.Int64Gauge(name, options...)
317+
if err != nil {
318+
panic(err)
319+
}
320+
return metric
321+
}
322+
291323
func mustInt64Counter(meter metric.Meter, name string, options ...metric.Int64CounterOption) metric.Int64Counter {
292324
metric, err := meter.Int64Counter(name, options...)
293325
if err != nil {

otelriver/middleware_test.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
// Verify interface compliance.
2323
var (
24+
_ rivertype.HookQueueStateCount = &Middleware{} // the middleware is both hook and middleware (the hook so it can emit queue counts); kind of weird, but it works fine
2425
_ rivertype.JobInsertMiddleware = &Middleware{}
2526
_ rivertype.WorkerMiddleware = &Middleware{}
2627
)
@@ -258,6 +259,86 @@ func TestMiddleware(t *testing.T) {
258259
}
259260
})
260261

262+
t.Run("QueueStateCount", func(t *testing.T) {
263+
t.Parallel()
264+
265+
middleware, bundle := setup(t)
266+
267+
params := &rivertype.HookQueueStateCountParams{
268+
ByQueue: map[string]rivertype.HookQueueStateCountResult{
269+
"default": {
270+
ByState: map[rivertype.JobState]int{
271+
rivertype.JobStateAvailable: 5,
272+
rivertype.JobStateRunning: 3,
273+
rivertype.JobStateCompleted: 10,
274+
},
275+
Total: 18,
276+
},
277+
"critical": {
278+
ByState: map[rivertype.JobState]int{
279+
rivertype.JobStateAvailable: 2,
280+
rivertype.JobStateScheduled: 1,
281+
},
282+
Total: 3,
283+
},
284+
},
285+
}
286+
287+
// Preflight: verify Total is the sum of all ByState counts so
288+
// test data stays consistent if someone edits it later.
289+
for queue, result := range params.ByQueue {
290+
var sum int
291+
for _, count := range result.ByState {
292+
sum += count
293+
}
294+
require.Equal(t, sum, result.Total, "Total for queue %q doesn't match sum of ByState", queue)
295+
}
296+
297+
middleware.QueueStateCount(ctx, params)
298+
299+
var metrics metricdata.ResourceMetrics
300+
require.NoError(t, bundle.metricReader.Collect(ctx, &metrics))
301+
302+
_, metricData := requireMetric[metricdata.Gauge[int64]](t, metrics, "river.queue_state_count")
303+
304+
// Build a map of (queue, state) -> value for easy assertion.
305+
type queueState struct{ queue, state string }
306+
counts := make(map[queueState]int64)
307+
for _, dataPoint := range metricData.DataPoints {
308+
var key queueState
309+
for _, attr := range dataPoint.Attributes.ToSlice() {
310+
switch string(attr.Key) {
311+
case "queue":
312+
key.queue = attr.Value.AsString()
313+
case "state":
314+
key.state = attr.Value.AsString()
315+
}
316+
}
317+
counts[key] = dataPoint.Value
318+
}
319+
320+
require.Equal(t, int64(5), counts[queueState{"default", "available"}])
321+
require.Equal(t, int64(3), counts[queueState{"default", "running"}])
322+
require.Equal(t, int64(10), counts[queueState{"default", "completed"}])
323+
require.Equal(t, int64(2), counts[queueState{"critical", "available"}])
324+
require.Equal(t, int64(1), counts[queueState{"critical", "scheduled"}])
325+
326+
// Verify total counts per queue.
327+
_, totalData := requireMetric[metricdata.Gauge[int64]](t, metrics, "river.queue_total_count")
328+
329+
totals := make(map[string]int64)
330+
for _, dataPoint := range totalData.DataPoints {
331+
for _, attr := range dataPoint.Attributes.ToSlice() {
332+
if string(attr.Key) == "queue" {
333+
totals[attr.Value.AsString()] = dataPoint.Value
334+
}
335+
}
336+
}
337+
338+
require.Equal(t, int64(18), totals["default"])
339+
require.Equal(t, int64(3), totals["critical"])
340+
})
341+
261342
t.Run("WorkSuccess", func(t *testing.T) {
262343
t.Parallel()
263344

0 commit comments

Comments
 (0)