Skip to content

Commit f49553e

Browse files
committed
JAVA-3131: Add #retrieve method to EndPoint for when caller does not need the endpoint to be proactively resolved
Refactor existing usages of EndPoint#resolve to use retrieve when resolved ip addresses are not needed.
1 parent 1626026 commit f49553e

File tree

25 files changed

+238
-57
lines changed

25 files changed

+238
-57
lines changed

core/src/main/java/com/datastax/dse/driver/api/core/auth/DseGssApiAuthProviderBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ protected GssApiAuthenticator(
319319
SUPPORTED_MECHANISMS,
320320
options.getAuthorizationId(),
321321
protocol,
322-
((InetSocketAddress) endPoint.resolve()).getAddress().getCanonicalHostName(),
322+
((InetSocketAddress) endPoint.retrieve()).getAddress().getCanonicalHostName(),
323323
options.getSaslProperties(),
324324
null);
325325
} catch (LoginException | SaslException e) {

core/src/main/java/com/datastax/dse/driver/internal/core/insights/InsightsClient.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ private Map<String, SessionStateForNode> getConnectedNodes() {
294294
return pools.entrySet().stream()
295295
.collect(
296296
Collectors.toMap(
297-
entry -> AddressFormatter.nullSafeToString(entry.getKey().getEndPoint().resolve()),
297+
entry -> AddressFormatter.nullSafeToString(entry.getKey().getEndPoint().retrieve()),
298298
this::constructSessionStateForNode));
299299
}
300300

@@ -315,7 +315,7 @@ private InsightsStartupData createStartupData() {
315315
.withContactPoints(
316316
getResolvedContactPoints(
317317
driverContext.getMetadataManager().getContactPoints().stream()
318-
.map(n -> n.getEndPoint().resolve())
318+
.map(n -> n.getEndPoint().retrieve())
319319
.filter(InetSocketAddress.class::isInstance)
320320
.map(InetSocketAddress.class::cast)
321321
.collect(Collectors.toSet())))
@@ -456,7 +456,7 @@ private PoolSizeByHostDistance getPoolSizeByHostDistance() {
456456
}
457457

458458
private String getControlConnectionSocketAddress() {
459-
SocketAddress controlConnectionAddress = controlConnection.channel().getEndPoint().resolve();
459+
SocketAddress controlConnectionAddress = controlConnection.channel().getEndPoint().retrieve();
460460
return AddressFormatter.nullSafeToString(controlConnectionAddress);
461461
}
462462

core/src/main/java/com/datastax/oss/driver/api/core/metadata/EndPoint.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,17 @@ public interface EndPoint {
4040
@NonNull
4141
SocketAddress resolve();
4242

43+
/**
44+
* Returns a possibly unresolved instance to a socket address.
45+
*
46+
* <p>This should be called when the address does not need to be proactively resolved. For example
47+
* if the node hostname or port number is needed.
48+
*/
49+
@NonNull
50+
default SocketAddress retrieve() {
51+
return resolve();
52+
}
53+
4354
/**
4455
* Returns an alternate string representation for use in node-level metric names.
4556
*

core/src/main/java/com/datastax/oss/driver/api/core/ssl/ProgrammaticSslEngineFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public ProgrammaticSslEngineFactory(
8989
@Override
9090
public SSLEngine newSslEngine(@NonNull EndPoint remoteEndpoint) {
9191
SSLEngine engine;
92-
SocketAddress remoteAddress = remoteEndpoint.resolve();
92+
SocketAddress remoteAddress = remoteEndpoint.retrieve();
9393
if (remoteAddress instanceof InetSocketAddress) {
9494
InetSocketAddress socketAddress = (InetSocketAddress) remoteAddress;
9595
engine = sslContext.createSSLEngine(socketAddress.getHostName(), socketAddress.getPort());

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultEndPoint.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ public DefaultEndPoint(InetSocketAddress address) {
3838
@NonNull
3939
@Override
4040
public InetSocketAddress resolve() {
41+
return retrieve();
42+
}
43+
44+
@Override
45+
public InetSocketAddress retrieve() {
4146
return address;
4247
}
4348

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,7 @@ private Optional<NodeInfo> findInPeers(
472472
// We save it the first time we get a control connection channel.
473473
private void savePort(DriverChannel channel) {
474474
if (port < 0) {
475-
SocketAddress address = channel.getEndPoint().resolve();
475+
SocketAddress address = channel.getEndPoint().retrieve();
476476
if (address instanceof InetSocketAddress) {
477477
port = ((InetSocketAddress) address).getPort();
478478
}
@@ -518,7 +518,7 @@ protected InetSocketAddress getBroadcastRpcAddress(
518518
}
519519
InetSocketAddress broadcastRpcAddress =
520520
new InetSocketAddress(broadcastRpcInetAddress, broadcastRpcPort);
521-
if (row.contains("peer") && broadcastRpcAddress.equals(localEndPoint.resolve())) {
521+
if (row.contains("peer") && broadcastRpcAddress.equals(localEndPoint.retrieve())) {
522522
// JAVA-2303: if the peer is actually the control node, ignore that peer as it is likely
523523
// a misconfiguration problem.
524524
LOG.warn(

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SniEndPoint.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.datastax.oss.driver.internal.core.metadata;
1919

2020
import com.datastax.oss.driver.api.core.metadata.EndPoint;
21+
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
2122
import com.datastax.oss.driver.shaded.guava.common.primitives.UnsignedBytes;
2223
import edu.umd.cs.findbugs.annotations.NonNull;
2324
import java.net.InetAddress;
@@ -26,10 +27,14 @@
2627
import java.util.Arrays;
2728
import java.util.Comparator;
2829
import java.util.Objects;
29-
import java.util.concurrent.atomic.AtomicLong;
30+
import java.util.concurrent.ThreadLocalRandom;
31+
import java.util.concurrent.atomic.AtomicInteger;
3032

3133
public class SniEndPoint implements EndPoint {
32-
private static final AtomicLong OFFSET = new AtomicLong();
34+
// initialize offset to random position to avoid all clients starting at the same index
35+
@VisibleForTesting
36+
static final AtomicInteger OFFSET =
37+
new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1024));
3338

3439
private final InetSocketAddress proxyAddress;
3540
private final String serverName;
@@ -55,7 +60,7 @@ public String getServerName() {
5560
@Override
5661
public InetSocketAddress resolve() {
5762
try {
58-
InetAddress[] aRecords = InetAddress.getAllByName(proxyAddress.getHostName());
63+
InetAddress[] aRecords = resolveARecords();
5964
if (aRecords.length == 0) {
6065
// Probably never happens, but the JDK docs don't explicitly say so
6166
throw new IllegalArgumentException(
@@ -64,14 +69,32 @@ public InetSocketAddress resolve() {
6469
// The order of the returned address is unspecified. Sort by IP to make sure we get a true
6570
// round-robin
6671
Arrays.sort(aRecords, IP_COMPARATOR);
67-
int index = (aRecords.length == 1) ? 0 : (int) OFFSET.getAndIncrement() % aRecords.length;
68-
return new InetSocketAddress(aRecords[index], proxyAddress.getPort());
72+
73+
// get next offset value, reset OFFSET if wrapped around to negative
74+
int nextOffset = OFFSET.getAndIncrement();
75+
if (nextOffset < 0) {
76+
// if negative set OFFSET to 1 and nextOffset to 0, else simulate getAndIncrement()
77+
nextOffset = OFFSET.updateAndGet(v -> v < 0 ? 1 : v + 1) - 1;
78+
}
79+
80+
return new InetSocketAddress(aRecords[nextOffset % aRecords.length], proxyAddress.getPort());
6981
} catch (UnknownHostException e) {
7082
throw new IllegalArgumentException(
7183
"Could not resolve proxy address " + proxyAddress.getHostName(), e);
7284
}
7385
}
7486

87+
@VisibleForTesting
88+
InetAddress[] resolveARecords() throws UnknownHostException {
89+
// moving static call to method to allow mocking in tests
90+
return InetAddress.getAllByName(proxyAddress.getHostName());
91+
}
92+
93+
@Override
94+
public InetSocketAddress retrieve() {
95+
return proxyAddress;
96+
}
97+
7598
@Override
7699
public boolean equals(Object other) {
77100
if (other == this) {

core/src/main/java/com/datastax/oss/driver/internal/core/ssl/DefaultSslEngineFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public DefaultSslEngineFactory(DriverContext driverContext) {
9090
@Override
9191
public SSLEngine newSslEngine(@NonNull EndPoint remoteEndpoint) {
9292
SSLEngine engine;
93-
SocketAddress remoteAddress = remoteEndpoint.resolve();
93+
SocketAddress remoteAddress = remoteEndpoint.retrieve();
9494
if (remoteAddress instanceof InetSocketAddress) {
9595
InetSocketAddress socketAddress = (InetSocketAddress) remoteAddress;
9696
engine = sslContext.createSSLEngine(socketAddress.getHostName(), socketAddress.getPort());

core/src/main/java/com/datastax/oss/driver/internal/core/ssl/SniSslEngineFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public SSLEngine newSslEngine(@NonNull EndPoint remoteEndpoint) {
5353
this.getClass().getSimpleName()));
5454
}
5555
SniEndPoint sniEndPoint = (SniEndPoint) remoteEndpoint;
56-
InetSocketAddress address = sniEndPoint.resolve();
56+
InetSocketAddress address = sniEndPoint.retrieve();
5757
String sniServerName = sniEndPoint.getServerName();
5858

5959
// When hostname verification is enabled (with setEndpointIdentificationAlgorithm), the SSL

core/src/test/java/com/datastax/dse/driver/internal/core/insights/InsightsClientTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,7 @@ private DefaultDriverContext mockDefaultDriverContext() throws UnknownHostExcept
485485
when(context.getProtocolVersion()).thenReturn(DSE_V2);
486486
DefaultNode contactPoint = mock(DefaultNode.class);
487487
EndPoint contactEndPoint = mock(EndPoint.class);
488-
when(contactEndPoint.resolve()).thenReturn(new InetSocketAddress("127.0.0.1", 9999));
488+
when(contactEndPoint.retrieve()).thenReturn(new InetSocketAddress("127.0.0.1", 9999));
489489
when(contactPoint.getEndPoint()).thenReturn(contactEndPoint);
490490
when(manager.getContactPoints()).thenReturn(ImmutableSet.of(contactPoint));
491491

@@ -501,7 +501,7 @@ private DefaultDriverContext mockDefaultDriverContext() throws UnknownHostExcept
501501
ControlConnection controlConnection = mock(ControlConnection.class);
502502
DriverChannel channel = mock(DriverChannel.class);
503503
EndPoint controlConnectionEndpoint = mock(EndPoint.class);
504-
when(controlConnectionEndpoint.resolve()).thenReturn(new InetSocketAddress("127.0.0.1", 10));
504+
when(controlConnectionEndpoint.retrieve()).thenReturn(new InetSocketAddress("127.0.0.1", 10));
505505

506506
when(channel.getEndPoint()).thenReturn(controlConnectionEndpoint);
507507
when(channel.localAddress()).thenReturn(new InetSocketAddress("127.0.0.1", 10));
@@ -513,15 +513,15 @@ private DefaultDriverContext mockDefaultDriverContext() throws UnknownHostExcept
513513
private void mockConnectionPools(DefaultDriverContext driverContext) {
514514
Node node1 = mock(Node.class);
515515
EndPoint endPoint1 = mock(EndPoint.class);
516-
when(endPoint1.resolve()).thenReturn(new InetSocketAddress("127.0.0.1", 10));
516+
when(endPoint1.retrieve()).thenReturn(new InetSocketAddress("127.0.0.1", 10));
517517
when(node1.getEndPoint()).thenReturn(endPoint1);
518518
when(node1.getOpenConnections()).thenReturn(1);
519519
ChannelPool channelPool1 = mock(ChannelPool.class);
520520
when(channelPool1.getInFlight()).thenReturn(10);
521521

522522
Node node2 = mock(Node.class);
523523
EndPoint endPoint2 = mock(EndPoint.class);
524-
when(endPoint2.resolve()).thenReturn(new InetSocketAddress("127.0.0.1", 20));
524+
when(endPoint2.retrieve()).thenReturn(new InetSocketAddress("127.0.0.1", 20));
525525
when(node2.getEndPoint()).thenReturn(endPoint2);
526526
when(node2.getOpenConnections()).thenReturn(2);
527527
ChannelPool channelPool2 = mock(ChannelPool.class);

0 commit comments

Comments
 (0)