Skip to content
Open
17 changes: 17 additions & 0 deletions core/src/main/java/tech/ydb/core/metrics/Meter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package tech.ydb.core.metrics;

import tech.ydb.core.Status;

public interface Meter {
void recordOperationDuration(String operationName, long durationNanos);

void recordOperationFailed(String operationName, Status status);

void registerSessionPool(String poolName, SessionPoolObserver observer);

void recordSessionCreateTime(String poolName, long durationNanos);

void incrementSessionPendingRequests(String poolName);

void incrementSessionTimeouts(String poolName);
}
38 changes: 38 additions & 0 deletions core/src/main/java/tech/ydb/core/metrics/NoopMeter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package tech.ydb.core.metrics;

import tech.ydb.core.Status;

public final class NoopMeter implements Meter {
public static final NoopMeter INSTANCE = new NoopMeter();

private NoopMeter() {
}

public static NoopMeter getInstance() {
return INSTANCE;
}

@Override
public void recordOperationDuration(String operationName, long durationNanos) {
}

@Override
public void recordOperationFailed(String operationName, Status status) {
}

@Override
public void registerSessionPool(String poolName, SessionPoolObserver observer) {
}

@Override
public void recordSessionCreateTime(String poolName, long durationNanos) {
}

@Override
public void incrementSessionPendingRequests(String poolName) {
}

@Override
public void incrementSessionTimeouts(String poolName) {
}
}
150 changes: 150 additions & 0 deletions core/src/main/java/tech/ydb/core/metrics/OpenTelemetryMeter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package tech.ydb.core.metrics;

import java.util.Objects;

import io.grpc.ExperimentalApi;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.LongCounter;

import tech.ydb.core.Status;

@ExperimentalApi("YDB Meter is experimental and API may change without notice")
public final class OpenTelemetryMeter implements Meter {
private static final String DEFAULT_SCOPE = "tech.ydb.sdk";

private static final AttributeKey<String> DATABASE = AttributeKey.stringKey("database");
private static final AttributeKey<String> ENDPOINT = AttributeKey.stringKey("endpoint");
private static final AttributeKey<String> OPERATION_NAME = AttributeKey.stringKey("operation.name");
private static final AttributeKey<String> STATUS_CODE = AttributeKey.stringKey("status_code");
private static final AttributeKey<String> POOL_NAME = AttributeKey.stringKey("ydb.query.session.pool.name");
private static final AttributeKey<String> SESSION_STATE = AttributeKey.stringKey("ydb.query.session.state");

private final io.opentelemetry.api.metrics.Meter meter;
private final Attributes operationBaseAttributes;

private final DoubleHistogram operationDuration;
private final LongCounter operationFailed;
private final DoubleHistogram sessionCreateTime;
private final LongCounter sessionPendingRequests;
private final LongCounter sessionTimeouts;

private OpenTelemetryMeter(io.opentelemetry.api.metrics.Meter meter, String database, String endpoint) {
this.meter = Objects.requireNonNull(meter, "meter is null");

this.operationBaseAttributes = Attributes.of(
DATABASE, database,
ENDPOINT, endpoint
);

this.operationDuration = meter.histogramBuilder("ydb.client.operation.duration")
.setUnit("s")
.setDescription("Duration of a single client operation attempt (ExecuteQuery, Commit, Rollback).")
.build();

this.operationFailed = meter.counterBuilder("ydb.client.operation.failed")
.setUnit("{operation}")
.setDescription("Number of failed client operation attempts.")
.build();

this.sessionCreateTime = meter.histogramBuilder("ydb.query.session.create_time")
.setUnit("s")
.setDescription("Time spent creating a new session.")
.build();

this.sessionPendingRequests = meter.counterBuilder("ydb.query.session.pending_requests")
.setUnit("{request}")
.setDescription("Number of session-acquire requests that had to wait for a free session.")
.build();

this.sessionTimeouts = meter.counterBuilder("ydb.query.session.timeouts")
.setUnit("{timeout}")
.setDescription("Number of session-acquire timeouts.")
.build();
}

public static OpenTelemetryMeter fromOpenTelemetry(OpenTelemetry openTelemetry,
String database, String endpoint) {
Objects.requireNonNull(openTelemetry, "openTelemetry is null");
return new OpenTelemetryMeter(openTelemetry.getMeter(DEFAULT_SCOPE), database, endpoint);
}

public static OpenTelemetryMeter createGlobal(String database, String endpoint) {
return fromOpenTelemetry(GlobalOpenTelemetry.get(), database, endpoint);
}

@Override
public void recordOperationDuration(String operationName, long durationNanos) {
operationDuration.record(toSeconds(durationNanos), withOperation(operationName));
}

@Override
public void recordOperationFailed(String operationName, Status status) {
if (status == null || status.isSuccess()) {
return;
}
Attributes attrs = operationBaseAttributes.toBuilder()
.put(OPERATION_NAME, operationName)
.put(STATUS_CODE, status.getCode().toString())
.build();
operationFailed.add(1L, attrs);
}

@Override
public void registerSessionPool(String poolName, SessionPoolObserver observer) {
Attributes idle = Attributes.of(POOL_NAME, poolName, SESSION_STATE, "idle");
Attributes used = Attributes.of(POOL_NAME, poolName, SESSION_STATE, "used");
Attributes pool = Attributes.of(POOL_NAME, poolName);

meter.gaugeBuilder("ydb.query.session.count")
.ofLongs()
.setUnit("{session}")
.setDescription("Current number of sessions in the pool by state.")
.buildWithCallback(measurement -> {
measurement.record(observer.getIdleCount(), idle);
measurement.record(observer.getUsedCount(), used);
});

meter.gaugeBuilder("ydb.query.session.min")
.ofLongs()
.setUnit("{session}")
.setDescription("Configured minimum size of the session pool.")
.buildWithCallback(measurement -> measurement.record(observer.getMinSize(), pool));

meter.gaugeBuilder("ydb.query.session.max")
.ofLongs()
.setUnit("{session}")
.setDescription("Configured maximum size of the session pool.")
.buildWithCallback(measurement -> measurement.record(observer.getMaxSize(), pool));
}

@Override
public void recordSessionCreateTime(String poolName, long durationNanos) {
sessionCreateTime.record(toSeconds(durationNanos), poolAttributes(poolName));
}

@Override
public void incrementSessionPendingRequests(String poolName) {
sessionPendingRequests.add(1L, poolAttributes(poolName));
}

@Override
public void incrementSessionTimeouts(String poolName) {
sessionTimeouts.add(1L, poolAttributes(poolName));
}

private Attributes withOperation(String operationName) {
return operationBaseAttributes.toBuilder().put(OPERATION_NAME, operationName).build();
}

private static Attributes poolAttributes(String poolName) {
return Attributes.of(POOL_NAME, poolName);
}

private static double toSeconds(long durationNanos) {
return durationNanos / 1_000_000_000.0;
}
}
11 changes: 11 additions & 0 deletions core/src/main/java/tech/ydb/core/metrics/SessionPoolObserver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package tech.ydb.core.metrics;

