Skip to content

Commit 7fc3eb3

Browse files
committed
Skip expired backfills instead of returning error
If a single backfill has expired it would cause the whole FetchMatches call to error out. Now the backfill will be skipped instead similar to the NotFound case. The status code returned for attempting to acknowledge or update an expired backfill is changed from Unavailable since the operation is not retryable.
1 parent c58996d commit 7fc3eb3

5 files changed

Lines changed: 91 additions & 32 deletions

File tree

internal/app/backend/backend_service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ func synchronizeRecv(ctx context.Context, syncStream synchronizerStream, m *sync
183183
err = createOrUpdateBackfill(ctx, backfill, ticketIds, store)
184184
if err != nil {
185185
e, ok := status.FromError(err)
186-
if err == errBackfillGenerationMismatch || (ok && e.Code() == codes.NotFound) {
186+
if err == errBackfillGenerationMismatch || (ok && (e.Code() == codes.NotFound || e.Code() == codes.FailedPrecondition)) {
187187
err = doReleaseTickets(ctx, ticketIds, store)
188188
if err != nil {
189189
logger.WithError(err).Errorf("failed to remove match tickets from pending release: %v", ticketIds)

internal/app/frontend/frontend_service_test.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func TestCreateBackfill(t *testing.T) {
9595
defer closer()
9696
ctx := utilTesting.NewContext(t)
9797
fs := frontendService{cfg: cfg, store: store}
98-
var testCases = []struct {
98+
testCases := []struct {
9999
description string
100100
request *pb.CreateBackfillRequest
101101
result *pb.Backfill
@@ -133,7 +133,10 @@ func TestCreateBackfill(t *testing.T) {
133133
SearchFields: &pb.SearchFields{
134134
StringArgs: map[string]string{
135135
"search": "me",
136-
}}}},
136+
},
137+
},
138+
},
139+
},
137140
expectedCode: codes.OK,
138141
expectedMessage: "",
139142
},
@@ -191,7 +194,7 @@ func TestUpdateBackfill(t *testing.T) {
191194
require.NoError(t, err)
192195
require.NotNil(t, res)
193196

194-
var testCases = []struct {
197+
testCases := []struct {
195198
description string
196199
request *pb.UpdateBackfillRequest
197200
result *pb.Backfill
@@ -224,7 +227,10 @@ func TestUpdateBackfill(t *testing.T) {
224227
SearchFields: &pb.SearchFields{
225228
StringArgs: map[string]string{
226229
"search": "me",
227-
}}}},
230+
},
231+
},
232+
},
233+
},
228234
expectedCode: codes.OK,
229235
expectedMessage: "",
230236
},
@@ -421,9 +427,8 @@ func TestAcknowledgeBackfill(t *testing.T) {
421427
resp, err = fs.AcknowledgeBackfill(ctx, &pb.AcknowledgeBackfillRequest{BackfillId: fakeBackfill.Id, Assignment: &pb.Assignment{Connection: "10.0.0.1"}})
422428
require.Nil(t, resp)
423429
require.Error(t, err)
424-
require.Equal(t, codes.Unavailable.String(), status.Convert(err).Code().String())
430+
require.Equal(t, codes.FailedPrecondition.String(), status.Convert(err).Code().String())
425431
require.Contains(t, status.Convert(err).Message(), "can not acknowledge an expired backfill, id: 1")
426-
427432
}
428433

