diff --git a/core/src/main/java/tech/ydb/core/metrics/Meter.java b/core/src/main/java/tech/ydb/core/metrics/Meter.java new file mode 100644 index 000000000..a8856351c --- /dev/null +++ b/core/src/main/java/tech/ydb/core/metrics/Meter.java @@ -0,0 +1,17 @@ +package tech.ydb.core.metrics; + +import tech.ydb.core.Status; + +public interface Meter { + void recordOperationDuration(String operationName, long durationNanos); + + void recordOperationFailed(String operationName, Status status); + + void registerSessionPool(String poolName, SessionPoolObserver observer); + + void recordSessionCreateTime(String poolName, long durationNanos); + + void incrementSessionPendingRequests(String poolName); + + void incrementSessionTimeouts(String poolName); +} diff --git a/core/src/main/java/tech/ydb/core/metrics/NoopMeter.java b/core/src/main/java/tech/ydb/core/metrics/NoopMeter.java new file mode 100644 index 000000000..6ca3d8c4a --- /dev/null +++ b/core/src/main/java/tech/ydb/core/metrics/NoopMeter.java @@ -0,0 +1,38 @@ +package tech.ydb.core.metrics; + +import tech.ydb.core.Status; + +public final class NoopMeter implements Meter { + public static final NoopMeter INSTANCE = new NoopMeter(); + + private NoopMeter() { + } + + public static NoopMeter getInstance() { + return INSTANCE; + } + + @Override + public void recordOperationDuration(String operationName, long durationNanos) { + } + + @Override + public void recordOperationFailed(String operationName, Status status) { + } + + @Override + public void registerSessionPool(String poolName, SessionPoolObserver observer) { + } + + @Override + public void recordSessionCreateTime(String poolName, long durationNanos) { + } + + @Override + public void incrementSessionPendingRequests(String poolName) { + } + + @Override + public void incrementSessionTimeouts(String poolName) { + } +} diff --git a/core/src/main/java/tech/ydb/core/metrics/OpenTelemetryMeter.java b/core/src/main/java/tech/ydb/core/metrics/OpenTelemetryMeter.java new file mode 100644 index 000000000..67bb7017c --- /dev/null +++ b/core/src/main/java/tech/ydb/core/metrics/OpenTelemetryMeter.java @@ -0,0 +1,150 @@ +package tech.ydb.core.metrics; + +import java.util.Objects; + +import io.grpc.ExperimentalApi; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.LongCounter; + +import tech.ydb.core.Status; + +@ExperimentalApi("YDB Meter is experimental and API may change without notice") +public final class OpenTelemetryMeter implements Meter { + private static final String DEFAULT_SCOPE = "tech.ydb.sdk"; + + private static final AttributeKey DATABASE = AttributeKey.stringKey("database"); + private static final AttributeKey ENDPOINT = AttributeKey.stringKey("endpoint"); + private static final AttributeKey OPERATION_NAME = AttributeKey.stringKey("operation.name"); + private static final AttributeKey STATUS_CODE = AttributeKey.stringKey("status_code"); + private static final AttributeKey POOL_NAME = AttributeKey.stringKey("ydb.query.session.pool.name"); + private static final AttributeKey SESSION_STATE = AttributeKey.stringKey("ydb.query.session.state"); + + private final io.opentelemetry.api.metrics.Meter meter; + private final Attributes operationBaseAttributes; + + private final DoubleHistogram operationDuration; + private final LongCounter operationFailed; + private final DoubleHistogram sessionCreateTime; + private final LongCounter sessionPendingRequests; + private final LongCounter sessionTimeouts; + + private OpenTelemetryMeter(io.opentelemetry.api.metrics.Meter meter, String database, String endpoint) { + this.meter = Objects.requireNonNull(meter, "meter is null"); + + this.operationBaseAttributes = Attributes.of( + DATABASE, database, + ENDPOINT, endpoint + ); + + this.operationDuration = meter.histogramBuilder("ydb.client.operation.duration") + .setUnit("s") + .setDescription("Duration of a single client operation attempt (ExecuteQuery, Commit, Rollback).") + .build(); + + this.operationFailed = meter.counterBuilder("ydb.client.operation.failed") + .setUnit("{operation}") + .setDescription("Number of failed client operation attempts.") + .build(); + + this.sessionCreateTime = meter.histogramBuilder("ydb.query.session.create_time") + .setUnit("s") + .setDescription("Time spent creating a new session.") + .build(); + + this.sessionPendingRequests = meter.counterBuilder("ydb.query.session.pending_requests") + .setUnit("{request}") + .setDescription("Number of session-acquire requests that had to wait for a free session.") + .build(); + + this.sessionTimeouts = meter.counterBuilder("ydb.query.session.timeouts") + .setUnit("{timeout}") + .setDescription("Number of session-acquire timeouts.") + .build(); + } + + public static OpenTelemetryMeter fromOpenTelemetry(OpenTelemetry openTelemetry, + String database, String endpoint) { + Objects.requireNonNull(openTelemetry, "openTelemetry is null"); + return new OpenTelemetryMeter(openTelemetry.getMeter(DEFAULT_SCOPE), database, endpoint); + } + + public static OpenTelemetryMeter createGlobal(String database, String endpoint) { + return fromOpenTelemetry(GlobalOpenTelemetry.get(), database, endpoint); + } + + @Override + public void recordOperationDuration(String operationName, long durationNanos) { + operationDuration.record(toSeconds(durationNanos), withOperation(operationName)); + } + + @Override + public void recordOperationFailed(String operationName, Status status) { + if (status == null || status.isSuccess()) { + return; + } + Attributes attrs = operationBaseAttributes.toBuilder() + .put(OPERATION_NAME, operationName) + .put(STATUS_CODE, status.getCode().toString()) + .build(); + operationFailed.add(1L, attrs); + } + + @Override + public void registerSessionPool(String poolName, SessionPoolObserver observer) { + Attributes idle = Attributes.of(POOL_NAME, poolName, SESSION_STATE, "idle"); + Attributes used = Attributes.of(POOL_NAME, poolName, SESSION_STATE, "used"); + Attributes pool = Attributes.of(POOL_NAME, poolName); + + meter.gaugeBuilder("ydb.query.session.count") + .ofLongs() + .setUnit("{session}") + .setDescription("Current number of sessions in the pool by state.") + .buildWithCallback(measurement -> { + measurement.record(observer.getIdleCount(), idle); + measurement.record(observer.getUsedCount(), used); + }); + + meter.gaugeBuilder("ydb.query.session.min") + .ofLongs() + .setUnit("{session}") + .setDescription("Configured minimum size of the session pool.") + .buildWithCallback(measurement -> measurement.record(observer.getMinSize(), pool)); + + meter.gaugeBuilder("ydb.query.session.max") + .ofLongs() + .setUnit("{session}") + .setDescription("Configured maximum size of the session pool.") + .buildWithCallback(measurement -> measurement.record(observer.getMaxSize(), pool)); + } + + @Override + public void recordSessionCreateTime(String poolName, long durationNanos) { + sessionCreateTime.record(toSeconds(durationNanos), poolAttributes(poolName)); + } + + @Override + public void incrementSessionPendingRequests(String poolName) { + sessionPendingRequests.add(1L, poolAttributes(poolName)); + } + + @Override + public void incrementSessionTimeouts(String poolName) { + sessionTimeouts.add(1L, poolAttributes(poolName)); + } + + private Attributes withOperation(String operationName) { + return operationBaseAttributes.toBuilder().put(OPERATION_NAME, operationName).build(); + } + + private static Attributes poolAttributes(String poolName) { + return Attributes.of(POOL_NAME, poolName); + } + + private static double toSeconds(long durationNanos) { + return durationNanos / 1_000_000_000.0; + } +} diff --git a/core/src/main/java/tech/ydb/core/metrics/SessionPoolObserver.java b/core/src/main/java/tech/ydb/core/metrics/SessionPoolObserver.java new file mode 100644 index 000000000..9ae5f0d14 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/metrics/SessionPoolObserver.java @@ -0,0 +1,11 @@ +package tech.ydb.core.metrics; + +public interface SessionPoolObserver { + int getMinSize(); + + int getMaxSize(); + + int getIdleCount(); + + int getUsedCount(); +} diff --git a/query/src/main/java/tech/ydb/query/QueryClient.java b/query/src/main/java/tech/ydb/query/QueryClient.java index 343ccb01f..8cdeec529 100644 --- a/query/src/main/java/tech/ydb/query/QueryClient.java +++ b/query/src/main/java/tech/ydb/query/QueryClient.java @@ -8,6 +8,7 @@ import tech.ydb.core.Result; import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.metrics.Meter; import tech.ydb.core.tracing.NoopTracer; import tech.ydb.core.tracing.Tracer; import tech.ydb.query.impl.QueryClientImpl; @@ -55,6 +56,10 @@ interface Builder { Builder sessionMaxIdleTime(Duration duration); + Builder sessionPoolName(String poolName); + + Builder withMeter(Meter meter); + QueryClient build(); } } diff --git a/query/src/main/java/tech/ydb/query/impl/QueryClientImpl.java b/query/src/main/java/tech/ydb/query/impl/QueryClientImpl.java index 4716abc85..b380d8bb1 100644 --- a/query/src/main/java/tech/ydb/query/impl/QueryClientImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/QueryClientImpl.java @@ -9,6 +9,8 @@ import tech.ydb.core.Result; import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.metrics.Meter; +import tech.ydb.core.metrics.NoopMeter; import tech.ydb.core.tracing.Tracer; import tech.ydb.query.QueryClient; import tech.ydb.query.QuerySession; @@ -24,13 +26,16 @@ public class QueryClientImpl implements QueryClient { private final Tracer tracer; public QueryClientImpl(Builder builder) { + String poolName = builder.sessionPoolName != null + ? builder.sessionPoolName : builder.transport.getDatabase(); this.pool = new SessionPool( Clock.systemUTC(), - new QueryServiceRpc(builder.transport), + new QueryServiceRpc(builder.transport, builder.meter), builder.transport.getScheduler(), builder.sessionPoolMinSize, builder.sessionPoolMaxSize, - builder.sessionPoolIdleDuration + builder.sessionPoolIdleDuration, + poolName ); this.scheduler = builder.transport.getScheduler(); this.tracer = builder.transport.getTracer(); @@ -77,6 +82,8 @@ public static class Builder implements QueryClient.Builder { private int sessionPoolMinSize = 0; private int sessionPoolMaxSize = 50; private Duration sessionPoolIdleDuration = Duration.ofMinutes(5); + private String sessionPoolName = null; + private Meter meter = NoopMeter.INSTANCE; Builder(GrpcTransport transport) { Preconditions.checkArgument(transport != null, "transport is null"); @@ -126,6 +133,21 @@ public Builder sessionMaxIdleTime(Duration duration) { return this; } + @Override + public Builder sessionPoolName(String poolName) { + Preconditions.checkArgument(poolName != null && !poolName.isEmpty(), + "sessionPoolName must be a non-empty string"); + this.sessionPoolName = poolName; + return this; + } + + @Override + public Builder withMeter(Meter meter) { + Preconditions.checkArgument(meter != null, "meter is null"); + this.meter = meter; + return this; + } + @Override public QueryClientImpl build() { return new QueryClientImpl(this); diff --git a/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java b/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java index 66c2e0a6c..c063e637e 100644 --- a/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java +++ b/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java @@ -6,6 +6,8 @@ import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.core.grpc.GrpcRequestSettings; import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.metrics.Meter; +import tech.ydb.core.metrics.NoopMeter; import tech.ydb.core.operation.StatusExtractor; import tech.ydb.core.tracing.Span; import tech.ydb.core.tracing.SpanKind; @@ -51,16 +53,26 @@ class QueryServiceRpc { private final GrpcTransport transport; private final Tracer trace; + private final Meter meter; QueryServiceRpc(GrpcTransport transport) { + this(transport, NoopMeter.INSTANCE); + } + + QueryServiceRpc(GrpcTransport transport, Meter meter) { this.transport = transport; this.trace = transport.getTracer(); + this.meter = meter; } Span startSpan(String spanName) { return trace.startSpan(spanName, SpanKind.CLIENT); } + Meter getMeter() { + return meter; + } + public CompletableFuture> createSession( YdbQuery.CreateSessionRequest request, GrpcRequestSettings settings) { return transport diff --git a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java index f4c5a4065..0fb792bf3 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java @@ -331,7 +331,9 @@ GrpcReadStream createGrpcStream( public QueryStream createQuery(String query, TxMode tx, Params prms, ExecuteQuerySettings settings) { YdbQuery.TransactionControl tc = TxControl.txModeCtrl(tx, true); Span span = startSpan("ydb.ExecuteQuery"); - return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span) { + long startNanos = System.nanoTime(); + return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span, + startNanos, "ExecuteQuery") { @Override void handleTxMeta(String txID) { if (txID != null && !txID.isEmpty()) { @@ -378,10 +380,15 @@ static CompletableFuture> createSession( abstract class StreamImpl implements QueryStream { private final GrpcReadStream grpcStream; private final Span span; + private final long startNanos; + private final String operationName; - StreamImpl(GrpcReadStream grpcStream, Span operationSpan) { + StreamImpl(GrpcReadStream grpcStream, Span operationSpan, + long startNanos, String operationName) { this.grpcStream = grpcStream; this.span = operationSpan; + this.startNanos = startNanos; + this.operationName = operationName; } abstract void handleTxMeta(String txId); @@ -434,11 +441,13 @@ public CompletableFuture> execute(PartsHandler handler) { }).whenComplete(this::handleCompletion).thenApply(streamStatus -> { updateSessionState(streamStatus); Status status = operationStatus.orElse(streamStatus); - if (status.isSuccess()) { - return Result.success(new QueryInfo(stats.get()), streamStatus); - } else { - return Result.fail(status); + long elapsed = System.nanoTime() - startNanos; + rpc.getMeter().recordOperationDuration(operationName, elapsed); + if (!status.isSuccess()) { + rpc.getMeter().recordOperationFailed(operationName, status); + return Result.fail(status); } + return Result.success(new QueryInfo(stats.get()), streamStatus); }) ); } @@ -478,7 +487,9 @@ public QueryStream createQuery(String query, boolean commitAtEnd, Params prms, E : TxControl.txModeCtrl(txMode, commitAtEnd); Span span = startSpan("ydb.ExecuteQuery"); - return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span) { + long startNanos = System.nanoTime(); + return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span, + startNanos, "ExecuteQuery") { @Override void handleTxMeta(String txID) { String newId = txID == null || txID.isEmpty() ? null : txID; @@ -518,11 +529,13 @@ public void cancel() { @Override public CompletableFuture> commit(CommitTransactionSettings settings) { Span span = startSpan("ydb.Commit"); + long startNanos = System.nanoTime(); CompletableFuture currentStatusFuture = statusFuture.getAndSet(new CompletableFuture<>()); String transactionId = txId.get(); if (transactionId == null) { Issue issue = Issue.of("Transaction is not started", Issue.Severity.WARNING); Result res = Result.success(new QueryInfo(null), Status.of(StatusCode.SUCCESS, issue)); + rpc.getMeter().recordOperationDuration("Commit", System.nanoTime() - startNanos); return Span.endOnResult(span, CompletableFuture.completedFuture(res)); } @@ -537,11 +550,16 @@ public CompletableFuture> commit(CommitTransactionSettings set currentStatusFuture.complete(status); updateSessionState(status); if (!txId.compareAndSet(transactionId, null)) { - logger.warn("{} lost commit response for transaction {}", SessionImpl.this, transactionId); + logger.warn("{} lost commit response for transaction {}", + SessionImpl.this, transactionId); } // TODO: CommitTransactionResponse must contain exec_stats return res.map(resp -> new QueryInfo(null)); - }).whenComplete(((status, th) -> { + }).whenComplete(((res, th) -> { + rpc.getMeter().recordOperationDuration("Commit", System.nanoTime() - startNanos); + if (res != null && !res.getStatus().isSuccess()) { + rpc.getMeter().recordOperationFailed("Commit", res.getStatus()); + } if (th != null) { currentStatusFuture.completeExceptionally( new RuntimeException("Transaction commit failed with exception", th)); @@ -552,12 +570,14 @@ public CompletableFuture> commit(CommitTransactionSettings set @Override public CompletableFuture rollback(RollbackTransactionSettings settings) { Span span = startSpan("ydb.Rollback"); + long startNanos = System.nanoTime(); CompletableFuture currentStatusFuture = statusFuture.getAndSet(new CompletableFuture<>()); String transactionId = txId.get(); if (transactionId == null) { Issue issue = Issue.of("Transaction is not started", Issue.Severity.WARNING); Status status = Status.of(StatusCode.SUCCESS, issue); + rpc.getMeter().recordOperationDuration("Rollback", System.nanoTime() - startNanos); return Span.endOnStatus(span, CompletableFuture.completedFuture(status)); } @@ -578,6 +598,10 @@ public CompletableFuture rollback(RollbackTransactionSettings settings) currentStatusFuture.complete(Status .of(StatusCode.ABORTED) .withIssues(Issue.of("Transaction was rolled back", Issue.Severity.ERROR))); + rpc.getMeter().recordOperationDuration("Rollback", System.nanoTime() - startNanos); + if (status != null && !status.isSuccess()) { + rpc.getMeter().recordOperationFailed("Rollback", status); + } }); } } diff --git a/query/src/main/java/tech/ydb/query/impl/SessionPool.java b/query/src/main/java/tech/ydb/query/impl/SessionPool.java index ccbbf4889..e4bbbcc7f 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionPool.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionPool.java @@ -22,6 +22,8 @@ import tech.ydb.core.StatusCode; import tech.ydb.core.UnexpectedResultException; import tech.ydb.core.grpc.GrpcReadStream; +import tech.ydb.core.metrics.Meter; +import tech.ydb.core.metrics.SessionPoolObserver; import tech.ydb.core.tracing.Span; import tech.ydb.core.utils.FutureTools; import tech.ydb.proto.query.YdbQuery; @@ -54,6 +56,8 @@ class SessionPool implements AutoCloseable { .build(); private final int minSize; + private final String poolName; + private final Meter meter; private final Clock clock; private final ScheduledExecutorService scheduler; private final WaitingQueue queue; @@ -61,8 +65,10 @@ class SessionPool implements AutoCloseable { private final StatsImpl stats = new StatsImpl(); SessionPool(Clock clock, QueryServiceRpc rpc, ScheduledExecutorService scheduler, int minSize, int maxSize, - Duration idleDuration) { + Duration idleDuration, String poolName) { this.minSize = minSize; + this.poolName = poolName; + this.meter = rpc.getMeter(); this.clock = clock; this.scheduler = scheduler; @@ -74,10 +80,33 @@ class SessionPool implements AutoCloseable { cleaner.periodMillis / 2, cleaner.periodMillis, TimeUnit.MILLISECONDS); - logger.info("init QuerySession pool, min size = {}, max size = {}, keep alive period = {}", + logger.info("init QuerySession pool '{}', min size = {}, max size = {}, keep alive period = {}", + poolName, minSize, maxSize, cleaner.periodMillis); + + meter.registerSessionPool(poolName, new SessionPoolObserver() { + @Override + public int getMinSize() { + return minSize; + } + + @Override + public int getMaxSize() { + return queue.getTotalLimit(); + } + + @Override + public int getIdleCount() { + return queue.getIdleCount(); + } + + @Override + public int getUsedCount() { + return queue.getUsedCount(); + } + }); } public void updateMaxSize(int maxSize) { @@ -102,8 +131,9 @@ public CompletableFuture> acquire(Duration timeout) { // If next session is not ready - add timeout canceler if (!pollNext(future)) { + meter.incrementSessionPendingRequests(poolName); future.whenComplete(new Canceller(scheduler.schedule( - new Timeout(future), + new Timeout(future, meter, poolName), timeout.toMillis(), TimeUnit.MILLISECONDS) )); @@ -277,8 +307,17 @@ public CompletableFuture create() { Context previous = ctx.attach(); try { Span createSpan = rpc.startSpan("ydb.CreateSession"); + long startNanos = System.nanoTime(); stats.requested.increment(); return Span.endOnResult(createSpan, SessionImpl.createSession(rpc, CREATE_SETTINGS, true, createSpan)) + .whenComplete((r, th) -> { + long elapsed = System.nanoTime() - startNanos; + meter.recordSessionCreateTime(poolName, elapsed); + meter.recordOperationDuration("CreateSession", elapsed); + if (r != null && !r.isSuccess()) { + meter.recordOperationFailed("CreateSession", r.getStatus()); + } + }) .thenCompose(r -> { if (!r.isSuccess()) { stats.failed.increment(); @@ -444,15 +483,19 @@ static final class Timeout implements Runnable { ); private final CompletableFuture> f; + private final Meter meter; + private final String poolName; - Timeout(CompletableFuture> f) { + Timeout(CompletableFuture> f, Meter meter, String poolName) { this.f = f; + this.meter = meter; + this.poolName = poolName; } @Override public void run() { - if (f != null && !f.isDone()) { - f.complete(Result.fail(EXPIRE)); + if (f != null && !f.isDone() && f.complete(Result.fail(EXPIRE))) { + meter.incrementSessionTimeouts(poolName); } } } diff --git a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java index 8fde13bc8..4b932c6e5 100644 --- a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java @@ -132,9 +132,10 @@ public CompletableFuture> executeDataQueryInternal( final List issues = new ArrayList<>(); final List results = new ArrayList<>(); Span span = querySession.startSpan("ydb.ExecuteQuery"); + long startNanos = System.nanoTime(); QueryStream stream = querySession.new StreamImpl(querySession.createGrpcStream(query, tc, prms, qs, span), - span) { + span, startNanos, "ExecuteQuery") { @Override void handleTxMeta(String txID) { txRef.set(txID); @@ -325,6 +326,12 @@ public Builder sessionMaxIdleTime(Duration duration) { return this; } + @Override + public Builder withMeter(tech.ydb.core.metrics.Meter meter) { + query.withMeter(meter); + return this; + } + @Override public TableClientImpl build() { return new TableClientImpl(this); diff --git a/query/src/test/java/tech/ydb/query/opentelemetry/OpenTelemetryQueryMetricsIntegrationTest.java b/query/src/test/java/tech/ydb/query/opentelemetry/OpenTelemetryQueryMetricsIntegrationTest.java new file mode 100644 index 000000000..ba2bd5199 --- /dev/null +++ b/query/src/test/java/tech/ydb/query/opentelemetry/OpenTelemetryQueryMetricsIntegrationTest.java @@ -0,0 +1,257 @@ +package tech.ydb.query.opentelemetry; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.HistogramPointData; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +import tech.ydb.auth.TokenAuthProvider; +import tech.ydb.common.transaction.TxMode; +import tech.ydb.core.Result; +import tech.ydb.core.StatusCode; +import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.metrics.OpenTelemetryMeter; +import tech.ydb.query.QueryClient; +import tech.ydb.query.QuerySession; +import tech.ydb.query.QueryTransaction; +import tech.ydb.test.junit4.YdbHelperRule; + +public class OpenTelemetryQueryMetricsIntegrationTest { + @ClassRule + public static final YdbHelperRule YDB = new YdbHelperRule(); + + private static final AttributeKey DATABASE = AttributeKey.stringKey("database"); + private static final AttributeKey ENDPOINT = AttributeKey.stringKey("endpoint"); + private static final AttributeKey OPERATION_NAME = AttributeKey.stringKey("operation.name"); + private static final AttributeKey STATUS_CODE = AttributeKey.stringKey("status_code"); + private static final AttributeKey POOL_NAME = AttributeKey.stringKey("ydb.query.session.pool.name"); + private static final AttributeKey SESSION_STATE = AttributeKey.stringKey("ydb.query.session.state"); + + private static InMemoryMetricReader metricReader; + private static SdkMeterProvider meterProvider; + private static OpenTelemetryMeter ydbMeter; + private static GrpcTransport transport; + + private QueryClient queryClient; + + @BeforeClass + public static void initTransport() { + metricReader = InMemoryMetricReader.create(); + meterProvider = SdkMeterProvider.builder() + .registerMetricReader(metricReader) + .build(); + + OpenTelemetry openTelemetry = OpenTelemetrySdk.builder() + .setMeterProvider(meterProvider) + .build(); + + ydbMeter = OpenTelemetryMeter.fromOpenTelemetry(openTelemetry, YDB.database(), YDB.endpoint()); + + transport = GrpcTransport.forEndpoint(YDB.endpoint(), YDB.database()) + .withAuthProvider(new TokenAuthProvider(YDB.authToken())) + .build(); + } + + @AfterClass + public static void closeTransport() throws IOException { + transport.close(); + meterProvider.close(); + metricReader.close(); + } + + @Before + public void initClient() { + queryClient = QueryClient.newClient(transport).withMeter(ydbMeter).build(); + } + + @After + public void closeClient() { + queryClient.close(); + } + + @Test + public void executeQueryRecordsOperationDuration() { + try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + session.createQuery("SELECT 1", TxMode.NONE).execute().join().getStatus().expectSuccess(); + } + + MetricData metric = findMetric("ydb.client.operation.duration"); + Assert.assertNotNull("ydb.client.operation.duration metric not found", metric); + Assert.assertEquals("s", metric.getUnit()); + + HistogramPointData point = findHistogramPoint(metric, "ExecuteQuery"); + Assert.assertNotNull("No histogram point for ExecuteQuery", point); + Assert.assertTrue("Duration must be > 0", point.getSum() > 0); + Assert.assertEquals(YDB.database(), point.getAttributes().get(DATABASE)); + Assert.assertEquals(YDB.endpoint(), point.getAttributes().get(ENDPOINT)); + } + + @Test + public void commitAndRollbackRecordOperationDuration() { + try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + QueryTransaction txCommit = session.beginTransaction(TxMode.SERIALIZABLE_RW) + .join().getValue(); + txCommit.createQuery("SELECT 1").execute().join().getStatus().expectSuccess(); + txCommit.commit().join().getStatus().expectSuccess(); + + QueryTransaction txRollback = session.beginTransaction(TxMode.SERIALIZABLE_RW) + .join().getValue(); + txRollback.createQuery("SELECT 1").execute().join().getStatus().expectSuccess(); + txRollback.rollback().join().expectSuccess(); + } + + MetricData metric = findMetric("ydb.client.operation.duration"); + Assert.assertNotNull(metric); + + Assert.assertNotNull("No histogram point for Commit", + findHistogramPoint(metric, "Commit")); + Assert.assertNotNull("No histogram point for Rollback", + findHistogramPoint(metric, "Rollback")); + } + + @Test + public void failedOperationRecordsFailedCounter() { + try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + session.createQuery("SELECT * FROM __nonexistent_table__", TxMode.NONE) + .execute().join(); + } + + MetricData metric = findMetric("ydb.client.operation.failed"); + Assert.assertNotNull("ydb.client.operation.failed metric not found", metric); + Assert.assertEquals("{operation}", metric.getUnit()); + + Collection points = metric.getLongSumData().getPoints(); + Assert.assertFalse("Failed counter must have at least one point", points.isEmpty()); + long total = points.stream().mapToLong(LongPointData::getValue).sum(); + Assert.assertTrue("Failed counter must be > 0", total > 0); + Assert.assertTrue("Failed counter must carry status_code attribute", + points.stream().anyMatch(p -> p.getAttributes().get(STATUS_CODE) != null)); + Assert.assertTrue("Failed counter must carry database attribute", + points.stream().anyMatch(p -> YDB.database().equals(p.getAttributes().get(DATABASE)))); + } + + @Test + public void sessionPoolMetricsAreReported() { + try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + session.createQuery("SELECT 1", TxMode.NONE).execute().join().getStatus().expectSuccess(); + } + + MetricData count = findMetric("ydb.query.session.count"); + Assert.assertNotNull("ydb.query.session.count metric not found", count); + Assert.assertEquals("{session}", count.getUnit()); + Assert.assertTrue("session.count must have idle/used buckets", + count.getLongGaugeData().getPoints().stream() + .map(p -> p.getAttributes().get(SESSION_STATE)) + .anyMatch("idle"::equals)); + + MetricData min = findMetric("ydb.query.session.min"); + Assert.assertNotNull("ydb.query.session.min metric not found", min); + Assert.assertEquals("{session}", min.getUnit()); + Assert.assertTrue("min must have a pool.name attribute", + min.getLongGaugeData().getPoints().stream() + .anyMatch(p -> p.getAttributes().get(POOL_NAME) != null)); + + MetricData max = findMetric("ydb.query.session.max"); + Assert.assertNotNull("ydb.query.session.max metric not found", max); + Assert.assertEquals("{session}", max.getUnit()); + + MetricData createTime = findMetric("ydb.query.session.create_time"); + Assert.assertNotNull("ydb.query.session.create_time metric not found", createTime); + Assert.assertEquals("s", createTime.getUnit()); + Assert.assertFalse("session.create_time must have at least one point", + createTime.getHistogramData().getPoints().isEmpty()); + } + + @Test + public void sessionPendingAndTimeoutsMetricsAreCounters() { + try (QueryClient tinyClient = QueryClient.newClient(transport) + .withMeter(ydbMeter) + .sessionPoolMaxSize(1) + .sessionPoolName("tiny") + .build()) { + try (QuerySession s1 = tinyClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + Result result = tinyClient.createSession(Duration.ofMillis(500)).join(); + Assert.assertFalse( + "waiter must time out (sessionPoolMaxSize=1 with one session held), but got " + result, + result.isSuccess()); + Assert.assertEquals( + "waiter must complete with CLIENT_DEADLINE_EXPIRED", + StatusCode.CLIENT_DEADLINE_EXPIRED, result.getStatus().getCode()); + } + } + + // Metric reader observes counter increments synchronously, but give the runtime + // a brief moment in case other threads still hold references in flight. + Collection snapshot = metricReader.collectAllMetrics(); + + MetricData pending = findMetric(snapshot, "ydb.query.session.pending_requests"); + Assert.assertNotNull("ydb.query.session.pending_requests metric not found", pending); + Assert.assertEquals("{request}", pending.getUnit()); + long pendingTotal = pending.getLongSumData().getPoints().stream() + .mapToLong(LongPointData::getValue).sum(); + Assert.assertTrue("pending_requests must be > 0", pendingTotal > 0); + + MetricData timeouts = findMetric(snapshot, "ydb.query.session.timeouts"); + Assert.assertNotNull("ydb.query.session.timeouts metric not found", timeouts); + Assert.assertEquals("{timeout}", timeouts.getUnit()); + long timeoutTotal = timeouts.getLongSumData().getPoints().stream() + .mapToLong(LongPointData::getValue).sum(); + Assert.assertTrue("timeouts must be > 0", timeoutTotal > 0); + } + + private MetricData findMetric(String name) { + return findMetric(metricReader.collectAllMetrics(), name); + } + + private MetricData findMetric(Collection metrics, String name) { + for (MetricData m : metrics) { + if (name.equals(m.getName())) { + return m; + } + } + return null; + } + + private HistogramPointData findHistogramPoint(MetricData metric, String operationName) { + for (HistogramPointData point : metric.getHistogramData().getPoints()) { + String op = point.getAttributes().get(OPERATION_NAME); + if (operationName.equals(op)) { + return point; + } + } + return null; + } + + private static String extractHost(String endpoint) { + String stripped = endpoint.replaceFirst("grpcs?://", ""); + int colon = stripped.lastIndexOf(':'); + return colon >= 0 ? stripped.substring(0, colon) : stripped; + } + + private static int extractPort(String endpoint) { + String stripped = endpoint.replaceFirst("grpcs?://", ""); + int colon = stripped.lastIndexOf(':'); + if (colon >= 0) { + try { + return Integer.parseInt(stripped.substring(colon + 1)); + } catch (NumberFormatException ignored) { + } + } + return 2135; + } +} diff --git a/table/src/main/java/tech/ydb/table/TableClient.java b/table/src/main/java/tech/ydb/table/TableClient.java index 5dcf73966..6b29d3735 100644 --- a/table/src/main/java/tech/ydb/table/TableClient.java +++ b/table/src/main/java/tech/ydb/table/TableClient.java @@ -47,6 +47,10 @@ interface Builder { Builder sessionMaxIdleTime(Duration duration); + default Builder withMeter(tech.ydb.core.metrics.Meter meter) { + return this; + } + TableClient build(); } }