Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/couch_replicator/src/couch_replicator_doc_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ process_updated({DbName, _DocId} = Id, JsonRepDoc, Owner = true) ->
% Parsing replication doc (but not calculating the id) could throw an
% exception which would indicate this document is malformed. This exception
% should propagate to db_change function and will be recorded as permanent
% failure in the document. User will have to update the documet to fix the
% failure in the document. User will have to update the document to fix the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please make the many (and welcome!) documentation fixes in a separate PR to the one making the since_seq enhancement.

% problem.
Rep0 = couch_replicator_parse:parse_rep_doc_without_id(JsonRepDoc),
Rep = Rep0#rep{db_name = DbName, start_time = os:timestamp()},
Expand Down Expand Up @@ -675,8 +675,8 @@ start_scanner(#st{mdb_changes_pid = undefined} = St) ->

start_delay_msec() ->
DefaultSec = ?DEFAULT_START_DELAY_MSEC div 1000,
% We're using a compatiblity config setting (cluster_start_period) to avoid
% introducting a new config value.
% We're using a compatibility config setting (cluster_start_period) to avoid
% introducing a new config value.
MSec = 1000 * config:get_integer("replicator", "cluster_start_period", DefaultSec),
max(MSec, ?MIN_START_DELAY_MSEC).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ t_ignore_if_doc_deleted(_) ->
?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())),
?assertNot(added_job()).

% Should not add job if by the time worker got to fetchign the filter
% Should not add job if by the time worker got to fetch the filter
% and building a replication id, another worker was spawned.
t_ignore_if_worker_ref_does_not_match(_) ->
Id = {?DB, ?DOC1},
Expand Down
2 changes: 1 addition & 1 deletion src/couch_replicator/src/couch_replicator_docs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ delete_old_rep_ddoc(RepDb, DDocId) ->
ok.

% Update a #rep{} record with a replication_id. Calculating the id might involve
% fetching a filter from the source db, and so it could fail intermetently.
% fetching a filter from the source db, and so it could fail intermittently.
% In case of a failure to fetch the filter this function will throw a
% `{filter_fetch_error, Reason} exception.
update_rep_id(Rep) ->
Expand Down
2 changes: 1 addition & 1 deletion src/couch_replicator/src/couch_replicator_filters.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ parse(Options) ->

% Fetches body of filter function from source database. Guaranteed to either
% return {ok, Body} or an {error, Reason}. Also assume this function might
% block due to network / socket issues for an undeterminted amount of time.
% block due to network / socket issues for an undetermined amount of time.
-spec fetch(binary(), binary(), binary()) ->
{ok, {[_]}} | {error, binary()}.
fetch(DDocName, FilterName, Source) ->
Expand Down
4 changes: 2 additions & 2 deletions src/couch_replicator/src/couch_replicator_parse.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ default_options() ->
% database. If failure or parsing of filter docs fails, parse_doc throws a
% {filter_fetch_error, Error} exception. This exception should be considered
% transient in respect to the contents of the document itself, since it depends
% on netowrk availability of the source db and other factors.
% on network availability of the source db and other factors.
-spec parse_rep_doc({[_]}) -> #rep{}.
parse_rep_doc(RepDoc) ->
{ok, Rep} =
Expand Down Expand Up @@ -167,7 +167,7 @@ parse_proxy_settings(Props) when is_list(Props) ->
end.

% Update a #rep{} record with a replication_id. Calculating the id might involve
% fetching a filter from the source db, and so it could fail intermetently.
% fetching a filter from the source db, and so it could fail intermittently.
% In case of a failure to fetch the filter this function will throw a
% `{filter_fetch_error, Reason} exception.
update_rep_id(Rep) ->
Expand Down
2 changes: 1 addition & 1 deletion src/couch_replicator/src/couch_replicator_rate_limiter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
%
% The algorithm referenced above estimates a rate, whereas the implemented
% algorithm uses an interval (in milliseconds). It preserves the original
% semantics, that is the failure part is multplicative and the success part is
% semantics, that is the failure part is multiplicative and the success part is
% additive. The relationship between rate and interval is: rate = 1000 /
% interval.
%
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
% in effect (also configurable).
%
% This module is also in charge of calculating ownership of replications based
% on where their _repicator db documents shards live.
% on where their _replicator db documents shards live.

-module(couch_replicator_rate_limiter_tables).

Expand Down
15 changes: 7 additions & 8 deletions src/couch_replicator/src/couch_replicator_scheduler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@

% Worker children get a default 5 second shutdown timeout, so pick a value just
% a bit less than that: 4.5 seconds. In couch_replicator_sup our scheduler
% worker doesn't specify the timeout, so it up picks ups the OTP default of 5
% worker doesn't specify the timeout, so it picks ups the OTP default of 5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/ups/up ?

