[go: up one dir, main page]

Skip to content

Commit

Permalink
feat: Open telemetry implementation (#2770)
Browse files Browse the repository at this point in the history
This PR adds support for [OpenTelemetry](https://opentelemetry.io/) Instrumentation for Traces and Metrics. 

Add dependency for [OpenTelemetrySDK](https://opentelemetry.io/docs/instrumentation/java/manual/#initialize-the-sdk) and required [exporters](https://opentelemetry.io/docs/instrumentation/java/exporters/).

Create OpenTelemetry object with required MeterProvider and TracerProvider exporter . Inject OpenTelemetry object via SpannerOptions or register as Global

`
OpenTelemetry openTelemetry = OpenTelemetrySdk.builder()
              .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
              .setTracerProvider(tracerProvider)
              .setMeterProvider(sdkMeterProvider)
              .build;

SpannerOptions options = SpannerOptions.newBuilder().setOpenTelemetry(openTelemetry).build();
`

 By default, OpenTelemetry traces are not enabled. To enable OpenTelemetry traces , call `SpannerOptions.enableOpenTelemetryTraces()` in startup of your application. Enabling OpenTelemetry traces will disable OpenCensus traces. Both OpenCensus and OpenTelemetry traces can not be enabled at the same time.
  • Loading branch information
surbhigarg92 committed Feb 8, 2024
1 parent e2b7ae6 commit 244d6a8
Show file tree
Hide file tree
Showing 53 changed files with 2,719 additions and 489 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -22,6 +22,7 @@ nosetests.xml
.settings
.DS_Store
.classpath
.tool-versions

# Built documentation
docs/
Expand Down
7 changes: 6 additions & 1 deletion google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -426,7 +426,6 @@
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void rollbackToSavepoint(java.lang.String)</method>
</difference>

<!-- Delay start transaction -->
<difference>
<differenceType>7012</differenceType>
Expand Down Expand Up @@ -540,6 +539,12 @@
<className>com/google/cloud/spanner/Dialect</className>
<method>java.lang.String getDefaultSchema()</method>
</difference>
<difference>
<differenceType>7005</differenceType>
<className>com/google/cloud/spanner/PartitionedDmlTransaction</className>
<method>void setSpan(io.opencensus.trace.Span)</method>
<to>void setSpan(com.google.cloud.spanner.ISpan)</to>
</difference>

<!-- Added DirectedReadOptions -->
<difference>
Expand Down
32 changes: 29 additions & 3 deletions google-cloud-spanner/pom.xml
Expand Up @@ -247,6 +247,14 @@
<artifactId>opencensus-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
Expand Down Expand Up @@ -393,7 +401,6 @@
<version>2.2</version>
<scope>test</scope>
</dependency>

<!-- Benchmarking dependencies -->
<dependency>
<groupId>org.openjdk.jmh</groupId>
Expand All @@ -407,9 +414,28 @@
<version>1.37</version>
<scope>test</scope>
</dependency>

<!-- OpenTelemetry test dependencies -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-metrics</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-trace</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
<profile>
<id>java9</id>
Expand Down
Expand Up @@ -39,6 +39,7 @@
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.BeginTransactionRequest;
Expand All @@ -53,8 +54,6 @@
import com.google.spanner.v1.Transaction;
import com.google.spanner.v1.TransactionOptions;
import com.google.spanner.v1.TransactionSelector;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracing;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
Expand All @@ -70,7 +69,8 @@ abstract class AbstractReadContext
abstract static class Builder<B extends Builder<?, T>, T extends AbstractReadContext> {
private SessionImpl session;
private SpannerRpc rpc;
private Span span = Tracing.getTracer().getCurrentSpan();
private ISpan span;
private TraceWrapper tracer;
private int defaultPrefetchChunks = SpannerOptions.Builder.DEFAULT_PREFETCH_CHUNKS;
private QueryOptions defaultQueryOptions = SpannerOptions.Builder.DEFAULT_QUERY_OPTIONS;
private DirectedReadOptions defaultDirectedReadOption;
Expand All @@ -94,11 +94,16 @@ B setRpc(SpannerRpc rpc) {
return self();
}

B setSpan(Span span) {
B setSpan(ISpan span) {
this.span = span;
return self();
}

B setTracer(TraceWrapper tracer) {
this.tracer = tracer;
return self();
}

B setDefaultPrefetchChunks(int defaultPrefetchChunks) {
this.defaultPrefetchChunks = defaultPrefetchChunks;
return self();
Expand Down Expand Up @@ -389,9 +394,12 @@ void initTransaction() {
}
transactionId = transaction.getId();
span.addAnnotation(
"Transaction Creation Done", TraceUtil.getTransactionAnnotations(transaction));
"Transaction Creation Done",
ImmutableMap.of(
"Id", transaction.getId().toStringUtf8(), "Timestamp", timestamp.toString()));

} catch (SpannerException e) {
span.addAnnotation("Transaction Creation Failed", TraceUtil.getExceptionAnnotations(e));
span.addAnnotation("Transaction Creation Failed", e);
throw e;
}
}
Expand All @@ -402,7 +410,8 @@ void initTransaction() {
final SessionImpl session;
final SpannerRpc rpc;
final ExecutorProvider executorProvider;
Span span;
ISpan span;
TraceWrapper tracer;
private final int defaultPrefetchChunks;
private final QueryOptions defaultQueryOptions;

Expand Down Expand Up @@ -435,10 +444,11 @@ void initTransaction() {
this.span = builder.span;
this.executorProvider = builder.executorProvider;
this.clock = builder.clock;
this.tracer = builder.tracer;
}

@Override
public void setSpan(Span span) {
public void setSpan(ISpan span) {
this.span = span;
}

Expand Down Expand Up @@ -692,6 +702,7 @@ ResultSet executeQueryInternalWithOptions(
MAX_BUFFERED_CHUNKS,
SpannerImpl.QUERY,
span,
tracer,
rpc.getExecuteQueryRetrySettings(),
rpc.getExecuteQueryRetryableCodes()) {
@Override
Expand Down Expand Up @@ -752,7 +763,7 @@ public final void invalidate() {

@Override
public void close() {
span.end(TraceUtil.END_SPAN_OPTIONS);
span.end();
synchronized (lock) {
isClosed = true;
}
Expand Down Expand Up @@ -837,6 +848,7 @@ ResultSet readInternalWithOptions(
MAX_BUFFERED_CHUNKS,
SpannerImpl.READ,
span,
tracer,
rpc.getReadRetrySettings(),
rpc.getReadRetryableCodes()) {
@Override
Expand Down
Expand Up @@ -37,7 +37,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.CharSource;
import com.google.common.util.concurrent.Uninterruptibles;
Expand All @@ -53,11 +52,6 @@
import com.google.spanner.v1.Transaction;
import com.google.spanner.v1.TypeCode;
import io.grpc.Context;
import io.opencensus.common.Scope;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import java.io.IOException;
import java.io.Serializable;
import java.math.BigDecimal;
Expand Down Expand Up @@ -87,7 +81,6 @@

/** Implementation of {@link ResultSet}. */
abstract class AbstractResultSet<R> extends AbstractStructReader implements ResultSet {
private static final Tracer tracer = Tracing.getTracer();
private static final com.google.protobuf.Value NULL_VALUE =
com.google.protobuf.Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build();

Expand Down Expand Up @@ -1206,7 +1199,8 @@ abstract static class ResumableStreamIterator extends AbstractIterator<PartialRe
private final BackOff backOff;
private final LinkedList<PartialResultSet> buffer = new LinkedList<>();
private final int maxBufferSize;
private final Span span;
private final ISpan span;
private final TraceWrapper tracer;
private CloseableIterator<PartialResultSet> stream;
private ByteString resumeToken;
private boolean finished;
Expand All @@ -1220,12 +1214,14 @@ abstract static class ResumableStreamIterator extends AbstractIterator<PartialRe
protected ResumableStreamIterator(
int maxBufferSize,
String streamName,
Span parent,
ISpan parent,
TraceWrapper tracer,
RetrySettings streamingRetrySettings,
Set<Code> retryableCodes) {
checkArgument(maxBufferSize >= 0);
this.maxBufferSize = maxBufferSize;
this.span = tracer.spanBuilderWithExplicitParent(streamName, parent).startSpan();
this.tracer = tracer;
this.span = tracer.spanBuilderWithExplicitParent(streamName, parent);
this.streamingRetrySettings = Preconditions.checkNotNull(streamingRetrySettings);
this.retryableCodes = Preconditions.checkNotNull(retryableCodes);
this.backOff = newBackOff();
Expand Down Expand Up @@ -1281,11 +1277,7 @@ private static long nextBackOffMillis(BackOff backoff) throws SpannerException {
}

private void backoffSleep(Context context, long backoffMillis) throws SpannerException {
tracer
.getCurrentSpan()
.addAnnotation(
"Backing off",
ImmutableMap.of("Delay", AttributeValue.longAttributeValue(backoffMillis)));
tracer.getCurrentSpan().addAnnotation("Backing off", "Delay", backoffMillis);
final CountDownLatch latch = new CountDownLatch(1);
final Context.CancellationListener listener =
ignored -> {
Expand Down Expand Up @@ -1325,7 +1317,7 @@ public void execute(Runnable command) {
public void close(@Nullable String message) {
if (stream != null) {
stream.close(message);
span.end(TraceUtil.END_SPAN_OPTIONS);
span.end();
stream = null;
}
}
Expand All @@ -1343,11 +1335,9 @@ protected PartialResultSet computeNext() {
if (stream == null) {
span.addAnnotation(
"Starting/Resuming stream",
ImmutableMap.of(
"ResumeToken",
AttributeValue.stringAttributeValue(
resumeToken == null ? "null" : resumeToken.toStringUtf8())));
try (Scope s = tracer.withSpan(span)) {
"ResumeToken",
resumeToken == null ? "null" : resumeToken.toStringUtf8());
try (IScope scope = tracer.withSpan(span)) {
// When start a new stream set the Span as current to make the gRPC Span a child of
// this Span.
stream = checkNotNull(startStream(resumeToken));
Expand Down Expand Up @@ -1387,17 +1377,15 @@ protected PartialResultSet computeNext() {
}
} catch (SpannerException spannerException) {
if (safeToRetry && isRetryable(spannerException)) {
span.addAnnotation(
"Stream broken. Safe to retry",
TraceUtil.getExceptionAnnotations(spannerException));
span.addAnnotation("Stream broken. Safe to retry", spannerException);
logger.log(Level.FINE, "Retryable exception, will sleep and retry", spannerException);
// Truncate any items in the buffer before the last retry token.
while (!buffer.isEmpty() && buffer.getLast().getResumeToken().isEmpty()) {
buffer.removeLast();
}
assert buffer.isEmpty() || buffer.getLast().getResumeToken().equals(resumeToken);
stream = null;
try (Scope s = tracer.withSpan(span)) {
try (IScope s = tracer.withSpan(span)) {
long delay = spannerException.getRetryDelayInMillis();
if (delay != -1) {
backoffSleep(context, delay);
Expand All @@ -1408,12 +1396,12 @@ protected PartialResultSet computeNext() {

continue;
}
span.addAnnotation("Stream broken. Not safe to retry");
TraceUtil.setWithFailure(span, spannerException);
span.addAnnotation("Stream broken. Not safe to retry", spannerException);
span.setStatus(spannerException);
throw spannerException;
} catch (RuntimeException e) {
span.addAnnotation("Stream broken. Not safe to retry");
TraceUtil.setWithFailure(span, e);
span.addAnnotation("Stream broken. Not safe to retry", e);
span.setStatus(e);
throw e;
}
}
Expand Down
Expand Up @@ -28,31 +28,27 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;

/** Implementation of {@link AsyncTransactionManager}. */
final class AsyncTransactionManagerImpl
implements CommittableAsyncTransactionManager, SessionTransaction {
private static final Tracer tracer = Tracing.getTracer();

private final SessionImpl session;
private Span span;
private ISpan span;
private final Options options;

private TransactionRunnerImpl.TransactionContextImpl txn;
private TransactionState txnState;
private final SettableApiFuture<CommitResponse> commitResponse = SettableApiFuture.create();

AsyncTransactionManagerImpl(SessionImpl session, Span span, TransactionOption... options) {
AsyncTransactionManagerImpl(SessionImpl session, ISpan span, TransactionOption... options) {
this.session = session;
this.span = span;
this.options = Options.fromTransactionOptions(options);
}

@Override
public void setSpan(Span span) {
public void setSpan(ISpan span) {
this.span = span;
}

Expand Down

0 comments on commit 244d6a8

Please sign in to comment.