Skip to content

Commit e16b79e

Browse files
authored
OkHTTP3 - fix response charset and adding support for gzip compression (#405)
* fix response charset and adding support for gzip compression * adding tests * adding support for other clients * fixing apacheasync http client test * disabling flaky tests
1 parent 5813589 commit e16b79e

49 files changed

Lines changed: 1515 additions & 172 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Copyright The Hypertrace Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.opentelemetry.instrumentation.hypertrace.apachehttpasyncclient;
18+
19+
import io.opentelemetry.proto.trace.v1.Span;
20+
import java.io.BufferedReader;
21+
import java.io.IOException;
22+
import java.io.InputStream;
23+
import java.io.InputStreamReader;
24+
import java.nio.charset.StandardCharsets;
25+
import java.util.List;
26+
import java.util.concurrent.ExecutionException;
27+
import java.util.concurrent.Future;
28+
import java.util.concurrent.TimeoutException;
29+
import java.util.zip.GZIPInputStream;
30+
import org.apache.http.HttpResponse;
31+
import org.apache.http.client.methods.HttpGet;
32+
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
33+
import org.apache.http.impl.nio.client.HttpAsyncClients;
34+
import org.hypertrace.agent.testing.AbstractInstrumenterTest;
35+
import org.hypertrace.agent.testing.TestHttpServer;
36+
import org.junit.jupiter.api.AfterAll;
37+
import org.junit.jupiter.api.Assertions;
38+
import org.junit.jupiter.api.BeforeAll;
39+
import org.junit.jupiter.api.Test;
40+
41+
class ApacheAsyncClientGzipHandlingTest extends AbstractInstrumenterTest {
42+
43+
private static final TestHttpServer testHttpServer = new TestHttpServer();
44+
45+
private static final CloseableHttpAsyncClient client = HttpAsyncClients.createDefault();
46+
47+
@BeforeAll
48+
public static void startServer() throws Exception {
49+
testHttpServer.start();
50+
client.start();
51+
}
52+
53+
@AfterAll
54+
public static void closeServer() throws Exception {
55+
testHttpServer.close();
56+
}
57+
58+
@Test
59+
public void getGzipResponse()
60+
throws ExecutionException, InterruptedException, TimeoutException, IOException {
61+
HttpGet getRequest =
62+
new HttpGet(String.format("http://localhost:%s/gzip", testHttpServer.port()));
63+
getRequest.addHeader("foo", "bar");
64+
Future<HttpResponse> futureResponse =
65+
client.execute(
66+
getRequest, new ApacheAsyncClientInstrumentationModuleTest.NoopFutureCallback());
67+
68+
HttpResponse response = futureResponse.get();
69+
Assertions.assertEquals(200, response.getStatusLine().getStatusCode());
70+
try (InputStream gzipStream = new GZIPInputStream(response.getEntity().getContent())) {
71+
String responseBody = readInputStream(gzipStream);
72+
Assertions.assertEquals(TestHttpServer.GzipHandler.RESPONSE_BODY, responseBody);
73+
}
74+
75+
TEST_WRITER.waitForTraces(1);
76+
// exclude server spans
77+
List<List<Span>> traces =
78+
TEST_WRITER.waitForSpans(
79+
2,
80+
span ->
81+
span.getKind().equals(Span.SpanKind.SPAN_KIND_SERVER)
82+
|| span.getAttributesList().stream()
83+
.noneMatch(
84+
keyValue ->
85+
keyValue.getKey().equals("http.response.header.content-encoding")
86+
&& keyValue.getValue().getStringValue().contains("gzip")));
87+
Assertions.assertEquals(1, traces.size());
88+
Assertions.assertEquals(2, traces.get(0).size());
89+
Span clientSpan = traces.get(0).get(1);
90+
Span responseBodySpan = traces.get(0).get(0);
91+
if (traces.get(0).get(0).getKind().equals(Span.SpanKind.SPAN_KIND_CLIENT)) {
92+
clientSpan = traces.get(0).get(0);
93+
responseBodySpan = traces.get(0).get(1);
94+
}
95+
96+
Assertions.assertEquals(
97+
"test-value",
98+
TEST_WRITER
99+
.getAttributesMap(clientSpan)
100+
.get("http.response.header.test-response-header")
101+
.getStringValue());
102+
Assertions.assertEquals(
103+
"bar",
104+
TEST_WRITER.getAttributesMap(clientSpan).get("http.request.header.foo").getStringValue());
105+
Assertions.assertNull(TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body"));
106+
107+
Assertions.assertEquals(
108+
TestHttpServer.GzipHandler.RESPONSE_BODY,
109+
TEST_WRITER.getAttributesMap(responseBodySpan).get("http.response.body").getStringValue());
110+
}
111+
112+
private String readInputStream(InputStream inputStream) throws IOException {
113+
StringBuilder textBuilder = new StringBuilder();
114+
115+
try (BufferedReader reader =
116+
new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
117+
int c;
118+
while ((c = reader.read()) != -1) {
119+
textBuilder.append((char) c);
120+
}
121+
}
122+
return textBuilder.toString();
123+
}
124+
}

instrumentation/apache-httpclient-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/apachehttpclient/v4_0/ApacheHttpClientUtils.java

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@
2020
import io.opentelemetry.api.trace.Span;
2121
import io.opentelemetry.javaagent.instrumentation.hypertrace.apachehttpclient.v4_0.ApacheHttpClientObjectRegistry.SpanAndAttributeKey;
2222
import java.io.IOException;
23-
import java.io.UnsupportedEncodingException;
23+
import java.io.InputStream;
24+
import java.io.InputStreamReader;
25+
import java.io.OutputStreamWriter;
2426
import java.nio.charset.Charset;
2527
import java.util.function.Function;
28+
import java.util.zip.GZIPInputStream;
2629
import org.apache.http.Header;
2730
import org.apache.http.HeaderIterator;
2831
import org.apache.http.HttpEntity;
@@ -100,26 +103,32 @@ public static void traceEntity(
100103
if (contentType == null || !ContentTypeUtils.shouldCapture(contentType.getValue())) {
101104
return;
102105
}
103-
104106
String charsetStr = ContentTypeUtils.parseCharset(contentType.getValue());
105107
Charset charset = ContentTypeCharsetUtils.toCharset(charsetStr);
106-
108+
// Get the content encoding header and check if it's gzip
109+
Header contentEncoding = entity.getContentEncoding();
110+
boolean isGzipEncoded =
111+
contentEncoding != null
112+
&& contentEncoding.getValue() != null
113+
&& contentEncoding.getValue().toLowerCase().contains("gzip");
107114
if (entity.isRepeatable()) {
108115
try {
109-
BoundedByteArrayOutputStream byteArrayOutputStream =
110-
BoundedBuffersFactory.createStream(charset);
111-
entity.writeTo(byteArrayOutputStream);
112-
113-
try {
114-
String body = byteArrayOutputStream.toStringWithSuppliedCharset();
115-
span.setAttribute(bodyAttributeKey, body);
116-
} catch (UnsupportedEncodingException e) {
117-
log.error("Could not parse charset from encoding {}", charsetStr, e);
116+
InputStream contentStream = entity.getContent();
117+
if (isGzipEncoded) {
118+
try {
119+
contentStream = new GZIPInputStream(contentStream);
120+
} catch (IOException e) {
121+
log.error("Failed to create GZIPInputStream", e);
122+
return;
123+
}
118124
}
125+
126+
String body = readInputStream(contentStream, charset);
127+
span.setAttribute(bodyAttributeKey, body);
128+
119129
} catch (IOException e) {
120-
log.error("Could not read request input stream from repeatable request entity/body", e);
130+
throw new RuntimeException(e);
121131
}
122-
123132
return;
124133
}
125134

@@ -133,4 +142,18 @@ public static void traceEntity(
133142
ApacheHttpClientObjectRegistry.entityToSpan.put(
134143
entity, new SpanAndAttributeKey(span, bodyAttributeKey));
135144
}
145+
146+
public static String readInputStream(InputStream inputStream, Charset charset)
147+
throws IOException {
148+
BoundedByteArrayOutputStream outputStream = BoundedBuffersFactory.createStream(charset);
149+
try (InputStreamReader reader = new InputStreamReader(inputStream, charset);
150+
OutputStreamWriter writer = new OutputStreamWriter(outputStream, charset)) {
151+
int c;
152+
while ((c = reader.read()) != -1) {
153+
writer.write(c);
154+
}
155+
writer.flush();
156+
}
157+
return outputStream.toStringWithSuppliedCharset();
158+
}
136159
}

instrumentation/apache-httpclient-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/apachehttpclient/v4_0/HttpEntityInstrumentation.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,18 @@ public static void exit(@Advice.This HttpEntity thizz, @Advice.Return InputStrea
9292
}
9393
Charset charset = ContentTypeCharsetUtils.toCharset(charsetStr);
9494

95+
String contentEncoding = null;
96+
Header contentEncodingHeader = thizz.getContentEncoding();
97+
if (contentEncodingHeader != null) {
98+
contentEncoding = contentEncodingHeader.getValue();
99+
}
95100
SpanAndBuffer spanAndBuffer =
96101
new SpanAndBuffer(
97102
clientSpan.span,
98103
BoundedBuffersFactory.createStream((int) contentSize, charset),
99104
clientSpan.attributeKey,
100-
charset);
105+
charset,
106+
contentEncoding);
101107
VirtualField.find(InputStream.class, SpanAndBuffer.class).set(inputStream, spanAndBuffer);
102108
}
103109
}

