diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java index 825ff4c5eac2c..ff22ed8d9e0f7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java @@ -40,10 +40,14 @@ import org.slf4j.LoggerFactory; import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.DatagramPacket; +import java.net.DatagramSocket; import java.net.Socket; +import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.zip.CRC32; @@ -61,6 +65,9 @@ public class IoTDBAirGapReceiver extends WrappedRunnable { private final IoTDBDataNodeReceiverAgent agent; private boolean isELanguagePayload; + private OutputStream currentOutputStream; + private DatagramSocket currentDatagramSocket; + private SocketAddress currentRemoteSocketAddress; public IoTDBAirGapReceiver(final Socket socket, final long receiverId) { this.socket = socket; @@ -101,33 +108,11 @@ private void receive() throws IOException { try { final byte[] data = readData(inputStream); + currentOutputStream = socket.getOutputStream(); - // If check sum failed, it indicates that the length we read may not be correct. - // Namely, there may be remaining bytes in the socket stream, which will fail any subsequent - // attempts to read from that. - // We directly close the socket here. - if (!checkSum(data)) { - LOGGER.warn( - DataNodePipeMessages.PIPE_AIR_GAP_RECEIVER_CLOSED_BECAUSE_OF, receiverId, socket); - try { - fail(); - } finally { - socket.close(); - } - return; + if (!receive(data, null)) { + socket.close(); } - - // Removed the used checksum - final ByteBuffer byteBuffer = ByteBuffer.wrap(data, LONG_LEN, data.length - LONG_LEN); - - // Pseudo request, to reuse logic in IoTDBThriftReceiverAgent - final AirGapPseudoTPipeTransferRequest req = - (AirGapPseudoTPipeTransferRequest) - new AirGapPseudoTPipeTransferRequest() - .setVersion(ReadWriteIOUtils.readByte(byteBuffer)) - .setType(ReadWriteIOUtils.readShort(byteBuffer)) - .setBody(byteBuffer.slice()); - handleReq(req, System.currentTimeMillis()); } catch (final PipeConnectionException e) { LOGGER.info( DataNodePipeMessages.PIPE_AIR_GAP_RECEIVER_SOCKET_CLOSED_WHEN, @@ -145,9 +130,82 @@ private void receive() throws IOException { } } + boolean receive(final byte[] data, final String receiverKey) throws IOException { + try { + // If check sum failed, it indicates that the length we read may not be correct. + // Namely, there may be remaining bytes in the socket stream, which will fail any subsequent + // attempts to read from that. + if (!checkSum(data)) { + LOGGER.warn( + DataNodePipeMessages.PIPE_AIR_GAP_RECEIVER_CLOSED_BECAUSE_OF, receiverId, socket); + fail(); + return false; + } + + handleReq(toTPipeTransferReq(data), System.currentTimeMillis(), receiverKey); + return true; + } catch (final IOException e) { + throw e; + } catch (final Exception e) { + if (currentDatagramSocket != null) { + fail(); + } + throw e; + } finally { + currentOutputStream = null; + currentDatagramSocket = null; + currentRemoteSocketAddress = null; + } + } + + void receiveUdp( + final DatagramSocket datagramSocket, + final DatagramPacket packet, + final String receiverKey, + final byte[] buffer) + throws IOException { + isELanguagePayload = false; + currentDatagramSocket = datagramSocket; + currentRemoteSocketAddress = packet.getSocketAddress(); + boolean requestHandlingStarted = false; + try { + final byte[] data = readData(new ByteArrayInputStream(buffer, 0, packet.getLength())); + requestHandlingStarted = true; + receive(data, receiverKey); + } catch (final Exception e) { + if (!requestHandlingStarted) { + fail(); + } + throw e; + } finally { + currentOutputStream = null; + currentDatagramSocket = null; + currentRemoteSocketAddress = null; + } + } + + private AirGapPseudoTPipeTransferRequest toTPipeTransferReq(final byte[] data) { + // Removed the used checksum + final ByteBuffer byteBuffer = ByteBuffer.wrap(data, LONG_LEN, data.length - LONG_LEN); + + // Pseudo request, to reuse logic in IoTDBThriftReceiverAgent + return (AirGapPseudoTPipeTransferRequest) + new AirGapPseudoTPipeTransferRequest() + .setVersion(ReadWriteIOUtils.readByte(byteBuffer)) + .setType(ReadWriteIOUtils.readShort(byteBuffer)) + .setBody(byteBuffer.slice()); + } + private void handleReq(final AirGapPseudoTPipeTransferRequest req, final long startTime) throws IOException { - final TPipeTransferResp resp = agent.receive(req); + handleReq(req, startTime, null); + } + + private void handleReq( + final AirGapPseudoTPipeTransferRequest req, final long startTime, final String receiverKey) + throws IOException { + final TPipeTransferResp resp = + receiverKey == null ? agent.receive(req) : agent.receive(receiverKey, req); final TSStatus status = resp.getStatus(); if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { @@ -170,7 +228,7 @@ private void handleReq(final AirGapPseudoTPipeTransferRequest req, final long st LOGGER.info(DataNodePipeMessages.TEMPORARY_UNAVAILABLE_EXCEPTION_ENCOUNTERED_AT_AIR_GAP); if (System.currentTimeMillis() - startTime < PipeConfig.getInstance().getPipeAirGapRetryMaxMs()) { - handleReq(req, startTime); + handleReq(req, startTime, receiverKey); } else { LOGGER.warn( DataNodePipeMessages.PIPE_AIR_GAP_RECEIVER_TEMPORARY_UNAVAILABLE_RETRY, receiverId); @@ -187,14 +245,23 @@ private void handleReq(final AirGapPseudoTPipeTransferRequest req, final long st } private void ok() throws IOException { - final OutputStream outputStream = socket.getOutputStream(); - outputStream.write(AirGapOneByteResponse.OK); - outputStream.flush(); + respond(AirGapOneByteResponse.OK); } private void fail() throws IOException { - final OutputStream outputStream = socket.getOutputStream(); - outputStream.write(AirGapOneByteResponse.FAIL); + respond(AirGapOneByteResponse.FAIL); + } + + private void respond(final byte[] response) throws IOException { + if (currentDatagramSocket != null && currentRemoteSocketAddress != null) { + currentDatagramSocket.send( + new DatagramPacket(response, response.length, currentRemoteSocketAddress)); + return; + } + + final OutputStream outputStream = + currentOutputStream != null ? currentOutputStream : socket.getOutputStream(); + outputStream.write(response); outputStream.flush(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java index c14ce2054b7e9..0e8c642ce4716 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java @@ -26,14 +26,22 @@ import org.apache.iotdb.commons.service.IService; import org.apache.iotdb.commons.service.ServiceType; import org.apache.iotdb.db.i18n.DataNodePipeMessages; +import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; +import org.apache.iotdb.db.protocol.session.ClientSession; +import org.apache.iotdb.db.protocol.session.SessionManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; +import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -41,15 +49,21 @@ public class IoTDBAirGapReceiverAgent implements IService { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBAirGapReceiverAgent.class); + private static final int UDP_PACKET_MAX_SIZE_IN_BYTES = 65_507; private final ExecutorService listenExecutor = IoTDBThreadPoolFactory.newSingleThreadExecutor( ThreadName.PIPE_RECEIVER_AIR_GAP_AGENT.getName()); + private final ExecutorService udpListenExecutor = + IoTDBThreadPoolFactory.newSingleThreadExecutor( + ThreadName.PIPE_RECEIVER_AIR_GAP_AGENT.getName() + "-UDP"); private final AtomicBoolean allowSubmitListen = new AtomicBoolean(false); private ServerSocket serverSocket; + private DatagramSocket datagramSocket; private final AtomicLong receiverId = new AtomicLong(0); + private final Map udpClientSessions = new ConcurrentHashMap<>(); public void listen() { try { @@ -61,7 +75,9 @@ public void listen() { ThreadName.PIPE_AIR_GAP_RECEIVER.getName() + "-" + airGapReceiverId); airGapReceiverThread.start(); } catch (final IOException e) { - LOGGER.warn(DataNodePipeMessages.UNHANDLED_EXCEPTION_DURING_PIPE_AIR_GAP_RECEIVER, e); + if (allowSubmitListen.get()) { + LOGGER.warn(DataNodePipeMessages.UNHANDLED_EXCEPTION_DURING_PIPE_AIR_GAP_RECEIVER, e); + } } if (allowSubmitListen.get()) { @@ -69,32 +85,97 @@ public void listen() { } } + public void listenUdp() { + while (allowSubmitListen.get() && !datagramSocket.isClosed()) { + final byte[] buffer = new byte[UDP_PACKET_MAX_SIZE_IN_BYTES]; + final DatagramPacket packet = new DatagramPacket(buffer, buffer.length); + try { + datagramSocket.receive(packet); + + final long airGapReceiverId = receiverId.incrementAndGet(); + final IoTDBAirGapReceiver receiver = + new IoTDBAirGapReceiver(new Socket(), airGapReceiverId); + final String receiverKey = packet.getSocketAddress().toString(); + final boolean registeredSession = registerUdpSessionIfNecessary(packet); + try { + receiver.receiveUdp(datagramSocket, packet, receiverKey, buffer); + } finally { + if (registeredSession) { + SessionManager.getInstance().removeCurrSession(); + } + } + } catch (final IOException e) { + if (allowSubmitListen.get()) { + LOGGER.warn(DataNodePipeMessages.UNHANDLED_EXCEPTION_DURING_PIPE_AIR_GAP_RECEIVER, e); + } + } catch (final Exception e) { + LOGGER.warn(DataNodePipeMessages.UNHANDLED_EXCEPTION_DURING_PIPE_AIR_GAP_RECEIVER, e); + } + } + } + + private boolean registerUdpSessionIfNecessary(final DatagramPacket packet) { + final String receiverKey = packet.getSocketAddress().toString(); + final ClientSession session = + udpClientSessions.computeIfAbsent( + receiverKey, + key -> + new ClientSession(new DatagramClientSocket(packet.getAddress(), packet.getPort()))); + return SessionManager.getInstance().registerSession(session); + } + @Override public void start() throws StartupException { try { serverSocket = new ServerSocket(PipeConfig.getInstance().getPipeAirGapReceiverPort()); + datagramSocket = new DatagramSocket(PipeConfig.getInstance().getPipeAirGapReceiverPort()); } catch (final IOException e) { + if (Objects.nonNull(serverSocket)) { + try { + serverSocket.close(); + } catch (final IOException closeException) { + e.addSuppressed(closeException); + } + } throw new StartupException(e); } allowSubmitListen.set(true); listenExecutor.submit(this::listen); + udpListenExecutor.submit(this::listenUdp); LOGGER.info(DataNodePipeMessages.IOTDBAIRGAPRECEIVERAGENT_STARTED, serverSocket); } @Override public void stop() { + allowSubmitListen.set(false); + try { if (Objects.nonNull(serverSocket)) { serverSocket.close(); } + if (Objects.nonNull(datagramSocket)) { + datagramSocket.close(); + } } catch (final IOException e) { LOGGER.warn(DataNodePipeMessages.FAILED_TO_CLOSE_IOTDBAIRGAPRECEIVERAGENT_S_SERVER_SOCKET, e); } - allowSubmitListen.set(false); + udpClientSessions.forEach( + (key, session) -> { + final boolean registeredSession = SessionManager.getInstance().registerSession(session); + try { + PipeDataNodeAgent.receiver().thrift().handleClientExit(key); + } finally { + if (registeredSession) { + SessionManager.getInstance().removeCurrSession(); + } + } + }); + udpClientSessions.clear(); listenExecutor.shutdown(); + udpListenExecutor.shutdown(); LOGGER.info(DataNodePipeMessages.IOTDBAIRGAPRECEIVERAGENT_STOPPED, serverSocket); } @@ -103,4 +184,25 @@ public void stop() { public ServiceType getID() { return ServiceType.AIR_GAP_SERVICE; } + + private static class DatagramClientSocket extends Socket { + + private final InetAddress address; + private final int port; + + private DatagramClientSocket(final InetAddress address, final int port) { + this.address = address; + this.port = port; + } + + @Override + public InetAddress getInetAddress() { + return address; + } + + @Override + public int getPort() { + return port; + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java index 8a995f7656d93..d761bb711fc90 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java @@ -24,9 +24,13 @@ import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; import org.apache.iotdb.db.pipe.processor.twostage.exchange.receiver.TwoStageAggregateReceiver; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + public class IoTDBDataNodeReceiverAgent extends IoTDBReceiverAgent { private final ThreadLocal receiverThreadLocal = new ThreadLocal<>(); + private final Map receiverMap = new ConcurrentHashMap<>(); @Override protected void initConstructors() { @@ -38,16 +42,24 @@ protected void initConstructors() { @Override protected IoTDBReceiver getReceiverWithSpecifiedClient(final String ignore) { - return receiverThreadLocal.get(); + return ignore == null ? receiverThreadLocal.get() : receiverMap.get(ignore); } @Override protected void setReceiverWithSpecifiedClient(final String ignore, final IoTDBReceiver receiver) { - receiverThreadLocal.set(receiver); + if (ignore == null) { + receiverThreadLocal.set(receiver); + } else { + receiverMap.put(ignore, receiver); + } } @Override protected void removeReceiverWithSpecifiedClient(final String ignore) { - receiverThreadLocal.remove(); + if (ignore == null) { + receiverThreadLocal.remove(); + } else { + receiverMap.remove(ignore); + } } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java index e23db1f1ca8cd..9bf59f0a50895 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java @@ -33,11 +33,13 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; import org.apache.tsfile.utils.BytesUtils; +import org.apache.tsfile.utils.ReadWriteIOUtils; import org.junit.Assert; import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -45,6 +47,8 @@ import java.lang.reflect.Method; import java.net.Socket; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.zip.CRC32; public class IoTDBAirGapReceiverTest { @@ -121,6 +125,19 @@ public void testTemporaryUnavailableRetryTimeoutReturnsFail() throws Exception { } } + @Test + public void testReceiveAirGapPayloadWithSpecifiedReceiverKey() throws Exception { + final RecordingSocket socket = new RecordingSocket(); + final IoTDBAirGapReceiver receiver = new IoTDBAirGapReceiver(socket, 4L); + final StubIoTDBDataNodeReceiverAgent stubAgent = new StubIoTDBDataNodeReceiverAgent(); + stubAgent.setStubReceiver( + "udp-client", new StubReceiver(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))); + setField(receiver, "agent", stubAgent); + + Assert.assertTrue(receiver.receive(toAirGapData(toRawRequestBytes()), "udp-client")); + Assert.assertArrayEquals(AirGapOneByteResponse.OK, socket.getWrittenBytes()); + } + private static void setField(final Object target, final String fieldName, final Object value) throws Exception { final Field field = IoTDBAirGapReceiver.class.getDeclaredField(fieldName); @@ -147,6 +164,10 @@ private static class StubIoTDBDataNodeReceiverAgent extends IoTDBDataNodeReceive void setStubReceiver(final IoTDBReceiver receiver) { setReceiverWithSpecifiedClient(null, receiver); } + + void setStubReceiver(final String key, final IoTDBReceiver receiver) { + setReceiverWithSpecifiedClient(key, receiver); + } } private static class StubReceiver implements IoTDBReceiver { @@ -172,4 +193,20 @@ public IoTDBSinkRequestVersion getVersion() { return IoTDBSinkRequestVersion.VERSION_1; } } + + private static byte[] toRawRequestBytes() throws IOException { + try (final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write(IoTDBSinkRequestVersion.VERSION_1.getVersion(), outputStream); + ReadWriteIOUtils.write((short) 0, outputStream); + return byteArrayOutputStream.toByteArray(); + } + } + + private static byte[] toAirGapData(final byte[] rawRequestBytes) { + final CRC32 crc32 = new CRC32(); + crc32.update(rawRequestBytes, 0, rawRequestBytes.length); + return BytesUtils.concatByteArrayList( + Arrays.asList(BytesUtils.longToBytes(crc32.getValue()), rawRequestBytes)); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSinkTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSinkTest.java index d49ef82ce0550..ed2568db08de6 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSinkTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSinkTest.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSinkRuntimeEnvironment; +import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapOneByteResponse; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; @@ -39,12 +40,19 @@ import org.junit.Assert; import org.junit.Test; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.Socket; +import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.zip.CRC32; public class IoTDBDataRegionAirGapSinkTest { @@ -104,7 +112,79 @@ public void testTransferTsFileBatchOverAirGap() throws Exception { } } + @Test + public void testUdpAirGapSlicing() throws Exception { + try (final DatagramSocket receiverSocket = new DatagramSocket(0); + final UdpTestingIoTDBDataRegionAirGapSink sink = + new UdpTestingIoTDBDataRegionAirGapSink()) { + final Map attributes = buildParameterAttributes(false); + attributes.put(PipeSinkConstant.CONNECTOR_AIR_GAP_TRANSPORT_KEY, "udp"); + attributes.put(PipeSinkConstant.CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_KEY, "1024"); + + final PipeParameters parameters = new PipeParameters(attributes); + sink.validate(new PipeParameterValidator(parameters)); + sink.customize( + parameters, + new PipeTaskRuntimeConfiguration(new PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1))); + + final List receivedRequests = new ArrayList<>(); + final AtomicBoolean senderFinished = new AtomicBoolean(false); + final AtomicReference receiverException = new AtomicReference<>(); + final Thread receiverThread = + new Thread( + () -> { + try { + receiverSocket.setSoTimeout(100); + while (!senderFinished.get()) { + final byte[] buffer = new byte[2048]; + final DatagramPacket packet = new DatagramPacket(buffer, buffer.length); + try { + receiverSocket.receive(packet); + receivedRequests.add(decodeAirGapDatagram(buffer, packet.getLength())); + receiverSocket.send( + new DatagramPacket( + AirGapOneByteResponse.OK, + AirGapOneByteResponse.OK.length, + packet.getAddress(), + packet.getPort())); + } catch (final SocketTimeoutException ignored) { + // Poll until the sender has finished all datagrams. + } + } + } catch (final Throwable ignored) { + receiverException.set(ignored); + } + }); + receiverThread.start(); + + final byte[] request = new byte[2500]; + request[0] = 1; + final byte[] type = org.apache.tsfile.utils.BytesUtils.shortToBytes((short) 11); + request[1] = type[0]; + request[2] = type[1]; + + try { + Assert.assertTrue(sink.sendUdp(receiverSocket.getLocalPort(), request)); + } finally { + senderFinished.set(true); + } + + receiverThread.join(5000); + Assert.assertFalse(receiverThread.isAlive()); + Assert.assertNull(receiverException.get()); + Assert.assertTrue(receivedRequests.size() > 1); + for (final TPipeTransferReq receivedRequest : receivedRequests) { + Assert.assertEquals(PipeRequestType.TRANSFER_SLICE.getType(), receivedRequest.type); + Assert.assertTrue(receivedRequest.body.remaining() < 1024); + } + } + } + private PipeParameters buildParameters(final boolean useTsFileBatch) { + return new PipeParameters(buildParameterAttributes(useTsFileBatch)); + } + + private Map buildParameterAttributes(final boolean useTsFileBatch) { final Map attributes = new HashMap<>(); attributes.put( PipeSinkConstant.CONNECTOR_KEY, @@ -115,7 +195,7 @@ private PipeParameters buildParameters(final boolean useTsFileBatch) { if (useTsFileBatch) { attributes.put(PipeSinkConstant.CONNECTOR_FORMAT_KEY, "tsfile"); } - return new PipeParameters(attributes); + return attributes; } private PipeRawTabletInsertionEvent createPipeRawTabletInsertionEvent( @@ -149,6 +229,22 @@ private static TPipeTransferReq toTPipeTransferReq(final byte[] requestBytes) { return req; } + private static TPipeTransferReq decodeAirGapDatagram(final byte[] datagram, final int length) { + final ByteBuffer buffer = ByteBuffer.wrap(datagram, 0, length); + final int payloadLength = ReadWriteIOUtils.readInt(buffer); + Assert.assertEquals(payloadLength, ReadWriteIOUtils.readInt(buffer)); + Assert.assertEquals(length - 2 * Integer.BYTES, payloadLength); + + final long expectedChecksum = ReadWriteIOUtils.readLong(buffer); + final byte[] payload = new byte[payloadLength - Long.BYTES]; + buffer.get(payload); + + final CRC32 crc32 = new CRC32(); + crc32.update(payload, 0, payload.length); + Assert.assertEquals(expectedChecksum, crc32.getValue()); + return toTPipeTransferReq(payload); + } + private static class RecordingIoTDBDataRegionAirGapSink extends IoTDBDataRegionAirGapSink { private final List sentRequests = new ArrayList<>(); @@ -180,4 +276,15 @@ public synchronized void setSoTimeout(final int timeout) { } } } + + private static class UdpTestingIoTDBDataRegionAirGapSink extends IoTDBDataRegionAirGapSink { + + private boolean sendUdp(final int port, final byte[] request) throws Exception { + final AirGapSocket socket = new AirGapSocket("127.0.0.1", port); + try (final Socket ignored = socket) { + socket.connectUdp(1000); + return sendBytes(socket, request); + } + } + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java index 058b17e2f4f4c..98fedbbeb3535 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java @@ -152,6 +152,26 @@ public class PipeSinkConstant { "sink.air-gap.handshake-timeout-ms"; public static final int CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE = 5000; + public static final String CONNECTOR_AIR_GAP_TRANSPORT_KEY = "connector.air-gap.transport"; + public static final String SINK_AIR_GAP_TRANSPORT_KEY = "sink.air-gap.transport"; + public static final String CONNECTOR_AIR_GAP_TRANSPORT_TCP_VALUE = "tcp"; + public static final String CONNECTOR_AIR_GAP_TRANSPORT_UDP_VALUE = "udp"; + public static final String CONNECTOR_AIR_GAP_TRANSPORT_DEFAULT_VALUE = + CONNECTOR_AIR_GAP_TRANSPORT_TCP_VALUE; + public static final Set CONNECTOR_AIR_GAP_TRANSPORT_SET = + Collections.unmodifiableSet( + new HashSet<>( + Arrays.asList( + CONNECTOR_AIR_GAP_TRANSPORT_TCP_VALUE, CONNECTOR_AIR_GAP_TRANSPORT_UDP_VALUE))); + + public static final String CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_KEY = + "connector.air-gap.udp.packet-size-bytes"; + public static final String SINK_AIR_GAP_UDP_PACKET_SIZE_KEY = + "sink.air-gap.udp.packet-size-bytes"; + public static final int CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_DEFAULT_VALUE = 60 * 1024; + public static final int CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_MIN_VALUE = 1024; + public static final int CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_MAX_VALUE = 65_507; + public static final String CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY = "connector.version"; public static final String SINK_IOTDB_SYNC_CONNECTOR_VERSION_KEY = "sink.version"; public static final String CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_DEFAULT_VALUE = "1.1"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java index 6d74102ff1940..6099109b78179 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java @@ -25,25 +25,35 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapELanguageConstant; import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapOneByteResponse; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqBuilder; import org.apache.iotdb.pipe.api.annotation.TableModel; import org.apache.iotdb.pipe.api.annotation.TreeModel; import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeConnectionException; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import org.apache.tsfile.utils.BytesUtils; +import org.apache.tsfile.utils.PublicBAOS; +import org.apache.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedOutputStream; +import java.io.DataOutputStream; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -55,20 +65,36 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_AIR_GAP_TRANSPORT_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_AIR_GAP_TRANSPORT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_AIR_GAP_TRANSPORT_SET; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_AIR_GAP_TRANSPORT_UDP_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_MAX_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_MIN_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_AIR_GAP_E_LANGUAGE_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_AIR_GAP_TRANSPORT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_AIR_GAP_UDP_PACKET_SIZE_KEY; import static org.apache.iotdb.commons.utils.BasicStructureSerDeUtil.LONG_LEN; @TreeModel @TableModel public abstract class IoTDBAirGapSink extends IoTDBSink { + private static final int UDP_ENVELOPE_SIZE = 2 * Integer.BYTES + Long.BYTES; + private static final int UDP_SLICE_REQUEST_SERIALIZATION_RESERVED_SIZE = 64; + protected static class AirGapSocket extends Socket { private final TEndPoint endPoint; + private DatagramSocket datagramSocket; + private InetAddress datagramAddress; + private boolean isUdp; public AirGapSocket(final String ip, final int port) { this.endPoint = new TEndPoint(ip, port); @@ -78,9 +104,60 @@ public TEndPoint getEndPoint() { return endPoint; } + public void connectUdp(final int timeoutMs) throws IOException { + datagramAddress = InetAddress.getByName(endPoint.getIp()); + datagramSocket = new DatagramSocket(); + datagramSocket.connect(datagramAddress, endPoint.getPort()); + datagramSocket.setSoTimeout(timeoutMs); + isUdp = true; + } + + public InetAddress getDatagramAddress() { + return datagramAddress; + } + + public DatagramSocket getDatagramSocket() { + return datagramSocket; + } + + public boolean isUdp() { + return isUdp; + } + + @Override + public boolean isConnected() { + return isUdp + ? datagramSocket != null && datagramSocket.isConnected() && !datagramSocket.isClosed() + : super.isConnected(); + } + + @Override + public synchronized void setSoTimeout(final int timeout) throws SocketException { + if (isUdp && datagramSocket != null) { + datagramSocket.setSoTimeout(timeout); + } else { + super.setSoTimeout(timeout); + } + } + + @Override + public synchronized void close() throws IOException { + if (datagramSocket != null) { + datagramSocket.close(); + } + super.close(); + } + @Override public String toString() { - return "AirGapSocket{" + "endPoint=" + endPoint + "} (" + super.toString() + ")"; + return "AirGapSocket{" + + "endPoint=" + + endPoint + + ", transport=" + + (isUdp ? "udp" : "tcp") + + "} (" + + super.toString() + + ")"; } } @@ -98,12 +175,49 @@ public String toString() { private int handshakeTimeoutMs; private boolean eLanguageEnable; + private boolean useUdpTransport; + private int udpPacketSizeInBytes; // The air gap connector does not use clientManager thus we put handshake type here protected boolean supportModsIfIsDataNodeReceiver = true; private final Map failLogTimes = new HashMap<>(); + @Override + public void validate(final PipeParameterValidator validator) throws Exception { + super.validate(validator); + + final PipeParameters parameters = validator.getParameters(); + final String airGapTransport = + parameters + .getStringOrDefault( + Arrays.asList(CONNECTOR_AIR_GAP_TRANSPORT_KEY, SINK_AIR_GAP_TRANSPORT_KEY), + CONNECTOR_AIR_GAP_TRANSPORT_DEFAULT_VALUE) + .trim() + .toLowerCase(); + validator.validate( + arg -> CONNECTOR_AIR_GAP_TRANSPORT_SET.contains(airGapTransport), + String.format( + "Air gap transport should be one of %s, but got %s.", + CONNECTOR_AIR_GAP_TRANSPORT_SET, airGapTransport), + airGapTransport); + + final int packetSize = + parameters.getIntOrDefault( + Arrays.asList(CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_KEY, SINK_AIR_GAP_UDP_PACKET_SIZE_KEY), + CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_DEFAULT_VALUE); + validator.validate( + arg -> + packetSize >= CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_MIN_VALUE + && packetSize <= CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_MAX_VALUE, + String.format( + "UDP packet size should be in the range [%d, %d], but got %d.", + CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_MIN_VALUE, + CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_MAX_VALUE, + packetSize), + packetSize); + } + @Override public void customize( final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration) @@ -145,6 +259,23 @@ public void customize( CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY, SINK_AIR_GAP_E_LANGUAGE_ENABLE_KEY), CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_DEFAULT_VALUE); LOGGER.info(PipeMessages.AIR_GAP_CUSTOMIZED_E_LANGUAGE, eLanguageEnable); + + useUdpTransport = + CONNECTOR_AIR_GAP_TRANSPORT_UDP_VALUE.equals( + parameters + .getStringOrDefault( + Arrays.asList(CONNECTOR_AIR_GAP_TRANSPORT_KEY, SINK_AIR_GAP_TRANSPORT_KEY), + CONNECTOR_AIR_GAP_TRANSPORT_DEFAULT_VALUE) + .trim() + .toLowerCase()); + udpPacketSizeInBytes = + parameters.getIntOrDefault( + Arrays.asList(CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_KEY, SINK_AIR_GAP_UDP_PACKET_SIZE_KEY), + CONNECTOR_AIR_GAP_UDP_PACKET_SIZE_DEFAULT_VALUE); + LOGGER.info( + "Air gap transport is {}, udp packet size is {} bytes.", + useUdpTransport ? "udp" : "tcp", + udpPacketSizeInBytes); } @Override @@ -179,8 +310,12 @@ public void handshake() throws Exception { final AirGapSocket socket = new AirGapSocket(ip, port); try { - socket.connect(new InetSocketAddress(ip, port), handshakeTimeoutMs); - socket.setKeepAlive(true); + if (useUdpTransport) { + socket.connectUdp(handshakeTimeoutMs); + } else { + socket.connect(new InetSocketAddress(ip, port), handshakeTimeoutMs); + socket.setKeepAlive(true); + } sockets.set(i, socket); LOGGER.info(PipeMessages.CONNECTED_TO_TARGET_SERVER, ip, port); failLogTimes.remove(nodeUrls.get(i)); @@ -318,6 +453,10 @@ protected boolean sendBytes(final AirGapSocket socket, byte[] bytes) throws IOEx String.format("Socket %s is closed, will try to handshake", socket)); } + if (socket.isUdp()) { + return sendBytesByUdp(socket, bytes); + } + final BufferedOutputStream outputStream = new BufferedOutputStream(socket.getOutputStream()); bytes = enrichWithLengthAndChecksum(bytes); outputStream.write(eLanguageEnable ? enrichWithELanguage(bytes) : bytes); @@ -328,6 +467,101 @@ protected boolean sendBytes(final AirGapSocket socket, byte[] bytes) throws IOEx return size > 0 && Arrays.equals(AirGapOneByteResponse.OK, response); } + private boolean sendBytesByUdp(final AirGapSocket socket, final byte[] bytes) throws IOException { + for (final byte[] requestBytes : sliceIfNeededForUdp(bytes)) { + if (!sendOneDatagram(socket, requestBytes)) { + return false; + } + } + return true; + } + + private boolean sendOneDatagram(final AirGapSocket socket, final byte[] requestBytes) + throws IOException { + final byte[] datagramBytes = + eLanguageEnable + ? enrichWithELanguage(enrichWithLengthAndChecksum(requestBytes)) + : enrichWithLengthAndChecksum(requestBytes); + if (datagramBytes.length > udpPacketSizeInBytes) { + throw new IOException( + String.format( + "Air gap UDP datagram size %d exceeds configured packet size %d.", + datagramBytes.length, udpPacketSizeInBytes)); + } + + final DatagramSocket datagramSocket = socket.getDatagramSocket(); + datagramSocket.send( + new DatagramPacket( + datagramBytes, + datagramBytes.length, + socket.getDatagramAddress(), + socket.getEndPoint().getPort())); + + final byte[] response = new byte[1]; + final DatagramPacket responsePacket = new DatagramPacket(response, response.length); + datagramSocket.receive(responsePacket); + return responsePacket.getLength() > 0 && Arrays.equals(AirGapOneByteResponse.OK, response); + } + + private List sliceIfNeededForUdp(final byte[] requestBytes) throws IOException { + final int rawPayloadSizeLimit = + udpPacketSizeInBytes + - UDP_ENVELOPE_SIZE + - (eLanguageEnable + ? AirGapELanguageConstant.E_LANGUAGE_PREFIX.length + + AirGapELanguageConstant.E_LANGUAGE_SUFFIX.length + : 0); + if (requestBytes.length <= rawPayloadSizeLimit) { + return Arrays.asList(requestBytes); + } + + final int sliceBodySizeLimit = + rawPayloadSizeLimit - UDP_SLICE_REQUEST_SERIALIZATION_RESERVED_SIZE; + if (sliceBodySizeLimit <= 0) { + throw new IOException( + String.format( + "Air gap UDP packet size %d is too small to transfer sliced requests.", + udpPacketSizeInBytes)); + } + + final TPipeTransferReq request = toTPipeTransferReq(requestBytes); + final int sliceOrderId = PipeTransferSliceReqBuilder.nextSliceOrderId(); + final int sliceCount = PipeTransferSliceReqBuilder.getSliceCount(request, sliceBodySizeLimit); + + final List slicedRequestBytes = new ArrayList<>(sliceCount); + for (int i = 0; i < sliceCount; i++) { + slicedRequestBytes.add( + toTPipeTransferBytes( + PipeTransferSliceReqBuilder.buildSliceReq( + request, sliceOrderId, i, sliceCount, sliceBodySizeLimit))); + } + return slicedRequestBytes; + } + + private TPipeTransferReq toTPipeTransferReq(final byte[] requestBytes) { + final ByteBuffer byteBuffer = ByteBuffer.wrap(requestBytes); + final TPipeTransferReq request = new TPipeTransferReq(); + request.version = ReadWriteIOUtils.readByte(byteBuffer); + request.type = ReadWriteIOUtils.readShort(byteBuffer); + request.body = byteBuffer.slice(); + return request; + } + + private byte[] toTPipeTransferBytes(final TPipeTransferReq request) throws IOException { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write(request.version, outputStream); + ReadWriteIOUtils.write(request.type, outputStream); + + final ByteBuffer bodyBuffer = request.body.duplicate(); + final byte[] body = new byte[bodyBuffer.remaining()]; + bodyBuffer.get(body); + outputStream.write(body); + + return Arrays.copyOf(byteArrayOutputStream.getBuf(), byteArrayOutputStream.size()); + } + } + protected boolean send(final AirGapSocket socket, final byte[] bytes) throws IOException { return send(null, 0, socket, bytes); }