Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ section "com" {

/* broken_link_c/issue/14137269 */
ScoreReq.CompReq ProxyEventCopySemantics {
description = '''The {{ProxyEvent}} class shall neither be copy-constructable nor copy-assignable.'''
description = '''The {{ProxyEvent}} class shall neither be copy-constructable nor copy-assignable.'''
safety = ScoreReq.Asil.B
derived_from = [Communication.EventType@1, Communication.DataLoss@1]
version = 1
Expand Down Expand Up @@ -1590,7 +1590,7 @@ section "com" {

The signature of an SubscriptionStateChangeHandler is:

{{{void(SubscriptionState)}}}
{{{bool(SubscriptionState)}}}

The type chosen for {{SubscriptionStateChangeHandler}} by the {{mw::com}} provider shall allow construction from any callable, with the given signature.'''
safety = ScoreReq.Asil.B
Expand Down Expand Up @@ -2307,8 +2307,7 @@ section "com" {
/* broken_link_c/issue/21466369 */
ScoreReq.CompReq BehaviourOfSetSubscriptionStateChangeHandler {
description = '''The {{SubscriptionStateChangeHandler}} (See SubscriptionStateChangeHandler)
{{SetSubscriptionStateChangeHandler}}
that is passed to shall be called every time the {{SubscriptionState}} (See SubscriptionState) changes according to the requirements:
that is passed to {{SetSubscriptionStateChangeHandler}} shall be called every time the {{SubscriptionState}} (See SubscriptionState) changes according to the requirements:

* Calls to SubscriptionStateChangeHandler with kSubscriptionPending

Expand Down Expand Up @@ -2356,9 +2355,17 @@ section "com" {
version = 1
}

ScoreReq.CompReq SubscriptionStateChangeHandlerReturnValue {
description = '''In case the user provided handler returns true, the {{SubscriptionStateChangeHandler}} will be kept registered and will be called on the next {{SubscriptionState}} change again.
In case the user provided handler returns false, the {{SubscriptionStateChangeHandler}} will be unregistered and won't be called on the next {{SubscriptionState}} change again.'''
safety = ScoreReq.Asil.B
derived_from = [Communication.SupportForTimeBasedArchitecture@2]
version = 1
}

/* broken_link_c/issue/21466236 */
ScoreReq.CompReq BehaviourOfUnsetSubscriptionStateChangeHandler {
description = '''{{UnsetSubscriptionStateChangeHandler}} {{UnsetSubscriptionStateChangeHandler}}{{SubscriptionStateChangeHandler}}will unregister the {{SubscriptionStateChangeHandler}} (See SubscriptionStateChangeHandler) that was registered by the class instance with a call to {{SetSubscriptionStateChangeHandler}} After calling the will no longer be triggered when the {{SubscriptionState}} (See progress-cursor SubscriptionState changes.
description = '''{{UnsetSubscriptionStateChangeHandler}} will unregister the {{SubscriptionStateChangeHandler}} (See SubscriptionStateChangeHandler) that was registered by the class instance with a call to {{SetSubscriptionStateChangeHandler}} After calling the will no longer be triggered when the {{SubscriptionState}} (See progress-cursor SubscriptionState changes.

{{UnsetSubscriptionStateChangeHandler}} shall fulfill the following requirement: Explicit Lifetime ending by API calls.'''
safety = ScoreReq.Asil.B
Expand Down
30 changes: 30 additions & 0 deletions score/mw/com/design/events_fields/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,36 @@ The structural model of the state machine design is as follows:

<img alt="PROXY_EVENT_STATE_MACHINE_MODEL" src="https://www.plantuml.com/plantuml/proxy?src=https://raw.githubusercontent.com/eclipse-score/communication/refs/heads/main/score/mw/com/design/events_fields/proxy_event_state_machine_model.puml">

#### User access to the subscription state

The user can access the subscription state of a `ProxyEvent` or `ProxyField` instance via method `GetSubscriptionState()`.
This method dispatches to the binding (`ProxyEventBindingBase::GetSubscriptionState()`), which in the case of the `LoLa`
binding dispatches to `lola::SubscriptionStateMachine` (method `GetCurrentState()`) shown above.

Additionally a user can register a callback, which gets called as soon as the subscription state changes. This is done
via method `ProxyEvent::SetSubscriptionStateChangeHandler`. This method dispatches to the binding
(`ProxyEventBindingBase::SetSubscriptionStateChangeHandler()`), which in the case of the `LoLa` binding dispatches to
`lola::SubscriptionStateMachine` (method `SetSubscriptionStateChangeHandler()`) shown above. The
`lola::SubscriptionStateMachine` stores the user provided callback and calls it as soon as the subscription state changes.

The call to the user-provided handler happens under the same lock, which protects the state change and the state change
notification. Thus, there are two important implications:
1. The user provided handler gets called synchronously within the state change call. This means, that the state change
call only returns after the user provided handler has returned.
2. Since the user provided handler gets called under the lock, it must not do any call, which could lead to a deadlock. E.g.
it must not call `ProxyEvent::Unsubscribe` or `ProxyEvent::Subscribe` again, as these calls also acquire the same
lock.

The implications/recommendations are given in the public API description of the `SubscriptionStateChangeHandler`.
We explicitly allow, that the user unsets/unregisters the `SubscriptionStateChangeHandler` from within the handler itself.
To avoid any potential deadlock, we do **not** allow to unset the `SubscriptionStateChangeHandler` via the
`UnsetSubscriptionStateChangeHandler()`! Instead the `SubscriptionStateChangeHandler` signature is designed to control
the unsetting via its return value. I.e. if the handler returns `false`, it will be automatically unset after it has been
called. This avoids introduction of recursive mutexes/locking on the existing mutex of the state machine, which would
add a lot of complexity and potential for deadlocks. The solution via the return code is very lightweight: After the state
machine called the handler, and before it releases the lock, it checks the return value. If it is `false`, it just resets
&ndash; still under state-machine mutex lock &ndash; the internal `std::optional`, which holds the handler.

### Event Update Notification

Event Notification is a good showcase for the "smart" behavior of `lola::MessagePassingFacade` as already mentioned (see
Expand Down
18 changes: 17 additions & 1 deletion score/mw/com/impl/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ cc_library(
tags = ["FFI"],
visibility = [
"//score/mw/com:__pkg__",
"//score/mw/com/impl/mocking:__pkg__",
"//score/mw/com/impl:__subpackages__",
],
)

Expand Down Expand Up @@ -584,6 +584,7 @@ cc_library(
":sample_reference_tracker",
":scoped_event_receive_handler",
":subscription_state",
":subscription_state_change_handler",
"//score/mw/com/impl/configuration",
"//score/mw/com/impl/plumbing:sample_ptr",
"//score/mw/com/impl/tracing:i_tracing_runtime",
Expand Down Expand Up @@ -833,6 +834,21 @@ cc_library(
deps = ["@score_baselibs//score/language/futurecpp"],
)

cc_library(
name = "subscription_state_change_handler",
srcs = ["subscription_state_change_handler.cpp"],
hdrs = ["subscription_state_change_handler.h"],
features = COMPILER_WARNING_FEATURES,
tags = ["FFI"],
visibility = [
"//score/mw/com:__subpackages__",
],
deps = [
":subscription_state",
"@score_baselibs//score/language/futurecpp",
],
)

cc_library(
name = "i_service_discovery",
srcs = ["i_service_discovery.cpp"],
Expand Down
2 changes: 2 additions & 0 deletions score/mw/com/impl/bindings/lola/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,8 @@ cc_library(
":transaction_log_set",
"//score/mw/com/impl:runtime",
"//score/mw/com/impl:scoped_event_receive_handler",
"//score/mw/com/impl:subscription_state",
"//score/mw/com/impl:subscription_state_change_handler",
"//score/mw/com/impl/bindings/lola/messaging:i_message_passing_service",
],
)
Expand Down
10 changes: 10 additions & 0 deletions score/mw/com/impl/bindings/lola/generic_proxy_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ Result<void> GenericProxyEvent::UnsetReceiveHandler() noexcept
return proxy_event_common_.UnsetReceiveHandler();
}

Result<void> GenericProxyEvent::SetSubscriptionStateChangeHandler(SubscriptionStateChangeHandler handler) noexcept
{
return proxy_event_common_.SetSubscriptionStateChangeHandler(std::move(handler));
}

Result<void> GenericProxyEvent::UnsetSubscriptionStateChangeHandler() noexcept
{
return proxy_event_common_.UnsetSubscriptionStateChangeHandler();
}

pid_t GenericProxyEvent::GetEventSourcePid() const noexcept
{
return proxy_event_common_.GetEventSourcePid();
Expand Down
3 changes: 3 additions & 0 deletions score/mw/com/impl/bindings/lola/generic_proxy_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ class GenericProxyEvent final : public GenericProxyEventBinding

Result<void> SetReceiveHandler(std::weak_ptr<ScopedEventReceiveHandler> handler) noexcept override;
Result<void> UnsetReceiveHandler() noexcept override;
Result<void> SetSubscriptionStateChangeHandler(SubscriptionStateChangeHandler handler) noexcept override;
Result<void> UnsetSubscriptionStateChangeHandler() noexcept override;

pid_t GetEventSourcePid() const noexcept;
ElementFqId GetElementFQId() const noexcept;
std::optional<std::uint16_t> GetMaxSampleCount() const noexcept override;
Expand Down
8 changes: 8 additions & 0 deletions score/mw/com/impl/bindings/lola/proxy_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ class ProxyEvent final : public ProxyEventBinding<SampleType>
{
return proxy_event_common_.UnsetReceiveHandler();
}
Result<void> SetSubscriptionStateChangeHandler(SubscriptionStateChangeHandler handler) noexcept override
{
return proxy_event_common_.SetSubscriptionStateChangeHandler(std::move(handler));
}
Result<void> UnsetSubscriptionStateChangeHandler() noexcept override
{
return proxy_event_common_.UnsetSubscriptionStateChangeHandler();
}
std::optional<std::uint16_t> GetMaxSampleCount() const noexcept override
{
return proxy_event_common_.GetMaxSampleCount();
Expand Down
27 changes: 13 additions & 14 deletions score/mw/com/impl/bindings/lola/proxy_event_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,7 @@ void ProxyEventCommon::Unsubscribe()
SubscriptionState ProxyEventCommon::GetSubscriptionState() const noexcept
{
const auto current_state = subscription_event_state_machine_.GetCurrentState();
if (current_state == SubscriptionStateMachineState::NOT_SUBSCRIBED_STATE)
{
return SubscriptionState::kNotSubscribed;
}
else if (current_state == SubscriptionStateMachineState::SUBSCRIPTION_PENDING_STATE)
{
return SubscriptionState::kSubscriptionPending;
}
else
{
SCORE_LANGUAGE_FUTURECPP_ASSERT_PRD_MESSAGE(current_state == SubscriptionStateMachineState::SUBSCRIBED_STATE,
"Invalid subscription state machine state.");
return SubscriptionState::kSubscribed;
}
return SubscriptionStateMachineStateToSubscriptionState(current_state);
}

Result<std::size_t> ProxyEventCommon::GetNumNewSamplesAvailable() const noexcept
Expand Down Expand Up @@ -114,6 +101,18 @@ Result<void> ProxyEventCommon::UnsetReceiveHandler()
return {};
}

Result<void> ProxyEventCommon::SetSubscriptionStateChangeHandler(SubscriptionStateChangeHandler handler) noexcept
{
subscription_event_state_machine_.SetSubscriptionStateChangeHandler(std::move(handler));
return {};
}

Result<void> ProxyEventCommon::UnsetSubscriptionStateChangeHandler() noexcept
{
subscription_event_state_machine_.UnsetSubscriptionStateChangeHandler();
return {};
}

pid_t ProxyEventCommon::GetEventSourcePid() const noexcept
{
return parent_.GetSourcePid();
Expand Down
3 changes: 3 additions & 0 deletions score/mw/com/impl/bindings/lola/proxy_event_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ class ProxyEventCommon final
Result<void> SetReceiveHandler(std::weak_ptr<ScopedEventReceiveHandler> handler);
Result<void> UnsetReceiveHandler();

Result<void> SetSubscriptionStateChangeHandler(SubscriptionStateChangeHandler handler) noexcept;
Result<void> UnsetSubscriptionStateChangeHandler() noexcept;

pid_t GetEventSourcePid() const noexcept;
ElementFqId GetElementFQId() const noexcept
{
Expand Down
15 changes: 15 additions & 0 deletions score/mw/com/impl/bindings/lola/subscription_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,21 @@ void EventReceiveHandlerManager::Unregister() noexcept
}
}

SubscriptionState SubscriptionStateMachineStateToSubscriptionState(SubscriptionStateMachineState state) noexcept
{
switch (state)
{
case SubscriptionStateMachineState::NOT_SUBSCRIBED_STATE:
return SubscriptionState::kNotSubscribed;
case SubscriptionStateMachineState::SUBSCRIBED_STATE:
return SubscriptionState::kSubscribed;
case SubscriptionStateMachineState::SUBSCRIPTION_PENDING_STATE:
return SubscriptionState::kSubscriptionPending;
default:
SCORE_LANGUAGE_FUTURECPP_ASSERT_PRD_MESSAGE(false, "Invalid subscription state");
}
}

std::string CreateLoggingString(std::string&& string,
const ElementFqId& element_fq_id,
const SubscriptionStateMachineState current_state)
Expand Down
3 changes: 3 additions & 0 deletions score/mw/com/impl/bindings/lola/subscription_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "score/mw/com/impl/bindings/lola/slot_collector.h"
#include "score/mw/com/impl/bindings/lola/subscription_state_machine_states.h"
#include "score/mw/com/impl/scoped_event_receive_handler.h"
#include "score/mw/com/impl/subscription_state.h"

#include <score/callback.hpp>
#include <score/optional.hpp>
Expand Down Expand Up @@ -87,6 +88,8 @@ std::string CreateLoggingString(std::string&& string,
const ElementFqId& element_fq_id,
const SubscriptionStateMachineState current_state);

SubscriptionState SubscriptionStateMachineStateToSubscriptionState(SubscriptionStateMachineState state) noexcept;

} // namespace score::mw::com::impl::lola

#endif // SCORE_MW_COM_IMPL_BINDINGS_LOLA_SUBSCRIPTION_HELPERS_H
23 changes: 23 additions & 0 deletions score/mw/com/impl/bindings/lola/subscription_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,18 @@ void SubscriptionStateMachine::UnsetReceiveHandler() noexcept
GetCurrentEventState().UnsetReceiveHandler();
}

void SubscriptionStateMachine::SetSubscriptionStateChangeHandler(SubscriptionStateChangeHandler handler) noexcept
{
std::lock_guard<std::mutex> lock{state_mutex_};
subscription_state_change_handler_ = std::move(handler);
}

void SubscriptionStateMachine::UnsetSubscriptionStateChangeHandler() noexcept
{
std::lock_guard<std::mutex> lock{state_mutex_};
subscription_state_change_handler_.reset();
}

std::optional<std::uint16_t> SubscriptionStateMachine::GetMaxSampleCount() const noexcept
{
std::lock_guard<std::mutex> lock{state_mutex_};
Expand All @@ -136,6 +148,17 @@ void SubscriptionStateMachine::TransitionToState(const SubscriptionStateMachineS
GetCurrentEventState().OnExit();
current_state_idx_ = newState;
GetCurrentEventState().OnEntry();
if (subscription_state_change_handler_.has_value())
{
// We call the user-provided handler under state_mutex_ lock, which has always been acquired within this method.
// This is documented in the AoUs of SubscriptionStateChangeHandler!
const auto keep_handler =
subscription_state_change_handler_.value()(SubscriptionStateMachineStateToSubscriptionState(newState));
if (!keep_handler)
{
subscription_state_change_handler_.reset();
}
}
}

} // namespace score::mw::com::impl::lola
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "score/mw/com/impl/bindings/lola/transaction_log_registration_guard.h"
#include "score/mw/com/impl/bindings/lola/transaction_log_set.h"
#include "score/mw/com/impl/scoped_event_receive_handler.h"
#include "score/mw/com/impl/subscription_state_change_handler.h"

#include "score/result/result.h"

Expand Down Expand Up @@ -99,10 +100,12 @@ class SubscriptionStateMachine : public std::enable_shared_from_this<Subscriptio
void StopOfferEvent() noexcept;
void ReOfferEvent(const pid_t new_event_source_pid) noexcept;

// State Machine Methods. These are not modelled by the state machine UML and do not cause transitions between
// State Machine Methods. These are not modeled by the state machine UML and do not cause transitions between
// states.
void SetReceiveHandler(std::weak_ptr<ScopedEventReceiveHandler> handler) noexcept;
void UnsetReceiveHandler() noexcept;
void SetSubscriptionStateChangeHandler(SubscriptionStateChangeHandler handler) noexcept;
void UnsetSubscriptionStateChangeHandler() noexcept;

std::optional<std::uint16_t> GetMaxSampleCount() const noexcept;

Expand Down Expand Up @@ -139,6 +142,7 @@ class SubscriptionStateMachine : public std::enable_shared_from_this<Subscriptio
// Data used by states
SubscriptionData subscription_data_;
std::optional<std::weak_ptr<ScopedEventReceiveHandler>> event_receiver_handler_;
std::optional<SubscriptionStateChangeHandler> subscription_state_change_handler_;
EventReceiveHandlerManager event_receive_handler_manager_;
ConsumerEventDataControlLocalView<>& event_data_control_local_;
EventSubscriptionControl<>& subscription_control_;
Expand Down
Loading
Loading