Skip to content
This repository has been archived by the owner on Jul 1, 2022. It is now read-only.

Avoid direct access to apache thrift from jaeger-core via transitive … #374

Merged
merged 5 commits into from
Apr 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions jaeger-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ dependencies {
compile group: 'com.squareup.okhttp3', name: 'okhttp', version: '3.9.0'

// Testing frameworks
testCompile project(path: ':jaeger-thrift', configuration: 'tests')

// Jersey dependencies for unit tests
testCompile group: 'org.glassfish.jersey.test-framework.providers', name: 'jersey-test-framework-provider-grizzly2', version: jerseyVersion

Expand Down
27 changes: 14 additions & 13 deletions jaeger-core/src/main/java/com/uber/jaeger/senders/HttpSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.uber.jaeger.senders;

import com.uber.jaeger.exceptions.SenderException;
import com.uber.jaeger.thriftjava.Batch;
import com.uber.jaeger.thriftjava.Process;
import com.uber.jaeger.thriftjava.Span;
Expand All @@ -29,16 +30,12 @@
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;

@ToString(exclude = {"httpClient", "serializer", "requestBuilder"})
@ToString(exclude = {"httpClient", "requestBuilder"})
public class HttpSender extends ThriftSender {
private static final String HTTP_COLLECTOR_JAEGER_THRIFT_FORMAT_PARAM = "format=jaeger.thrift";
private static final int ONE_MB_IN_BYTES = 1048576;
private static final MediaType MEDIA_TYPE_THRIFT = MediaType.parse("application/x-thrift");
private final TSerializer serializer;
private final OkHttpClient httpClient;
private final Request.Builder requestBuilder;

Expand All @@ -57,7 +54,7 @@ public HttpSender(String endpoint) {
* @param endpoint Jaeger REST endpoint consuming jaeger.thrift, e.g
* http://localhost:14268/api/traces
* @param maxPacketSize max bytes to serialize as payload, if 0 it will use
* {@value com.uber.jaeger.reporters.protocols.ThriftUdpTransport#MAX_PACKET_SIZE}
* {@value com.uber.jaeger.thrift.reporters.protocols.ThriftUdpTransport#MAX_PACKET_SIZE}
*
* @deprecated use {@link HttpSender.Builder} with fluent API
*/
Expand All @@ -78,7 +75,7 @@ public HttpSender(String endpoint, OkHttpClient client) {
* @param endpoint Jaeger REST endpoint consuming jaeger.thrift, e.g
* http://localhost:14268/api/traces
* @param maxPacketSize max bytes to serialize as payload, if 0 it will use
* {@value com.uber.jaeger.reporters.protocols.ThriftUdpTransport#MAX_PACKET_SIZE}
* {@value com.uber.jaeger.thrift.reporters.protocols.ThriftUdpTransport#MAX_PACKET_SIZE}
* @param client a client used to make http requests
* @deprecated use {@link HttpSender.Builder} with fluent API
*/
Expand All @@ -90,29 +87,33 @@ public HttpSender(String endpoint, int maxPacketSize, OkHttpClient client) {
}

private HttpSender(Builder builder) {
super(new TBinaryProtocol.Factory(), builder.maxPacketSize);
super(ProtocolType.Binary, builder.maxPacketSize);
HttpUrl collectorUrl = HttpUrl
.parse(String.format("%s?%s", builder.endpoint, HTTP_COLLECTOR_JAEGER_THRIFT_FORMAT_PARAM));
if (collectorUrl == null) {
throw new IllegalArgumentException("Could not parse url.");
}
this.httpClient = builder.clientBuilder.build();
this.requestBuilder = new Request.Builder().url(collectorUrl);
this.serializer = new TSerializer(protocolFactory);
}

@Override
public void send(Process process, List<Span> spans) throws TException {
public void send(Process process, List<Span> spans) throws SenderException {
Batch batch = new Batch(process, spans);
byte[] bytes = serializer.serialize(batch);
byte[] bytes = null;
try {
bytes = serialize(batch);
} catch (Exception e) {
throw new SenderException(String.format("Failed to serialize %d spans", spans.size()), e, spans.size());
}

RequestBody body = RequestBody.create(MEDIA_TYPE_THRIFT, bytes);
Request request = requestBuilder.post(body).build();
Response response;
try {
response = httpClient.newCall(request).execute();
} catch (IOException e) {
throw new TException(String.format("Could not send %d spans", spans.size()), e);
throw new SenderException(String.format("Could not send %d spans", spans.size()), e, spans.size());
}

if (!response.isSuccessful()) {
Expand All @@ -125,7 +126,7 @@ public void send(Process process, List<Span> spans) throws TException {

String exceptionMessage = String.format("Could not send %d spans, response %d: %s",
spans.size(), response.code(), responseBody);
throw new TException(exceptionMessage);
throw new SenderException(exceptionMessage, null, spans.size());
}
}

Expand Down
72 changes: 33 additions & 39 deletions jaeger-core/src/main/java/com/uber/jaeger/senders/ThriftSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,67 +17,52 @@
import com.uber.jaeger.Span;
import com.uber.jaeger.exceptions.SenderException;
import com.uber.jaeger.reporters.protocols.JaegerThriftSpanConverter;
import com.uber.jaeger.reporters.protocols.ThriftUdpTransport;
import com.uber.jaeger.thrift.reporters.protocols.ThriftUdpTransport;
import com.uber.jaeger.thrift.senders.ThriftSenderBase;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I fully understand this change. Don't these imports still retain the dependency on jaeger-thrift?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes there is still a dependency from core to jaeger-thrift. This PR was about removing direct use of apache thrift from core, as a dependency leak.

More work is required to make a true transport abstraction. Is this something also for 1.0?

import com.uber.jaeger.thriftjava.Process;
import java.util.ArrayList;
import java.util.List;

import lombok.ToString;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.AutoExpandingBufferWriteTransport;

@ToString(exclude = {"spanBuffer", "memoryTransport"})
public abstract class ThriftSender implements Sender {
static final int EMIT_BATCH_OVERHEAD = 33;
@ToString(exclude = {"spanBuffer"})
public abstract class ThriftSender extends ThriftSenderBase implements Sender {

private Process process;
private int processBytesSize;
private List<com.uber.jaeger.thriftjava.Span> spanBuffer;
private int byteBufferSize;
private AutoExpandingBufferWriteTransport memoryTransport;

protected final TProtocolFactory protocolFactory;
private final int maxSpanBytes;

/**
* @param protocolFactory protocol factory
* @param protocolType protocol type (compact or binary)
* @param maxPacketSize if 0 it will use default value {@value ThriftUdpTransport#MAX_PACKET_SIZE}
*/
public ThriftSender(TProtocolFactory protocolFactory, int maxPacketSize) {
this.protocolFactory = protocolFactory;
public ThriftSender(ProtocolType protocolType, int maxPacketSize) {
super(protocolType, maxPacketSize);

if (maxPacketSize == 0) {
maxPacketSize = ThriftUdpTransport.MAX_PACKET_SIZE;
}

memoryTransport = new AutoExpandingBufferWriteTransport(maxPacketSize, 2);
maxSpanBytes = maxPacketSize - EMIT_BATCH_OVERHEAD;
spanBuffer = new ArrayList<com.uber.jaeger.thriftjava.Span>();
}

public abstract void send(Process process, List<com.uber.jaeger.thriftjava.Span> spans) throws TException;

@Override
public int append(Span span) throws SenderException {
if (process == null) {
process = new Process(span.getTracer().getServiceName());
process.setTags(JaegerThriftSpanConverter.buildTags(span.getTracer().tags()));
processBytesSize = getSizeOfSerializedThrift(process);
processBytesSize = calculateProcessSize(process);
byteBufferSize += processBytesSize;
}

com.uber.jaeger.thriftjava.Span thriftSpan = JaegerThriftSpanConverter.convertSpan(span);
int spanSize = getSizeOfSerializedThrift(thriftSpan);
if (spanSize > maxSpanBytes) {
int spanSize = calculateSpanSize(thriftSpan);
if (spanSize > getMaxSpanBytes()) {
throw new SenderException(String.format("ThriftSender received a span that was too large, size = %d, max = %d",
spanSize, maxSpanBytes), null, 1);
spanSize, getMaxSpanBytes()), null, 1);
}

byteBufferSize += spanSize;
if (byteBufferSize <= maxSpanBytes) {
if (byteBufferSize <= getMaxSpanBytes()) {
spanBuffer.add(thriftSpan);
if (byteBufferSize < maxSpanBytes) {
if (byteBufferSize < getMaxSpanBytes()) {
return 0;
}
return flush();
Expand All @@ -96,6 +81,24 @@ public int append(Span span) throws SenderException {
return n;
}

protected int calculateProcessSize(Process proc) throws SenderException {
try {
return getSize(proc);
} catch (Exception e) {
throw new SenderException("ThriftSender failed writing Process to memory buffer.", e, 1);
}
}

protected int calculateSpanSize(com.uber.jaeger.thriftjava.Span span) throws SenderException {
try {
return getSize(span);
} catch (Exception e) {
throw new SenderException("ThriftSender failed writing Span to memory buffer.", e, 1);
}
}

public abstract void send(Process process, List<com.uber.jaeger.thriftjava.Span> spans) throws SenderException;

@Override
public int flush() throws SenderException {
if (spanBuffer.isEmpty()) {
Expand All @@ -105,7 +108,7 @@ public int flush() throws SenderException {
int n = spanBuffer.size();
try {
send(process, spanBuffer);
} catch (TException e) {
} catch (SenderException e) {
throw new SenderException("Failed to flush spans.", e, n);
} finally {
spanBuffer.clear();
Expand All @@ -119,13 +122,4 @@ public int close() throws SenderException {
return flush();
}

int getSizeOfSerializedThrift(TBase thriftBase) throws SenderException {
memoryTransport.reset();
try {
thriftBase.write(protocolFactory.getProtocol(memoryTransport));
} catch (TException e) {
throw new SenderException("ThriftSender failed writing to memory buffer.", e, 1);
}
return memoryTransport.getPos();
}
}
14 changes: 8 additions & 6 deletions jaeger-core/src/main/java/com/uber/jaeger/senders/UdpSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@

import com.uber.jaeger.agent.thrift.Agent;
import com.uber.jaeger.exceptions.SenderException;
import com.uber.jaeger.reporters.protocols.ThriftUdpTransport;
import com.uber.jaeger.thrift.reporters.protocols.ThriftUdpTransport;
import com.uber.jaeger.thriftjava.Batch;
import com.uber.jaeger.thriftjava.Process;
import java.util.List;
import lombok.ToString;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;

@ToString(exclude = {"agentClient"})
public class UdpSender extends ThriftSender {
Expand All @@ -46,7 +44,7 @@ public UdpSender() {
* @param maxPacketSize if 0 it will use {@value ThriftUdpTransport#MAX_PACKET_SIZE}
*/
public UdpSender(String host, int port, int maxPacketSize) {
super(new TCompactProtocol.Factory(), maxPacketSize);
super(ProtocolType.Compact, maxPacketSize);

if (host == null || host.length() == 0) {
host = DEFAULT_AGENT_UDP_HOST;
Expand All @@ -61,8 +59,12 @@ public UdpSender(String host, int port, int maxPacketSize) {
}

@Override
public void send(Process process, List<com.uber.jaeger.thriftjava.Span> spans) throws TException {
agentClient.emitBatch(new Batch(process, spans));
public void send(Process process, List<com.uber.jaeger.thriftjava.Span> spans) throws SenderException {
try {
agentClient.emitBatch(new Batch(process, spans));
} catch (Exception e) {
throw new SenderException(String.format("Could not send %d spans", spans.size()), e, spans.size());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.junit.Assert.fail;

import com.uber.jaeger.Configuration;
import com.uber.jaeger.exceptions.SenderException;
import com.uber.jaeger.thriftjava.Process;
import com.uber.jaeger.thriftjava.Span;

Expand All @@ -33,7 +34,6 @@
import javax.ws.rs.core.Response;

import okhttp3.OkHttpClient;
import org.apache.thrift.TException;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.test.JerseyTest;
import org.junit.Before;
Expand Down Expand Up @@ -71,7 +71,7 @@ public void sendHappy() throws Exception {
.send(new Process("name"), generateSpans());
}

@Test(expected = TException.class)
@Test(expected = Exception.class)
public void sendServerError() throws Exception {
HttpSender sender = new HttpSender(target("/api/tracesErr").getUri().toString());
sender.send(new Process("robotrock"), generateSpans());
Expand All @@ -82,12 +82,18 @@ public void misconfiguredUrl() throws Exception {
new HttpSender("misconfiguredUrl");
}

@Test(expected = TException.class)
@Test(expected = Exception.class)
public void serverDoesntExist() throws Exception {
HttpSender sender = new HttpSender("http://some-server/api/traces");
sender.send(new Process("robotrock"), generateSpans());
}

@Test(expected = SenderException.class)
public void senderFail() throws Exception {
HttpSender sender = new HttpSender("http://some-server/api/traces");
sender.send(null, generateSpans());
}

@Test
public void sendWithoutAuthData() throws Exception {
System.setProperty(Configuration.JAEGER_ENDPOINT, target("/api/traces").getUri().toString());
Expand Down Expand Up @@ -125,7 +131,7 @@ public void sanityTestForTokenAuthTest() throws Exception {
try {
sender.send(new Process("robotrock"), generateSpans());
fail("expecting exception");
} catch (TException te) {
} catch (Exception te) {
assertTrue(te.getMessage().contains("response 401"));
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2017-2018, The Jaeger Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package com.uber.jaeger.senders;

import com.uber.jaeger.exceptions.SenderException;
import com.uber.jaeger.thrift.senders.ThriftSenderBase.ProtocolType;
import com.uber.jaeger.thriftjava.Process;
import com.uber.jaeger.thriftjava.Span;

import java.util.List;

import org.junit.Test;

/**
* This class tests the abstract ThriftSender.
*/
public class ThriftSenderTest {

@Test(expected = SenderException.class)
public void calculateProcessSizeNull() throws Exception {
ThriftSender sender = new ThriftSender(ProtocolType.Compact, 0) {
@Override
public void send(Process process, List<Span> spans) throws SenderException {
}
};

sender.calculateProcessSize(null);
}

@Test(expected = SenderException.class)
public void calculateSpanSizeNull() throws Exception {
ThriftSender sender = new ThriftSender(ProtocolType.Compact, 0) {
@Override
public void send(Process process, List<Span> spans) throws SenderException {
}
};

sender.calculateSpanSize(null);
}

}
Loading