public interface SessionPoolObserver {
int getMinSize();

int getMaxSize();

int getIdleCount();

int getUsedCount();
}
5 changes: 5 additions & 0 deletions query/src/main/java/tech/ydb/query/QueryClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import tech.ydb.core.Result;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.core.metrics.Meter;
import tech.ydb.core.tracing.NoopTracer;
import tech.ydb.core.tracing.Tracer;
import tech.ydb.query.impl.QueryClientImpl;
Expand Down Expand Up @@ -55,6 +56,10 @@ interface Builder {

Builder sessionMaxIdleTime(Duration duration);

Builder sessionPoolName(String poolName);

Builder withMeter(Meter meter);

QueryClient build();
}
}
26 changes: 24 additions & 2 deletions query/src/main/java/tech/ydb/query/impl/QueryClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

import tech.ydb.core.Result;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.core.metrics.Meter;
import tech.ydb.core.metrics.NoopMeter;
import tech.ydb.core.tracing.Tracer;
import tech.ydb.query.QueryClient;
import tech.ydb.query.QuerySession;
Expand All @@ -24,13 +26,16 @@ public class QueryClientImpl implements QueryClient {
private final Tracer tracer;

public QueryClientImpl(Builder builder) {
String poolName = builder.sessionPoolName != null
? builder.sessionPoolName : builder.transport.getDatabase();
this.pool = new SessionPool(
Clock.systemUTC(),
new QueryServiceRpc(builder.transport),
new QueryServiceRpc(builder.transport, builder.meter),
builder.transport.getScheduler(),
builder.sessionPoolMinSize,
builder.sessionPoolMaxSize,
builder.sessionPoolIdleDuration
builder.sessionPoolIdleDuration,
poolName
);
this.scheduler = builder.transport.getScheduler();
this.tracer = builder.transport.getTracer();
Expand Down Expand Up @@ -77,6 +82,8 @@ public static class Builder implements QueryClient.Builder {
private int sessionPoolMinSize = 0;
private int sessionPoolMaxSize = 50;
private Duration sessionPoolIdleDuration = Duration.ofMinutes(5);
private String sessionPoolName = null;
private Meter meter = NoopMeter.INSTANCE;

Builder(GrpcTransport transport) {
Preconditions.checkArgument(transport != null, "transport is null");
Expand Down Expand Up @@ -126,6 +133,21 @@ public Builder sessionMaxIdleTime(Duration duration) {
return this;
}

@Override
public Builder sessionPoolName(String poolName) {
Preconditions.checkArgument(poolName != null && !poolName.isEmpty(),
"sessionPoolName must be a non-empty string");
this.sessionPoolName = poolName;
return this;
}

@Override
public Builder withMeter(Meter meter) {
Preconditions.checkArgument(meter != null, "meter is null");
this.meter = meter;
return this;
}

@Override
public QueryClientImpl build() {
return new QueryClientImpl(this);
Expand Down
12 changes: 12 additions & 0 deletions query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.core.grpc.GrpcRequestSettings;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.core.metrics.Meter;
import tech.ydb.core.metrics.NoopMeter;
import tech.ydb.core.operation.StatusExtractor;
import tech.ydb.core.tracing.Span;
import tech.ydb.core.tracing.SpanKind;
Expand Down Expand Up @@ -51,16 +53,26 @@ class QueryServiceRpc {

private final GrpcTransport transport;
private final Tracer trace;
private final Meter meter;

QueryServiceRpc(GrpcTransport transport) {
this(transport, NoopMeter.INSTANCE);
}

QueryServiceRpc(GrpcTransport transport, Meter meter) {
this.transport = transport;
this.trace = transport.getTracer();
this.meter = meter;
}

Span startSpan(String spanName) {
return trace.startSpan(spanName, SpanKind.CLIENT);
}

Meter getMeter() {
return meter;
}

public CompletableFuture<Result<YdbQuery.CreateSessionResponse>> createSession(
YdbQuery.CreateSessionRequest request, GrpcRequestSettings settings) {
return transport
Expand Down
Loading