[go: up one dir, main page]

Skip to content

Commit

Permalink
fix(pubsub): fix bug with AckWithResult with exactly once disabled (#…
Browse files Browse the repository at this point in the history
…7319)

* fix(pubsub): fix bug with AckWithResult with exactly once disabled

* switch wait argument to time.Duration
  • Loading branch information
hongalex committed Jan 28, 2023
1 parent e364f7a commit c88fbdf
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 16 deletions.
13 changes: 8 additions & 5 deletions pubsub/integration_test.go
Expand Up @@ -333,7 +333,13 @@ func testPublishAndReceive(t *testing.T, client *Client, maxMsgs int, synchronou
timeout := 3 * time.Minute
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
gotMsgs, err := pullN(timeoutCtx, sub, len(want), func(ctx context.Context, m *Message) {
gotMsgs, err := pullN(timeoutCtx, sub, len(want), 2*time.Second, func(ctx context.Context, m *Message) {
if exactlyOnceDelivery {
if _, err := m.AckWithResult().Get(ctx); err != nil {
t.Fatalf("failed to ack message with exactly once delivery: %v", err)
}
return
}
m.Ack()
})
if err != nil {
Expand Down Expand Up @@ -2003,16 +2009,13 @@ func TestIntegration_TopicRetention(t *testing.T) {
}
}

func TestExactlyOnceDelivery_PublishReceive(t *testing.T) {
func TestIntegration_ExactlyOnceDelivery_PublishReceive(t *testing.T) {
ctx := context.Background()
client := integrationTestClient(ctx, t)

for _, maxMsgs := range []int{0, 3, -1} { // MaxOutstandingMessages = default, 3, unlimited
testPublishAndReceive(t, client, maxMsgs, false, true, 10, 0)
}

// Tests for large messages (larger than the 4MB gRPC limit).
testPublishAndReceive(t, client, 0, false, true, 1, 5*1024*1024)
}

func TestIntegration_TopicUpdateSchema(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions pubsub/message.go
Expand Up @@ -157,20 +157,20 @@ func (ah *psAckHandler) OnNack() {
}

func (ah *psAckHandler) OnAckWithResult() *AckResult {
// call done with true to indicate ack.
ah.done(true)
if !ah.exactlyOnceDelivery {
return newSuccessAckResult()
}
// call done with true to indicate ack.
ah.done(true)
return ah.ackResult
}

func (ah *psAckHandler) OnNackWithResult() *AckResult {
// call done with false to indicate nack.
ah.done(false)
if !ah.exactlyOnceDelivery {
return newSuccessAckResult()
}
// call done with false to indicate nack.
ah.done(false)
return ah.ackResult
}

Expand Down
12 changes: 8 additions & 4 deletions pubsub/streaming_pull_test.go
Expand Up @@ -66,7 +66,7 @@ func TestStreamingPullMultipleFetches(t *testing.T) {

func testStreamingPullIteration(t *testing.T, client *Client, server *mockServer, msgs []*pb.ReceivedMessage) {
sub := client.Subscription("S")
gotMsgs, err := pullN(context.Background(), sub, len(msgs), func(_ context.Context, m *Message) {
gotMsgs, err := pullN(context.Background(), sub, len(msgs), 0, func(_ context.Context, m *Message) {
id, err := strconv.Atoi(msgAckID(m))
if err != nil {
t.Fatalf("pullN err: %v", err)
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestStreamingPullRetry(t *testing.T) {

sub := client.Subscription("S")
sub.ReceiveSettings.NumGoroutines = 1
gotMsgs, err := pullN(context.Background(), sub, len(testMessages), func(_ context.Context, m *Message) {
gotMsgs, err := pullN(context.Background(), sub, len(testMessages), 0, func(_ context.Context, m *Message) {
id, err := strconv.Atoi(msgAckID(m))
if err != nil {
t.Fatalf("pullN err: %v", err)
Expand Down Expand Up @@ -297,7 +297,7 @@ func TestStreamingPullConcurrent(t *testing.T) {
sub := client.Subscription("S")
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
gotMsgs, err := pullN(ctx, sub, nMessages, func(ctx context.Context, m *Message) {
gotMsgs, err := pullN(ctx, sub, nMessages, 0, func(ctx context.Context, m *Message) {
m.Ack()
})
if c := status.Convert(err); err != nil && c.Code() != codes.Canceled {
Expand Down Expand Up @@ -513,7 +513,8 @@ func newMock(t *testing.T) (*Client, *mockServer) {
}

// pullN calls sub.Receive until at least n messages are received.
func pullN(ctx context.Context, sub *Subscription, n int, f func(context.Context, *Message)) ([]*Message, error) {
// Wait a provided duration before cancelling.
func pullN(ctx context.Context, sub *Subscription, n int, wait time.Duration, f func(context.Context, *Message)) ([]*Message, error) {
var (
mu sync.Mutex
msgs []*Message
Expand All @@ -526,6 +527,9 @@ func pullN(ctx context.Context, sub *Subscription, n int, f func(context.Context
mu.Unlock()
f(ctx, m)
if nSeen >= n {
// Wait a specified amount of time so that for exactly once delivery,
// Acks aren't cancelled immediately.
time.Sleep(wait)
cancel()
}
})
Expand Down
4 changes: 2 additions & 2 deletions pubsub/subscription.go
Expand Up @@ -397,8 +397,8 @@ type SubscriptionConfig struct {
// by Pub/Sub and have distinct MessageID values.
//
// Lastly, to guarantee messages have been acked or nacked properly, you must
// call Message.AckWithResponse() or Message.NackWithResponse(). These return an
// AckResponse which will be ready if the message has been acked (or failed to be acked).
// call Message.AckWithResult() or Message.NackWithResult(). These return an
// AckResult which will be ready if the message has been acked (or failed to be acked).
EnableExactlyOnceDelivery bool

// State indicates whether or not the subscription can receive messages.
Expand Down
2 changes: 1 addition & 1 deletion pubsub/subscription_test.go
Expand Up @@ -296,7 +296,7 @@ func testReceive(t *testing.T, synchronous, exactlyOnceDelivery bool) {
srv.Publish(topic.name, []byte{byte(i)}, nil)
}
sub.ReceiveSettings.Synchronous = synchronous
msgs, err := pullN(ctx, sub, 256, func(_ context.Context, m *Message) {
msgs, err := pullN(ctx, sub, 256, 0, func(_ context.Context, m *Message) {
if exactlyOnceDelivery {
ar := m.AckWithResult()
// Don't use the above ctx here since that will get cancelled.
Expand Down

0 comments on commit c88fbdf

Please sign in to comment.