Skip to content

Commit f006221

Browse files
committed
chore
1 parent 470c647 commit f006221

2 files changed

Lines changed: 200 additions & 82 deletions

File tree

src/main/java/de/gesellix/docker/client/filesocket/NamedPipeSocket.java

Lines changed: 3 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
11
package de.gesellix.docker.client.filesocket;
22

33
import static com.sun.jna.platform.win32.WinBase.INVALID_HANDLE_VALUE;
4-
import static com.sun.jna.platform.win32.WinNT.FILE_FLAG_OVERLAPPED;
5-
import static com.sun.jna.platform.win32.WinNT.GENERIC_READ;
6-
import static com.sun.jna.platform.win32.WinNT.GENERIC_WRITE;
7-
import static com.sun.jna.platform.win32.WinNT.OPEN_EXISTING;
84

95
import java.io.IOException;
106
import java.io.InputStream;
@@ -17,7 +13,6 @@
1713
import org.slf4j.Logger;
1814
import org.slf4j.LoggerFactory;
1915

20-
import com.sun.jna.platform.win32.Kernel32;
2116
import com.sun.jna.platform.win32.WinNT;
2217

2318
import okio.BufferedSink;
@@ -36,7 +31,7 @@ public class NamedPipeSocket extends FileSocket {
3631
private BufferedSource source;
3732
private BufferedSink sink;
3833

39-
private final Timeout ioTimeout = new Timeout().timeout(2000, TimeUnit.MILLISECONDS);
34+
private final Timeout ioTimeout = new Timeout().timeout(1000, TimeUnit.MILLISECONDS);
4035

4136
@Override
4237
public void connect(SocketAddress endpoint, int timeout) throws IOException {
@@ -50,33 +45,11 @@ public void connect(SocketAddress endpoint, int timeout) throws IOException {
5045
connect(socketPath);
5146
}
5247

53-
void connect(String socketPath) throws IOException {
48+
void connect(String socketPath) {
5449
socketPath = socketPath.replace("/", "\\");
5550
log.debug("connect via '{}'...", socketPath);
5651

57-
boolean ok = Kernel32.INSTANCE.WaitNamedPipe(socketPath, 200);
58-
if (!ok) {
59-
int err = Kernel32.INSTANCE.GetLastError();
60-
log.error("Failed to wait for Named Pipe '" + socketPath + "', WinError=" + err);
61-
throw new IOException("Failed to wait for Named Pipe '" + socketPath + "', WinError=" + err);
62-
}
63-
64-
handle = Kernel32.INSTANCE.CreateFile(
65-
socketPath,
66-
GENERIC_READ | GENERIC_WRITE,
67-
0,
68-
null,
69-
OPEN_EXISTING,
70-
//0,
71-
FILE_FLAG_OVERLAPPED,
72-
null
73-
);
74-
75-
if (INVALID_HANDLE_VALUE.equals(handle)) {
76-
int err = Kernel32.INSTANCE.GetLastError();
77-
log.error("Failed to open Named Pipe '" + socketPath + "', WinError=" + err);
78-
throw new IOException("Failed to open Named Pipe '" + socketPath + "', WinError=" + err);
79-
}
52+
handle = NamedPipeUtils.connect(socketPath, 10_000, 500, 50);
8053

8154
connected = true;
8255
source = Okio.buffer(new NamedPipeSource(handle, ioTimeout));

src/main/java/de/gesellix/docker/client/filesocket/NamedPipeUtils.java

Lines changed: 197 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -19,79 +19,224 @@ public final class NamedPipeUtils {
1919
private NamedPipeUtils() {
2020
}
2121

22-
public static boolean readToBuffer(HANDLE handle, byte[] buf, IntByReference bytesRead) {
23-
return Kernel32.INSTANCE.ReadFile(handle, buf, buf.length, bytesRead, null);
24-
}
22+
public static WinNT.HANDLE connect(
23+
String path,
24+
int totalTimeoutMs, // z.B. 15000
25+
int busyWaitCapMs, // z.B. 2000 (max für einzelne WaitNamedPipe)
26+
int notFoundBaseSleepMs // z.B. 50
27+
) {
28+
final long deadline = System.nanoTime() + (long) totalTimeoutMs * 1_000_000L;
29+
int notFoundBackoff = notFoundBaseSleepMs;
30+
WinNT.HANDLE h;
31+
32+
while (true) {
33+
h = Kernel32.INSTANCE.CreateFile(
34+
path,
35+
WinNT.GENERIC_READ | WinNT.GENERIC_WRITE,
36+
0,
37+
null,
38+
WinNT.OPEN_EXISTING,
39+
WinNT.FILE_FLAG_OVERLAPPED,
40+
null
41+
);
42+
43+
if (!WinBase.INVALID_HANDLE_VALUE.equals(h)) {
44+
// Verbunden – Handle ist sofort nutzbar (für Overlapped-I/O)
45+
log.debug("connected");
46+
return h;
47+
}
2548

26-
public static boolean writeFromBuffer(HANDLE handle, byte[] buf, int len, IntByReference written) {
27-
return Kernel32.INSTANCE.WriteFile(handle, buf, len, written, null);
49+
int err = Kernel32.INSTANCE.GetLastError();
50+
long remainingMs = Math.max(0, (deadline - System.nanoTime()) / 1_000_000L);
51+
if (remainingMs == 0) {
52+
log.debug("Connect timed out; last error=" + err);
53+
throw new RuntimeException("Connect timed out; last error=" + err);
54+
}
55+
56+
if (err == WinError.ERROR_PIPE_BUSY) {
57+
log.debug("Pipe busy; last error=" + err);
58+
// Warte, aber maximal busyWaitCapMs und nie länger als verbleibende Zeit
59+
int to = (int) Math.min(remainingMs, busyWaitCapMs);
60+
boolean ok = Kernel32.INSTANCE.WaitNamedPipe(path, to);
61+
if (!ok) {
62+
int e2 = Kernel32.INSTANCE.GetLastError(); // z.B. ERROR_SEM_TIMEOUT
63+
if (e2 == WinError.ERROR_SEM_TIMEOUT && (deadline - System.nanoTime()) > 0) {
64+
log.debug("Retry wait; last error=" + err);
65+
continue; // Restzeit vorhanden → nochmal versuchen
66+
}
67+
throw new RuntimeException("WaitNamedPipe failed: " + e2);
68+
}
69+
// danach erneut versuchen
70+
continue;
71+
}
72+
73+
if (err == WinError.ERROR_FILE_NOT_FOUND) {
74+
log.debug("File not found; last error=" + err);
75+
// Server hat Pipe noch nicht erstellt → kurzer, exponentieller Backoff
76+
int sleepMs = Math.min(notFoundBackoff, (int) remainingMs);
77+
try {
78+
Thread.sleep(sleepMs);
79+
} catch (InterruptedException ie) {
80+
Thread.currentThread().interrupt();
81+
throw new RuntimeException("Interrupted while waiting for pipe", ie);
82+
}
83+
notFoundBackoff = Math.min(notFoundBackoff * 2, 250); // Deckel bei 250ms
84+
continue;
85+
}
86+
87+
// Andere Fehler → sofort abbrechen
88+
throw new RuntimeException("CreateFile failed: " + err);
89+
}
2890
}
2991

3092
public static boolean readOverlapped(WinNT.HANDLE handle, byte[] buf, IntByReference bytesRead, int timeoutMillis) {
93+
WinNT.HANDLE evt = Kernel32.INSTANCE.CreateEvent(null, false, false, null);
94+
if (evt == null || WinBase.INVALID_HANDLE_VALUE.equals(evt)) {
95+
throw new RuntimeException("CreateEvent failed: " + Kernel32.INSTANCE.GetLastError());
96+
}
97+
3198
WinBase.OVERLAPPED overlapped = new WinBase.OVERLAPPED();
32-
overlapped.hEvent = Kernel32.INSTANCE.CreateEvent(null, true, false, null);
99+
overlapped.hEvent = evt;
100+
// overlapped.hEvent = evt;
101+
overlapped.write(); // wichtig: Struktur in nativen Speicher schreiben
33102

34-
try {
35-
Thread.sleep(10);
36-
} catch (InterruptedException ignored) {
37-
// ignored
38-
}
103+
// try {
104+
// Thread.sleep(500);
105+
// } catch (InterruptedException ie) {
106+
// Thread.currentThread().interrupt();
107+
// throw new RuntimeException("Interrupted while waiting for pipe", ie);
108+
// }
39109

40-
boolean ok = Kernel32.INSTANCE.ReadFile(handle, buf, buf.length, null, overlapped);
41-
if (!ok) {
42-
int err = Kernel32.INSTANCE.GetLastError();
43-
if (err != WinError.ERROR_IO_PENDING) {
44-
Kernel32.INSTANCE.CloseHandle(overlapped.hEvent);
45-
return false;
110+
try {
111+
boolean ok = Kernel32.INSTANCE.ReadFile(handle, buf, buf.length, null, overlapped);
112+
if (!ok) {
113+
int err = Kernel32.INSTANCE.GetLastError();
114+
if (err != WinError.ERROR_IO_PENDING) {
115+
log.debug("IO pending; last error=" + err);
116+
return false;
117+
}
118+
119+
int wait = Kernel32.INSTANCE.WaitForSingleObject(evt, timeoutMillis);
120+
if (wait != WinBase.WAIT_OBJECT_0) {
121+
log.debug("File not found; last error=" + err);
122+
ExtendedKernel32.INSTANCE.CancelIoEx(handle, overlapped.getPointer());
123+
return false;
124+
}
46125
}
47126

48-
int wait = Kernel32.INSTANCE.WaitForSingleObject(overlapped.hEvent, timeoutMillis);
49-
if (wait != WinBase.WAIT_OBJECT_0) {
50-
ExtendedKernel32.INSTANCE.CancelIoEx(handle, overlapped.getPointer());
51-
Kernel32.INSTANCE.CloseHandle(overlapped.hEvent);
127+
128+
// Ergebnis (auch bei synchroner Completion) abholen – ohne zusätzlich zu warten
129+
if (!ExtendedKernel32.INSTANCE.GetOverlappedResult(handle, overlapped, bytesRead, false)) {
130+
int err = Kernel32.INSTANCE.GetLastError();
131+
log.debug("Failed to get overlapped result; last error=" + err);
52132
return false;
53133
}
54-
}
55134

56-
ExtendedKernel32.INSTANCE.GetOverlappedResult(handle, overlapped, bytesRead, false);
57-
Kernel32.INSTANCE.CloseHandle(overlapped.hEvent);
58-
log.debug("Bytes read: {}", bytesRead.getValue());
59-
return bytesRead.getValue() >= 0;
135+
int n = bytesRead.getValue();
136+
log.debug("Bytes read: {}", bytesRead.getValue());
137+
// n == 0: EOF
138+
return n >= 0;
139+
} finally {
140+
closeHandle(evt);
141+
}
60142
}
61143

62-
public static boolean writeOverlapped(WinNT.HANDLE handle, byte[] buf, int len, IntByReference bytesWritten, int timeoutMillis) {
63-
WinBase.OVERLAPPED overlapped = new WinBase.OVERLAPPED();
64-
overlapped.hEvent = Kernel32.INSTANCE.CreateEvent(null, true, false, null);
65-
66-
try {
67-
Thread.sleep(10);
68-
} catch (InterruptedException ignored) {
69-
// ignored
144+
/**
145+
* Overlapped-Write: schreibt buffer[0..len) vollständig oder wirft bei Timeout/Fehler.
146+
* Beachtet Teil-Schreibungen und verwendet eine Gesamt-Deadline (timeoutMs).
147+
*/
148+
public static boolean writeOverlapped(
149+
WinNT.HANDLE handle, byte[] buf, int len, IntByReference bytesWritten, int timeoutMs) {
150+
if (len < 0 || len > buf.length) {
151+
throw new IllegalArgumentException("len out of range");
152+
}
153+
if (len == 0) {
154+
return true;
70155
}
71156

72-
boolean ok = Kernel32.INSTANCE.WriteFile(handle, buf, len, null, overlapped);
73-
if (!ok) {
74-
int err = Kernel32.INSTANCE.GetLastError();
75-
if (err != WinError.ERROR_IO_PENDING) {
76-
Kernel32.INSTANCE.CloseHandle(overlapped.hEvent);
77-
return false;
157+
// try {
158+
// Thread.sleep(500);
159+
// } catch (InterruptedException ie) {
160+
// Thread.currentThread().interrupt();
161+
// throw new RuntimeException("Interrupted while waiting for pipe", ie);
162+
// }
163+
164+
final long deadline = System.nanoTime() + (long) timeoutMs * 1_000_000L;
165+
166+
int totalWritten = 0;
167+
168+
while (totalWritten < len) {
169+
long remainingMs = (deadline - System.nanoTime()) / 1_000_000L;
170+
if (remainingMs <= 0) {
171+
throw new RuntimeException("writeOverlapped: overall timeout; written=" + totalWritten + "/" + len);
78172
}
79173

80-
int wait = Kernel32.INSTANCE.WaitForSingleObject(overlapped.hEvent, timeoutMillis);
81-
if (wait != WinBase.WAIT_OBJECT_0) {
82-
ExtendedKernel32.INSTANCE.CancelIoEx(handle, overlapped.getPointer());
83-
Kernel32.INSTANCE.CloseHandle(overlapped.hEvent);
84-
return false;
174+
// Frisches auto-reset Event pro Operation
175+
WinNT.HANDLE evt = Kernel32.INSTANCE.CreateEvent(null, false, false, null);
176+
if (evt == null || WinBase.INVALID_HANDLE_VALUE.equals(evt)) {
177+
throw new RuntimeException("CreateEvent failed: " + Kernel32.INSTANCE.GetLastError());
85178
}
86-
}
87179

88-
ExtendedKernel32.INSTANCE.GetOverlappedResult(handle, overlapped, bytesWritten, false);
89-
Kernel32.INSTANCE.CloseHandle(overlapped.hEvent);
90-
log.debug("Bytes written: {}", bytesWritten.getValue());
91-
if (bytesWritten.getValue() != len) {
92-
log.warn("Incomplete write: {}/{}", bytesWritten.getValue(), len);
180+
WinBase.OVERLAPPED ol = new WinBase.OVERLAPPED();
181+
ol.hEvent = evt;
182+
ol.write();
183+
184+
try {
185+
boolean ok = Kernel32.INSTANCE.WriteFile(handle, buf, len, null, ol);
186+
if (!ok) {
187+
int err = Kernel32.INSTANCE.GetLastError();
188+
189+
// Sofortige Fehler
190+
if (err != WinError.ERROR_IO_PENDING) {
191+
if (err == WinError.ERROR_NO_DATA || err == WinError.ERROR_BROKEN_PIPE) {
192+
throw new RuntimeException("writeOverlapped: pipe closed by peer");
193+
}
194+
throw new RuntimeException("WriteFile failed: " + err);
195+
}
196+
197+
// Asynchron → warten bis fertig oder Timeout
198+
int wait = Kernel32.INSTANCE.WaitForSingleObject(evt, (int) Math.min(Integer.MAX_VALUE, remainingMs));
199+
if (wait != WinBase.WAIT_OBJECT_0) {
200+
// Timeout/Fehler → I/O abbrechen
201+
ExtendedKernel32.INSTANCE.CancelIoEx(handle, ol.getPointer());
202+
// Optional: warten, bis Cancel durch ist (nicht zwingend)
203+
throw new RuntimeException("writeOverlapped: wait failed/timeout (Wait=" + wait + ")");
204+
}
205+
}
206+
207+
// Ergebnis abholen (blockiert nicht, weil Event signalisiert)
208+
if (!ExtendedKernel32.INSTANCE.GetOverlappedResult(handle, ol, bytesWritten, false)) {
209+
int e2 = Kernel32.INSTANCE.GetLastError();
210+
if (e2 == WinError.ERROR_OPERATION_ABORTED) {
211+
throw new RuntimeException("writeOverlapped: operation aborted");
212+
}
213+
throw new RuntimeException("GetOverlappedResult(write) failed: " + e2);
214+
}
215+
216+
int n = bytesWritten.getValue();
217+
if (n < 0) {
218+
throw new RuntimeException("writeOverlapped: negative bytesWritten? " + n);
219+
}
220+
if (n == 0) {
221+
// Möglich, wenn Gegenstelle zu ist → als Fehler behandeln
222+
throw new RuntimeException("writeOverlapped: wrote 0 bytes, likely peer closed");
223+
}
224+
225+
totalWritten += n;
226+
} finally {
227+
closeHandle(evt);
228+
}
93229
}
94-
return bytesWritten.getValue() >= 0;
230+
// return totalWritten >= 0;
231+
return true;
232+
}
233+
234+
public static boolean readToBuffer(HANDLE handle, byte[] buf, IntByReference bytesRead) {
235+
return Kernel32.INSTANCE.ReadFile(handle, buf, buf.length, bytesRead, null);
236+
}
237+
238+
public static boolean writeFromBuffer(HANDLE handle, byte[] buf, int len, IntByReference written) {
239+
return Kernel32.INSTANCE.WriteFile(handle, buf, len, written, null);
95240
}
96241

97242
public static void closeHandle(HANDLE handle) {

0 commit comments

Comments
 (0)