Skip to content

feature: Add Timestamp Transformation for Syncing #122

@jjeff07

Description

@jjeff07

Jinja2 has no built in feature to get current timestamp. If users want to add a timestamp for when a sync took place to all objects it would require writing a custom adapter. This is useful if you want to know the last time a device or other object was discovered.

Example Config:

---
name: from-ipfabric
source:
  name: ipfabricsync

destination:
  name: infrahub

order: [
  "VendorManufacturer",
  "VendorFamily",
]

It would probably be best to implement 2 separate timestamp functions.

  • Timestamp of when the entire diff was started.
    • The same timestamp would be applied to both VendorManufacturer and VendorFamily
  • Timestamp of when an individual schema mapping diff started,
    • All objects in VendorManufacturer would have 1 timestamp and all objects in VendorFamily would have a different timestamp.

adapters\infrahub.py:

    def model_loader(self, model_name: str, model: InfrahubModel) -> None:

        """ADD a global level timestamp:"""
        global_ts = datetime.now(tz=UTC).isoformat(timespec='seconds')

        element = next((el for el in self.config.schema_mapping if el.name == model_name), None)
        if element:
            # Retrieve all nodes corresponding to model_name (list of InfrahubNodeSync)
            nodes = self.client.all(kind=model_name, include=model._attributes, populate_store=True)

            # Transform the list of InfrahubNodeSync into a list of (node, dict) tuples
            node_dict_pairs = [(node, self.infrahub_node_to_diffsync(node=node)) for node in nodes]
            total = len(node_dict_pairs)

            # Extract the list of dicts for filtering and transforming
            list_obj = [pair[1] for pair in node_dict_pairs]

            if self.config.source.name.title() == self.type.title():
                # Filter records
                filtered_objs = model.filter_records(records=list_obj, schema_mapping=element)
                logger.info("%s: Loading %d/%d %s", self.type, len(filtered_objs), total, model_name)
                # Transform records

            """ADD global_ts as an arg and update transform_records function:"""
                transformed_objs = model.transform_records(records=filtered_objs, schema_mapping=element, global_ts=global_ts)

__init__.py

    @classmethod
    def transform_records(cls, records: list[dict], schema_mapping: SchemaMappingModel, global_ts: datetime | None) -> list[dict]:

        """ADD a schema level timestamp:"""
        schema_ts = = datetime.now(tz=UTC).isoformat(timespec='seconds')

        transforms = schema_mapping.transforms or []
        if not transforms:
            return records
        transformed_records = []
        for record in records:

            """ADD global and schema timestamp as args and update apply_transforms function:"""
            transformed_record = cls.apply_transforms(item=record, transforms=transforms, global_ts=global_ts, schema_ts=schema_ts )


            transformed_records.append(transformed_record)
        return transformed_records

    @classmethod
    def apply_transform(cls, item: dict[str, Any], transform_expr: str, field: str) -> None:
    # TODO update this function

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions