[go: up one dir, main page]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rework low level message stream retries, add debugging #1713

Merged
merged 6 commits into from
Apr 16, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: clarify 'debug' channel a bit, add explicit debug message class
  • Loading branch information
feywind committed Apr 15, 2023
commit 5407338a73dfe67e47cfdcb945eb02a82a52371e
33 changes: 33 additions & 0 deletions src/debug.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* Represents a debug message the user might want to print out for logging
* while debugging or whatnot. These will always come by way of the 'error'
* channel on streams or other event emitters. It's completely fine to
* ignore them, as some will just be verbose logging info, but they may
* help figure out what's going wrong. Support may also ask you to catch
* these channels, which you can do like so:
*
* ```
* subscription.on('debug', msg => console.log(msg.message));
* ```
*
* These values are _not_ guaranteed to remain stable, even within a major
* version, so don't depend on them for your program logic. Debug outputs
* may be added or removed at any time, without warning.
*/
export class DebugMessage {
constructor(public message: string, public error?: Error) {}
}
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ export {
TopicMetadata,
} from './topic';
export {Duration, TotalOfUnit, DurationLike} from './temporal';
export {DebugMessage} from './debug';

if (process.env.DEBUG_GRPC) {
console.info('gRPC logging set to verbose');
Expand Down
19 changes: 11 additions & 8 deletions src/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
} from './subscriber';
import {Duration} from './temporal';
import {addToBucket} from './util';
import {DebugMessage} from './debug';

/**
* @private
Expand Down Expand Up @@ -65,15 +66,16 @@ export interface BatchOptions {
* @param {string} message The error message.
* @param {GoogleError} err The grpc error.
*/
export class BatchError extends Error {
export class BatchError extends DebugMessage {
ackIds: string[];
code: grpc.status;
details: string;
constructor(err: GoogleError, ackIds: string[], rpc: string) {
super(
`Failed to "${rpc}" for ${ackIds.length} message(s). Reason: ${
process.env.DEBUG_GRPC ? err.stack : err.message
}`
}`,
err
);

this.ackIds = ackIds;
Expand Down Expand Up @@ -278,7 +280,9 @@ export abstract class MessageQueue {
// These queues are used for ack and modAck messages, which should
// never surface an error to the user level. However, we'll emit
// them onto this debug channel in case debug info is needed.
this._subscriber.emit('debug', e);
const err = e as Error;
const debugMsg = new DebugMessage(err.message, err);
this._subscriber.emit('debug', debugMsg);
}

this.numInFlightRequests -= batchSize;
Expand Down Expand Up @@ -404,10 +408,8 @@ export abstract class MessageQueue {
const others = toError.get(AckResponses.Other);
if (others?.length) {
const otherIds = others.map(e => e.ackId);
this._subscriber.emit(
'debug',
new BatchError(rpcError, otherIds, operation)
);
const debugMsg = new BatchError(rpcError, otherIds, operation);
this._subscriber.emit('debug', debugMsg);
}

// Take care of following up on all the Promises.
Expand Down Expand Up @@ -492,7 +494,8 @@ export class AckQueue extends MessageQueue {
return results.toRetry;
} catch (e) {
// This should only ever happen if there's a code failure.
this._subscriber.emit('debug', e);
const err = e as Error;
this._subscriber.emit('debug', new DebugMessage(err.message, err));
const exc = new AckError(AckResponses.Other, 'Code error');
batch.forEach(m => {
m.responsePromise?.reject(exc);
Expand Down
5 changes: 3 additions & 2 deletions src/message-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {google} from '../protos/protos';
import {defaultOptions} from './default-options';
import {Duration} from './temporal';
import {ExponentialRetry} from './exponential-retry';
import {DebugMessage} from './debug';

/*!
* Frequency to ping streams.
Expand Down Expand Up @@ -359,7 +360,7 @@ export class MessageStream extends PassThrough {
if (PullRetry.retry(status)) {
this.emit(
'debug',
new Error(
new DebugMessage(
`Subscriber stream ${index} has ended with status ${status.code}; will be retried.`
)
);
Expand All @@ -372,7 +373,7 @@ export class MessageStream extends PassThrough {
} else if (this._activeStreams() === 0) {
this.emit(
'debug',
new Error(
new DebugMessage(
`Subscriber stream ${index} has ended with status ${status.code}; will not be retried.`
)
);
Expand Down
2 changes: 1 addition & 1 deletion src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ export class Subscriber extends EventEmitter {

this._stream
.on('error', err => this.emit('error', err))
.on('debug', err => this.emit('debug', err))
.on('debug', msg => this.emit('debug', msg))
.on('data', (data: PullResponse) => this._onData(data))
.once('close', () => this.close());

Expand Down
10 changes: 5 additions & 5 deletions src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ export type DetachSubscriptionResponse = EmptyResponse;
listener: (error: StatusError) => void
): this;
on(event: 'close', listener: () => void): this;
on(event: 'debug', listener: (error: StatusError) => void); this;
on(event: 'debug', listener: (msg: DebugMessage) => void); this;

// Only used internally.
on(event: 'newListener', listener: Function): this;
Expand Down Expand Up @@ -158,7 +158,7 @@ export type DetachSubscriptionResponse = EmptyResponse;
* on(event: 'error', listener: (error: Error) => void): this;
*
* Upon receipt of a (non-fatal) debug warning:
* on(event: 'debug', listener: (error: Error) => void): this;
* on(event: 'debug', listener: (msg: DebugMessage) => void): this;
*
* Upon the closing of the subscriber:
* on(event: 'close', listener: Function): this;
Expand Down Expand Up @@ -226,8 +226,8 @@ export type DetachSubscriptionResponse = EmptyResponse;
* // Register an error handler.
* subscription.on('error', (err) => {});
*
* // Register a debug handler, to catch non-fatal errors.
* subscription.on('debug', (err) => { console.error(err); });
* // Register a debug handler, to catch non-fatal errors and other messages.
* subscription.on('debug', msg => { console.log(msg.message); });
*
* // Register a close handler in case the subscriber closes unexpectedly
* subscription.on('close', () => {});
Expand Down Expand Up @@ -327,7 +327,7 @@ export class Subscription extends EventEmitter {
this._subscriber = new Subscriber(this, options);
this._subscriber
.on('error', err => this.emit('error', err))
.on('debug', err => this.emit('debug', err))
.on('debug', msg => this.emit('debug', msg))
.on('message', message => this.emit('message', message))
.on('close', () => this.emit('close'));

Expand Down
25 changes: 15 additions & 10 deletions test/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import defer = require('p-defer');
import * as messageTypes from '../src/message-queues';
import {BatchError} from '../src/message-queues';
import {AckError, Message, Subscriber} from '../src/subscriber';
import {DebugMessage} from '../src/debug';

class FakeClient {
async acknowledge(
Expand Down Expand Up @@ -261,8 +262,8 @@ describe('MessageQueues', () => {

sandbox.stub(messageQueue.batches, 'push').throws(fakeError);

subscriber.on('debug', err => {
assert.strictEqual(err, fakeError);
subscriber.on('debug', msg => {
assert.strictEqual(msg.message, fakeError.message);
done();
});

Expand Down Expand Up @@ -445,11 +446,13 @@ describe('MessageQueues', () => {

sandbox.stub(fakeSubscriber.client, 'acknowledge').rejects(fakeError);

subscriber.on('debug', (err: BatchError) => {
subscriber.on('debug', (msg: DebugMessage) => {
try {
assert.strictEqual(err.message, expectedMessage);
assert.deepStrictEqual(err.ackIds, ackIds);
assert.strictEqual(err.code, fakeError.code);
assert.strictEqual(msg.message, expectedMessage);
const batchError = msg.error! as unknown as BatchError;
assert.strictEqual(batchError.message, expectedMessage);
assert.deepStrictEqual(batchError.ackIds, ackIds);
assert.strictEqual(batchError.code, fakeError.code);
done();
} catch (e) {
// I'm unsure why Mocha's regular handler doesn't work here,
Expand Down Expand Up @@ -735,11 +738,13 @@ describe('MessageQueues', () => {
.stub(fakeSubscriber.client, 'modifyAckDeadline')
.rejects(fakeError);

subscriber.on('debug', (err: BatchError) => {
subscriber.on('debug', (msg: DebugMessage) => {
try {
assert.strictEqual(err.message, expectedMessage);
assert.deepStrictEqual(err.ackIds, ackIds);
assert.strictEqual(err.code, fakeError.code);
assert.strictEqual(msg.message, expectedMessage);
const batchError = msg.error! as unknown as BatchError;
assert.strictEqual(batchError.message, expectedMessage);
assert.deepStrictEqual(batchError.ackIds, ackIds);
assert.strictEqual(batchError.code, fakeError.code);
done();
} catch (e) {
// I'm unsure why Mocha's regular handler doesn't work here,
Expand Down
8 changes: 5 additions & 3 deletions test/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {Snapshot} from '../src/snapshot';
import {Message, SubscriberOptions} from '../src/subscriber';
import * as subby from '../src/subscription';
import * as util from '../src/util';
import {DebugMessage} from '../src/debug';

let promisified = false;
const fakeUtil = Object.assign({}, util, {
Expand Down Expand Up @@ -511,6 +512,7 @@ describe('Subscription', () => {

describe('debug', () => {
const error = new Error('err') as ServiceError;
const msg = new DebugMessage(error.message, error);

beforeEach(() => {
subscription.request = (config, callback) => {
Expand All @@ -519,12 +521,12 @@ describe('Subscription', () => {
});

it('should return the debug events to the callback', done => {
subscription.on?.('debug', err => {
assert.strictEqual(err, error);
subscription.on?.('debug', (msg: DebugMessage) => {
assert.strictEqual(msg.error, error);
done();
});
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(subscription as any)._subscriber.emit('debug', error);
(subscription as any)._subscriber.emit('debug', msg);
});
});

Expand Down