instrumentation/java-streams/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/java/inputstream/InputStreamInstrumentationModule.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,8 @@ public static void exit(@Advice.This InputStream thizz, @Advice.Return int avail
268268
spanAndBuffer.span,
269269
spanAndBuffer.attributeKey,
270270
spanAndBuffer.byteArrayBuffer,
271-
spanAndBuffer.charset);
271+
spanAndBuffer.charset,
272+
spanAndBuffer.contentEncoding);
272273
contextStore.set(thizz, null);
273274
}
274275
}

instrumentation/java-streams/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/java/inputstream/InputStreamUtils.java

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,22 @@
2323
import io.opentelemetry.api.trace.Tracer;
2424
import io.opentelemetry.context.Context;
2525
import io.opentelemetry.instrumentation.api.util.VirtualField;
26+
import java.io.ByteArrayInputStream;
2627
import java.io.ByteArrayOutputStream;
2728
import java.io.IOException;
2829
import java.io.InputStream;
30+
import java.io.InputStreamReader;
31+
import java.io.OutputStreamWriter;
2932
import java.io.UnsupportedEncodingException;
3033
import java.lang.reflect.InvocationTargetException;
3134
import java.lang.reflect.Method;
3235
import java.nio.charset.Charset;
36+
import java.util.zip.GZIPInputStream;
3337
import org.hypertrace.agent.core.instrumentation.HypertraceCallDepthThreadLocalMap;
3438
import org.hypertrace.agent.core.instrumentation.HypertraceSemanticAttributes;
3539
import org.hypertrace.agent.core.instrumentation.SpanAndBuffer;
40+
import org.hypertrace.agent.core.instrumentation.buffer.BoundedBuffersFactory;
41+
import org.hypertrace.agent.core.instrumentation.buffer.BoundedByteArrayOutputStream;
3642
import org.slf4j.Logger;
3743
import org.slf4j.LoggerFactory;
3844