% seconds https://www.erlang.org/doc/system/sup_princ.html#child-specification
%
-define(TERMINATE_SHUTDOWN_TIME, 4500).
Expand Down Expand Up @@ -173,9 +173,9 @@ job_proxy_url(_Endpoint) ->
null.

% Health threshold is the minimum amount of time an unhealthy job should run
% crashing before it is considered to be healthy again. HealtThreashold should
% crashing before it is considered to be healthy again. Health threshold should
% not be 0 as jobs could start and immediately crash, and it shouldn't be
% infinity, since then consecutive crashes would accumulate forever even if
% infinity, since then consecutive crashes would accumulate forever even if
% job is back to normal.
-spec health_threshold() -> non_neg_integer().
health_threshold() ->
Expand Down Expand Up @@ -522,7 +522,7 @@ pending_fold(Job, {Set, Now, Count, HealthThreshold}) ->

% Replace Job in the accumulator if it has a higher priority (lower priority
% value) than the lowest priority there. Job priority is indexed by
% {FairSharePiority, LastStarted} tuples. If the FairSharePriority is the same
% {FairSharePriority, LastStarted} tuples. If the FairSharePriority is the same
% then last started timestamp is used to pick. The goal is to keep up to Count
% oldest jobs during the iteration. For example, if there are jobs with these
% priorities accumulated so far [5, 7, 11], and the priority of current job is
Expand Down Expand Up @@ -594,14 +594,13 @@ not_recently_crashed(#job{history = History}, Now, HealthThreshold) ->
% and running successfully without crashing for a period of time. That period
% of time is the HealthThreshold.
%

-spec consecutive_crashes(history(), non_neg_integer()) -> non_neg_integer().
consecutive_crashes(History, HealthThreshold) when is_list(History) ->
consecutive_crashes(History, HealthThreshold, 0).

-spec consecutive_crashes(history(), non_neg_integer(), non_neg_integer()) ->
non_neg_integer().
consecutive_crashes([], _HealthThreashold, Count) ->
consecutive_crashes([], _HealthThreshold, Count) ->
Count;
consecutive_crashes(
[{{crashed, _}, CrashT}, {_, PrevT} = PrevEvent | Rest],
Expand Down Expand Up @@ -795,7 +794,7 @@ rotate_jobs(State, ChurnSoFar) ->
if
SlotsAvailable >= 0 ->
% If there is are enough SlotsAvailable reduce StopCount to avoid
% unnesessarily stopping jobs. `stop_jobs/3` ignores 0 or negative
% unnecessarily stopping jobs. `stop_jobs/3` ignores 0 or negative
% values so we don't worry about that here.
StopCount = lists:min([Pending - SlotsAvailable, Running, Churn]),
stop_jobs(StopCount, true, State),
Expand Down Expand Up @@ -930,7 +929,7 @@ optimize_int_option({Key, Val}, #rep{options = Options} = Rep) ->
% Updater is a separate process. It receives `update_stats` messages and
% updates scheduler stats from the scheduler jobs table. Updates are
% performed no more frequently than once per ?STATS_UPDATE_WAIT milliseconds.

%
update_running_jobs_stats(StatsPid) when is_pid(StatsPid) ->
StatsPid ! update_stats,
ok.
Expand Down
11 changes: 8 additions & 3 deletions src/couch_replicator/src/couch_replicator_scheduler_job.erl
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ stop(Pid) when is_pid(Pid) ->
% In the rare case the job is already stopping as we try to stop it, it
% won't return ok but exit the calling process, usually the scheduler, so
% we guard against that. See:
% www.erlang.org/doc/apps/stdlib/gen_server.html#stop/3
% https://www.erlang.org/doc/apps/stdlib/gen_server.html#stop/3
catch gen_server:stop(Pid, shutdown, ?STOP_TIMEOUT_MSEC),
exit(Pid, kill),
receive
Expand Down Expand Up @@ -189,7 +189,7 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx = UserCtx} = Rep) -
% Restarting a temporary supervised child implies that the original arguments
% (#rep{} record) specified in the MFA component of the supervisor
% child spec will always be used whenever the child is restarted.
% This implies the same replication performance tunning parameters will
% This implies the same replication performance tuning parameters will
% always be used. The solution is to delete the child spec (see
% cancel_replication/1) and then start the replication again, but this is
% unfortunately not immune to race conditions.
Expand Down Expand Up @@ -685,7 +685,12 @@ init_state(Rep) ->
Stats = couch_replicator_stats:max_stats(ArgStats1, HistoryStats),

StartSeq1 = get_value(since_seq, Options, StartSeq0),
StartSeq = {0, StartSeq1},
StartSeq2 =
Copy link
Contributor

@nickva nickva Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't in general compare sequences. But we can compare with 0 if we're starting from scratch. We should use == 0.

Based on the discussion in #5867 we'd want something like this:

If there is a checkpoint (StartSeq0 =/= 0) use the checkpoint and ignore since_seq. Otherwise (no checkpoint) and user specified a since_seq, use the since_seq

In addition make sure the since_seq is folded into the replication ID (append to the list if it's there but if it isn't make sure to generate the same replication ID as before). This will rewind changes for a existing replications with since_seq back to 0. So it's something to warn the users about in the release notes.

case StartSeq0 > StartSeq1 of
true -> StartSeq0;
false -> StartSeq1
end,
StartSeq = {0, StartSeq2},

SourceSeq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ),

Expand Down
6 changes: 3 additions & 3 deletions src/couch_replicator/src/couch_replicator_share.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
% 3) Jobs which run longer accumulate more charges and get assigned a
% higher priority value and get to wait longer to run.
%
% In order to prevent job starvation, all job priorities are periodicaly
% In order to prevent job starvation, all job priorities are periodically
% decayed (decreased). This effectively moves all the jobs towards the front of
% the run queue. So, in effect, there are two competing processes: one
% uniformly moves all jobs to the front, and the other throws them back in
Expand Down Expand Up @@ -86,7 +86,7 @@
% priority 0, and would render this algorithm useless. The default value of
% 0.98 is picked such that if a job ran for one scheduler cycle, then didn't
% get to run for 7 hours, it would still have priority > 0. 7 hours was picked
% as it was close enought to 8 hours which is the default maximum error backoff
% as it was close enough to 8 hours which is the default maximum error backoff
% interval.
%
% Example calculation:
Expand Down Expand Up @@ -215,7 +215,7 @@ decay_priorities() ->
% is missing we assume it is 0
clear_zero(?PRIORITIES).

