From a4d863a6b18221a43569212348ab54b8a44c4ec7 Mon Sep 17 00:00:00 2001 From: Jiahui Li Date: Sat, 31 Jan 2026 21:42:14 -0600 Subject: [PATCH 1/2] Fix typos --- .../src/couch_replicator_doc_processor.erl | 6 +++--- .../src/couch_replicator_doc_processor_worker.erl | 2 +- .../src/couch_replicator_docs.erl | 2 +- .../src/couch_replicator_filters.erl | 2 +- .../src/couch_replicator_parse.erl | 4 ++-- .../src/couch_replicator_rate_limiter.erl | 2 +- .../src/couch_replicator_rate_limiter_tables.erl | 2 +- .../src/couch_replicator_scheduler.erl | 15 +++++++-------- .../src/couch_replicator_scheduler_job.erl | 4 ++-- .../src/couch_replicator_share.erl | 6 +++--- .../src/couch_replicator_worker.erl | 4 ++-- src/couch_replicator/src/json_stream_parse.erl | 4 ++-- .../couch_replicator_scheduler_docs_tests.erl | 3 +-- 13 files changed, 27 insertions(+), 29 deletions(-) diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl index f3a0bfcfd7c..646d5f173f2 100644 --- a/src/couch_replicator/src/couch_replicator_doc_processor.erl +++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl @@ -167,7 +167,7 @@ process_updated({DbName, _DocId} = Id, JsonRepDoc, Owner = true) -> % Parsing replication doc (but not calculating the id) could throw an % exception which would indicate this document is malformed. This exception % should propagate to db_change function and will be recorded as permanent - % failure in the document. User will have to update the documet to fix the + % failure in the document. User will have to update the document to fix the % problem. Rep0 = couch_replicator_parse:parse_rep_doc_without_id(JsonRepDoc), Rep = Rep0#rep{db_name = DbName, start_time = os:timestamp()}, @@ -675,8 +675,8 @@ start_scanner(#st{mdb_changes_pid = undefined} = St) -> start_delay_msec() -> DefaultSec = ?DEFAULT_START_DELAY_MSEC div 1000, - % We're using a compatiblity config setting (cluster_start_period) to avoid - % introducting a new config value. + % We're using a compatibility config setting (cluster_start_period) to avoid + % introducing a new config value. MSec = 1000 * config:get_integer("replicator", "cluster_start_period", DefaultSec), max(MSec, ?MIN_START_DELAY_MSEC). diff --git a/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl b/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl index 5a62b3e06dd..8bb22ac45ef 100644 --- a/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl +++ b/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl @@ -230,7 +230,7 @@ t_ignore_if_doc_deleted(_) -> ?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())), ?assertNot(added_job()). -% Should not add job if by the time worker got to fetchign the filter +% Should not add job if by the time worker got to fetch the filter % and building a replication id, another worker was spawned. t_ignore_if_worker_ref_does_not_match(_) -> Id = {?DB, ?DOC1}, diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl index d28ae908cde..8f134aaab8d 100644 --- a/src/couch_replicator/src/couch_replicator_docs.erl +++ b/src/couch_replicator/src/couch_replicator_docs.erl @@ -137,7 +137,7 @@ delete_old_rep_ddoc(RepDb, DDocId) -> ok. % Update a #rep{} record with a replication_id. Calculating the id might involve -% fetching a filter from the source db, and so it could fail intermetently. +% fetching a filter from the source db, and so it could fail intermittently. % In case of a failure to fetch the filter this function will throw a % `{filter_fetch_error, Reason} exception. update_rep_id(Rep) -> diff --git a/src/couch_replicator/src/couch_replicator_filters.erl b/src/couch_replicator/src/couch_replicator_filters.erl index aab8e80b3ed..c6f51d72479 100644 --- a/src/couch_replicator/src/couch_replicator_filters.erl +++ b/src/couch_replicator/src/couch_replicator_filters.erl @@ -60,7 +60,7 @@ parse(Options) -> % Fetches body of filter function from source database. Guaranteed to either % return {ok, Body} or an {error, Reason}. Also assume this function might -% block due to network / socket issues for an undeterminted amount of time. +% block due to network / socket issues for an undetermined amount of time. -spec fetch(binary(), binary(), binary()) -> {ok, {[_]}} | {error, binary()}. fetch(DDocName, FilterName, Source) -> diff --git a/src/couch_replicator/src/couch_replicator_parse.erl b/src/couch_replicator/src/couch_replicator_parse.erl index 5f8437992ba..f2d40acbeab 100644 --- a/src/couch_replicator/src/couch_replicator_parse.erl +++ b/src/couch_replicator/src/couch_replicator_parse.erl @@ -63,7 +63,7 @@ default_options() -> % database. If failure or parsing of filter docs fails, parse_doc throws a % {filter_fetch_error, Error} exception. This exception should be considered % transient in respect to the contents of the document itself, since it depends -% on netowrk availability of the source db and other factors. +% on network availability of the source db and other factors. -spec parse_rep_doc({[_]}) -> #rep{}. parse_rep_doc(RepDoc) -> {ok, Rep} = @@ -167,7 +167,7 @@ parse_proxy_settings(Props) when is_list(Props) -> end. % Update a #rep{} record with a replication_id. Calculating the id might involve -% fetching a filter from the source db, and so it could fail intermetently. +% fetching a filter from the source db, and so it could fail intermittently. % In case of a failure to fetch the filter this function will throw a % `{filter_fetch_error, Reason} exception. update_rep_id(Rep) -> diff --git a/src/couch_replicator/src/couch_replicator_rate_limiter.erl b/src/couch_replicator/src/couch_replicator_rate_limiter.erl index ccffd901144..d76052ec7a6 100644 --- a/src/couch_replicator/src/couch_replicator_rate_limiter.erl +++ b/src/couch_replicator/src/couch_replicator_rate_limiter.erl @@ -22,7 +22,7 @@ % % The algorithm referenced above estimates a rate, whereas the implemented % algorithm uses an interval (in milliseconds). It preserves the original -% semantics, that is the failure part is multplicative and the success part is +% semantics, that is the failure part is multiplicative and the success part is % additive. The relationship between rate and interval is: rate = 1000 / % interval. % diff --git a/src/couch_replicator/src/couch_replicator_rate_limiter_tables.erl b/src/couch_replicator/src/couch_replicator_rate_limiter_tables.erl index 2e255688812..1b79892906e 100644 --- a/src/couch_replicator/src/couch_replicator_rate_limiter_tables.erl +++ b/src/couch_replicator/src/couch_replicator_rate_limiter_tables.erl @@ -20,7 +20,7 @@ % in effect (also configurable). % % This module is also in charge of calculating ownership of replications based -% on where their _repicator db documents shards live. +% on where their _replicator db documents shards live. -module(couch_replicator_rate_limiter_tables). diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl index 14cca4a21a7..51d020d441d 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler.erl @@ -73,7 +73,7 @@ % Worker children get a default 5 second shutdown timeout, so pick a value just % a bit less than that: 4.5 seconds. In couch_replicator_sup our scheduler -% worker doesn't specify the timeout, so it up picks ups the OTP default of 5 +% worker doesn't specify the timeout, so it picks ups the OTP default of 5 % seconds https://www.erlang.org/doc/system/sup_princ.html#child-specification % -define(TERMINATE_SHUTDOWN_TIME, 4500). @@ -173,9 +173,9 @@ job_proxy_url(_Endpoint) -> null. % Health threshold is the minimum amount of time an unhealthy job should run -% crashing before it is considered to be healthy again. HealtThreashold should +% crashing before it is considered to be healthy again. Health threshold should % not be 0 as jobs could start and immediately crash, and it shouldn't be -% infinity, since then consecutive crashes would accumulate forever even if +% infinity, since then consecutive crashes would accumulate forever even if % job is back to normal. -spec health_threshold() -> non_neg_integer(). health_threshold() -> @@ -522,7 +522,7 @@ pending_fold(Job, {Set, Now, Count, HealthThreshold}) -> % Replace Job in the accumulator if it has a higher priority (lower priority % value) than the lowest priority there. Job priority is indexed by -% {FairSharePiority, LastStarted} tuples. If the FairSharePriority is the same +% {FairSharePriority, LastStarted} tuples. If the FairSharePriority is the same % then last started timestamp is used to pick. The goal is to keep up to Count % oldest jobs during the iteration. For example, if there are jobs with these % priorities accumulated so far [5, 7, 11], and the priority of current job is @@ -594,14 +594,13 @@ not_recently_crashed(#job{history = History}, Now, HealthThreshold) -> % and running successfully without crashing for a period of time. That period % of time is the HealthThreshold. % - -spec consecutive_crashes(history(), non_neg_integer()) -> non_neg_integer(). consecutive_crashes(History, HealthThreshold) when is_list(History) -> consecutive_crashes(History, HealthThreshold, 0). -spec consecutive_crashes(history(), non_neg_integer(), non_neg_integer()) -> non_neg_integer(). -consecutive_crashes([], _HealthThreashold, Count) -> +consecutive_crashes([], _HealthThreshold, Count) -> Count; consecutive_crashes( [{{crashed, _}, CrashT}, {_, PrevT} = PrevEvent | Rest], @@ -795,7 +794,7 @@ rotate_jobs(State, ChurnSoFar) -> if SlotsAvailable >= 0 -> % If there is are enough SlotsAvailable reduce StopCount to avoid - % unnesessarily stopping jobs. `stop_jobs/3` ignores 0 or negative + % unnecessarily stopping jobs. `stop_jobs/3` ignores 0 or negative % values so we don't worry about that here. StopCount = lists:min([Pending - SlotsAvailable, Running, Churn]), stop_jobs(StopCount, true, State), @@ -930,7 +929,7 @@ optimize_int_option({Key, Val}, #rep{options = Options} = Rep) -> % Updater is a separate process. It receives `update_stats` messages and % updates scheduler stats from the scheduler jobs table. Updates are % performed no more frequently than once per ?STATS_UPDATE_WAIT milliseconds. - +% update_running_jobs_stats(StatsPid) when is_pid(StatsPid) -> StatsPid ! update_stats, ok. diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index f82e3626930..c921543c311 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -110,7 +110,7 @@ stop(Pid) when is_pid(Pid) -> % In the rare case the job is already stopping as we try to stop it, it % won't return ok but exit the calling process, usually the scheduler, so % we guard against that. See: - % www.erlang.org/doc/apps/stdlib/gen_server.html#stop/3 + % https://www.erlang.org/doc/apps/stdlib/gen_server.html#stop/3 catch gen_server:stop(Pid, shutdown, ?STOP_TIMEOUT_MSEC), exit(Pid, kill), receive @@ -189,7 +189,7 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx = UserCtx} = Rep) - % Restarting a temporary supervised child implies that the original arguments % (#rep{} record) specified in the MFA component of the supervisor % child spec will always be used whenever the child is restarted. - % This implies the same replication performance tunning parameters will + % This implies the same replication performance tuning parameters will % always be used. The solution is to delete the child spec (see % cancel_replication/1) and then start the replication again, but this is % unfortunately not immune to race conditions. diff --git a/src/couch_replicator/src/couch_replicator_share.erl b/src/couch_replicator/src/couch_replicator_share.erl index fb7b2fcfc56..bf9f2367b5a 100644 --- a/src/couch_replicator/src/couch_replicator_share.erl +++ b/src/couch_replicator/src/couch_replicator_share.erl @@ -39,7 +39,7 @@ % 3) Jobs which run longer accumulate more charges and get assigned a % higher priority value and get to wait longer to run. % -% In order to prevent job starvation, all job priorities are periodicaly +% In order to prevent job starvation, all job priorities are periodically % decayed (decreased). This effectively moves all the jobs towards the front of % the run queue. So, in effect, there are two competing processes: one % uniformly moves all jobs to the front, and the other throws them back in @@ -86,7 +86,7 @@ % priority 0, and would render this algorithm useless. The default value of % 0.98 is picked such that if a job ran for one scheduler cycle, then didn't % get to run for 7 hours, it would still have priority > 0. 7 hours was picked -% as it was close enought to 8 hours which is the default maximum error backoff +% as it was close enough to 8 hours which is the default maximum error backoff % interval. % % Example calculation: @@ -215,7 +215,7 @@ decay_priorities() -> % is missing we assume it is 0 clear_zero(?PRIORITIES). -% This is the main part of the alrgorithm. In [1] it is described in the +% This is the main part of the algorithm. In [1] it is described in the % "Priority Adjustment" section. % update_priority(#job{} = Job) -> diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl index 3a6edc0b85e..528ff0f3f2c 100644 --- a/src/couch_replicator/src/couch_replicator_worker.erl +++ b/src/couch_replicator/src/couch_replicator_worker.erl @@ -317,7 +317,7 @@ queue_fetch_loop(#fetch_st{} = St) -> ok = gen_server:call(Parent, {batch_doc, Doc}, infinity) end, lists:foreach(BatchFun, lists:sort(maps:to_list(Docs))), - % Invidually upload docs with attachments. + % Individually upload docs with attachments. maps:map(FetchFun, maps:without(maps:keys(Docs), IdRevs1)), {ok, Stats} = gen_server:call(Parent, flush, infinity), ok = report_seq_done(Cp, ReportSeq, Stats), @@ -384,7 +384,7 @@ attempt_revs_diff(#fetch_stats{} = St, NowSec) -> % Update fail ratio. Use the basic exponential moving average formula to smooth % over minor bumps in case we encounter a few % attachments and then get back -% to replicationg documents without attachments. +% to replicating documents without attachments. % update_fetch_stats(#fetch_stats{} = St, Successes, Attempts, Decay, NowSec) -> #fetch_stats{ratio = Avg} = St, diff --git a/src/couch_replicator/src/json_stream_parse.erl b/src/couch_replicator/src/json_stream_parse.erl index 8b0c34fa222..09eff8196e9 100644 --- a/src/couch_replicator/src/json_stream_parse.erl +++ b/src/couch_replicator/src/json_stream_parse.erl @@ -53,7 +53,7 @@ events(Data, EventFun) when is_binary(Data) -> events(DataFun, EventFun) -> parse_one(DataFun, EventFun, <<>>). -% converts the JSON directly to the erlang represention of Json +% converts the JSON directly to the erlang representation of Json to_ejson(DF) -> {_DF2, EF, _Rest} = events(DF, fun(Ev) -> collect_events(Ev, []) end), [[EJson]] = make_ejson(EF(get_results), [[]]), @@ -63,7 +63,7 @@ to_ejson(DF) -> % % Return this function from inside an event function right after getting an % object_start event. It then collects the remaining events for that object -% and converts it to the erlang represention of Json. +% and converts it to the erlang representation of Json. % % It then calls your ReturnControl function with the erlang object. Your % return control function then should yield another event function. diff --git a/src/couch_replicator/test/eunit/couch_replicator_scheduler_docs_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_scheduler_docs_tests.erl index 7cf79da771d..b3bd7b543e2 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_scheduler_docs_tests.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_scheduler_docs_tests.erl @@ -48,7 +48,6 @@ setup_prefixed_replicator_db_with_update_docs_true() -> {Ctx, {RepDb, Source, Target}}. teardown({Ctx, {RepDb, Source, Target}}) -> - meck:unload(), ok = fabric:delete_db(RepDb, [?ADMIN_CTX]), config:delete("replicator", "update_docs", _Persist = false), couch_replicator_test_helper:test_teardown({Ctx, {Source, Target}}). @@ -76,7 +75,7 @@ scheduler_docs_test_prefixed_db_test_() -> replicator_bdu_test_main_db_test_() -> { setup, - fun setup_prefixed_replicator_db/0, + fun setup_main_replicator_db/0, fun teardown/1, with([ ?TDEF(t_local_docs_can_be_written), From 5ad169b639d729126258afb78251c3784e66e536 Mon Sep 17 00:00:00 2001 From: Jiahui Li Date: Thu, 5 Feb 2026 21:52:48 -0600 Subject: [PATCH 2/2] Improve replication `since_seq` parameter --- .../src/couch_replicator_scheduler_job.erl | 7 +- .../couch_replicator_scheduler_job_tests.erl | 153 ++++++++++++++++++ 2 files changed, 159 insertions(+), 1 deletion(-) create mode 100644 src/couch_replicator/test/eunit/couch_replicator_scheduler_job_tests.erl diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index c921543c311..18523137f85 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -685,7 +685,12 @@ init_state(Rep) -> Stats = couch_replicator_stats:max_stats(ArgStats1, HistoryStats), StartSeq1 = get_value(since_seq, Options, StartSeq0), - StartSeq = {0, StartSeq1}, + StartSeq2 = + case StartSeq0 > StartSeq1 of + true -> StartSeq0; + false -> StartSeq1 + end, + StartSeq = {0, StartSeq2}, SourceSeq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ), diff --git a/src/couch_replicator/test/eunit/couch_replicator_scheduler_job_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_scheduler_job_tests.erl new file mode 100644 index 00000000000..6b79bf94473 --- /dev/null +++ b/src/couch_replicator/test/eunit/couch_replicator_scheduler_job_tests.erl @@ -0,0 +1,153 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_scheduler_job_tests). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-define(CHANGES_READER, couch_replicator_changes_reader). +-define(DOC1, #{<<"_id">> => <<"doc1">>}). +-define(DOC2, #{<<"_id">> => <<"doc2">>}). +-define(DOC3, #{<<"_id">> => <<"doc3">>}). +-define(DOC4, #{<<"_id">> => <<"doc4">>}). +-define(DOCS, #{<<"docs">> => [?DOC1, ?DOC2, ?DOC3]}). +-define(JSON, {"Content-Type", "application/json"}). + +setup_replicator_db(Prefix) -> + RepDb = + case Prefix of + <<>> -> <<"_replicator">>; + <<_/binary>> -> <> + end, + Opts = [{q, 1}, {n, 1}, ?ADMIN_CTX], + case fabric:create_db(RepDb, Opts) of + ok -> ok; + {error, file_exists} -> ok + end, + RepDb. + +setup_main_replicator_db() -> + {Ctx, {Source, Target}} = couch_replicator_test_helper:test_setup(), + RepDb = setup_replicator_db(<<>>), + meck:new(?CHANGES_READER, [passthrough]), + {Ctx, {RepDb, Source, Target}}. + +setup_prefixed_replicator_db() -> + {Ctx, {Source, Target}} = couch_replicator_test_helper:test_setup(), + RepDb = setup_replicator_db(?tempdb()), + meck:new(?CHANGES_READER, [passthrough]), + {Ctx, {RepDb, Source, Target}}. + +teardown({Ctx, {RepDb, Source, Target}}) -> + ok = fabric:delete_db(RepDb, [?ADMIN_CTX]), + config:delete("replicator", "update_docs", _Persist = false), + couch_replicator_test_helper:test_teardown({Ctx, {Source, Target}}). + +scheduler_job_replicate_test() -> + { + foreach, + fun setup_main_replicator_db/0, + fun teardown/1, + [ + ?TDEF_FE(t_replicate_without_since_seq), + ?TDEF_FE(t_replicate_with_since_seq) + ] + }. + +t_replicate_without_since_seq({_Ctx, {_RepDb, Source, Target}}) -> + ok = create_docs(Source), + replicate(Source, Target), + ?assertEqual(1, num_calls(read_changes, ['_', 0, '_', '_', '_'])), + + meck:reset(?CHANGES_READER), + ok = create_doc(Source), + replicate(Source, Target), + Changes = changes(Source), + Seq = sequence(?DOC3, Changes), + ?assertEqual(1, num_calls(read_changes, ['_', Seq, '_', '_', '_'])). + +t_replicate_with_since_seq({_Ctx, {_RepDb, Source, Target}}) -> + ok = create_docs(Source), + Changes = changes(Source), + SinceSeq = sequence(?DOC2, Changes), + replicate(Source, Target, SinceSeq), + ?assertEqual(1, num_calls(read_changes, ['_', SinceSeq, '_', '_', '_'])), + + meck:reset(?CHANGES_READER), + ok = create_doc(Source), + replicate(Source, Target), + Seq = sequence(?DOC3, Changes), + ?assertEqual(1, num_calls(read_changes, ['_', Seq, '_', '_', '_'])). + +%%%%%%%%%%%%%%%%%%%% Utility Functions %%%%%%%%%%%%%%%%%%%% +url(UrlPath) -> + binary_to_list(couch_replicator_test_helper:cluster_db_url(UrlPath)). + +create_docs(DbName) -> + case req(post, url(DbName) ++ "/_bulk_docs", ?DOCS) of + {201, _} -> ok; + Error -> error({failed_to_create_docs, DbName, Error}) + end. + +create_doc(DbName) -> + case req(post, url(DbName), ?DOC4) of + {201, _} -> ok; + Error -> error({failed_to_create_doc, DbName, Error}) + end. + +changes(DbName) -> + {200, Res} = req(get, url(DbName) ++ "/_changes"), + ?assert(maps:is_key(<<"last_seq">>, Res)), + ?assert(maps:is_key(<<"pending">>, Res)), + ?assert(maps:is_key(<<"results">>, Res)), + Res. + +sequence(Doc, Changes) -> + #{<<"_id">> := DocId} = Doc, + #{<<"results">> := Results} = Changes, + case lists:search(fun(M) -> maps:get(<<"id">>, M) == DocId end, Results) of + {value, #{<<"seq">> := Seq}} -> Seq; + false -> not_found + end. + +replicate(RepObject) -> + couch_replicator_test_helper:replicate(RepObject). + +replicate(Source, Target) -> + replicate(#{ + <<"source">> => ?l2b(url(Source)), + <<"target">> => ?l2b(url(Target)) + }). + +replicate(Source, Target, SinceSeq) -> + ?debugVal(SinceSeq), + replicate(#{ + <<"source">> => ?l2b(url(Source)), + <<"target">> => ?l2b(url(Target)), + <<"since_seq">> => SinceSeq + }). + +req(Method, Url) -> + Headers = [?JSON], + {ok, Code, _, Res} = test_request:request(Method, Url, Headers), + {Code, jiffy:decode(Res, [return_maps])}. + +req(Method, Url, #{} = Body) -> + req(Method, Url, jiffy:encode(Body)); +req(Method, Url, Body) -> + Headers = [?JSON], + {ok, Code, _, Res} = test_request:request(Method, Url, Headers, Body), + {Code, jiffy:decode(Res, [return_maps])}. + +num_calls(Fun, Args) -> + meck:num_calls(?CHANGES_READER, Fun, Args).