Skip to content

Commit

Permalink
Use deadline from callContext, not callOptions
Browse files Browse the repository at this point in the history
  • Loading branch information
djyau committed Oct 16, 2024
1 parent ff7c311 commit a336fa4
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package com.google.cloud.bigtable.data.v2.stub.metrics;

import io.grpc.ClientStreamTracer;
import io.grpc.Deadline;
import io.grpc.Metadata;
import org.threeten.bp.Duration;

/**
* Records the time a request is enqueued in a grpc channel queue. This a bridge between gRPC stream
Expand All @@ -27,27 +27,25 @@
class BigtableGrpcStreamTracer extends ClientStreamTracer {

private final BigtableTracer tracer;
private final Deadline deadline;
private final Duration deadline;

public BigtableGrpcStreamTracer(BigtableTracer tracer, Deadline deadline) {
public BigtableGrpcStreamTracer(BigtableTracer tracer, Duration deadline) {
this.tracer = tracer;
this.deadline = deadline;
}

@Override
public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) {
tracer.grpcMessageSent();
if (deadline != null) {
tracer.setRemainingDeadline(deadline.timeRemaining(TimeUnit.MILLISECONDS));
}
tracer.setRemainingDeadline(deadline.toMillis());
}

static class Factory extends ClientStreamTracer.Factory {

private final BigtableTracer tracer;
private final Deadline deadline;
private final Duration deadline;

Factory(BigtableTracer tracer, Deadline deadline) {
Factory(BigtableTracer tracer, Duration deadline) {
this.tracer = tracer;
this.deadline = deadline;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void grpcMessageSent() {
* Set the remaining customer specified deadline so it can be exported in a metric. This will
* be called in BuiltinMetricsTracer.
*/
public void setRemainingDeadline(long deadlineRemaining) {
public void setRemainingDeadline(long deadline) {
// noop
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ class BuiltinMetricsTracer extends BigtableTracer {
private Long serverLatencies = null;
private final AtomicLong grpcMessageSentDelay = new AtomicLong(0);

private long deadlineRemaining;

// OpenCensus (and server) histogram buckets use [start, end), however OpenTelemetry uses (start,
// end]. To work around this, we measure all the latencies in nanoseconds and convert them
// to milliseconds and use DoubleHistogram. This should minimize the chance of a data
Expand All @@ -98,6 +96,7 @@ class BuiltinMetricsTracer extends BigtableTracer {
private final DoubleHistogram firstResponseLatenciesHistogram;
private final DoubleHistogram clientBlockingLatenciesHistogram;
private final DoubleHistogram applicationBlockingLatenciesHistogram;

private final DoubleHistogram remainingDeadlineHistogram;
private final LongCounter connectivityErrorCounter;
private final LongCounter retryCounter;
Expand Down Expand Up @@ -274,9 +273,9 @@ public void grpcMessageSent() {
}

@Override
public void setRemainingDeadline(long deadlineRemaining) {
// update remaining deadline variable
this.deadlineRemaining = deadlineRemaining;
public void setRemainingDeadline(long deadline) {
long timeElapsed = attemptTimer.elapsed(TimeUnit.MILLISECONDS);
long deadlineRemaining = deadline - timeElapsed;
}

@Override
Expand Down Expand Up @@ -321,8 +320,6 @@ private void recordOperationCompletion(@Nullable Throwable status) {
long applicationLatencyNano = operationLatencyNano - totalServerLatencyNano.get();
applicationBlockingLatenciesHistogram.record(convertToMs(applicationLatencyNano), attributes);

remainingDeadlineHistogram.record(deadlineRemaining, attributes);

if (operationType == OperationType.ServerStreaming
&& spanName.getMethodName().equals("ReadRows")) {
firstResponseLatenciesHistogram.record(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,9 @@ public void grpcChannelQueuedLatencies(long queuedTimeMs) {
}

@Override
<<<<<<< HEAD
public void grpcMessageSent() {
public void setRemainingDeadline(long deadline) {
for (BigtableTracer tracer : bigtableTracers) {
tracer.grpcMessageSent();
=======
public void setRemainingDeadline(long deadlineRemaining) {
for (BigtableTracer tracer : bigtableTracers) {
tracer.setRemainingDeadline(deadlineRemaining);
>>>>>>> 0ad9a399 (Add override to CompositeTracer)
tracer.setRemainingDeadline(deadline);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.CallOptions;
import io.grpc.Deadline;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.opencensus.tags.TagValue;
import org.threeten.bp.Duration;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
Expand Down Expand Up @@ -223,7 +224,7 @@ static GrpcCallContext injectBigtableStreamTracer(
if (context instanceof GrpcCallContext) {
GrpcCallContext callContext = (GrpcCallContext) context;
CallOptions callOptions = callContext.getCallOptions();
Deadline deadline = callOptions.getDeadline();
Duration deadline = callContext.getTimeout();
return responseMetadata.addHandlers(
callContext.withCallOptions(
callOptions.withStreamTracerFactory(new BigtableGrpcStreamTracer.Factory(tracer, deadline))));
Expand Down

0 comments on commit a336fa4

Please sign in to comment.