% This is the main part of the alrgorithm. In [1] it is described in the
% This is the main part of the algorithm. In [1] it is described in the
% "Priority Adjustment" section.
%
update_priority(#job{} = Job) ->
Expand Down
4 changes: 2 additions & 2 deletions src/couch_replicator/src/couch_replicator_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ queue_fetch_loop(#fetch_st{} = St) ->
ok = gen_server:call(Parent, {batch_doc, Doc}, infinity)
end,
lists:foreach(BatchFun, lists:sort(maps:to_list(Docs))),
% Invidually upload docs with attachments.
% Individually upload docs with attachments.
maps:map(FetchFun, maps:without(maps:keys(Docs), IdRevs1)),
{ok, Stats} = gen_server:call(Parent, flush, infinity),
ok = report_seq_done(Cp, ReportSeq, Stats),
Expand Down Expand Up @@ -384,7 +384,7 @@ attempt_revs_diff(#fetch_stats{} = St, NowSec) ->

% Update fail ratio. Use the basic exponential moving average formula to smooth
% over minor bumps in case we encounter a few % attachments and then get back
% to replicationg documents without attachments.
% to replicating documents without attachments.
%
update_fetch_stats(#fetch_stats{} = St, Successes, Attempts, Decay, NowSec) ->
#fetch_stats{ratio = Avg} = St,
Expand Down
4 changes: 2 additions & 2 deletions src/couch_replicator/src/json_stream_parse.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ events(Data, EventFun) when is_binary(Data) ->
events(DataFun, EventFun) ->
parse_one(DataFun, EventFun, <<>>).

% converts the JSON directly to the erlang represention of Json
% converts the JSON directly to the erlang representation of Json
to_ejson(DF) ->
{_DF2, EF, _Rest} = events(DF, fun(Ev) -> collect_events(Ev, []) end),
[[EJson]] = make_ejson(EF(get_results), [[]]),
Expand All @@ -63,7 +63,7 @@ to_ejson(DF) ->
%
% Return this function from inside an event function right after getting an
% object_start event. It then collects the remaining events for that object
% and converts it to the erlang represention of Json.
% and converts it to the erlang representation of Json.
%
% It then calls your ReturnControl function with the erlang object. Your
% return control function then should yield another event function.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ setup_prefixed_replicator_db_with_update_docs_true() ->
{Ctx, {RepDb, Source, Target}}.

teardown({Ctx, {RepDb, Source, Target}}) ->
meck:unload(),
ok = fabric:delete_db(RepDb, [?ADMIN_CTX]),
config:delete("replicator", "update_docs", _Persist = false),
couch_replicator_test_helper:test_teardown({Ctx, {Source, Target}}).
Expand Down Expand Up @@ -76,7 +75,7 @@ scheduler_docs_test_prefixed_db_test_() ->
replicator_bdu_test_main_db_test_() ->
{
setup,
fun setup_prefixed_replicator_db/0,
fun setup_main_replicator_db/0,
fun teardown/1,
with([
?TDEF(t_local_docs_can_be_written),
Expand Down
Loading