1919
2020import com .datastax .dse .driver .api .core .DseProtocolVersion ;
2121import com .datastax .dse .driver .api .core .cql .continuous .ContinuousAsyncResultSet ;
22+ import com .datastax .dse .driver .api .core .graph .AsyncGraphResultSet ;
2223import com .datastax .dse .driver .internal .core .DseProtocolFeature ;
2324import com .datastax .dse .driver .internal .core .cql .DseConversions ;
2425import com .datastax .dse .protocol .internal .request .Revise ;
2526import com .datastax .dse .protocol .internal .response .result .DseRowsMetadata ;
2627import com .datastax .oss .driver .api .core .AllNodesFailedException ;
28+ import com .datastax .oss .driver .api .core .AsyncPagingIterable ;
2729import com .datastax .oss .driver .api .core .CqlIdentifier ;
2830import com .datastax .oss .driver .api .core .DriverTimeoutException ;
2931import com .datastax .oss .driver .api .core .NodeUnavailableException ;
@@ -627,7 +629,7 @@ public void operationComplete(@NonNull Future<java.lang.Void> future) {
627629 Throwable error = future .cause ();
628630 if (error instanceof EncoderException
629631 && error .getCause () instanceof FrameTooLongException ) {
630- trackNodeError (node , error .getCause ());
632+ trackNodeError (node , error .getCause (), null );
631633 lock .lock ();
632634 try {
633635 abort (error .getCause (), false );
@@ -644,7 +646,7 @@ public void operationComplete(@NonNull Future<java.lang.Void> future) {
644646 .getMetricUpdater ()
645647 .incrementCounter (DefaultNodeMetric .UNSENT_REQUESTS , executionProfile .getName ());
646648 recordError (node , error );
647- trackNodeError (node , error .getCause ());
649+ trackNodeError (node , error .getCause (), null );
648650 sendRequest (statement , null , executionIndex , retryCount , scheduleSpeculativeExecution );
649651 }
650652 } else {
@@ -739,7 +741,8 @@ private void onPageTimeout(int expectedPage) {
739741 * Invoked when a continuous paging response is received, either a successful or failed one.
740742 *
741743 * <p>Delegates further processing to appropriate methods: {@link #processResultResponse(Result,
742- * Frame)} if the response was successful, or {@link #processErrorResponse(Error)} if it wasn't.
744+ * Frame)} if the response was successful, or {@link #processErrorResponse(Error, Frame)} if it
745+ * wasn't.
743746 *
744747 * @param response the received {@link Frame}.
745748 */
@@ -760,15 +763,15 @@ public void onResponse(@NonNull Frame response) {
760763 processResultResponse ((Result ) responseMessage , response );
761764 } else if (responseMessage instanceof Error ) {
762765 LOG .trace ("[{}] Got error response" , logPrefix );
763- processErrorResponse ((Error ) responseMessage );
766+ processErrorResponse ((Error ) responseMessage , response );
764767 } else {
765768 IllegalStateException error =
766769 new IllegalStateException ("Unexpected response " + responseMessage );
767- trackNodeError (node , error );
770+ trackNodeError (node , error , response );
768771 abort (error , false );
769772 }
770773 } catch (Throwable t ) {
771- trackNodeError (node , t );
774+ trackNodeError (node , t , response );
772775 abort (t , false );
773776 }
774777 } finally {
@@ -902,7 +905,7 @@ private void processResultResponse(@NonNull Result result, @Nullable Frame frame
902905 * @param errorMessage the error message received.
903906 */
904907 @ SuppressWarnings ("GuardedBy" ) // this method is only called with the lock held
905- private void processErrorResponse (@ NonNull Error errorMessage ) {
908+ private void processErrorResponse (@ NonNull Error errorMessage , @ NonNull Frame frame ) {
906909 assert lock .isHeldByCurrentThread ();
907910 if (errorMessage instanceof Unprepared ) {
908911 processUnprepared ((Unprepared ) errorMessage );
@@ -911,7 +914,7 @@ private void processErrorResponse(@NonNull Error errorMessage) {
911914 if (error instanceof BootstrappingException ) {
912915 LOG .trace ("[{}] {} is bootstrapping, trying next node" , logPrefix , node );
913916 recordError (node , error );
914- trackNodeError (node , error );
917+ trackNodeError (node , error , frame );
915918 sendRequest (statement , null , executionIndex , retryCount , false );
916919 } else if (error instanceof QueryValidationException
917920 || error instanceof FunctionFailureException
@@ -923,7 +926,7 @@ private void processErrorResponse(@NonNull Error errorMessage) {
923926 NodeMetricUpdater metricUpdater = ((DefaultNode ) node ).getMetricUpdater ();
924927 metricUpdater .incrementCounter (
925928 DefaultNodeMetric .OTHER_ERRORS , executionProfile .getName ());
926- trackNodeError (node , error );
929+ trackNodeError (node , error , frame );
927930 abort (error , true );
928931 } else {
929932 try {
@@ -1062,7 +1065,7 @@ private void processUnprepared(@NonNull Unprepared errorMessage) {
10621065 + "This usually happens when you run a 'USE...' query after "
10631066 + "the statement was prepared." ,
10641067 Bytes .toHexString (idToReprepare ), Bytes .toHexString (repreparedId )));
1065- trackNodeError (node , illegalStateException );
1068+ trackNodeError (node , illegalStateException , null );
10661069 fatalError = illegalStateException ;
10671070 } else {
10681071 LOG .trace (
@@ -1081,18 +1084,18 @@ private void processUnprepared(@NonNull Unprepared errorMessage) {
10811084 || prepareError instanceof FunctionFailureException
10821085 || prepareError instanceof ProtocolError ) {
10831086 LOG .trace ("[{}] Unrecoverable error on re-prepare, rethrowing" , logPrefix );
1084- trackNodeError (node , prepareError );
1087+ trackNodeError (node , prepareError , null );
10851088 fatalError = prepareError ;
10861089 }
10871090 }
10881091 } else if (exception instanceof RequestThrottlingException ) {
1089- trackNodeError (node , exception );
1092+ trackNodeError (node , exception , null );
10901093 fatalError = exception ;
10911094 }
10921095 if (fatalError == null ) {
10931096 LOG .trace ("[{}] Re-prepare failed, trying next node" , logPrefix );
10941097 recordError (node , exception );
1095- trackNodeError (node , exception );
1098+ trackNodeError (node , exception , null );
10961099 sendRequest (statement , null , executionIndex , retryCount , false );
10971100 }
10981101 }
@@ -1120,18 +1123,18 @@ private void processRetryVerdict(@NonNull RetryVerdict verdict, @NonNull Throwab
11201123 switch (verdict .getRetryDecision ()) {
11211124 case RETRY_SAME :
11221125 recordError (node , error );
1123- trackNodeError (node , error );
1126+ trackNodeError (node , error , null );
11241127 sendRequest (
11251128 verdict .getRetryRequest (statement ), node , executionIndex , retryCount + 1 , false );
11261129 break ;
11271130 case RETRY_NEXT :
11281131 recordError (node , error );
1129- trackNodeError (node , error );
1132+ trackNodeError (node , error , null );
11301133 sendRequest (
11311134 verdict .getRetryRequest (statement ), null , executionIndex , retryCount + 1 , false );
11321135 break ;
11331136 case RETHROW :
1134- trackNodeError (node , error );
1137+ trackNodeError (node , error , null );
11351138 abort (error , true );
11361139 break ;
11371140 case IGNORE :
@@ -1444,12 +1447,20 @@ private void reenableAutoReadIfNeeded() {
14441447
14451448 // ERROR HANDLING
14461449
1447- private void trackNodeError (@ NonNull Node node , @ NonNull Throwable error ) {
1450+ private void trackNodeError (
1451+ @ NonNull Node node , @ NonNull Throwable error , @ Nullable Frame frame ) {
14481452 if (nodeErrorReported .compareAndSet (false , true )) {
14491453 long latencyNanos = System .nanoTime () - this .messageStartTimeNanos ;
14501454 context
14511455 .getRequestTracker ()
1452- .onNodeError (this .statement , error , latencyNanos , executionProfile , node , logPrefix );
1456+ .onNodeError (
1457+ this .statement ,
1458+ error ,
1459+ latencyNanos ,
1460+ executionProfile ,
1461+ node ,
1462+ createExecutionInfo (frame ),
1463+ logPrefix );
14531464 }
14541465 }
14551466
@@ -1563,21 +1574,32 @@ private void completeResultSetFuture(
15631574 if (resultSetClass .isInstance (pageOrError )) {
15641575 if (future .complete (resultSetClass .cast (pageOrError ))) {
15651576 throttler .signalSuccess (ContinuousRequestHandlerBase .this );
1577+
1578+ ExecutionInfo executionInfo = null ;
1579+ if (pageOrError instanceof AsyncPagingIterable ) {
1580+ executionInfo = ((AsyncPagingIterable ) pageOrError ).getExecutionInfo ();
1581+ } else if (pageOrError instanceof AsyncGraphResultSet ) {
1582+ executionInfo = ((AsyncGraphResultSet ) pageOrError ).getRequestExecutionInfo ();
1583+ }
1584+
15661585 if (nodeSuccessReported .compareAndSet (false , true )) {
15671586 context
15681587 .getRequestTracker ()
1569- .onNodeSuccess (statement , nodeLatencyNanos , executionProfile , node , logPrefix );
1588+ .onNodeSuccess (
1589+ statement , nodeLatencyNanos , executionProfile , node , executionInfo , logPrefix );
15701590 }
15711591 context
15721592 .getRequestTracker ()
1573- .onSuccess (statement , totalLatencyNanos , executionProfile , node , logPrefix );
1593+ .onSuccess (
1594+ statement , totalLatencyNanos , executionProfile , node , executionInfo , logPrefix );
15741595 }
15751596 } else {
15761597 Throwable error = (Throwable ) pageOrError ;
15771598 if (future .completeExceptionally (error )) {
15781599 context
15791600 .getRequestTracker ()
1580- .onError (statement , error , totalLatencyNanos , executionProfile , node , logPrefix );
1601+ .onError (
1602+ statement , error , totalLatencyNanos , executionProfile , node , null , logPrefix );
15811603 if (error instanceof DriverTimeoutException ) {
15821604 throttler .signalTimeout (ContinuousRequestHandlerBase .this );
15831605 session
@@ -1608,6 +1630,22 @@ private ExecutionInfo createExecutionInfo(@NonNull Result result, @Nullable Fram
16081630 executionProfile );
16091631 }
16101632
1633+ @ NonNull
1634+ private ExecutionInfo createExecutionInfo (@ Nullable Frame response ) {
1635+ return new DefaultExecutionInfo (
1636+ statement ,
1637+ node ,
1638+ startedSpeculativeExecutionsCount .get (),
1639+ executionIndex ,
1640+ errors ,
1641+ null ,
1642+ response ,
1643+ true ,
1644+ session ,
1645+ context ,
1646+ executionProfile );
1647+ }
1648+
16111649 private void logTimeoutSchedulingError (IllegalStateException timeoutError ) {
16121650 // If we're racing with session shutdown, the timer might be stopped already. We don't want
16131651 // to schedule more executions anyway, so swallow the error.
0 commit comments