Skip to content

feat: flow enabled c8y mapper#3934

Merged
didier-wenzek merged 24 commits intothin-edge:mainfrom
didier-wenzek:feat/flow-enabled-c8y-mapper
Feb 15, 2026
Merged

feat: flow enabled c8y mapper#3934
didier-wenzek merged 24 commits intothin-edge:mainfrom
didier-wenzek:feat/flow-enabled-c8y-mapper

Conversation

@didier-wenzek
Copy link
Copy Markdown
Contributor

@didier-wenzek didier-wenzek commented Jan 21, 2026

Proposed changes

The c8y mapper can be extended using flows.

  • The c8y mapper runs the flows defined in /etc/tedge/mappers/c8y/flows
  • The c8y mapper flow context holds entity definitions indexed by topic ids as well as external ids.
  • The builtin flow for measurements can be customized by users
  • The builtin flow for events can be customized by users
  • The builtin flow for alarms can be customized by users
  • The builtin flow for health status can be customized by users
  • Fix the broken unit tests
  • Cache MEA messages till the source entity is registered
  • Publish large events over HTTP
  • Honor the c8y.topics settings to process only configured MEA messages
  • Remove dead code

Types of changes

  • Bugfix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Improvement (general improvements like code refactoring that doesn't explicitly fix a bug or add any new functionality)
  • Documentation Update (if none of the other choices apply)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)

Paste Link to the issue


Checklist

  • I have read the CONTRIBUTING doc
  • I have signed the CLA (in all commits with git commit -s. You can activate automatic signing by running just prepare-dev once)
  • I ran just format as mentioned in CODING_GUIDELINES
  • I used just check as mentioned in CODING_GUIDELINES
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)

Further comments

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jan 21, 2026

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
831 0 3 831 100 2h44m34.215053s

@didier-wenzek didier-wenzek changed the title feat! flow enabled c8y mapper feat: flow enabled c8y mapper Jan 21, 2026
Comment thread tests/RobotFramework/tests/cumulocity/flows/custom-measurements.js Outdated
Comment thread crates/extensions/c8y_mapper_ext/src/actor.rs Outdated
Copy link
Copy Markdown
Contributor

@albinsuresh albinsuresh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Huge achievement, considering the complexity of this mapper with a lot of legacy code that isn't the most intuitive. There are a few points that could be improved, but as future iterations.

Comment on lines +76 to +78
let pending_messages = self.cache.take();
for message in pending_messages.into_iter() {
if message.topic.starts_with(entity_topic) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this logic, if there are some cached messages for some entities (e.g: child1 and child2) in this cache, and when messages are received for another entity (e.g: child0) that is already registered, we'll be unnecessarily draining the cache first, only to put them all back. And we repeat that for every single message.

We could make this logic more efficient, by changing the cache from a simple RingBuffer<Message> to HasMap<EntityTopicId, RingBuffer<Message>> so that this step only consumes the messages for the current entity and leave the ones from other entities untouched. But, this is just a matter of efficiency and not correctness. So, it can be improved in a later PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this logic, if there are some cached messages for some entities (e.g: child1 and child2) in this cache, and when messages are received for another entity (e.g: child0) that is already registered, we'll be unnecessarily draining the cache first, only to put them all back. And we repeat that for every single message.

The normal case is to have an empty cache and if not empty this will be for not too long, the main purpose of this cache being to solve races between registration and mea messages.

I see one annoying case though. A measurement which has been sent by mistake on behalf of a non-existing entity on a device configured not to use auto-registration. No registration message will ever be sent and the measurement sent by mistake will stay in the cache for ever. Worse, this measurement will be considered again and again for all cache drains.

In other word, I see more a cache pollution problem than a performance problem.

We could make this logic more efficient, by changing the cache from a simple RingBuffer<Message> to HasMap<EntityTopicId, RingBuffer<Message>> so that this step only consumes the messages for the current entity and leave the ones from other entities untouched. But, this is just a matter of efficiency and not correctness. So, it can be improved in a later PR.

Indeed, this would solve the issue of the unknown-source measurement polluting the drain process for all messages and all entities.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could make this logic more efficient, by changing the cache from a simple RingBuffer<Message> to HasMap<EntityTopicId, RingBuffer<Message>> so that this step only consumes the messages for the current entity and leave the ones from other entities untouched.

Done af732fe

Comment on lines +29 to +34
let message_topic = te.topic_for(
mapper,
&Channel::Status {
component: "entities".to_string(),
},
);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decision deferred for later.

context: &FlowContextHandle,
) -> Result<Vec<Message>, FlowError> {
match self.mqtt_schema.entity_channel_of(&message.topic) {
Ok((_, Channel::Status { component })) if component == "entities" => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also run into issues here if we have multiple c8y mappers running simultaneously (e.g. with multiple profiles)? Should we not be checking the service topic ID is for this mapper, and ignoring birth messages for other mappers?

Copy link
Copy Markdown
Contributor Author

@didier-wenzek didier-wenzek Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. This is related to #3934 (comment)

As the birth messages are related to a c8y mapper instance, we have to use a c8y related topic for those. Precisely this needs to be a sub-topic of c8y.bridge.topic_prefix for the appropriate profile.

Copy link
Copy Markdown
Contributor Author

@didier-wenzek didier-wenzek Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the current code is working because the flows using this cache subscribe to a c8y mapper instance topic (aka te/device/main/service/tedge-mapper-c8y/status/entities).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • This can made more obvious with a check on the entity sending the birth message (instead of ignoring is it with _).

I think this is wise. It feels like a bit of a hack to rely on "we don't subscribe to other conflicting topics" to ensure we only respond to this mapper's messages.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • This can made more obvious with a check on the entity sending the birth message (instead of ignoring is it with _).

I think this is wise. It feels like a bit of a hack to rely on "we don't subscribe to other conflicting topics" to ensure we only respond to this mapper's messages.

Done: 61d7259

Comment thread tests/RobotFramework/tests/cumulocity/flows/c8y_flows.robot Outdated
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
This is a first step toward a mapper context populated by the mapper
using a specific implementation (say to read the entity cache),
while the context for a script or a flow is a simple key value store.

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
so the flow context can be populated by the mapper,
notably with entity cache entries.

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
The goal is to group the methods acting on the content of the entity cache,
putting apart all the business logic related to entity metadata.
The next step will be to connect the EntityCache to the FlowContext,
populating the latter with entity metadata.

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Starting only with `Mqtt { qos: QoS, retain: bool }`,
the plan begin to add `Process { command: String }`
and, possibly, `Http { url: String }`

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
preventing tedge flows to start.

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Copy link
Copy Markdown
Contributor

@reubenmiller reubenmiller left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved. Everything works nicely and this allows users to also edit existing tedge-mapper-c8y logic (which was not possible before)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

theme:c8y Theme: Cumulocity related topics theme:flows

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants