diff --git a/README.md b/README.md index 48926001..a6f42d0f 100644 --- a/README.md +++ b/README.md @@ -108,24 +108,49 @@ metaURI: mongodb://localhost:28012 The verifier will now check to completion to make sure that there are no inconsistencies. The command you need to send the verifier here is `writesOff`. The command doesn’t block. This means that you will have to poll the verifier, or watch its logs, to see the status of the verification (see `progress`). ``` - curl -H "Content-Type: application/json" -X POST -d '{}' http://127.0.0.1:27020/api/v1/writesOff + curl -H "Content-Type: application/json" -d '{}' http://127.0.0.1:27020/api/v1/writesOff ``` -3. You can poll the status of the verification by hitting the `progress`endpoint. In particular, the `phase`should reveal whether the verifier is done verifying; once the `phase`is `idle`the verification has completed. When the `phase`has reached `idle`, the `error`field should be `null`and the `failedTasks`field should be `0`, if the verification was successful. A non-`null``error`field indicates that the verifier itself ran into an error. `failedTasks`being non-`0`indicates that there was an inconsistency. The logs printed by the verifier itself should have more information regarding what the inconsistencies are. - - ``` - curl -H "Content-Type: application/json" -X GET http://127.0.0.1:27020/api/v1/progress - - ``` - - - - - This is a sample output when inconsistencies are present: +3. You can poll the status of the verification by hitting the `progress` endpoint. In particular, the `phase` should reveal whether the verifier is done verifying. Once the `phase` is `idle`, the verification has completed. At that point the `error` field should be `null`, and the `failedTasks` field should be `0`, if the verification was successful. A non-`null` `error` field indicates that the verifier itself ran into an error. `failedTasks` being non-`0` indicates that there was an inconsistency. See below for how to investigate mismatches. +``` +curl http://127.0.0.1:27020/api/v1/progress +``` - `{"progress":{"phase":"idle","error":null,"verificationStatus":{"totalTasks":1,"addedTasks":0,"processingTasks":0,"failedTasks":1,"completedTasks":0,"metadataMismatchTasks":0,"recheckTasks":0}}}` +### `/progress` API Response Contents + +In the below a “timestamp” is an object with `T` and `I` unsigned integers. +These represent a logical time in MongoDB’s replication protocol. + +- `progress` + - `phase` (string): either `idle`, `check`, or `recheck` + - `generation` (unsigned integer) + - `generationStats` + - `timeElapsed` (string, [Go Duration format](https://pkg.go.dev/time#ParseDuration)) + - `activeWorkers` (unsigned integer) + - `docsCompared` (unsigned integer) + - `totalDocs` (unsigned integer) + - `srcBytesCompared` (unsigned integer) + - `totalSrcBytes` (unsigned integer, only present in `check` phase) + - `priorMismatches` (unsigned integer, optional, mismatches seen in prior generation) + - `mismatchesFound` (unsigned integer) + - `rechecksEnqueued` (unsigned integer) + - `srcChangeStats` + - `eventsPerSecond` (nonnegative float, optional) + - `currentTimes` (optional) + - `lastHandledTime` (timestamp) + - `lastClusterTime` (timestamp) + - `bufferSaturation` (nonnegative float) + - `dstChangeStats` (same fields as `srcChangeStats`) + - `error` (string, optional) + - `verificationStatus` (tasks for the current generation) + - `totalTasks` (unsigned integer) + - `addedTasks` (unsigned integer, unstarted tasks) + - `processingTasks` (unsigned integer, in-progress tasks) + - `failedTasks` (unsigned integer, tasks that found a document mismatch) + - `completedTasks` (unsigned integer, tasks that found no problems) + - `metadataMismatchTasks` (unsigned integer, tasks that found a collection metadata mismatch) # CLI Options diff --git a/internal/verifier/change_reader.go b/internal/verifier/change_reader.go index 056fdbc5..030fb5e2 100644 --- a/internal/verifier/change_reader.go +++ b/internal/verifier/change_reader.go @@ -31,13 +31,24 @@ const ( changeReaderCollectionName = "changeReader" ) +type readerCurrentTimes struct { + LastHandledTime bson.Timestamp `json:"lastHandledTime"` + LastClusterTime bson.Timestamp `json:"lastClusterTime"` +} + +func (rp readerCurrentTimes) Lag() time.Duration { + return time.Second * time.Duration( + int(rp.LastClusterTime.T)-int(rp.LastHandledTime.T), + ) +} + type changeReader interface { getWhichCluster() whichCluster getReadChannel() <-chan changeEventBatch getStartTimestamp() bson.Timestamp getLastSeenClusterTime() option.Option[bson.Timestamp] getEventsPerSecond() option.Option[float64] - getLag() option.Option[time.Duration] + getCurrentTimes() option.Option[readerCurrentTimes] getBufferSaturation() float64 setWritesOff(bson.Timestamp) start(context.Context, *errgroup.Group) error @@ -64,9 +75,10 @@ type ChangeReaderCommon struct { lastChangeEventTime *msync.TypedAtomic[option.Option[bson.Timestamp]] + currentTimes *msync.TypedAtomic[option.Option[readerCurrentTimes]] + startAtTs *bson.Timestamp - lag *msync.TypedAtomic[option.Option[time.Duration]] batchSizeHistory *history.History[int] onDDLEvent ddlEventHandling @@ -77,7 +89,7 @@ func newChangeReaderCommon(clusterName whichCluster) ChangeReaderCommon { readerType: clusterName, changeEventBatchChan: make(chan changeEventBatch, batchChanBufferSize), writesOffTs: util.NewEventual[bson.Timestamp](), - lag: msync.NewTypedAtomic(option.None[time.Duration]()), + currentTimes: msync.NewTypedAtomic(option.None[readerCurrentTimes]()), lastChangeEventTime: msync.NewTypedAtomic(option.None[bson.Timestamp]()), batchSizeHistory: history.New[int](time.Minute), onDDLEvent: lo.Ternary( @@ -123,11 +135,23 @@ func (rc *ChangeReaderCommon) getBufferSaturation() float64 { return util.DivideToF64(len(rc.changeEventBatchChan), cap(rc.changeEventBatchChan)) } +func (rc *ChangeReaderCommon) getCurrentTimes() option.Option[readerCurrentTimes] { + return rc.currentTimes.Load() +} + +/* // getLag returns the observed change stream lag (i.e., the delta between // cluster time and the most-recently-seen change event). func (rc *ChangeReaderCommon) getLag() option.Option[time.Duration] { - return rc.lag.Load() + if prog, has := rc.progress.Load().Get(); has { + return option.Some( + time.Duration(int(prog.lastClusterTime.T)-int(prog.lastResumeTime.T)) * time.Second, + ) + } + + return option.None[time.Duration]() } +*/ // getEventsPerSecond returns the number of change events per second we’ve been // seeing “recently”. (See implementation for the actual period over which we @@ -221,8 +245,18 @@ func (rc *ChangeReaderCommon) loadResumeToken(ctx context.Context) (option.Optio func (rc *ChangeReaderCommon) updateLag(sess *mongo.Session, token bson.Raw) { tokenTs, err := rc.resumeTokenTSExtractor(token) if err == nil { - lagSecs := int64(sess.OperationTime().T) - int64(tokenTs.T) - rc.lag.Store(option.Some(time.Second * time.Duration(lagSecs))) + cTime, err := util.GetClusterTimeFromSession(sess) + if err != nil { + rc.logger.Warn(). + Err(err). + Str("reader", string(rc.getWhichCluster())). + Msg("Failed to extract cluster time from session.") + } else { + rc.currentTimes.Store(option.Some(readerCurrentTimes{ + LastHandledTime: tokenTs, + LastClusterTime: cTime, + })) + } } else { rc.logger.Warn(). Err(err). diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 62a39bca..69807dae 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -593,7 +593,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamLag() { verifierRunner.AwaitGenerationEnd(), ) - return verifier.srcChangeReader.getLag().IsSome() + return verifier.srcChangeReader.getCurrentTimes().IsSome() }, time.Minute, 100*time.Millisecond, @@ -602,7 +602,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamLag() { // NB: The lag will include whatever time elapsed above before // verifier read the event, so it can be several seconds. suite.Assert().Less( - verifier.srcChangeReader.getLag().MustGet(), + verifier.srcChangeReader.getCurrentTimes().MustGet().Lag(), 10*time.Minute, "verifier lag is as expected", ) diff --git a/internal/verifier/check.go b/internal/verifier/check.go index be003ba6..49b0e947 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -216,7 +216,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh // Now that we’ve initialized verifier.generation we can // start the change readers. verifier.initializeChangeReaders() - verifier.mux.Unlock() + //verifier.mux.Unlock() err = retry.New().WithCallback( func(ctx context.Context, _ *retry.FuncInfo) error { @@ -246,17 +246,12 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh verifier.logger.Debug().Msg("Starting Check") - verifier.phase = Check - defer func() { - verifier.phase = Idle - }() - if err := verifier.startChangeHandling(ctx); err != nil { return err } // Log the verification status when initially booting up so it's easy to see the current state - verificationStatus, err := verifier.GetVerificationStatus(ctx) + verificationStatus, err := verifier.getVerificationStatusForGeneration(ctx, verifier.generation) if err != nil { return errors.Wrapf( err, @@ -274,7 +269,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh } // Now enter the multi-generational steady check state for { - verifier.mux.Lock() + //verifier.mux.Lock() err = retry.New().WithCallback( func(ctx context.Context, _ *retry.FuncInfo) error { return verifier.persistGenerationWhileLocked(ctx) @@ -286,12 +281,13 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh verifier.mux.Unlock() return errors.Wrapf(err, "failed to persist generation (%d)", verifier.generation) } - verifier.mux.Unlock() verifier.generationStartTime = time.Now() verifier.srcEventRecorder.Reset() verifier.dstEventRecorder.Reset() + verifier.mux.Unlock() + err := verifier.CheckWorker(ctx) if err != nil { return err @@ -362,7 +358,6 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh // on enqueued rechecks. Meanwhile, generaiton 3’s recheck tasks will // derive from rechecks enqueued during generation 2. verifier.generation++ - verifier.phase = Recheck verifier.mux.Unlock() // Generation of recheck tasks can partial-fail. The following will @@ -384,6 +379,8 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh Err(err). Msg("Failed to clear out old recheck docs. (This is probably unimportant.)") } + + verifier.mux.Lock() } } diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index e23445cb..0e741395 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -89,7 +89,6 @@ type Verifier struct { lastGeneration bool running bool generation int - phase string port int metaURI string metaClient *mongo.Client @@ -181,7 +180,6 @@ func NewVerifier(settings VerifierSettings, logPath string) *Verifier { logger: logger, writer: logWriter, - phase: Idle, numWorkers: NumWorkers, readPreference: readpref.Primary(), partitionSizeInBytes: 400 * 1024 * 1024, @@ -1255,9 +1253,17 @@ func (verifier *Verifier) verifyMetadataAndPartitionCollection( } func (verifier *Verifier) GetVerificationStatus(ctx context.Context) (*VerificationStatus, error) { - taskCollection := verifier.verificationTaskCollection() generation, _ := verifier.getGeneration() + return verifier.getVerificationStatusForGeneration(ctx, generation) +} + +func (verifier *Verifier) getVerificationStatusForGeneration( + ctx context.Context, + generation int, +) (*VerificationStatus, error) { + taskCollection := verifier.verificationTaskCollection() + var results []bson.Raw err := retry.New().WithCallback( @@ -1396,18 +1402,6 @@ func (verifier *Verifier) StartServer() error { return server.Run(context.Background()) } -func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { - status, err := verifier.GetVerificationStatus(ctx) - if err != nil { - return Progress{Error: err}, err - } - return Progress{ - Phase: verifier.phase, - Generation: verifier.generation, - Status: status, - }, nil -} - // Returned boolean indicates that namespaces are cached, and // whatever needs them can proceed. func (verifier *Verifier) ensureNamespaces(ctx context.Context) bool { diff --git a/internal/verifier/mismatches.go b/internal/verifier/mismatches.go index 6cafb852..7395bef1 100644 --- a/internal/verifier/mismatches.go +++ b/internal/verifier/mismatches.go @@ -85,7 +85,7 @@ func countMismatchesForTasks( ctx context.Context, db *mongo.Database, taskIDs []bson.ObjectID, - filter bson.D, + filter any, ) (int64, int64, error) { cursor, err := db.Collection(mismatchesCollectionName).Aggregate( ctx, @@ -116,8 +116,12 @@ func countMismatchesForTasks( return 0, 0, errors.Wrap(err, "reading mismatch counts") } - if len(got) != 1 { - return 0, 0, fmt.Errorf("unexpected mismatch count result: %+v", got) + switch len(got) { + case 0: + return 0, 0, nil + case 1: + default: + return 0, 0, fmt.Errorf("unexpected mismatch count (%d) result: %+v", len(got), got) } totalRV, err := got[0].LookupErr("total") @@ -135,6 +139,70 @@ func countMismatchesForTasks( return matched, totalRV.AsInt64() - matched, nil } +func countRechecksForGeneration( + ctx context.Context, + metaDB *mongo.Database, + generation int, +) (int64, int64, error) { + cursor, err := metaDB.Collection(verificationTasksCollection).Aggregate( + ctx, + mongo.Pipeline{ + {{"$match", bson.D{ + {"generation", generation}, + }}}, + {{"$lookup", bson.D{ + {"from", mismatchesCollectionName}, + {"localField", "_id"}, + {"foreignField", "task"}, + {"as", "mismatches"}, + }}}, + {{"$addFields", bson.D{ + {"mismatches", bson.D{{"$size", "$mismatches"}}}, + }}}, + {{"$group", bson.D{ + {"_id", nil}, + {"changes", bson.D{ + {"$sum", bson.D{ + {"$subtract", bson.A{ + bson.D{{"$size", "$_ids"}}, + "$mismatches", + }}, + }}, + }}, + {"mismatches", bson.D{ + {"$sum", "$mismatches"}, + }}, + }}}, + }, + ) + if err != nil { + return 0, 0, errors.Wrap(err, "sending query to count last generation’s found mismatches") + } + + defer cursor.Close(ctx) + + if !cursor.Next(ctx) { + if cursor.Err() != nil { + return 0, 0, errors.Wrap(err, "reading count of last generation’s found mismatches") + } + + // This happens if there were no tasks in the queried generation. + return 0, 0, nil + } + + result := struct { + Mismatches int64 + Changes int64 + }{} + + err = cursor.Decode(&result) + if err != nil { + return 0, 0, errors.Wrapf(err, "reading mismatches from result (%v)", cursor.Current) + } + + return result.Mismatches, result.Changes, nil +} + func getMismatchesForTasks( ctx context.Context, db *mongo.Database, diff --git a/internal/verifier/progress.go b/internal/verifier/progress.go new file mode 100644 index 00000000..cd6cc39e --- /dev/null +++ b/internal/verifier/progress.go @@ -0,0 +1,188 @@ +package verifier + +import ( + "context" + "time" + + "github.com/10gen/migration-verifier/contextplus" + "github.com/10gen/migration-verifier/internal/types" + "github.com/10gen/migration-verifier/mslices" + "github.com/10gen/migration-verifier/option" + "github.com/pkg/errors" + "github.com/samber/lo" + "go.mongodb.org/mongo-driver/v2/bson" +) + +func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { + verifier.mux.RLock() + defer verifier.mux.RUnlock() + + var vStatus *VerificationStatus + + generation := verifier.generation + genStats := ProgressGenerationStats{} + + if !verifier.generationStartTime.IsZero() { + progressTime := time.Now() + genElapsed := progressTime.Sub(verifier.generationStartTime) + + genStats.TimeElapsed = option.Some(genElapsed.Round(time.Millisecond).String()) + } + + eg, egCtx := contextplus.ErrGroup(ctx) + eg.Go( + func() error { + var err error + vStatus, err = verifier.getVerificationStatusForGeneration(egCtx, generation) + + return errors.Wrapf(err, "fetching generation %d’s tasks’ status", generation) + }, + ) + + if generation > 0 { + eg.Go( + func() error { + mismatches, changes, err := countRechecksForGeneration( + egCtx, + verifier.metaClient.Database(verifier.metaDBName), + generation-1, + ) + + if err != nil { + return errors.Wrapf(err, "counting mismatches seen during generation %d", generation-1) + } + + genStats.PriorRechecks = option.Some(ProgressRechecks{ + Changes: changes, + Mismatches: mismatches, + }) + + return nil + }, + ) + } + + eg.Go( + func() error { + var err error + nsStats, err := verifier.GetPersistedNamespaceStatisticsForGeneration(ctx, generation) + + if err != nil { + return errors.Wrapf(err, "fetching generation %d’s persisted namespace stats", generation) + } + + var totalDocs, comparedDocs types.DocumentCount + var totalBytes, comparedBytes types.ByteCount + var totalNss, completedNss types.NamespaceCount + + for _, result := range nsStats { + totalDocs += result.TotalDocs + comparedDocs += result.DocsCompared + totalBytes += result.TotalBytes + comparedBytes += result.BytesCompared + + totalNss++ + if result.PartitionsDone > 0 { + partitionsPending := result.PartitionsAdded + result.PartitionsProcessing + if partitionsPending == 0 { + completedNss++ + } + } + } + + var activeWorkers int + perNamespaceWorkerStats := verifier.getPerNamespaceWorkerStats() + for _, nsWorkerStats := range perNamespaceWorkerStats { + for _, workerStats := range nsWorkerStats { + activeWorkers++ + comparedDocs += workerStats.SrcDocCount + comparedBytes += workerStats.SrcByteCount + } + } + + genStats.DocsCompared = comparedDocs + genStats.TotalDocs = totalDocs + + genStats.SrcBytesCompared = comparedBytes + genStats.TotalSrcBytes = totalBytes + + genStats.ActiveWorkers = activeWorkers + + return nil + }, + ) + eg.Go( + func() error { + failedTasks, incompleteTasks, err := FetchFailedAndIncompleteTasks( + ctx, + verifier.logger, + verifier.verificationTaskCollection(), + verificationTaskVerifyDocuments, + generation, + ) + if err != nil { + return errors.Wrapf(err, "fetching generation %d’s failed & incomplete tasks", generation) + } + + taskIDsToQuery := lo.Map( + lo.Flatten(mslices.Of(failedTasks, incompleteTasks)), + func(ft VerificationTask, _ int) bson.ObjectID { + return ft.PrimaryKey + }, + ) + + mismatchCount, _, err := countMismatchesForTasks( + egCtx, + verifier.verificationDatabase(), + taskIDsToQuery, + true, + ) + if err != nil { + return errors.Wrapf(err, "counting mismatches seen during generation %d", generation) + } + + genStats.MismatchesFound = mismatchCount + + return nil + }, + ) + eg.Go( + func() error { + recheckColl := verifier.getRecheckQueueCollection(1 + generation) + count, err := recheckColl.EstimatedDocumentCount(ctx) + if err != nil { + return errors.Wrapf(err, "counting rechecks enqueued during generation %d", generation) + } + + genStats.RechecksEnqueued = count + + return nil + }, + ) + + if err := eg.Wait(); err != nil { + return Progress{Error: err}, err + } + + return Progress{ + Phase: lo.Ternary( + verifier.running, + lo.Ternary(generation > 0, Recheck, Check), + Idle, + ), + Generation: verifier.generation, + GenerationStats: genStats, + SrcChangeStats: ProgressChangeStats{ + EventsPerSecond: verifier.srcChangeReader.getEventsPerSecond(), + CurrentTimes: verifier.srcChangeReader.getCurrentTimes(), + BufferSaturation: verifier.srcChangeReader.getBufferSaturation(), + }, + DstChangeStats: ProgressChangeStats{ + EventsPerSecond: verifier.dstChangeReader.getEventsPerSecond(), + CurrentTimes: verifier.dstChangeReader.getCurrentTimes(), + BufferSaturation: verifier.dstChangeReader.getBufferSaturation(), + }, + Status: vStatus, + }, nil + +} diff --git a/internal/verifier/statistics.go b/internal/verifier/statistics.go index 73aa0725..29ad3038 100644 --- a/internal/verifier/statistics.go +++ b/internal/verifier/statistics.go @@ -179,6 +179,13 @@ var jsonTemplate *template.Template func (verifier *Verifier) GetPersistedNamespaceStatistics(ctx context.Context) ([]NamespaceStats, error) { generation, _ := verifier.getGeneration() + return verifier.GetPersistedNamespaceStatisticsForGeneration(ctx, generation) +} + +func (verifier *Verifier) GetPersistedNamespaceStatisticsForGeneration( + ctx context.Context, + generation int, +) ([]NamespaceStats, error) { templateOnce.Do(func() { tmpl, err := template.New("").Parse(perNsStatsPipelineTemplate) if err != nil { diff --git a/internal/verifier/summary.go b/internal/verifier/summary.go index d204ed88..01080bb5 100644 --- a/internal/verifier/summary.go +++ b/internal/verifier/summary.go @@ -587,10 +587,10 @@ func (verifier *Verifier) printChangeEventStatistics(builder io.Writer) { if eventsPerSec, has := cluster.csReader.getEventsPerSecond().Get(); has { var lagNote string - lag, hasLag := cluster.csReader.getLag().Get() + prog, hasProg := cluster.csReader.getCurrentTimes().Get() - if hasLag { - lagNote = fmt.Sprintf("lag: %s; ", reportutils.DurationToHMS(lag)) + if hasProg { + lagNote = fmt.Sprintf("lag: %s; ", reportutils.DurationToHMS(prog.Lag())) } saturation := cluster.csReader.getBufferSaturation() @@ -604,7 +604,7 @@ func (verifier *Verifier) printChangeEventStatistics(builder io.Writer) { reportutils.FmtReal(100*saturation), ) - if hasLag && lag > lagWarnThreshold { + if hasProg && prog.Lag() > lagWarnThreshold { fmt.Fprint( builder, "⚠️ Lag is excessive. Verification may fail. See documentation.\n", diff --git a/internal/verifier/webserver.go b/internal/verifier/webserver.go index 57c52624..524f3006 100644 --- a/internal/verifier/webserver.go +++ b/internal/verifier/webserver.go @@ -11,7 +11,9 @@ import ( "github.com/10gen/migration-verifier/contextplus" "github.com/10gen/migration-verifier/internal/logger" + "github.com/10gen/migration-verifier/internal/types" "github.com/10gen/migration-verifier/internal/verifier/webserver" + "github.com/10gen/migration-verifier/option" "github.com/gin-contrib/pprof" "github.com/gin-gonic/gin" "github.com/google/uuid" @@ -240,12 +242,44 @@ func (server *WebServer) writesOffEndpoint(c *gin.Context) { successResponse(c) } +type ProgressRechecks struct { + Mismatches int64 `json:"mismatches"` + Changes int64 `json:"changes"` +} + +type ProgressGenerationStats struct { + TimeElapsed option.Option[string] `json:"timeElapsed"` + ActiveWorkers int `json:"activeWorkers"` + + DocsCompared types.DocumentCount `json:"docsCompared"` + TotalDocs types.DocumentCount `json:"totalDocs"` + + SrcBytesCompared types.ByteCount `json:"srcBytesCompared"` + TotalSrcBytes types.ByteCount `json:"totalSrcBytes,omitempty"` + + PriorRechecks option.Option[ProgressRechecks] `json:"priorRechecks"` + MismatchesFound int64 `json:"mismatchesFound"` + RechecksEnqueued int64 `json:"rechecksEnqueued"` +} + +type ProgressChangeStats struct { + EventsPerSecond option.Option[float64] `json:"eventsPerSecond"` + CurrentTimes option.Option[readerCurrentTimes] `json:"currentTimes"` + BufferSaturation float64 `json:"bufferSaturation"` +} + // Progress represents the structure of the JSON response from the Progress end point. type Progress struct { - Phase string `json:"phase"` - Generation int `json:"generation"` - Error error `json:"error"` - Status *VerificationStatus `json:"verificationStatus"` + Phase string `json:"phase"` + + Generation int `json:"generation"` + GenerationStats ProgressGenerationStats `json:"generationStats"` + + SrcChangeStats ProgressChangeStats `json:"srcChangeStats"` + DstChangeStats ProgressChangeStats `json:"dstChangeStats"` + + Error error `json:"error,omitempty"` + Status *VerificationStatus `json:"verificationStatus"` } // progressEndpoint implements the gin handle for the progress endpoint. diff --git a/mbson/raw_value.go b/mbson/raw_value.go index b0b1c96c..35dd4fa9 100644 --- a/mbson/raw_value.go +++ b/mbson/raw_value.go @@ -9,7 +9,7 @@ import ( ) type bsonCastRecipient interface { - bson.Raw | bson.Timestamp | bson.ObjectID | string | int32 + bson.Raw | bson.Timestamp | bson.ObjectID | string | int32 | int64 } type bsonSourceTypes interface { @@ -52,11 +52,24 @@ func CastRawValue[T bsonCastRecipient](in bson.RawValue) (T, error) { if val, ok := in.Int32OK(); ok { return any(val).(T), nil } + case int64: + if val, ok := in.Int64OK(); ok { + return any(val).(T), nil + } default: panic(fmt.Sprintf("Unrecognized Go type: %T (maybe augment bsonType?)", in)) } - return *new(T), cannotCastErr{in.Type, any(in)} + return *new(T), cannotCastErr{in.Type, *new(T)} +} + +func ToInt64(in bson.RawValue) (int64, error) { + i64, ok := in.AsInt64OK() + if !ok { + return 0, cannotCastErr{in.Type, i64} + } + + return i64, nil } // Lookup fetches a value from a BSON document, casts it to the appropriate