429434
func TestDoDeleteTicket(t *testing.T) {

internal/statestore/backfill.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,10 @@ import (
3131
"open-match.dev/open-match/pkg/pb"
3232
)
3333

34-
var (
35-
logger = logrus.WithFields(logrus.Fields{
36-
"app": "openmatch",
37-
"component": "statestore.redis",
38-
})
39-
)
34+
var logger = logrus.WithFields(logrus.Fields{
35+
"app": "openmatch",
36+
"component": "statestore.redis",
37+
})
4038

4139
const (
4240
backfillLastAckTime = "backfill_last_ack_time"
@@ -199,7 +197,7 @@ func (rb *redisBackend) UpdateBackfill(ctx context.Context, backfill *pb.Backfil
199197
}
200198

201199
if expired {
202-
return status.Errorf(codes.Unavailable, "can not update an expired backfill, id: %s", backfill.Id)
200+
return status.Errorf(codes.FailedPrecondition, "can not update an expired backfill, id: %s", backfill.Id)
203201
}
204202

205203
bf := ipb.BackfillInternal{
@@ -338,7 +336,7 @@ func (rb *redisBackend) UpdateAcknowledgmentTimestamp(ctx context.Context, id st
338336
}
339337

340338
if expired {
341-
return status.Errorf(codes.Unavailable, "can not acknowledge an expired backfill, id: %s", id)
339+
return status.Errorf(codes.FailedPrecondition, "can not acknowledge an expired backfill, id: %s", id)
342340
}
343341

344342
return doUpdateAcknowledgmentTimestamp(redisConn, id)
@@ -380,7 +378,6 @@ func (rb *redisBackend) GetExpiredBackfillIDs(ctx context.Context) ([]string, er
380378

381379
// deleteExpiredBackfillID deletes expired BackfillID from a sorted set
382380
func (rb *redisBackend) deleteExpiredBackfillID(conn redis.Conn, backfillID string) error {
383-
384381
_, err := conn.Do("ZREM", backfillLastAckTime, backfillID)
385382
if err != nil {
386383
return status.Errorf(codes.Internal, "failed to delete expired backfill ID %s from Sorted Set %s",
@@ -459,7 +456,6 @@ func (rb *redisBackend) GetIndexedBackfills(ctx context.Context) (map[string]int
459456
}
460457

461458
return r, nil
462-
463459
}
464460

465461
func getBackfillReleaseTimeout(cfg config.View) time.Duration {

internal/statestore/backfill_test.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func TestCreateBackfill(t *testing.T) {
6868
Generation: 1,
6969
}
7070

71-
var testCases = []struct {
71+
testCases := []struct {
7272
description string
7373
backfill *pb.Backfill
7474
ticketIDs []string
@@ -231,7 +231,7 @@ func TestUpdateBackfillExpiredBackfillErrExpected(t *testing.T) {
231231

232232
err = service.UpdateBackfill(ctx, &bf, nil)
233233
require.Error(t, err)
234-
require.Equal(t, codes.Unavailable.String(), status.Convert(err).Code().String())
234+
require.Equal(t, codes.FailedPrecondition.String(), status.Convert(err).Code().String())
235235
require.Contains(t, status.Convert(err).Message(), fmt.Sprintf("can not update an expired backfill, id: %s", bfID))
236236
}
237237

@@ -274,7 +274,7 @@ func TestGetBackfill(t *testing.T) {
274274
_, err = c.Do("SET", "wrong-type-key", "wrong-type-value")
275275
require.NoError(t, err)
276276

277-
var testCases = []struct {
277+
testCases := []struct {
278278
description string
279279
backfillID string
280280
expectedCode codes.Code
@@ -348,7 +348,7 @@ func TestDeleteBackfill(t *testing.T) {
348348
defer service.Close()
349349
ctx := utilTesting.NewContext(t)
350350

351-
//Last Acknowledge timestamp is updated on Frontend CreateBackfill
351+
// Last Acknowledge timestamp is updated on Frontend CreateBackfill
352352
bfID := "mockBackfillID"
353353
err := service.CreateBackfill(ctx, &pb.Backfill{
354354
Id: bfID,
@@ -363,7 +363,7 @@ func TestDeleteBackfill(t *testing.T) {
363363
require.NoError(t, err)
364364
require.True(t, ts > 0, "timestamp is not valid")
365365

366-
var testCases = []struct {
366+
testCases := []struct {
367367
description string
368368
backfillID string
369369
expectedCode codes.Code
@@ -419,7 +419,6 @@ func TestDeleteBackfill(t *testing.T) {
419419
require.Error(t, err)
420420
require.Equal(t, codes.Unavailable.String(), status.Convert(err).Code().String())
421421
require.Contains(t, status.Convert(err).Message(), "DeleteBackfill, id: 12345, failed to connect to redis:")
422-
423422
}
424423

425424
// TestUpdateAcknowledgmentTimestampLifecycle test statestore functions - UpdateAcknowledgmentTimestamp, GetExpiredBackfillIDs
@@ -464,12 +463,12 @@ func TestUpdateAcknowledgmentTimestampLifecycle(t *testing.T) {
464463

465464
err = service.UpdateAcknowledgmentTimestamp(ctx, bf1)
466465
require.Error(t, err)
467-
require.Equal(t, codes.Unavailable.String(), status.Convert(err).Code().String())
466+
require.Equal(t, codes.FailedPrecondition.String(), status.Convert(err).Code().String())
468467
require.Contains(t, status.Convert(err).Message(), fmt.Sprintf("can not acknowledge an expired backfill, id: %s", bf1))
469468

470469
err = service.UpdateAcknowledgmentTimestamp(ctx, bf2)
471470
require.Error(t, err)
472-
require.Equal(t, codes.Unavailable.String(), status.Convert(err).Code().String())
471+
require.Equal(t, codes.FailedPrecondition.String(), status.Convert(err).Code().String())
473472
require.Contains(t, status.Convert(err).Message(), fmt.Sprintf("can not acknowledge an expired backfill, id: %s", bf2))
474473

475474
err = service.DeleteBackfill(ctx, bfIDs[0])
@@ -531,7 +530,7 @@ func TestUpdateAcknowledgmentTimestamptExpiredBackfillErrExpected(t *testing.T)
531530

532531
err = service.UpdateAcknowledgmentTimestamp(ctx, bfID)
533532
require.Error(t, err)
534-
require.Equal(t, codes.Unavailable.String(), status.Convert(err).Code().String())
533+
require.Equal(t, codes.FailedPrecondition.String(), status.Convert(err).Code().String())
535534
require.Contains(t, status.Convert(err).Message(), fmt.Sprintf("can not acknowledge an expired backfill, id: %s", bfID))
536535
}
537536

testing/e2e/backfill_test.go

Lines changed: 65 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"io"
2121
"regexp"
2222
"testing"
23+
"time"
2324

2425
"github.com/stretchr/testify/require"
2526
"google.golang.org/grpc/codes"
@@ -186,11 +187,12 @@ func TestAcknowledgeBackfillDeletedTicket(t *testing.T) {
186187
om := newOM(t)
187188
ctx := context.Background()
188189

189-
bf := &pb.Backfill{SearchFields: &pb.SearchFields{
190-
StringArgs: map[string]string{
191-
"search": "me",
190+
bf := &pb.Backfill{
191+
SearchFields: &pb.SearchFields{
192+
StringArgs: map[string]string{
193+
"search": "me",
194+
},
192195
},
193-
},
194196
}
195197
createdBf, err := om.Frontend().CreateBackfill(ctx, &pb.CreateBackfillRequest{Backfill: bf})
196198
require.NoError(t, err)
@@ -412,7 +414,7 @@ func TestProposedBackfillUpdate(t *testing.T) {
412414
"field1": "value1",
413415
},
414416
}
415-
//using DefaultEvaluationCriteria just for testing purposes only
417+
// using DefaultEvaluationCriteria just for testing purposes only
416418
b.Extensions = map[string]*anypb.Any{
417419
"evaluation_input": mustAny(&pb.DefaultEvaluationCriteria{
418420
Score: 10,
@@ -571,7 +573,8 @@ func TestBackfillSkipNotfoundError(t *testing.T) {
571573
StringArgs: map[string]string{
572574
"search": "me",
573575
},
574-
}}})
576+
},
577+
}})
575578
require.NoError(t, err)
576579
require.NotNil(t, b1)
577580

@@ -613,6 +616,62 @@ func TestBackfillSkipNotfoundError(t *testing.T) {
613616
require.Equal(t, fmt.Sprintf("rpc error: code = NotFound desc = Backfill id: %s not found", b1.Id), err.Error())
614617
}
615618

619+
func TestBackfillSkipExpiredError(t *testing.T) {
620+
ctx := context.Background()
621+
om := newOM(t)
622+
623+
t1, err := om.Frontend().CreateTicket(ctx, &pb.CreateTicketRequest{Ticket: &pb.Ticket{}})
624+
require.NoError(t, err)
625+
require.NotNil(t, t1)
626+
627+
b1, err := om.Frontend().CreateBackfill(ctx, &pb.CreateBackfillRequest{Backfill: &pb.Backfill{
628+
SearchFields: &pb.SearchFields{
629+
StringArgs: map[string]string{
630+
"search": "me",
631+
},
632+
},
633+
}})
634+
require.NoError(t, err)
635+
require.NotNil(t, b1)
636+
637+
m := &pb.Match{
638+
MatchId: "1",
639+
Tickets: []*pb.Ticket{t1},
640+
Backfill: b1,
641+
}
642+
643+
om.SetMMF(func(ctx context.Context, profile *pb.MatchProfile, out chan<- *pb.Match) error {
644+
out <- m
645+
return nil
646+
})
647+
648+
om.SetEvaluator(func(ctx context.Context, in <-chan *pb.Match, out chan<- string) error {
649+
p, ok := <-in
650+
require.True(t, ok)
651+
652+
out <- p.MatchId
653+
return nil
654+
})
655+
656+
// Depends on pendingReleaseTimeout
657+
time.Sleep(1 * time.Second)
658+
659+
// Make sure backfill has expired
660+
_, err = om.Frontend().AcknowledgeBackfill(ctx, &pb.AcknowledgeBackfillRequest{BackfillId: b1.Id, Assignment: &pb.Assignment{}})
661+
require.Error(t, err)
662+
require.Equal(t, fmt.Sprintf("rpc error: code = FailedPrecondition desc = can not acknowledge an expired backfill, id: %s", b1.Id), err.Error())
663+
664+
stream, err := om.Backend().FetchMatches(ctx, &pb.FetchMatchesRequest{
665+
Config: om.MMFConfigGRPC(),
666+
Profile: &pb.MatchProfile{},
667+
})
668+
require.NoError(t, err)
669+
resp, err := stream.Recv()
670+
require.Nil(t, resp)
671+
require.Error(t, err)
672+
require.Equal(t, io.EOF.Error(), err.Error())
673+
}
674+
616675
func mustAny(m proto.Message) *anypb.Any {
617676
result, err := anypb.New(m)
618677
if err != nil {

0 commit comments

Comments
 (0)