@@ -85,6 +91,13 @@ public static void addAttribute(Span span, AttributeKey<String> attributeKey, St
8591
spanBuilder.setAttribute(
8692
"http.response.header.content-type", (String) resContentType);
8793
}
94+
Object resContentEncoding =
95+
getAttribute.invoke(
96+
span, HypertraceSemanticAttributes.HTTP_RESPONSE_HEADER_CONTENT_ENCODING);
97+
if (resContentEncoding != null) {
98+
spanBuilder.setAttribute(
99+
"http.response.header.content-encoding", (String) resContentEncoding);
100+
}
88101
}
89102
} catch (IllegalAccessException | InvocationTargetException e) {
90103
// ignore and continue
@@ -100,12 +113,31 @@ public static void addAttribute(Span span, AttributeKey<String> attributeKey, St
100113
}
101114

102115
public static void addBody(
103-
Span span, AttributeKey<String> attributeKey, ByteArrayOutputStream buffer, Charset charset) {
116+
Span span,
117+
AttributeKey<String> attributeKey,
118+
ByteArrayOutputStream buffer,
119+
Charset charset,
120+
String contentEncoding) {
104121
try {
105-
String body = buffer.toString(charset.name());
106-
InputStreamUtils.addAttribute(span, attributeKey, body);
122+
byte[] data = buffer.toByteArray();
123+
124+
// if content-encoding is gzip,
125+
if (contentEncoding != null && contentEncoding.toLowerCase().contains("gzip")) {
126+
try (GZIPInputStream gzipInputStream =
127+
new GZIPInputStream(new ByteArrayInputStream(data))) {
128+
InputStreamReader reader = new InputStreamReader(gzipInputStream, charset);
129+
String body = readInputStream(reader, charset);
130+
InputStreamUtils.addAttribute(span, attributeKey, body);
131+
}
132+
} else {
133+
// No decompression needed, convert directly to string
134+
String body = new String(data, charset);
135+
InputStreamUtils.addAttribute(span, attributeKey, body);
136+
}
107137
} catch (UnsupportedEncodingException e) {
108-
log.error("Failed to parse encofing from charset {}", charset, e);
138+
log.error("Failed to parse encoding from charset {}", charset, e);
139+
} catch (IOException e) {
140+
log.error("Failed to read or decompress data", e);
109141
}
110142
}
111143

@@ -132,7 +164,8 @@ public static void read(
132164
spanAndBuffer.span,
133165
spanAndBuffer.attributeKey,
134166
spanAndBuffer.byteArrayBuffer,
135-
spanAndBuffer.charset);
167+
spanAndBuffer.charset,
168+
spanAndBuffer.contentEncoding);
136169
contextStore.set(inputStream, null);
137170
}
138171
}
@@ -146,7 +179,8 @@ public static void read(
146179
spanAndBuffer.span,
147180
spanAndBuffer.attributeKey,
148181
spanAndBuffer.byteArrayBuffer,
149-
spanAndBuffer.charset);
182+
spanAndBuffer.charset,
183+
spanAndBuffer.contentEncoding);
150184
VirtualField.find(InputStream.class, SpanAndBuffer.class).set(inputStream, null);
151185
}
152186
}
@@ -166,7 +200,8 @@ public static void read(
166200
spanAndBuffer.span,
167201
spanAndBuffer.attributeKey,
168202
spanAndBuffer.byteArrayBuffer,
169-
spanAndBuffer.charset);
203+
spanAndBuffer.charset,
204+
spanAndBuffer.contentEncoding);
170205
contextStore.set(inputStream, null);
171206
}
172207
}
@@ -194,10 +229,24 @@ public static void readNBytes(
194229
spanAndBuffer.span,
195230
spanAndBuffer.attributeKey,
196231
spanAndBuffer.byteArrayBuffer,
197-
spanAndBuffer.charset);
232+
spanAndBuffer.charset,
233+
spanAndBuffer.contentEncoding);
198234
contextStore.set(inputStream, null);
199235
} else {
200236
spanAndBuffer.byteArrayBuffer.write(b, off, read);
201237
}
202238
}
239+
240+
public static String readInputStream(InputStreamReader inputReader, Charset charset)
241+
throws IOException {
242+
BoundedByteArrayOutputStream outputStream = BoundedBuffersFactory.createStream(charset);
243+
try (OutputStreamWriter writer = new OutputStreamWriter(outputStream, charset)) {
244+
int c;
245+
while ((c = inputReader.read()) != -1) {
246+
writer.write(c);
247+
}
248+
writer.flush();
249+
}
250+
return outputStream.toStringWithSuppliedCharset();
251+
}
203252
}

0 commit comments

Comments
 (0)