[go: up one dir, main page]

Skip to content

Commit

Permalink
feat: create the backbone of counting errors per connection each minu…
Browse files Browse the repository at this point in the history
…te. (#2094)

* Create the backbone of counting errors per connection each minute.

* Clean up creating new interceptors and StatsRecorderWrapper ctor.

* Rename setting background task and fix imports.

* Temporarily skip exporting per connection metrics to fix test failures.

* Temporarily share the tests for debugging purposes

* Temporarily add the test for debugging.

* Remove the new ExecutorProvider and fix integration test failures.

* Update unit tests to reflect the new setup.

* Clean up and add tests.

* Clean comments and add a TODO.

* Improve tests and comments.

* Address comments and refactor by defining new classes.

* Fix code formatting.

* Refactor classes and move to better packages.

* Clean up classes and address comments.

* Update the scheduler object.

* Apply cleanups.

* Fix unit tests and avoid hanging when getting error in close().

* Fix code formatting.

* Improve error handling in the close() method.

* Improve exception logging.

* Fix code formatting.

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
rkaregar and gcf-owl-bot[bot] committed Feb 16, 2024
1 parent 2607fff commit 7d27816
Show file tree
Hide file tree
Showing 10 changed files with 543 additions and 17 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -42,3 +42,4 @@ api_key
artman-genfiles

.flattened-pom.xml
dependency-reduced-pom.xml
Expand Up @@ -51,6 +51,10 @@ public void export(Collection<Metric> metrics) {
if (!metric.getMetricDescriptor().getName().contains("bigtable")) {
continue;
}
// TODO: temporarily skip exporting per connection metrics.
if (metric.getMetricDescriptor().getName().contains("per_connection_error_count")) {
continue;
}

projectToTimeSeries =
metric.getTimeSeriesList().stream()
Expand Down
@@ -0,0 +1,57 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.stats;

import com.google.api.core.InternalApi;
import io.opencensus.stats.MeasureMap;
import io.opencensus.stats.StatsRecorder;
import io.opencensus.tags.TagContext;
import io.opencensus.tags.TagContextBuilder;
import io.opencensus.tags.TagKey;
import io.opencensus.tags.TagValue;
import io.opencensus.tags.Tagger;
import io.opencensus.tags.Tags;
import java.util.Map;

/** A wrapper to record built-in metrics for connection metrics not tied to operations/RPCs. */
@InternalApi("For internal use only")
public class StatsRecorderWrapperForConnection {
private final StatsRecorder statsRecorder;
private final TagContext tagContext;
private MeasureMap perConnectionErrorCountMeasureMap;

public StatsRecorderWrapperForConnection(
Map<String, String> statsAttributes, StatsRecorder statsRecorder) {
this.statsRecorder = statsRecorder;

this.perConnectionErrorCountMeasureMap = statsRecorder.newMeasureMap();

Tagger tagger = Tags.getTagger();
TagContextBuilder tagContextBuilder = tagger.toBuilder(tagger.getCurrentTagContext());
for (Map.Entry<String, String> entry : statsAttributes.entrySet()) {
tagContextBuilder.putLocal(TagKey.create(entry.getKey()), TagValue.create(entry.getValue()));
}
this.tagContext = tagContextBuilder.build();
}

public void putAndRecordPerConnectionErrorCount(long errorCount) {
perConnectionErrorCountMeasureMap.put(
BuiltinMeasureConstants.PER_CONNECTION_ERROR_COUNT, errorCount);

perConnectionErrorCountMeasureMap.record(tagContext);
perConnectionErrorCountMeasureMap = statsRecorder.newMeasureMap();
}
}
Expand Up @@ -40,6 +40,11 @@ public static StatsRecorderWrapper createRecorder(
operationType, spanName, statsAttributes, Stats.getStatsRecorder());
}

public static StatsRecorderWrapperForConnection createRecorderForConnection(
Map<String, String> statsAttributes) {
return new StatsRecorderWrapperForConnection(statsAttributes, Stats.getStatsRecorder());
}

// This is used in integration tests to get the tag value strings from view manager because Stats
// is relocated to com.google.bigtable.veneer.repackaged.io.opencensus.
@InternalApi("Visible for testing")
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.ApiFunction;
import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.gax.batching.Batcher;
Expand Down Expand Up @@ -94,6 +95,7 @@
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.ErrorCountPerConnectionMetricTracker;
import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants;
import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersServerStreamingCallable;
Expand All @@ -117,6 +119,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannelBuilder;
import io.opencensus.stats.Stats;
import io.opencensus.stats.StatsRecorder;
import io.opencensus.tags.TagKey;
Expand Down Expand Up @@ -149,7 +152,6 @@
public class EnhancedBigtableStub implements AutoCloseable {
private static final String CLIENT_NAME = "Bigtable";
private static final long FLOW_CONTROL_ADJUSTING_INTERVAL_MS = TimeUnit.SECONDS.toMillis(20);

private final EnhancedBigtableStubSettings settings;
private final ClientContext clientContext;

Expand All @@ -176,7 +178,6 @@ public class EnhancedBigtableStub implements AutoCloseable {

public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
throws IOException {

settings = settings.toBuilder().setTracerFactory(createBigtableTracerFactory(settings)).build();
ClientContext clientContext = createClientContext(settings);

Expand Down Expand Up @@ -204,10 +205,27 @@ public static ClientContext createClientContext(EnhancedBigtableStubSettings set
? ((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider()).toBuilder()
: null;

if (builder.getEnableRoutingCookie() && transportProvider != null) {
// TODO: this also need to be added to BigtableClientFactory
// patch cookies interceptor
transportProvider.setInterceptorProvider(() -> ImmutableList.of(new CookiesInterceptor()));
ErrorCountPerConnectionMetricTracker errorCountPerConnectionMetricTracker;
if (transportProvider != null) {
errorCountPerConnectionMetricTracker =
new ErrorCountPerConnectionMetricTracker(createBuiltinAttributes(builder));
ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> oldChannelConfigurator =
transportProvider.getChannelConfigurator();
transportProvider.setChannelConfigurator(
managedChannelBuilder -> {
if (settings.getEnableRoutingCookie()) {
managedChannelBuilder.intercept(new CookiesInterceptor());
}

managedChannelBuilder.intercept(errorCountPerConnectionMetricTracker.getInterceptor());

if (oldChannelConfigurator != null) {
managedChannelBuilder = oldChannelConfigurator.apply(managedChannelBuilder);
}
return managedChannelBuilder;
});
} else {
errorCountPerConnectionMetricTracker = null;
}

// Inject channel priming
Expand All @@ -233,7 +251,12 @@ public static ClientContext createClientContext(EnhancedBigtableStubSettings set
builder.setTransportChannelProvider(transportProvider.build());
}

return ClientContext.create(builder.build());
ClientContext clientContext = ClientContext.create(builder.build());
if (errorCountPerConnectionMetricTracker != null) {
errorCountPerConnectionMetricTracker.startConnectionErrorCountTracker(
clientContext.getExecutor());
}
return clientContext;
}

public static ApiTracerFactory createBigtableTracerFactory(
Expand All @@ -254,13 +277,7 @@ public static ApiTracerFactory createBigtableTracerFactory(
.put(RpcMeasureConstants.BIGTABLE_INSTANCE_ID, TagValue.create(instanceId))
.put(RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID, TagValue.create(appProfileId))
.build();
ImmutableMap<String, String> builtinAttributes =
ImmutableMap.<String, String>builder()
.put("project_id", projectId)
.put("instance", instanceId)
.put("app_profile", appProfileId)
.put("client_name", "bigtable-java/" + Version.VERSION)
.build();
ImmutableMap<String, String> builtinAttributes = createBuiltinAttributes(settings.toBuilder());

return new CompositeTracerFactory(
ImmutableList.of(
Expand All @@ -283,6 +300,16 @@ public static ApiTracerFactory createBigtableTracerFactory(
settings.getTracerFactory()));
}

private static ImmutableMap<String, String> createBuiltinAttributes(
EnhancedBigtableStubSettings.Builder builder) {
return ImmutableMap.<String, String>builder()
.put("project_id", builder.getProjectId())
.put("instance", builder.getInstanceId())
.put("app_profile", builder.getAppProfileId())
.put("client_name", "bigtable-java/" + Version.VERSION)
.build();
}

private static void patchCredentials(EnhancedBigtableStubSettings.Builder settings)
throws IOException {
int i = settings.getEndpoint().lastIndexOf(":");
Expand Down
Expand Up @@ -652,7 +652,6 @@ private Builder() {
setCredentialsProvider(defaultCredentialsProviderBuilder().build());
this.enableRoutingCookie = true;
this.enableRetryInfo = true;

// Defaults provider
BigtableStubSettings.Builder baseDefaults = BigtableStubSettings.newBuilder();

Expand Down Expand Up @@ -772,7 +771,6 @@ private Builder(EnhancedBigtableStubSettings settings) {
jwtAudienceMapping = settings.jwtAudienceMapping;
enableRoutingCookie = settings.enableRoutingCookie;
enableRetryInfo = settings.enableRetryInfo;

// Per method settings.
readRowsSettings = settings.readRowsSettings.toBuilder();
readRowSettings = settings.readRowSettings.toBuilder();
Expand Down
@@ -0,0 +1,89 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.util.concurrent.atomic.LongAdder;
import java.util.logging.Level;
import java.util.logging.Logger;

/** An interceptor which counts the number of failed responses for a channel. */
class ConnectionErrorCountInterceptor implements ClientInterceptor {
private static final Logger LOG =
Logger.getLogger(ConnectionErrorCountInterceptor.class.toString());
private final LongAdder numOfErrors;
private final LongAdder numOfSuccesses;

ConnectionErrorCountInterceptor() {
numOfErrors = new LongAdder();
numOfSuccesses = new LongAdder();
}

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
channel.newCall(methodDescriptor, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
// Connection accounting is non-critical, so we log the exception, but let normal
// processing proceed.
try {
handleOnCloseUnsafe(status);
} catch (Throwable t) {
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
LOG.log(
Level.WARNING, "Unexpected error while updating connection error stats", t);
}
super.onClose(status, trailers);
}

private void handleOnCloseUnsafe(Status status) {
if (status.isOk()) {
numOfSuccesses.increment();
} else {
numOfErrors.increment();
}
}
},
headers);
}
};
}

long getAndResetNumOfErrors() {
return numOfErrors.sumThenReset();
}

long getAndResetNumOfSuccesses() {
return numOfSuccesses.sumThenReset();
}
}
@@ -0,0 +1,83 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.core.InternalApi;
import com.google.cloud.bigtable.stats.StatsRecorderWrapperForConnection;
import com.google.cloud.bigtable.stats.StatsWrapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.grpc.ClientInterceptor;
import java.util.Collections;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/* Background task that goes through all connections and updates the errors_per_connection metric. */
@InternalApi("For internal use only")
public class ErrorCountPerConnectionMetricTracker implements Runnable {
private static final Integer PER_CONNECTION_ERROR_COUNT_PERIOD_SECONDS = 60;
private final Set<ConnectionErrorCountInterceptor> connectionErrorCountInterceptors;
private final Object interceptorsLock = new Object();
// This is not final so that it can be updated and mocked during testing.
private StatsRecorderWrapperForConnection statsRecorderWrapperForConnection;

@VisibleForTesting
void setStatsRecorderWrapperForConnection(
StatsRecorderWrapperForConnection statsRecorderWrapperForConnection) {
this.statsRecorderWrapperForConnection = statsRecorderWrapperForConnection;
}

public ErrorCountPerConnectionMetricTracker(ImmutableMap<String, String> builtinAttributes) {
connectionErrorCountInterceptors =
Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>()));

this.statsRecorderWrapperForConnection =
StatsWrapper.createRecorderForConnection(builtinAttributes);
}

public void startConnectionErrorCountTracker(ScheduledExecutorService scheduler) {
scheduler.scheduleAtFixedRate(
this, 0, PER_CONNECTION_ERROR_COUNT_PERIOD_SECONDS, TimeUnit.SECONDS);
}

public ClientInterceptor getInterceptor() {
ConnectionErrorCountInterceptor connectionErrorCountInterceptor =
new ConnectionErrorCountInterceptor();
synchronized (interceptorsLock) {
connectionErrorCountInterceptors.add(connectionErrorCountInterceptor);
}
return connectionErrorCountInterceptor;
}

@Override
public void run() {
synchronized (interceptorsLock) {
for (ConnectionErrorCountInterceptor interceptor : connectionErrorCountInterceptors) {
long errors = interceptor.getAndResetNumOfErrors();
long successes = interceptor.getAndResetNumOfSuccesses();
// We avoid keeping track of inactive connections (i.e., without any failed or successful
// requests).
if (errors > 0 || successes > 0) {
// TODO: add a metric to also keep track of the number of successful requests per each
// connection.
statsRecorderWrapperForConnection.putAndRecordPerConnectionErrorCount(errors);
}
}
}
}
}

0 comments on commit 7d27816

Please sign in to comment.