@@ -64,11 +64,10 @@ bool BackupCoordinationStageSync::State::operator !=(const State & other) const
6464
6565void BackupCoordinationStageSync::State::merge (const State & other)
6666{
67- if (other.host_with_error && !host_with_error )
67+ if (other.host_with_error )
6868 {
6969 const String & host = *other.host_with_error ;
70- host_with_error = host;
71- hosts.at (host).exception = other.hosts .at (host).exception ;
70+ addErrorInfo (other.hosts .at (host).exception , host);
7271 }
7372
7473 for (const auto & [host, other_host_info] : other.hosts )
@@ -81,6 +80,16 @@ void BackupCoordinationStageSync::State::merge(const State & other)
8180}
8281
8382
83+ void BackupCoordinationStageSync::State::addErrorInfo (std::exception_ptr exception, const String & host)
84+ {
85+ if (!host_with_error && exception)
86+ {
87+ host_with_error = host;
88+ hosts.at (host).exception = exception;
89+ }
90+ }
91+
92+
8493BackupCoordinationStageSync::BackupCoordinationStageSync (
8594 bool is_restore_,
8695 const String & zookeeper_path_,
@@ -137,25 +146,13 @@ BackupCoordinationStageSync::BackupCoordinationStageSync(
137146
138147BackupCoordinationStageSync::~BackupCoordinationStageSync ()
139148{
140- // / Normally either finish() or setError() must be called.
141- if (!tried_to_finish)
142- {
143- if (state.host_with_error )
144- {
145- // / setError() was called and succeeded.
146- finish (/* throw_if_error = */ false );
147- }
148- else if (!tried_to_set_error)
149- {
150- // / Neither finish() nor setError() were called, it's a bug.
151- chassert (false , " ~BackupCoordinationStageSync() is called without finish() or setError()" );
152- LOG_ERROR (log, " ~BackupCoordinationStageSync() is called without finish() or setError()" );
153- }
154- }
149+ // / If everything is ok, then the finish() function should be called already and the watching thread should be already stopped too.
150+ // / However if an error happened then that might be different,
151+ // / so here in the destructor we need to ensure that we've tried to create the finish node and also we've stopped the watching thread.
152+
153+ if (!finished ())
154+ finish (/* throw_if_error = */ false );
155155
156- // / Normally the watching thread should be stopped already because the finish() function stops it.
157- // / However if an error happened then the watching thread can be still running,
158- // / so here in the destructor we have to ensure that it's stopped.
159156 stopWatchingThread ();
160157}
161158
@@ -283,7 +280,11 @@ void BackupCoordinationStageSync::createStartAndAliveNodesAndCheckConcurrency(Co
283280 {
284281 auto [exception, host] = parseErrorNode (serialized_error);
285282 if (exception)
283+ {
284+ std::lock_guard lock{mutex};
285+ state.addErrorInfo (exception, host);
286286 std::rethrow_exception (exception);
287+ }
287288 }
288289
289290 if (check_concurrency)
@@ -580,12 +581,8 @@ void BackupCoordinationStageSync::readCurrentState(Coordination::ZooKeeperWithFa
580581 {
581582 String serialized_error = zookeeper->get (error_node_path);
582583 auto [exception, host] = parseErrorNode (serialized_error);
583- auto * host_info = get_host_info (host);
584- if (exception && host_info)
585- {
586- host_info->exception = exception;
587- new_state.host_with_error = host;
588- }
584+ if (exception)
585+ new_state.addErrorInfo (exception, host);
589586 }
590587 }
591588 else if (zk_node.starts_with (" started|" ))
@@ -804,20 +801,26 @@ void BackupCoordinationStageSync::setStage(const String & stage, const String &
804801 {
805802 LOG_INFO (log, " Skipped creating the 'finish' node because the initiator uses outdated version {}" , getInitiatorVersion ());
806803 std::lock_guard lock{mutex};
807- tried_to_finish = true ;
808804 state.hosts .at (current_host).finished = true ;
809805 }
810806}
811807
812808
813809void BackupCoordinationStageSync::createStageNode (const String & stage, const String & stage_result, Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper)
814810{
811+ if (isErrorSet ())
812+ rethrowSetError ();
813+
815814 String serialized_error;
816815 if (zookeeper->tryGet (error_node_path, serialized_error))
817816 {
818817 auto [exception, host] = parseErrorNode (serialized_error);
819818 if (exception)
819+ {
820+ std::lock_guard lock{mutex};
821+ state.addErrorInfo (exception, host);
820822 std::rethrow_exception (exception);
823+ }
821824 }
822825
823826 auto code = zookeeper->tryCreate (getStageNodePath (stage), stage_result, zkutil::CreateMode::Persistent);
@@ -931,7 +934,7 @@ bool BackupCoordinationStageSync::finishImpl(bool throw_if_error, WithRetries::K
931934 return true ;
932935 }
933936
934- if (tried_to_finish)
937+ if (tried_to_finish[throw_if_error] )
935938 {
936939 // / We don't repeat creating the finish node, no matter if it was successful or not.
937940 LOG_INFO (log, " Skipped creating the finish node for {} because earlier we failed to do that" , current_host_desc);
@@ -958,6 +961,11 @@ bool BackupCoordinationStageSync::finishImpl(bool throw_if_error, WithRetries::K
958961 }
959962 }
960963
964+ SCOPE_EXIT ({
965+ std::lock_guard lock{mutex};
966+ tried_to_finish[throw_if_error] = true ;
967+ });
968+
961969 stopWatchingThread ();
962970
963971 try
@@ -968,28 +976,18 @@ bool BackupCoordinationStageSync::finishImpl(bool throw_if_error, WithRetries::K
968976 with_retries.renewZooKeeper (zookeeper);
969977 createFinishNodeAndRemoveAliveNode (zookeeper, throw_if_error);
970978 });
979+ return true ;
971980 }
972981 catch (...)
973982 {
974983 LOG_TRACE (log, " Caught exception while creating the 'finish' node for {}: {}" ,
975984 current_host_desc,
976985 getCurrentExceptionMessage (/* with_stacktrace= */ false , /* check_embedded_stacktrace= */ true ));
977986
978- std::lock_guard lock{mutex};
979- tried_to_finish = true ;
980-
981987 if (throw_if_error)
982988 throw ;
983989 return false ;
984990 }
985-
986- {
987- std::lock_guard lock{mutex};
988- tried_to_finish = true ;
989- state.hosts .at (current_host).finished = true ;
990- }
991-
992- return true ;
993991}
994992
995993
@@ -1000,6 +998,24 @@ void BackupCoordinationStageSync::createFinishNodeAndRemoveAliveNode(Coordinatio
1000998
1001999 for (size_t attempt_no = 1 ; attempt_no <= max_attempts_after_bad_version; ++attempt_no)
10021000 {
1001+ if (throw_if_error)
1002+ {
1003+ if (isErrorSet ())
1004+ rethrowSetError ();
1005+
1006+ String serialized_error;
1007+ if (zookeeper->tryGet (error_node_path, serialized_error))
1008+ {
1009+ auto [exception, host] = parseErrorNode (serialized_error);
1010+ if (exception)
1011+ {
1012+ std::lock_guard lock{mutex};
1013+ state.addErrorInfo (exception, host);
1014+ std::rethrow_exception (exception);
1015+ }
1016+ }
1017+ }
1018+
10031019 // / The 'num_hosts' node may not exist if createStartAndAliveNodes() failed in the constructor.
10041020 if (!num_hosts)
10051021 {
@@ -1012,16 +1028,12 @@ void BackupCoordinationStageSync::createFinishNodeAndRemoveAliveNode(Coordinatio
10121028 }
10131029 }
10141030
1015- String serialized_error;
1016- if (throw_if_error && zookeeper->tryGet (error_node_path, serialized_error))
1017- {
1018- auto [exception, host] = parseErrorNode (serialized_error);
1019- if (exception)
1020- std::rethrow_exception (exception);
1021- }
1022-
10231031 if (zookeeper->exists (finish_node_path))
1032+ {
1033+ std::lock_guard lock{mutex};
1034+ state.hosts .at (current_host).finished = true ;
10241035 return ;
1036+ }
10251037
10261038 bool start_node_exists = zookeeper->exists (start_node_path);
10271039
@@ -1057,6 +1069,8 @@ void BackupCoordinationStageSync::createFinishNodeAndRemoveAliveNode(Coordinatio
10571069 hosts_left_desc = (*num_hosts == 0 ) ? " , no hosts left" : fmt::format (" , {} hosts left" , *num_hosts);
10581070 }
10591071 LOG_INFO (log, " Created the 'finish' node in ZooKeeper for {}{}" , current_host_desc, hosts_left_desc);
1072+ std::lock_guard lock{mutex};
1073+ state.hosts .at (current_host).finished = true ;
10601074 return ;
10611075 }
10621076
@@ -1315,49 +1329,50 @@ bool BackupCoordinationStageSync::setError(std::exception_ptr exception, bool th
13151329
13161330bool BackupCoordinationStageSync::setError (const Exception & exception, bool throw_if_error)
13171331{
1318- try
1319- {
1320- // / Most likely this exception has been already logged so here we're logging it without stacktrace.
1321- String exception_message = getExceptionMessage (exception, /* with_stacktrace= */ false , /* check_embedded_stacktrace= */ true );
1322- LOG_INFO (log, " Sending exception from {} to other hosts: {}" , current_host_desc, exception_message);
1332+ // / Most likely this exception has been already logged so here we're logging it without stacktrace.
1333+ String exception_message = getExceptionMessage (exception, /* with_stacktrace= */ false , /* check_embedded_stacktrace= */ true );
1334+ LOG_INFO (log, " Sending exception from {} to other hosts: {}" , current_host_desc, exception_message);
13231335
1336+ {
1337+ std::lock_guard lock{mutex};
1338+ if (state.host_with_error )
13241339 {
1325- std::lock_guard lock{mutex};
1326- if (state.host_with_error )
1327- {
1328- LOG_INFO (log, " The error node already exists" );
1329- return true ;
1330- }
1340+ // / We create the error node always before assigning `state.host_with_error`,
1341+ // / thus if `state.host_with_error` is set then we can be sure that the error node exists.
1342+ LOG_INFO (log, " The error node already exists" );
1343+ return true ;
1344+ }
13311345
1332- if (tried_to_set_error)
1333- {
1334- LOG_INFO (log, " Skipped creating the error node because earlier we failed to do that" );
1335- return false ;
1336- }
1346+ if (tried_to_set_error)
1347+ {
1348+ LOG_INFO (log, " Skipped creating the error node because earlier we failed to do that" );
1349+ return false ;
13371350 }
1351+ }
1352+
1353+ SCOPE_EXIT ({
1354+ std::lock_guard lock{mutex};
1355+ tried_to_set_error = true ;
1356+ });
1357+
1358+ try
1359+ {
1360+
13381361
13391362 auto holder = with_retries.createRetriesControlHolder (" BackupCoordinationStageSync::setError" , WithRetries::kErrorHandling );
13401363 holder.retries_ctl .retryLoop ([&, &zookeeper = holder.faulty_zookeeper ]()
13411364 {
13421365 with_retries.renewZooKeeper (zookeeper);
13431366 createErrorNode (exception, zookeeper);
13441367 });
1345-
1346- {
1347- std::lock_guard lock{mutex};
1348- tried_to_set_error = true ;
1349- return true ;
1350- }
1368+ return true ;
13511369 }
13521370 catch (...)
13531371 {
1354- LOG_TRACE (log, " Caught exception while removing nodes from ZooKeeper for this {}: {}" ,
1372+ LOG_TRACE (log, " Caught exception while creating the error node for this {}: {}" ,
13551373 is_restore ? " restore" : " backup" ,
13561374 getCurrentExceptionMessage (/* with_stacktrace= */ false , /* check_embedded_stacktrace= */ true ));
13571375
1358- std::lock_guard lock{mutex};
1359- tried_to_set_error = true ;
1360-
13611376 if (throw_if_error)
13621377 throw ;
13631378 return false ;
@@ -1367,6 +1382,14 @@ bool BackupCoordinationStageSync::setError(const Exception & exception, bool thr
13671382
13681383void BackupCoordinationStageSync::createErrorNode (const Exception & exception, Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper)
13691384{
1385+ if (isErrorSet ())
1386+ {
1387+ // / We create the error node always before assigning `state.host_with_error`,
1388+ // / thus if `state.host_with_error` is set then we can be sure that the error node exists.
1389+ LOG_INFO (log, " The error node already exists" );
1390+ return ;
1391+ }
1392+
13701393 String serialized_error;
13711394 {
13721395 WriteBufferFromOwnString buf;
@@ -1375,16 +1398,15 @@ void BackupCoordinationStageSync::createErrorNode(const Exception & exception, C
13751398 serialized_error = buf.str ();
13761399 }
13771400
1401+ zookeeper->createIfNotExists (operation_zookeeper_path, " " );
1402+ zookeeper->createIfNotExists (zookeeper_path, " " );
1403+
13781404 auto code = zookeeper->tryCreate (error_node_path, serialized_error, zkutil::CreateMode::Persistent);
13791405
13801406 if (code == Coordination::Error::ZOK)
13811407 {
13821408 std::lock_guard lock{mutex};
1383- if (!state.host_with_error )
1384- {
1385- state.host_with_error = current_host;
1386- state.hosts .at (current_host).exception = parseErrorNode (serialized_error).first ;
1387- }
1409+ state.addErrorInfo (parseErrorNode (serialized_error).first , current_host);
13881410 LOG_TRACE (log, " Sent exception from {} to other hosts" , current_host_desc);
13891411 return ;
13901412 }
@@ -1396,11 +1418,7 @@ void BackupCoordinationStageSync::createErrorNode(const Exception & exception, C
13961418 if (another_exception)
13971419 {
13981420 std::lock_guard lock{mutex};
1399- if (!state.host_with_error )
1400- {
1401- state.host_with_error = host;
1402- state.hosts .at (host).exception = another_exception;
1403- }
1421+ state.addErrorInfo (another_exception, host);
14041422 LOG_INFO (log, " Another error is already assigned for this {}" , operation_name);
14051423 return ;
14061424 }
@@ -1428,4 +1446,11 @@ bool BackupCoordinationStageSync::isErrorSet() const
14281446 return state.host_with_error .has_value ();
14291447}
14301448
1449+ void BackupCoordinationStageSync::rethrowSetError () const
1450+ {
1451+ std::lock_guard lock{mutex};
1452+ chassert (state.host_with_error );
1453+ std::rethrow_exception (state.hosts .at (*state.host_with_error ).exception );
1454+ }
1455+
14311456}
0 commit comments