diff --git a/charmcraft.yaml b/charmcraft.yaml
index ba4784f..1fe7d53 100644
--- a/charmcraft.yaml
+++ b/charmcraft.yaml
@@ -26,6 +26,14 @@ requires:
interface: certificate_transfer
limit: 1
optional: true
+ s3-backend:
+ interface: s3
+ limit: 1
+ optional: true
+ loki-push-api:
+ interface: loki_push_api
+ limit: 1
+ optional: true
peers:
dbcluster:
@@ -55,4 +63,8 @@ charm-libs:
- lib: tempo_coordinator_k8s.tracing
version: "0"
- lib: certificate_transfer_interface.certificate_transfer
- version: "1"
\ No newline at end of file
+ version: "1"
+ - lib: data_platform_libs.s3
+ version: "0"
+ - lib: loki_k8s.loki_push_api
+ version: "1"
diff --git a/lib/charms/data_platform_libs/v0/s3.py b/lib/charms/data_platform_libs/v0/s3.py
new file mode 100644
index 0000000..dbf4d5b
--- /dev/null
+++ b/lib/charms/data_platform_libs/v0/s3.py
@@ -0,0 +1,792 @@
+# Copyright 2023 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+r"""A library for communicating with the S3 credentials providers and consumers.
+
+This library provides the relevant interface code implementing the communication
+specification for fetching, retrieving, triggering, and responding to events related to
+the S3 provider charm and its consumers.
+
+### Provider charm
+
+The provider is implemented in the `s3-provider` charm which is meant to be deployed
+alongside one or more consumer charms. The provider charm is serving the s3 credentials and
+metadata needed to communicate and work with an S3 compatible backend.
+
+Example:
+```python
+
+from charms.data_platform_libs.v0.s3 import CredentialRequestedEvent, S3Provider
+
+
+class ExampleProviderCharm(CharmBase):
+ def __init__(self, *args) -> None:
+ super().__init__(*args)
+ self.s3_provider = S3Provider(self, "s3-credentials")
+
+ self.framework.observe(self.s3_provider.on.credentials_requested,
+ self._on_credential_requested)
+
+ def _on_credential_requested(self, event: CredentialRequestedEvent):
+ if not self.unit.is_leader():
+ return
+
+ # get relation id
+ relation_id = event.relation.id
+
+ # get bucket name
+ bucket = event.bucket
+
+ # S3 configuration parameters
+ desired_configuration = {"access-key": "your-access-key", "secret-key":
+ "your-secret-key", "bucket": "your-bucket"}
+
+ # update the configuration
+ self.s3_provider.update_connection_info(relation_id, desired_configuration)
+
+ # or it is possible to set each field independently
+
+ self.s3_provider.set_secret_key(relation_id, "your-secret-key")
+
+
+if __name__ == "__main__":
+ main(ExampleProviderCharm)
+
+
+### Requirer charm
+
+The requirer charm is the charm requiring the S3 credentials.
+An example of requirer charm is the following:
+
+Example:
+```python
+
+from charms.data_platform_libs.v0.s3 import (
+ CredentialsChangedEvent,
+ CredentialsGoneEvent,
+ S3Requirer
+)
+
+class ExampleRequirerCharm(CharmBase):
+
+ def __init__(self, *args):
+ super().__init__(*args)
+
+ bucket_name = "test-bucket"
+ # if bucket name is not provided the bucket name will be generated
+ # e.g., ('relation-{relation.id}')
+
+ self.s3_client = S3Requirer(self, "s3-credentials", bucket_name)
+
+ self.framework.observe(self.s3_client.on.credentials_changed, self._on_credential_changed)
+ self.framework.observe(self.s3_client.on.credentials_gone, self._on_credential_gone)
+
+ def _on_credential_changed(self, event: CredentialsChangedEvent):
+
+ # access single parameter credential
+ secret_key = event.secret_key
+ access_key = event.access_key
+
+ # or as alternative all credentials can be collected as a dictionary
+ credentials = self.s3_client.get_s3_credentials()
+
+ def _on_credential_gone(self, event: CredentialsGoneEvent):
+ # credentials are removed
+ pass
+
+ if __name__ == "__main__":
+ main(ExampleRequirerCharm)
+```
+
+"""
+
+import json
+import logging
+from collections import namedtuple
+from typing import Dict, List, Optional, Union
+
+import ops.charm
+import ops.framework
+import ops.model
+from ops.charm import (
+ CharmBase,
+ CharmEvents,
+ RelationBrokenEvent,
+ RelationChangedEvent,
+ RelationEvent,
+ RelationJoinedEvent,
+)
+from ops.framework import EventSource, Object, ObjectEvents
+from ops.model import Application, Relation, RelationDataContent, Unit
+
+# The unique Charmhub library identifier, never change it
+LIBID = "fca396f6254246c9bfa565b1f85ab528"
+
+# Increment this major API version when introducing breaking changes
+LIBAPI = 0
+
+# Increment this PATCH version before using `charmcraft publish-lib` or reset
+# to 0 if you are raising the major API version
+LIBPATCH = 6
+
+logger = logging.getLogger(__name__)
+
+Diff = namedtuple("Diff", "added changed deleted")
+Diff.__doc__ = """
+A tuple for storing the diff between two data mappings.
+
+added - keys that were added
+changed - keys that still exist but have new values
+deleted - key that were deleted"""
+
+
+def diff(event: RelationChangedEvent, bucket: Union[Unit, Application]) -> Diff:
+ """Retrieves the diff of the data in the relation changed databag.
+
+ Args:
+ event: relation changed event.
+ bucket: bucket of the databag (app or unit)
+
+ Returns:
+ a Diff instance containing the added, deleted and changed
+ keys from the event relation databag.
+ """
+ # Retrieve the old data from the data key in the application relation databag.
+ old_data = json.loads(event.relation.data[bucket].get("data", "{}"))
+ # Retrieve the new data from the event relation databag.
+ new_data = (
+ {key: value for key, value in event.relation.data[event.app].items() if key != "data"}
+ if event.app
+ else {}
+ )
+
+ # These are the keys that were added to the databag and triggered this event.
+ added = new_data.keys() - old_data.keys()
+ # These are the keys that were removed from the databag and triggered this event.
+ deleted = old_data.keys() - new_data.keys()
+ # These are the keys that already existed in the databag,
+ # but had their values changed.
+ changed = {key for key in old_data.keys() & new_data.keys() if old_data[key] != new_data[key]}
+
+ # TODO: evaluate the possibility of losing the diff if some error
+ # happens in the charm before the diff is completely checked (DPE-412).
+ # Convert the new_data to a serializable format and save it for a next diff check.
+ event.relation.data[bucket].update({"data": json.dumps(new_data)})
+
+ # Return the diff with all possible changes.
+ return Diff(added, changed, deleted)
+
+
+class BucketEvent(RelationEvent):
+ """Base class for bucket events."""
+
+ @property
+ def bucket(self) -> Optional[str]:
+ """Returns the bucket was requested."""
+ if not self.relation.app:
+ return None
+
+ return self.relation.data[self.relation.app].get("bucket", "")
+
+
+class CredentialRequestedEvent(BucketEvent):
+ """Event emitted when a set of credential is requested for use on this relation."""
+
+
+class S3CredentialEvents(CharmEvents):
+ """Event descriptor for events raised by S3Provider."""
+
+ credentials_requested = EventSource(CredentialRequestedEvent)
+
+
+class S3Provider(Object):
+ """A provider handler for communicating S3 credentials to consumers."""
+
+ on = S3CredentialEvents() # pyright: ignore [reportAssignmentType]
+
+ def __init__(
+ self,
+ charm: CharmBase,
+ relation_name: str,
+ ):
+ super().__init__(charm, relation_name)
+ self.charm = charm
+ self.local_app = self.charm.model.app
+ self.local_unit = self.charm.unit
+ self.relation_name = relation_name
+
+ # monitor relation changed event for changes in the credentials
+ self.framework.observe(charm.on[relation_name].relation_changed, self._on_relation_changed)
+
+ def _on_relation_changed(self, event: RelationChangedEvent) -> None:
+ """React to the relation changed event by consuming data."""
+ if not self.charm.unit.is_leader():
+ return
+ diff = self._diff(event)
+ # emit on credential requested if bucket is provided by the requirer application
+ if "bucket" in diff.added:
+ getattr(self.on, "credentials_requested").emit(
+ event.relation, app=event.app, unit=event.unit
+ )
+
+ def _load_relation_data(self, raw_relation_data: dict) -> dict:
+ """Loads relation data from the relation data bag.
+
+ Args:
+ raw_relation_data: Relation data from the databag
+ Returns:
+ dict: Relation data in dict format.
+ """
+ connection_data = {}
+ for key in raw_relation_data:
+ try:
+ connection_data[key] = json.loads(raw_relation_data[key])
+ except (json.decoder.JSONDecodeError, TypeError):
+ connection_data[key] = raw_relation_data[key]
+ return connection_data
+
+ # def _diff(self, event: RelationChangedEvent) -> Diff:
+ # """Retrieves the diff of the data in the relation changed databag.
+
+ # Args:
+ # event: relation changed event.
+
+ # Returns:
+ # a Diff instance containing the added, deleted and changed
+ # keys from the event relation databag.
+ # """
+ # # Retrieve the old data from the data key in the application relation databag.
+ # old_data = json.loads(event.relation.data[self.local_app].get("data", "{}"))
+ # # Retrieve the new data from the event relation databag.
+ # new_data = {
+ # key: value for key, value in event.relation.data[event.app].items() if key != "data"
+ # }
+
+ # # These are the keys that were added to the databag and triggered this event.
+ # added = new_data.keys() - old_data.keys()
+ # # These are the keys that were removed from the databag and triggered this event.
+ # deleted = old_data.keys() - new_data.keys()
+ # # These are the keys that already existed in the databag,
+ # # but had their values changed.
+ # changed = {
+ # key for key in old_data.keys() & new_data.keys() if old_data[key] != new_data[key]
+ # }
+
+ # # TODO: evaluate the possibility of losing the diff if some error
+ # # happens in the charm before the diff is completely checked (DPE-412).
+ # # Convert the new_data to a serializable format and save it for a next diff check.
+ # event.relation.data[self.local_app].update({"data": json.dumps(new_data)})
+
+ # # Return the diff with all possible changes.
+ # return Diff(added, changed, deleted)
+
+ def _diff(self, event: RelationChangedEvent) -> Diff:
+ """Retrieves the diff of the data in the relation changed databag.
+
+ Args:
+ event: relation changed event.
+
+ Returns:
+ a Diff instance containing the added, deleted and changed
+ keys from the event relation databag.
+ """
+ return diff(event, self.local_app)
+
+ def fetch_relation_data(self) -> dict:
+ """Retrieves data from relation.
+
+ This function can be used to retrieve data from a relation
+ in the charm code when outside an event callback.
+
+ Returns:
+ a dict of the values stored in the relation data bag
+ for all relation instances (indexed by the relation id).
+ """
+ data = {}
+ for relation in self.relations:
+ data[relation.id] = (
+ {key: value for key, value in relation.data[relation.app].items() if key != "data"}
+ if relation.app
+ else {}
+ )
+ return data
+
+ def update_connection_info(self, relation_id: int, connection_data: dict) -> None:
+ """Updates the credential data as set of key-value pairs in the relation.
+
+ This function writes in the application data bag, therefore,
+ only the leader unit can call it.
+
+ Args:
+ relation_id: the identifier for a particular relation.
+ connection_data: dict containing the key-value pairs
+ that should be updated.
+ """
+ # check and write changes only if you are the leader
+ if not self.local_unit.is_leader():
+ return
+
+ relation = self.charm.model.get_relation(self.relation_name, relation_id)
+
+ if not relation:
+ return
+
+ # configuration options that are list
+ s3_list_options = ["attributes", "tls-ca-chain"]
+
+ # update the databag, if connection data did not change with respect to before
+ # the relation changed event is not triggered
+ updated_connection_data = {}
+ for configuration_option, configuration_value in connection_data.items():
+ if configuration_option in s3_list_options:
+ updated_connection_data[configuration_option] = json.dumps(configuration_value)
+ else:
+ updated_connection_data[configuration_option] = configuration_value
+
+ relation.data[self.local_app].update(updated_connection_data)
+ logger.debug("Updated S3 connection info.")
+
+ @property
+ def relations(self) -> List[Relation]:
+ """The list of Relation instances associated with this relation_name."""
+ return list(self.charm.model.relations[self.relation_name])
+
+ def set_bucket(self, relation_id: int, bucket: str) -> None:
+ """Sets bucket name in application databag.
+
+ This function writes in the application data bag, therefore,
+ only the leader unit can call it.
+
+ Args:
+ relation_id: the identifier for a particular relation.
+ bucket: the bucket name.
+ """
+ self.update_connection_info(relation_id, {"bucket": bucket})
+
+ def set_access_key(self, relation_id: int, access_key: str) -> None:
+ """Sets access-key value in application databag.
+
+ This function writes in the application data bag, therefore,
+ only the leader unit can call it.
+
+ Args:
+ relation_id: the identifier for a particular relation.
+ access_key: the access-key value.
+ """
+ self.update_connection_info(relation_id, {"access-key": access_key})
+
+ def set_secret_key(self, relation_id: int, secret_key: str) -> None:
+ """Sets the secret key value in application databag.
+
+ This function writes in the application data bag, therefore,
+ only the leader unit can call it.
+
+ Args:
+ relation_id: the identifier for a particular relation.
+ secret_key: the value of the secret key.
+ """
+ self.update_connection_info(relation_id, {"secret-key": secret_key})
+
+ def set_path(self, relation_id: int, path: str) -> None:
+ """Sets the path value in application databag.
+
+ This function writes in the application data bag, therefore,
+ only the leader unit can call it.
+
+ Args:
+ relation_id: the identifier for a particular relation.
+ path: the path value.
+ """
+ self.update_connection_info(relation_id, {"path": path})
+
+ def set_endpoint(self, relation_id: int, endpoint: str) -> None:
+ """Sets the endpoint address in application databag.
+
+ This function writes in the application data bag, therefore,
+ only the leader unit can call it.
+
+ Args:
+ relation_id: the identifier for a particular relation.
+ endpoint: the endpoint address.
+ """
+ self.update_connection_info(relation_id, {"endpoint": endpoint})
+
+ def set_region(self, relation_id: int, region: str) -> None:
+ """Sets the region location in application databag.
+
+ This function writes in the application data bag, therefore,
+ only the leader unit can call it.
+
+ Args:
+ relation_id: the identifier for a particular relation.
+ region: the region address.
+ """
+ self.update_connection_info(relation_id, {"region": region})
+
+ def set_s3_uri_style(self, relation_id: int, s3_uri_style: str) -> None:
+ """Sets the S3 URI style in application databag.
+
+ This function writes in the application data bag, therefore,
+ only the leader unit can call it.
+
+ Args:
+ relation_id: the identifier for a particular relation.
+ s3_uri_style: the s3 URI style.
+ """
+ self.update_connection_info(relation_id, {"s3-uri-style": s3_uri_style})
+
+ def set_storage_class(self, relation_id: int, storage_class: str) -> None:
+ """Sets the storage class in application databag.
+
+ This function writes in the application data bag, therefore,
+ only the leader unit can call it.
+
+ Args:
+ relation_id: the identifier for a particular relation.
+ storage_class: the storage class.
+ """
+ self.update_connection_info(relation_id, {"storage-class": storage_class})
+
+ def set_tls_ca_chain(self, relation_id: int, tls_ca_chain: List[str]) -> None:
+ """Sets the tls_ca_chain value in application databag.
+
+ This function writes in the application data bag, therefore,
+ only the leader unit can call it.
+
+ Args:
+ relation_id: the identifier for a particular relation.
+ tls_ca_chain: the TLS Chain value.
+ """
+ self.update_connection_info(relation_id, {"tls-ca-chain": tls_ca_chain})
+
+ def set_s3_api_version(self, relation_id: int, s3_api_version: str) -> None:
+ """Sets the S3 API version in application databag.
+
+ This function writes in the application data bag, therefore,
+ only the leader unit can call it.
+
+ Args:
+ relation_id: the identifier for a particular relation.
+ s3_api_version: the S3 version value.
+ """
+ self.update_connection_info(relation_id, {"s3-api-version": s3_api_version})
+
+ def set_delete_older_than_days(self, relation_id: int, days: int) -> None:
+ """Sets the retention days for full backups in application databag.
+
+ This function writes in the application data bag, therefore,
+ only the leader unit can call it.
+
+ Args:
+ relation_id: the identifier for a particular relation.
+ days: the value.
+ """
+ self.update_connection_info(relation_id, {"delete-older-than-days": str(days)})
+
+ def set_attributes(self, relation_id: int, attributes: List[str]) -> None:
+ """Sets the connection attributes in application databag.
+
+ This function writes in the application data bag, therefore,
+ only the leader unit can call it.
+
+ Args:
+ relation_id: the identifier for a particular relation.
+ attributes: the attributes value.
+ """
+ self.update_connection_info(relation_id, {"attributes": attributes})
+
+
+class S3Event(RelationEvent):
+ """Base class for S3 storage events."""
+
+ @property
+ def bucket(self) -> Optional[str]:
+ """Returns the bucket name."""
+ if not self.relation.app:
+ return None
+
+ return self.relation.data[self.relation.app].get("bucket")
+
+ @property
+ def access_key(self) -> Optional[str]:
+ """Returns the access key."""
+ if not self.relation.app:
+ return None
+
+ return self.relation.data[self.relation.app].get("access-key")
+
+ @property
+ def secret_key(self) -> Optional[str]:
+ """Returns the secret key."""
+ if not self.relation.app:
+ return None
+
+ return self.relation.data[self.relation.app].get("secret-key")
+
+ @property
+ def path(self) -> Optional[str]:
+ """Returns the path where data can be stored."""
+ if not self.relation.app:
+ return None
+
+ return self.relation.data[self.relation.app].get("path")
+
+ @property
+ def endpoint(self) -> Optional[str]:
+ """Returns the endpoint address."""
+ if not self.relation.app:
+ return None
+
+ return self.relation.data[self.relation.app].get("endpoint")
+
+ @property
+ def region(self) -> Optional[str]:
+ """Returns the region."""
+ if not self.relation.app:
+ return None
+
+ return self.relation.data[self.relation.app].get("region")
+
+ @property
+ def s3_uri_style(self) -> Optional[str]:
+ """Returns the s3 uri style."""
+ if not self.relation.app:
+ return None
+
+ return self.relation.data[self.relation.app].get("s3-uri-style")
+
+ @property
+ def storage_class(self) -> Optional[str]:
+ """Returns the storage class name."""
+ if not self.relation.app:
+ return None
+
+ return self.relation.data[self.relation.app].get("storage-class")
+
+ @property
+ def tls_ca_chain(self) -> Optional[List[str]]:
+ """Returns the TLS CA chain."""
+ if not self.relation.app:
+ return None
+
+ tls_ca_chain = self.relation.data[self.relation.app].get("tls-ca-chain")
+ if tls_ca_chain is not None:
+ return json.loads(tls_ca_chain)
+ return None
+
+ @property
+ def s3_api_version(self) -> Optional[str]:
+ """Returns the S3 API version."""
+ if not self.relation.app:
+ return None
+
+ return self.relation.data[self.relation.app].get("s3-api-version")
+
+ @property
+ def delete_older_than_days(self) -> Optional[int]:
+ """Returns the retention days for full backups."""
+ if not self.relation.app:
+ return None
+
+ days = self.relation.data[self.relation.app].get("delete-older-than-days")
+ if days is None:
+ return None
+ return int(days)
+
+ @property
+ def attributes(self) -> Optional[List[str]]:
+ """Returns the attributes."""
+ if not self.relation.app:
+ return None
+
+ attributes = self.relation.data[self.relation.app].get("attributes")
+ if attributes is not None:
+ return json.loads(attributes)
+ return None
+
+
+class CredentialsChangedEvent(S3Event):
+ """Event emitted when S3 credential are changed on this relation."""
+
+
+class CredentialsGoneEvent(RelationEvent):
+ """Event emitted when S3 credential are removed from this relation."""
+
+
+class S3CredentialRequiresEvents(ObjectEvents):
+ """Event descriptor for events raised by the S3Provider."""
+
+ credentials_changed = EventSource(CredentialsChangedEvent)
+ credentials_gone = EventSource(CredentialsGoneEvent)
+
+
+S3_REQUIRED_OPTIONS = ["access-key", "secret-key"]
+
+
+class S3Requirer(Object):
+ """Requires-side of the s3 relation."""
+
+ on = S3CredentialRequiresEvents() # pyright: ignore[reportAssignmentType]
+
+ def __init__(
+ self, charm: ops.charm.CharmBase, relation_name: str, bucket_name: Optional[str] = None
+ ):
+ """Manager of the s3 client relations."""
+ super().__init__(charm, relation_name)
+
+ self.relation_name = relation_name
+ self.charm = charm
+ self.local_app = self.charm.model.app
+ self.local_unit = self.charm.unit
+ self.bucket = bucket_name
+
+ self.framework.observe(
+ self.charm.on[self.relation_name].relation_changed, self._on_relation_changed
+ )
+
+ self.framework.observe(
+ self.charm.on[self.relation_name].relation_joined, self._on_relation_joined
+ )
+
+ self.framework.observe(
+ self.charm.on[self.relation_name].relation_broken,
+ self._on_relation_broken,
+ )
+
+ def _generate_bucket_name(self, event: RelationJoinedEvent):
+ """Returns the bucket name generated from relation id."""
+ return f"relation-{event.relation.id}"
+
+ def _on_relation_joined(self, event: RelationJoinedEvent) -> None:
+ """Event emitted when the application joins the s3 relation."""
+ if self.bucket is None:
+ self.bucket = self._generate_bucket_name(event)
+ self.update_connection_info(event.relation.id, {"bucket": self.bucket})
+
+ def fetch_relation_data(self) -> dict:
+ """Retrieves data from relation.
+
+ This function can be used to retrieve data from a relation
+ in the charm code when outside an event callback.
+
+ Returns:
+ a dict of the values stored in the relation data bag
+ for all relation instances (indexed by the relation id).
+ """
+ data = {}
+
+ for relation in self.relations:
+ data[relation.id] = self._load_relation_data(relation.data[self.charm.app])
+ return data
+
+ def update_connection_info(self, relation_id: int, connection_data: dict) -> None:
+ """Updates the credential data as set of key-value pairs in the relation.
+
+ This function writes in the application data bag, therefore,
+ only the leader unit can call it.
+
+ Args:
+ relation_id: the identifier for a particular relation.
+ connection_data: dict containing the key-value pairs
+ that should be updated.
+ """
+ # check and write changes only if you are the leader
+ if not self.local_unit.is_leader():
+ return
+
+ relation = self.charm.model.get_relation(self.relation_name, relation_id)
+
+ if not relation:
+ return
+
+ # update the databag, if connection data did not change with respect to before
+ # the relation changed event is not triggered
+ # configuration options that are list
+ s3_list_options = ["attributes", "tls-ca-chain"]
+ updated_connection_data = {}
+ for configuration_option, configuration_value in connection_data.items():
+ if configuration_option in s3_list_options:
+ updated_connection_data[configuration_option] = json.dumps(configuration_value)
+ else:
+ updated_connection_data[configuration_option] = configuration_value
+
+ relation.data[self.local_app].update(updated_connection_data)
+ logger.debug("Updated S3 credentials.")
+
+ def _load_relation_data(self, raw_relation_data: RelationDataContent) -> Dict[str, str]:
+ """Loads relation data from the relation data bag.
+
+ Args:
+ raw_relation_data: Relation data from the databag
+ Returns:
+ dict: Relation data in dict format.
+ """
+ connection_data = {}
+ for key in raw_relation_data:
+ try:
+ connection_data[key] = json.loads(raw_relation_data[key])
+ except (json.decoder.JSONDecodeError, TypeError):
+ connection_data[key] = raw_relation_data[key]
+ return connection_data
+
+ def _diff(self, event: RelationChangedEvent) -> Diff:
+ """Retrieves the diff of the data in the relation changed databag.
+
+ Args:
+ event: relation changed event.
+
+ Returns:
+ a Diff instance containing the added, deleted and changed
+ keys from the event relation databag.
+ """
+ return diff(event, self.local_unit)
+
+ def _on_relation_changed(self, event: RelationChangedEvent) -> None:
+ """Notify the charm about the presence of S3 credentials."""
+ # check if the mandatory options are in the relation data
+ contains_required_options = True
+ # get current credentials data
+ credentials = self.get_s3_connection_info()
+ # records missing options
+ missing_options = []
+ for configuration_option in S3_REQUIRED_OPTIONS:
+ if configuration_option not in credentials:
+ contains_required_options = False
+ missing_options.append(configuration_option)
+ # emit credential change event only if all mandatory fields are present
+ if contains_required_options:
+ getattr(self.on, "credentials_changed").emit(
+ event.relation, app=event.app, unit=event.unit
+ )
+ else:
+ logger.warning(
+ f"Some mandatory fields: {missing_options} are not present, do not emit credential change event!"
+ )
+
+ def get_s3_connection_info(self) -> Dict[str, str]:
+ """Return the s3 credentials as a dictionary."""
+ for relation in self.relations:
+ if relation and relation.app:
+ return self._load_relation_data(relation.data[relation.app])
+
+ return {}
+
+ def _on_relation_broken(self, event: RelationBrokenEvent) -> None:
+ """Notify the charm about a broken S3 credential store relation."""
+ getattr(self.on, "credentials_gone").emit(event.relation, app=event.app, unit=event.unit)
+
+ @property
+ def relations(self) -> List[Relation]:
+ """The list of Relation instances associated with this relation_name."""
+ return list(self.charm.model.relations[self.relation_name])
diff --git a/lib/charms/loki_k8s/v1/loki_push_api.py b/lib/charms/loki_k8s/v1/loki_push_api.py
new file mode 100644
index 0000000..f8218d8
--- /dev/null
+++ b/lib/charms/loki_k8s/v1/loki_push_api.py
@@ -0,0 +1,2534 @@
+#!/usr/bin/env python3
+# Copyright 2023 Canonical Ltd.
+# See LICENSE file for licensing details.
+#
+# Learn more at: https://juju.is/docs/sdk
+
+r"""## Overview.
+
+This document explains how to use the two principal objects this library provides:
+
+- `LokiPushApiProvider`: This object is meant to be used by any Charmed Operator that needs to
+implement the provider side of the `loki_push_api` relation interface. For instance, a Loki charm.
+The provider side of the relation represents the server side, to which logs are being pushed.
+
+- `LokiPushApiConsumer`: This object is meant to be used by any Charmed Operator that needs to
+send log to Loki by implementing the consumer side of the `loki_push_api` relation interface.
+For instance, a Promtail or Grafana agent charm which needs to send logs to Loki.
+
+- `LogProxyConsumer`: DEPRECATED.
+This object can be used by any Charmed Operator which needs to send telemetry, such as logs, to
+Loki through a Log Proxy by implementing the consumer side of the `loki_push_api` relation
+interface.
+In order to be able to control the labels on the logs pushed this object adds a Pebble layer
+that runs Promtail in the workload container, injecting Juju topology labels into the
+logs on the fly.
+This object is deprecated. Consider migrating to LogForwarder with the release of Juju 3.6 LTS.
+
+- `LogForwarder`: This object can be used by any Charmed Operator which needs to send the workload
+standard output (stdout) through Pebble's log forwarding mechanism, to Loki endpoints through the
+`loki_push_api` relation interface.
+In order to be able to control the labels on the logs pushed this object updates the pebble layer's
+"log-targets" section with Juju topology.
+
+Filtering logs in Loki is largely performed on the basis of labels. In the Juju ecosystem, Juju
+topology labels are used to uniquely identify the workload which generates telemetry like logs.
+
+
+## LokiPushApiProvider Library Usage
+
+This object may be used by any Charmed Operator which implements the `loki_push_api` interface.
+For instance, Loki or Grafana Agent.
+
+For this purpose a charm needs to instantiate the `LokiPushApiProvider` object with one mandatory
+and three optional arguments.
+
+- `charm`: A reference to the parent (Loki) charm.
+
+- `relation_name`: The name of the relation that the charm uses to interact
+ with its clients, which implement `LokiPushApiConsumer` `LogForwarder`, or `LogProxyConsumer`
+ (note that LogProxyConsumer is deprecated).
+
+ If provided, this relation name must match a provided relation in metadata.yaml with the
+ `loki_push_api` interface.
+
+ The default relation name is "logging" for `LokiPushApiConsumer` and `LogForwarder`, and
+ "log-proxy" for `LogProxyConsumer` (note that LogProxyConsumer is deprecated).
+
+ For example, a provider's `metadata.yaml` file may look as follows:
+
+ ```yaml
+ provides:
+ logging:
+ interface: loki_push_api
+ ```
+
+ Subsequently, a Loki charm may instantiate the `LokiPushApiProvider` in its constructor as
+ follows:
+
+ from charms.loki_k8s.v1.loki_push_api import LokiPushApiProvider
+ from loki_server import LokiServer
+ ...
+
+ class LokiOperatorCharm(CharmBase):
+ ...
+
+ def __init__(self, *args):
+ super().__init__(*args)
+ ...
+ external_url = urlparse(self._external_url)
+ self.loki_provider = LokiPushApiProvider(
+ self,
+ port=external_url.port or 80,
+ scheme=external_url.scheme,
+ path=f"{external_url.path}/loki/api/v1/push",
+ )
+ ...
+
+ - `port`: Loki Push Api endpoint port. Default value: `3100`.
+ - `scheme`: Loki Push Api endpoint scheme (`HTTP` or `HTTPS`). Default value: `HTTP`
+ - `address`: Loki Push Api endpoint address. Default value: `localhost`
+ - `path`: Loki Push Api endpoint path. Default value: `loki/api/v1/push`
+
+
+The `LokiPushApiProvider` object has several responsibilities:
+
+1. Set the URL of the Loki Push API in the relation application data bag; the URL
+ must be unique to all instances (e.g. using a load balancer).
+ The default URL is the FQDN, but this can be overridden by calling `update_endpoint()`.
+
+2. Set the Promtail binary URL (`promtail_binary_zip_url`) so clients that use
+ `LogProxyConsumer` object could download and configure it.
+
+3. Process the metadata of the consumer application, provided via the
+ "metadata" field of the consumer data bag, which are used to annotate the
+ alert rules (see next point). An example for "metadata" is the following:
+
+ {'model': 'loki',
+ 'model_uuid': '0b7d1071-ded2-4bf5-80a3-10a81aeb1386',
+ 'application': 'promtail-k8s'
+ }
+
+4. Process alert rules set into the relation by the `LokiPushApiConsumer`
+ objects, e.g.:
+
+ '{
+ "groups": [{
+ "name": "loki_0b7d1071-ded2-4bf5-80a3-10a81aeb1386_promtail-k8s_alerts",
+ "rules": [{
+ "alert": "HighPercentageError",
+ "expr": "sum(rate({app=\\"foo\\", env=\\"production\\"} |= \\"error\\" [5m]))
+ by (job) \\n /\\nsum(rate({app=\\"foo\\", env=\\"production\\"}[5m]))
+ by (job)\\n > 0.05
+ \\n", "for": "10m",
+ "labels": {
+ "severity": "page",
+ "juju_model": "loki",
+ "juju_model_uuid": "0b7d1071-ded2-4bf5-80a3-10a81aeb1386",
+ "juju_application": "promtail-k8s"
+ },
+ "annotations": {
+ "summary": "High request latency"
+ }
+ }]
+ }]
+ }'
+
+
+Once these alert rules are sent over relation data, the `LokiPushApiProvider` object
+stores these files in the directory `/loki/rules` inside the Loki charm container. After
+storing alert rules files, the object will check alert rules by querying Loki API
+endpoint: [`loki/api/v1/rules`](https://grafana.com/docs/loki/latest/api/#list-rule-groups).
+If there are changes in the alert rules a `loki_push_api_alert_rules_changed` event will
+be emitted with details about the `RelationEvent` which triggered it.
+
+This events should be observed in the charm that uses `LokiPushApiProvider`:
+
+```python
+ def __init__(self, *args):
+ super().__init__(*args)
+ ...
+ self.loki_provider = LokiPushApiProvider(self)
+ self.framework.observe(
+ self.loki_provider.on.loki_push_api_alert_rules_changed,
+ self._loki_push_api_alert_rules_changed,
+ )
+```
+
+
+## LokiPushApiConsumer Library Usage
+
+This Loki charm interacts with its clients using the Loki charm library. Charms
+seeking to send log to Loki, must do so using the `LokiPushApiConsumer` object from
+this charm library.
+
+> **NOTE**: `LokiPushApiConsumer` also depends on an additional charm library.
+>
+> Ensure sure you `charmcraft fetch-lib charms.observability_libs.v0.juju_topology`
+> when using this library.
+
+For the simplest use cases, using the `LokiPushApiConsumer` object only requires
+instantiating it, typically in the constructor of your charm (the one which
+sends logs).
+
+```python
+from charms.loki_k8s.v1.loki_push_api import LokiPushApiConsumer
+
+class LokiClientCharm(CharmBase):
+
+ def __init__(self, *args):
+ super().__init__(*args)
+ ...
+ self._loki_consumer = LokiPushApiConsumer(self)
+```
+
+The `LokiPushApiConsumer` constructor requires two things:
+
+- A reference to the parent (LokiClientCharm) charm.
+
+- Optionally, the name of the relation that the Loki charm uses to interact
+ with its clients. If provided, this relation name must match a required
+ relation in metadata.yaml with the `loki_push_api` interface.
+
+ If not provided, the relation name defaults to `logging`.
+
+Any time the relation between a Loki provider charm and a Loki consumer charm is
+established, a `LokiPushApiEndpointJoined` event is fired. In the consumer side
+is it possible to observe this event with:
+
+```python
+
+self.framework.observe(
+ self._loki_consumer.on.loki_push_api_endpoint_joined,
+ self._on_loki_push_api_endpoint_joined,
+)
+```
+
+Any time there are departures in relations between the consumer charm and Loki
+the consumer charm is informed, through a `LokiPushApiEndpointDeparted` event, for instance:
+
+```python
+self.framework.observe(
+ self._loki_consumer.on.loki_push_api_endpoint_departed,
+ self._on_loki_push_api_endpoint_departed,
+)
+```
+
+The consumer charm can then choose to update its configuration in both situations.
+
+Note that LokiPushApiConsumer does not add any labels automatically on its own. In
+order to better integrate with the Canonical Observability Stack, you may want to configure your
+software to add Juju topology labels. The
+[observability-libs](https://charmhub.io/observability-libs) library can be used to get topology
+labels in charm code. See :func:`LogProxyConsumer._scrape_configs` for an example of how
+to do this with promtail.
+
+## LogProxyConsumer Library Usage
+
+> Note: This object is deprecated. Consider migrating to LogForwarder with the release of Juju 3.6
+> LTS.
+
+Let's say that we have a workload charm that produces logs, and we need to send those logs to a
+workload implementing the `loki_push_api` interface, such as `Loki` or `Grafana Agent`.
+
+Adopting this object in a Charmed Operator consist of two steps:
+
+1. Use the `LogProxyConsumer` class by instantiating it in the `__init__` method of the charmed
+ operator. There are two ways to get logs in to promtail. You can give it a list of files to
+ read, or you can write to it using the syslog protocol.
+
+ For example:
+
+ ```python
+ from charms.loki_k8s.v1.loki_push_api import LogProxyConsumer
+
+ ...
+
+ def __init__(self, *args):
+ ...
+ self._log_proxy = LogProxyConsumer(
+ self,
+ logs_scheme={
+ "workload-a": {
+ "log-files": ["/tmp/worload-a-1.log", "/tmp/worload-a-2.log"],
+ "syslog-port": 1514,
+ },
+ "workload-b": {"log-files": ["/tmp/worload-b.log"], "syslog-port": 1515},
+ },
+ relation_name="log-proxy",
+ )
+ self.framework.observe(
+ self._log_proxy.on.promtail_digest_error,
+ self._promtail_error,
+ )
+
+ def _promtail_error(self, event):
+ logger.error(event.message)
+ self.unit.status = BlockedStatus(event.message)
+ ```
+
+ Any time the relation between a provider charm and a LogProxy consumer charm is
+ established, a `LogProxyEndpointJoined` event is fired. In the consumer side is it
+ possible to observe this event with:
+
+ ```python
+
+ self.framework.observe(
+ self._log_proxy.on.log_proxy_endpoint_joined,
+ self._on_log_proxy_endpoint_joined,
+ )
+ ```
+
+ Any time there are departures in relations between the consumer charm and the provider
+ the consumer charm is informed, through a `LogProxyEndpointDeparted` event, for instance:
+
+ ```python
+ self.framework.observe(
+ self._log_proxy.on.log_proxy_endpoint_departed,
+ self._on_log_proxy_endpoint_departed,
+ )
+ ```
+
+ The consumer charm can then choose to update its configuration in both situations.
+
+ Note that:
+
+ - You can configure your syslog software using `localhost` as the address and the method
+ `LogProxyConsumer.syslog_port("container_name")` to get the port, or, alternatively, if you are using rsyslog
+ you may use the method `LogProxyConsumer.rsyslog_config("container_name")`.
+
+2. Modify the `metadata.yaml` file to add:
+
+ - The `log-proxy` relation in the `requires` section:
+ ```yaml
+ requires:
+ log-proxy:
+ interface: loki_push_api
+ optional: true
+ ```
+
+Once the library is implemented in a Charmed Operator and a relation is established with
+the charm that implements the `loki_push_api` interface, the library will inject a
+Pebble layer that runs Promtail in the workload container to send logs.
+
+By default, the promtail binary injected into the container will be downloaded from the internet.
+If, for any reason, the container has limited network access, you may allow charm administrators
+to provide their own promtail binary at runtime by adding the following snippet to your charm
+metadata:
+
+```yaml
+resources:
+ promtail-bin:
+ type: file
+ description: Promtail binary for logging
+ filename: promtail-linux
+```
+
+Which would then allow operators to deploy the charm this way:
+
+```
+juju deploy \
+ ./your_charm.charm \
+ --resource promtail-bin=/tmp/promtail-linux-amd64
+```
+
+If a different resource name is used, it can be specified with the `promtail_resource_name`
+argument to the `LogProxyConsumer` constructor.
+
+The object can emit a `PromtailDigestError` event:
+
+- Promtail binary cannot be downloaded.
+- The sha256 sum mismatch for promtail binary.
+
+The object can raise a `ContainerNotFoundError` event:
+
+- No `container_name` parameter has been specified and the Pod has more than 1 container.
+
+These can be monitored via the PromtailDigestError events via:
+
+```python
+ self.framework.observe(
+ self._loki_consumer.on.promtail_digest_error,
+ self._promtail_error,
+ )
+
+ def _promtail_error(self, event):
+ logger.error(msg)
+ self.unit.status = BlockedStatus(event.message)
+ )
+```
+
+## LogForwarder class Usage
+
+Let's say that we have a charm's workload that writes logs to the standard output (stdout),
+and we need to send those logs to a workload implementing the `loki_push_api` interface,
+such as `Loki` or `Grafana Agent`. To know how to reach a Loki instance, a charm would
+typically use the `loki_push_api` interface.
+
+Use the `LogForwarder` class by instantiating it in the `__init__` method of the charm:
+
+```python
+from charms.loki_k8s.v1.loki_push_api import LogForwarder
+
+...
+
+ def __init__(self, *args):
+ ...
+ self._log_forwarder = LogForwarder(
+ self,
+ relation_name="logging" # optional, defaults to `logging`
+ )
+```
+
+The `LogForwarder` by default will observe relation events on the `logging` endpoint and
+enable/disable log forwarding automatically.
+Next, modify the `metadata.yaml` file to add:
+
+The `log-forwarding` relation in the `requires` section:
+```yaml
+requires:
+ logging:
+ interface: loki_push_api
+ optional: true
+```
+
+Once the LogForwader class is implemented in your charm and the relation (implementing the
+`loki_push_api` interface) is active and healthy, the library will inject a Pebble layer in
+each workload container the charm has access to, to configure Pebble's log forwarding
+feature and start sending logs to Loki.
+
+## Alerting Rules
+
+This charm library also supports gathering alerting rules from all related Loki client
+charms and enabling corresponding alerts within the Loki charm. Alert rules are
+automatically gathered by `LokiPushApiConsumer` object from a directory conventionally
+named `loki_alert_rules`.
+
+This directory must reside at the top level in the `src` folder of the
+consumer charm. Each file in this directory is assumed to be a single alert rule
+in YAML format. The file name must have one of the following extensions: `.yaml`, `.yml`, `.rule`, or `.rules`.
+The format of this alert rule conforms to the
+[Loki docs](https://grafana.com/docs/loki/latest/rules/#alerting-rules).
+
+An example of the contents of one such file is shown below.
+
+```yaml
+alert: HighPercentageError
+expr: |
+ sum(rate({%%juju_topology%%} |= "error" [5m])) by (job)
+ /
+ sum(rate({%%juju_topology%%}[5m])) by (job)
+ > 0.05
+for: 10m
+labels:
+ severity: page
+annotations:
+ summary: High request latency
+
+```
+
+It is **critical** to use the `%%juju_topology%%` filter in the expression for the alert
+rule shown above. This filter is a stub that is automatically replaced by the
+`LokiPushApiConsumer` following Loki Client's Juju topology (application, model and its
+UUID). Such a topology filter is essential to ensure that alert rules submitted by one
+provider charm generates alerts only for that same charm.
+
+The Loki charm may be related to multiple Loki client charms. Without this, filter
+rules submitted by one provider charm will also result in corresponding alerts for other
+provider charms. Hence, every alert rule expression must include such a topology filter stub.
+
+Gathering alert rules and generating rule files within the Loki charm is easily done using
+the `alerts()` method of `LokiPushApiProvider`. Alerts generated by Loki will automatically
+include Juju topology labels in the alerts. These labels indicate the source of the alert.
+
+The following labels are automatically added to every alert
+
+- `juju_model`
+- `juju_model_uuid`
+- `juju_application`
+
+
+Whether alert rules files does not contain the keys `alert` or `expr` or there is no alert
+rules file in `alert_rules_path` a `loki_push_api_alert_rules_error` event is emitted.
+
+To handle these situations the event must be observed in the `LokiClientCharm` charm.py file:
+
+```python
+class LokiClientCharm(CharmBase):
+
+ def __init__(self, *args):
+ super().__init__(*args)
+ ...
+ self._loki_consumer = LokiPushApiConsumer(self)
+
+ self.framework.observe(
+ self._loki_consumer.on.loki_push_api_alert_rules_error,
+ self._alert_rules_error
+ )
+
+ def _alert_rules_error(self, event):
+ self.unit.status = BlockedStatus(event.message)
+```
+
+## Relation Data
+
+The Loki charm uses both application and unit relation data to obtain information regarding
+Loki Push API and alert rules.
+
+Units of consumer charm send their alert rules over app relation data using the `alert_rules`
+key.
+
+## Charm logging
+The `charms.loki_k8s.v0.charm_logging` library can be used in conjunction with this one to configure python's
+logging module to forward all logs to Loki via the loki-push-api interface.
+
+```python
+from lib.charms.loki_k8s.v0.charm_logging import log_charm
+from lib.charms.loki_k8s.v1.loki_push_api import charm_logging_config, LokiPushApiConsumer
+
+@log_charm(logging_endpoint="my_endpoints", server_cert="cert_path")
+class MyCharm(...):
+ _cert_path = "/path/to/cert/on/charm/container.crt"
+ def __init__(self, ...):
+ self.logging = LokiPushApiConsumer(...)
+ self.my_endpoints, self.cert_path = charm_logging_config(
+ self.logging, self._cert_path)
+```
+
+Do this, and all charm logs will be forwarded to Loki as soon as a relation is formed.
+"""
+
+import copy
+import json
+import logging
+import os
+import platform
+import re
+import socket
+import warnings
+from copy import deepcopy
+from gzip import GzipFile
+from hashlib import sha256
+from io import BytesIO
+from pathlib import Path
+from typing import Any, Dict, List, Optional, Tuple, Union, cast
+from urllib import request
+from urllib.error import URLError
+
+import yaml
+from cosl import CosTool, JujuTopology
+from cosl.rules import AlertRules
+from cosl.types import OfficialRuleFileFormat
+from ops.charm import (
+ CharmBase,
+ HookEvent,
+ PebbleReadyEvent,
+ RelationBrokenEvent,
+ RelationCreatedEvent,
+ RelationDepartedEvent,
+ RelationEvent,
+ RelationJoinedEvent,
+ RelationRole,
+ WorkloadEvent,
+)
+from ops.framework import BoundEvent, EventBase, EventSource, Object, ObjectEvents
+from ops.jujuversion import JujuVersion
+from ops.model import Container, ModelError, Relation
+from ops.pebble import APIError, ChangeError, Layer, PathError, ProtocolError
+
+# The unique Charmhub library identifier, never change it
+LIBID = "bf76f23cdd03464b877c52bd1d2f563e"
+
+# Increment this major API version when introducing breaking changes
+LIBAPI = 1
+
+# Increment this PATCH version before using `charmcraft publish-lib` or reset
+# to 0 if you are raising the major API version
+LIBPATCH = 24
+
+PYDEPS = ["cosl"]
+
+logger = logging.getLogger(__name__)
+
+RELATION_INTERFACE_NAME = "loki_push_api"
+DEFAULT_RELATION_NAME = "logging"
+DEFAULT_ALERT_RULES_RELATIVE_PATH = "./src/loki_alert_rules"
+DEFAULT_LOG_PROXY_RELATION_NAME = "log-proxy"
+
+PROMTAIL_BASE_URL = "https://github.com/canonical/loki-k8s-operator/releases/download"
+# To update Promtail version you only need to change the PROMTAIL_VERSION and
+# update all sha256 sums in PROMTAIL_BINARIES. To support a new architecture
+# you only need to add a new key value pair for the architecture in PROMTAIL_BINARIES.
+PROMTAIL_VERSION = "v2.9.7"
+PROMTAIL_ARM_BINARY = {
+ "filename": "promtail-static-arm64",
+ "zipsha": "c083fdb45e5c794103f974eeb426489b4142438d9e10d0ae272b2aff886e249b",
+ "binsha": "4cd055c477a301c0bdfdbcea514e6e93f6df5d57425ce10ffc77f3e16fec1ddf",
+}
+
+PROMTAIL_BINARIES = {
+ "amd64": {
+ "filename": "promtail-static-amd64",
+ "zipsha": "6873cbdabf23062aeefed6de5f00ff382710332af3ab90a48c253ea17e08f465",
+ "binsha": "28da9b99f81296fe297831f3bc9d92aea43b4a92826b8ff04ba433b8cb92fb50",
+ },
+ "arm64": PROMTAIL_ARM_BINARY,
+ "aarch64": PROMTAIL_ARM_BINARY,
+}
+
+# Paths in `charm` container
+BINARY_DIR = "/tmp"
+
+# Paths in `workload` container
+WORKLOAD_BINARY_DIR = "/opt/promtail"
+WORKLOAD_CONFIG_DIR = "/etc/promtail"
+WORKLOAD_CONFIG_FILE_NAME = "promtail_config.yaml"
+WORKLOAD_CONFIG_PATH = "{}/{}".format(WORKLOAD_CONFIG_DIR, WORKLOAD_CONFIG_FILE_NAME)
+WORKLOAD_POSITIONS_PATH = "{}/positions.yaml".format(WORKLOAD_BINARY_DIR)
+WORKLOAD_SERVICE_NAME = "promtail"
+
+# These are the initial port values. As we can have more than one container,
+# we use odd and even numbers to avoid collisions.
+# Each new container adds 2 to the previous value.
+HTTP_LISTEN_PORT_START = 9080 # even start port
+GRPC_LISTEN_PORT_START = 9095 # odd start port
+
+
+class LokiPushApiError(Exception):
+ """Base class for errors raised by this module."""
+
+
+class RelationNotFoundError(LokiPushApiError):
+ """Raised if there is no relation with the given name."""
+
+ def __init__(self, relation_name: str):
+ self.relation_name = relation_name
+ self.message = "No relation named '{}' found".format(relation_name)
+
+ super().__init__(self.message)
+
+
+class RelationInterfaceMismatchError(LokiPushApiError):
+ """Raised if the relation with the given name has a different interface."""
+
+ def __init__(
+ self,
+ relation_name: str,
+ expected_relation_interface: str,
+ actual_relation_interface: str,
+ ):
+ self.relation_name = relation_name
+ self.expected_relation_interface = expected_relation_interface
+ self.actual_relation_interface = actual_relation_interface
+ self.message = (
+ "The '{}' relation has '{}' as interface rather than the expected '{}'".format(
+ relation_name, actual_relation_interface, expected_relation_interface
+ )
+ )
+ super().__init__(self.message)
+
+
+class RelationRoleMismatchError(LokiPushApiError):
+ """Raised if the relation with the given name has a different direction."""
+
+ def __init__(
+ self,
+ relation_name: str,
+ expected_relation_role: RelationRole,
+ actual_relation_role: RelationRole,
+ ):
+ self.relation_name = relation_name
+ self.expected_relation_interface = expected_relation_role
+ self.actual_relation_role = actual_relation_role
+ self.message = "The '{}' relation has role '{}' rather than the expected '{}'".format(
+ relation_name, repr(actual_relation_role), repr(expected_relation_role)
+ )
+ super().__init__(self.message)
+
+
+def _validate_relation_by_interface_and_direction(
+ charm: CharmBase,
+ relation_name: str,
+ expected_relation_interface: str,
+ expected_relation_role: RelationRole,
+):
+ """Verifies that a relation has the necessary characteristics.
+
+ Verifies that the `relation_name` provided: (1) exists in metadata.yaml,
+ (2) declares as interface the interface name passed as `relation_interface`
+ and (3) has the right "direction", i.e., it is a relation that `charm`
+ provides or requires.
+
+ Args:
+ charm: a `CharmBase` object to scan for the matching relation.
+ relation_name: the name of the relation to be verified.
+ expected_relation_interface: the interface name to be matched by the
+ relation named `relation_name`.
+ expected_relation_role: whether the `relation_name` must be either
+ provided or required by `charm`.
+
+ Raises:
+ RelationNotFoundError: If there is no relation in the charm's metadata.yaml
+ with the same name as provided via `relation_name` argument.
+ RelationInterfaceMismatchError: The relation with the same name as provided
+ via `relation_name` argument does not have the same relation interface
+ as specified via the `expected_relation_interface` argument.
+ RelationRoleMismatchError: If the relation with the same name as provided
+ via `relation_name` argument does not have the same role as specified
+ via the `expected_relation_role` argument.
+ """
+ if relation_name not in charm.meta.relations:
+ raise RelationNotFoundError(relation_name)
+
+ relation = charm.meta.relations[relation_name]
+
+ actual_relation_interface = relation.interface_name
+ if actual_relation_interface != expected_relation_interface:
+ raise RelationInterfaceMismatchError(
+ relation_name,
+ expected_relation_interface,
+ actual_relation_interface, # pyright: ignore
+ )
+
+ if expected_relation_role == RelationRole.provides:
+ if relation_name not in charm.meta.provides:
+ raise RelationRoleMismatchError(
+ relation_name, RelationRole.provides, RelationRole.requires
+ )
+ elif expected_relation_role == RelationRole.requires:
+ if relation_name not in charm.meta.requires:
+ raise RelationRoleMismatchError(
+ relation_name, RelationRole.requires, RelationRole.provides
+ )
+ else:
+ raise Exception("Unexpected RelationDirection: {}".format(expected_relation_role))
+
+
+class InvalidAlertRulePathError(Exception):
+ """Raised if the alert rules folder cannot be found or is otherwise invalid."""
+
+ def __init__(
+ self,
+ alert_rules_absolute_path: Path,
+ message: str,
+ ):
+ self.alert_rules_absolute_path = alert_rules_absolute_path
+ self.message = message
+
+ super().__init__(self.message)
+
+
+def _resolve_dir_against_charm_path(charm: CharmBase, *path_elements: str) -> str:
+ """Resolve the provided path items against the directory of the main file.
+
+ Look up the directory of the `main.py` file being executed. This is normally
+ going to be the charm.py file of the charm including this library. Then, resolve
+ the provided path elements and, if the result path exists and is a directory,
+ return its absolute path; otherwise, raise en exception.
+
+ Raises:
+ InvalidAlertRulePathError, if the path does not exist or is not a directory.
+ """
+ charm_dir = Path(str(charm.charm_dir))
+ if not charm_dir.exists() or not charm_dir.is_dir():
+ # Operator Framework does not currently expose a robust
+ # way to determine the top level charm source directory
+ # that is consistent across deployed charms and unit tests
+ # Hence for unit tests the current working directory is used
+ # TODO: updated this logic when the following ticket is resolved
+ # https://github.com/canonical/operator/issues/643
+ charm_dir = Path(os.getcwd())
+
+ alerts_dir_path = charm_dir.absolute().joinpath(*path_elements)
+
+ if not alerts_dir_path.exists():
+ raise InvalidAlertRulePathError(alerts_dir_path, "directory does not exist")
+ if not alerts_dir_path.is_dir():
+ raise InvalidAlertRulePathError(alerts_dir_path, "is not a directory")
+
+ return str(alerts_dir_path)
+
+
+class NoRelationWithInterfaceFoundError(Exception):
+ """No relations with the given interface are found in the charm meta."""
+
+ def __init__(self, charm: CharmBase, relation_interface: Optional[str] = None):
+ self.charm = charm
+ self.relation_interface = relation_interface
+ self.message = (
+ "No relations with interface '{}' found in the meta of the '{}' charm".format(
+ relation_interface, charm.meta.name
+ )
+ )
+
+ super().__init__(self.message)
+
+
+class MultipleRelationsWithInterfaceFoundError(Exception):
+ """Multiple relations with the given interface are found in the charm meta."""
+
+ def __init__(self, charm: CharmBase, relation_interface: str, relations: list):
+ self.charm = charm
+ self.relation_interface = relation_interface
+ self.relations = relations
+ self.message = (
+ "Multiple relations with interface '{}' found in the meta of the '{}' charm.".format(
+ relation_interface, charm.meta.name
+ )
+ )
+ super().__init__(self.message)
+
+
+class LokiPushApiEndpointDeparted(EventBase):
+ """Event emitted when Loki departed."""
+
+
+class LokiPushApiEndpointJoined(EventBase):
+ """Event emitted when Loki joined."""
+
+
+class LokiPushApiAlertRulesChanged(EventBase):
+ """Event emitted if there is a change in the alert rules."""
+
+ def __init__(self, handle, relation, relation_id, app=None, unit=None):
+ """Pretend we are almost like a RelationEvent.
+
+ Fields to serialize:
+ {
+ "relation_name": ,
+ "relation_id": ,
+ "app_name": ,
+ "unit_name":
+ }
+
+ In this way, we can transparently use `RelationEvent.snapshot()` to pass
+ it back if we need to log it.
+ """
+ super().__init__(handle)
+ self.relation = relation
+ self.relation_id = relation_id
+ self.app = app
+ self.unit = unit
+
+ def snapshot(self) -> Dict:
+ """Save event information."""
+ if not self.relation:
+ return {}
+ snapshot = {"relation_name": self.relation.name, "relation_id": self.relation.id}
+ if self.app:
+ snapshot["app_name"] = self.app.name
+ if self.unit:
+ snapshot["unit_name"] = self.unit.name
+ return snapshot
+
+ def restore(self, snapshot: dict):
+ """Restore event information."""
+ self.relation = self.framework.model.get_relation(
+ snapshot["relation_name"], snapshot["relation_id"]
+ )
+ app_name = snapshot.get("app_name")
+ if app_name:
+ self.app = self.framework.model.get_app(app_name)
+ else:
+ self.app = None
+ unit_name = snapshot.get("unit_name")
+ if unit_name:
+ self.unit = self.framework.model.get_unit(unit_name)
+ else:
+ self.unit = None
+
+
+class InvalidAlertRuleEvent(EventBase):
+ """Event emitted when alert rule files are not parsable.
+
+ Enables us to set a clear status on the provider.
+ """
+
+ def __init__(self, handle, errors: str = "", valid: bool = False):
+ super().__init__(handle)
+ self.errors = errors
+ self.valid = valid
+
+ def snapshot(self) -> Dict:
+ """Save alert rule information."""
+ return {
+ "valid": self.valid,
+ "errors": self.errors,
+ }
+
+ def restore(self, snapshot):
+ """Restore alert rule information."""
+ self.valid = snapshot["valid"]
+ self.errors = snapshot["errors"]
+
+
+class LokiPushApiEvents(ObjectEvents):
+ """Event descriptor for events raised by `LokiPushApiProvider`."""
+
+ loki_push_api_endpoint_departed = EventSource(LokiPushApiEndpointDeparted)
+ loki_push_api_endpoint_joined = EventSource(LokiPushApiEndpointJoined)
+ loki_push_api_alert_rules_changed = EventSource(LokiPushApiAlertRulesChanged)
+ alert_rule_status_changed = EventSource(InvalidAlertRuleEvent)
+
+
+class LokiPushApiProvider(Object):
+ """A LokiPushApiProvider class."""
+
+ on = LokiPushApiEvents() # pyright: ignore
+
+ def __init__(
+ self,
+ charm,
+ relation_name: str = DEFAULT_RELATION_NAME,
+ *,
+ port: Union[str, int] = 3100,
+ scheme: str = "http",
+ address: str = "",
+ path: str = "loki/api/v1/push",
+ ):
+ """A Loki service provider.
+
+ Args:
+ charm: a `CharmBase` instance that manages this
+ instance of the Loki service.
+ relation_name: an optional string name of the relation between `charm`
+ and the Loki charmed service. The default is "logging".
+ It is strongly advised not to change the default, so that people
+ deploying your charm will have a consistent experience with all
+ other charms that consume metrics endpoints.
+ port: an optional port of the Loki service (default is "3100").
+ scheme: an optional scheme of the Loki API URL (default is "http").
+ address: DEPRECATED. This argument is ignored and will be removed in v2.
+ It is kept for backward compatibility.
+ Use `update_endpoint()` instead.
+ path: an optional path of the Loki API URL (default is "loki/api/v1/push")
+
+ Raises:
+ RelationNotFoundError: If there is no relation in the charm's metadata.yaml
+ with the same name as provided via `relation_name` argument.
+ RelationInterfaceMismatchError: The relation with the same name as provided
+ via `relation_name` argument does not have the `loki_push_api` relation
+ interface.
+ RelationRoleMismatchError: If the relation with the same name as provided
+ via `relation_name` argument does not have the `RelationRole.requires`
+ role.
+ """
+ _validate_relation_by_interface_and_direction(
+ charm, relation_name, RELATION_INTERFACE_NAME, RelationRole.provides
+ )
+
+ if address != "":
+ warnings.warn(
+ "The 'address' parameter is deprecated and will be removed in v2. "
+ "Use 'update_endpoint()' instead.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+
+ super().__init__(charm, relation_name)
+ self._charm = charm
+ self._relation_name = relation_name
+ self._tool = CosTool("logql")
+ self.port = int(port)
+ self.scheme = scheme
+ self.path = path
+ self._custom_url = None
+
+ events = self._charm.on[relation_name]
+ self.framework.observe(self._charm.on.upgrade_charm, self._on_lifecycle_event)
+ self.framework.observe(events.relation_joined, self._on_logging_relation_joined)
+ self.framework.observe(events.relation_changed, self._on_logging_relation_changed)
+ self.framework.observe(events.relation_departed, self._on_logging_relation_departed)
+ self.framework.observe(events.relation_broken, self._on_logging_relation_broken)
+
+ def _on_lifecycle_event(self, _):
+ # Upgrade event or other charm-level event
+ should_update = False
+ for relation in self._charm.model.relations[self._relation_name]:
+ # Don't accidentally flip a True result back.
+ should_update = should_update or self._process_logging_relation_changed(relation)
+ if should_update:
+ # We don't have a RelationEvent, so build it up by hand
+ first_rel = self._charm.model.relations[self._relation_name][0]
+ self.on.loki_push_api_alert_rules_changed.emit(
+ relation=first_rel,
+ relation_id=first_rel.id,
+ )
+
+ def _on_logging_relation_joined(self, event: RelationJoinedEvent):
+ """Set basic data on relation joins.
+
+ Set the promtail binary URL location, which will not change, and anything
+ else which may be required, but is static..
+
+ Args:
+ event: a `CharmEvent` in response to which the consumer
+ charm must set its relation data.
+ """
+ if self._charm.unit.is_leader():
+ event.relation.data[self._charm.app].update(self._promtail_binary_url)
+ logger.debug("Saved promtail binary url: %s", self._promtail_binary_url)
+
+ def _on_logging_relation_changed(self, event: HookEvent):
+ """Handle changes in related consumers.
+
+ Anytime there are changes in the relation between Loki
+ and its consumers charms.
+
+ Args:
+ event: a `CharmEvent` in response to which the consumer
+ charm must update its relation data.
+ """
+ should_update = self._process_logging_relation_changed(event.relation) # pyright: ignore
+ if should_update:
+ self.on.loki_push_api_alert_rules_changed.emit(
+ relation=event.relation, # pyright: ignore
+ relation_id=event.relation.id, # pyright: ignore
+ app=self._charm.app,
+ unit=self._charm.unit,
+ )
+
+ def _on_logging_relation_broken(self, event: RelationBrokenEvent):
+ """Removes alert rules files when consumer charms left the relation with Loki.
+
+ Args:
+ event: a `CharmEvent` in response to which the Loki
+ charm must update its relation data.
+ """
+ self.on.loki_push_api_alert_rules_changed.emit(
+ relation=event.relation,
+ relation_id=event.relation.id,
+ app=self._charm.app,
+ unit=self._charm.unit,
+ )
+
+ def _on_logging_relation_departed(self, event: RelationDepartedEvent):
+ """Removes alert rules files when consumer charms left the relation with Loki.
+
+ Args:
+ event: a `CharmEvent` in response to which the Loki
+ charm must update its relation data.
+ """
+ self.on.loki_push_api_alert_rules_changed.emit(
+ relation=event.relation,
+ relation_id=event.relation.id,
+ app=self._charm.app,
+ unit=self._charm.unit,
+ )
+
+ def _should_update_alert_rules(self, relation) -> bool:
+ """Determine whether alert rules should be regenerated.
+
+ If there are alert rules in the relation data bag, tell the charm
+ whether to regenerate them based on the boolean returned here.
+ """
+ if relation.data.get(relation.app).get("alert_rules", None) is not None:
+ return True
+ return False
+
+ def _process_logging_relation_changed(self, relation: Relation) -> bool:
+ """Handle changes in related consumers.
+
+ Anytime there are changes in relations between Loki
+ and its consumers charms, Loki set the `loki_push_api`
+ into the relation data. Set the endpoint building
+ appropriately, and if there are alert rules present in
+ the relation, let the caller know.
+ Besides Loki generates alert rules files based what
+ consumer charms forwards,
+
+ Args:
+ relation: the `Relation` instance to update.
+
+ Returns:
+ A boolean indicating whether an event should be emitted, so we
+ only emit one on lifecycle events
+ """
+ relation.data[self._charm.unit]["public_address"] = socket.getfqdn() or ""
+ self.update_endpoint(relation=relation)
+ return self._should_update_alert_rules(relation)
+
+ @property
+ def _promtail_binary_url(self) -> dict:
+ """URL from which Promtail binary can be downloaded."""
+ # construct promtail binary url paths from parts
+ promtail_binaries = {}
+ for arch, info in PROMTAIL_BINARIES.items():
+ info["url"] = "{}/promtail-{}/{}.gz".format(
+ PROMTAIL_BASE_URL, PROMTAIL_VERSION, info["filename"]
+ )
+ promtail_binaries[arch] = info
+
+ return {"promtail_binary_zip_url": json.dumps(promtail_binaries)}
+
+ def update_endpoint(self, url: str = "", relation: Optional[Relation] = None) -> None:
+ """Triggers programmatically the update of endpoint in unit relation data.
+
+ This method should be used when the charm relying on this library needs
+ to update the relation data in response to something occurring outside
+ the `logging` relation lifecycle, e.g., in case of a
+ host address change because the charmed operator becomes connected to an
+ Ingress after the `logging` relation is established.
+
+ To make this library reconciler-friendly, the endpoint URL was made sticky i.e., once the
+ endpoint is updated with a custom URL, using the public method, it cannot be unset. Users
+ of this method should set the "url" arg to an internal URL if the charms ingress is no
+ longer available.
+
+ Args:
+ url: An optional url value to update relation data.
+ relation: An optional instance of `class:ops.model.Relation` to update.
+ """
+ # if no relation is specified update all of them
+ if not relation:
+ if not self._charm.model.relations.get(self._relation_name):
+ return
+
+ relations_list = self._charm.model.relations.get(self._relation_name)
+ else:
+ relations_list = [relation]
+
+ if url:
+ self._custom_url = url
+
+ endpoint = self._endpoint(self._custom_url or self._url)
+
+ for relation in relations_list:
+ relation.data[self._charm.unit].update({"endpoint": json.dumps(endpoint)})
+
+ logger.debug("Saved endpoint in unit relation data")
+
+ @property
+ def _url(self) -> str:
+ """Get local Loki Push API url.
+
+ Return url to loki, including port number, but without the endpoint subpath.
+ """
+ return f"{self.scheme}://{socket.getfqdn()}:{self.port}"
+
+ def _endpoint(self, url) -> dict:
+ """Get Loki push API endpoint for a given url.
+
+ Args:
+ url: A loki unit URL.
+
+ Returns: str
+ """
+ endpoint = "/loki/api/v1/push"
+ return {"url": url.rstrip("/") + endpoint}
+
+ @property
+ def alerts(self) -> dict: # noqa: C901
+ """Fetch alerts for all relations.
+
+ A Loki alert rules file consists of a list of "groups". Each
+ group consists of a list of alerts (`rules`) that are sequentially
+ executed. This method returns all the alert rules provided by each
+ related metrics provider charm. These rules may be used to generate a
+ separate alert rules file for each relation since the returned list
+ of alert groups are indexed by relation ID. Also for each relation ID
+ associated scrape metadata such as Juju model, UUID and application
+ name are provided so a unique name may be generated for the rules
+ file. For each relation the structure of data returned is a dictionary
+ with four keys
+
+ - groups
+ - model
+ - model_uuid
+ - application
+
+ The value of the `groups` key is such that it may be used to generate
+ a Loki alert rules file directly using `yaml.dump` but the
+ `groups` key itself must be included as this is required by Loki,
+ for example as in `yaml.dump({"groups": alerts["groups"]})`.
+
+ Currently only accepts a list of rules and these
+ rules are all placed into a single group, even though Loki itself
+ allows for multiple groups within a single alert rules file.
+
+ Returns:
+ a dictionary of alert rule groups and associated scrape
+ metadata indexed by relation ID.
+ """
+ alerts = {} # type: Dict[str, dict] # mapping b/w juju identifiers and alert rule files
+ for relation in self._charm.model.relations[self._relation_name]:
+ if not relation.units or not relation.app:
+ continue
+
+ alert_rules = json.loads(relation.data[relation.app].get("alert_rules", "{}"))
+ if not alert_rules:
+ continue
+
+ alert_rules = self._inject_alert_expr_labels(alert_rules)
+
+ identifier, topology = self._get_identifier_by_alert_rules(alert_rules)
+ if not topology:
+ try:
+ metadata = json.loads(relation.data[relation.app]["metadata"])
+ identifier = JujuTopology.from_dict(metadata).identifier
+ alerts[identifier] = self._tool.apply_label_matchers(alert_rules) # type: ignore
+
+ except KeyError as e:
+ logger.debug(
+ "Relation %s has no 'metadata': %s",
+ relation.id,
+ e,
+ )
+
+ if not identifier:
+ logger.error(
+ "Alert rules were found but no usable group or identifier was present."
+ )
+ continue
+
+ _, errmsg = self._tool.validate_alert_rules(cast(OfficialRuleFileFormat, alert_rules))
+ if errmsg:
+ if self._charm.unit.is_leader():
+ relation.data[self._charm.app]["event"] = json.dumps({"errors": errmsg})
+ continue
+
+ alerts[identifier] = alert_rules
+
+ return alerts
+
+ def _get_identifier_by_alert_rules(
+ self, rules: dict
+ ) -> Tuple[Union[str, None], Union[JujuTopology, None]]:
+ """Determine an appropriate dict key for alert rules.
+
+ The key is used as the filename when writing alerts to disk, so the structure
+ and uniqueness is important.
+
+ Args:
+ rules: a dict of alert rules
+ Returns:
+ A tuple containing an identifier, if found, and a JujuTopology, if it could
+ be constructed.
+ """
+ if "groups" not in rules:
+ logger.debug("No alert groups were found in relation data")
+ return None, None
+
+ # Construct an ID based on what's in the alert rules if they have labels
+ for group in rules["groups"]:
+ try:
+ labels = group["rules"][0]["labels"]
+ topology = JujuTopology(
+ # Don't try to safely get required constructor fields. There's already
+ # a handler for KeyErrors
+ model_uuid=labels["juju_model_uuid"],
+ model=labels["juju_model"],
+ application=labels["juju_application"],
+ unit=labels.get("juju_unit", ""),
+ charm_name=labels.get("juju_charm", ""),
+ )
+ return topology.identifier, topology
+ except KeyError:
+ logger.debug("Alert rules were found but no usable labels were present")
+ continue
+
+ logger.warning(
+ "No labeled alert rules were found, and no 'scrape_metadata' "
+ "was available. Using the alert group name as filename."
+ )
+ try:
+ for group in rules["groups"]:
+ return group["name"], None
+ except KeyError:
+ logger.debug("No group name was found to use as identifier")
+
+ return None, None
+
+ def _inject_alert_expr_labels(self, rules: Dict[str, Any]) -> Dict[str, Any]:
+ """Iterate through alert rules and inject topology into expressions.
+
+ Args:
+ rules: a dict of alert rules
+ """
+ if "groups" not in rules:
+ return rules
+
+ modified_groups = []
+ for group in rules["groups"]:
+ # Copy off rules, so we don't modify an object we're iterating over
+ rules_copy = group["rules"]
+ for idx, rule in enumerate(rules_copy):
+ labels = rule.get("labels")
+
+ if labels:
+ try:
+ topology = JujuTopology(
+ # Don't try to safely get required constructor fields. There's already
+ # a handler for KeyErrors
+ model_uuid=labels["juju_model_uuid"],
+ model=labels["juju_model"],
+ application=labels["juju_application"],
+ unit=labels.get("juju_unit", ""),
+ charm_name=labels.get("juju_charm", ""),
+ )
+
+ # Inject topology and put it back in the list
+ rule["expr"] = self._tool.inject_label_matchers(
+ re.sub(r"%%juju_topology%%,?", "", rule["expr"]),
+ topology.label_matcher_dict,
+ )
+ except KeyError:
+ # Some required JujuTopology key is missing. Just move on.
+ pass
+
+ group["rules"][idx] = rule
+
+ modified_groups.append(group)
+
+ rules["groups"] = modified_groups
+ return rules
+
+
+class ConsumerBase(Object):
+ """Consumer's base class."""
+
+ def __init__(
+ self,
+ charm: CharmBase,
+ relation_name: str = DEFAULT_RELATION_NAME,
+ alert_rules_path: str = DEFAULT_ALERT_RULES_RELATIVE_PATH,
+ recursive: bool = False,
+ skip_alert_topology_labeling: bool = False,
+ *,
+ forward_alert_rules: bool = True,
+ extra_alert_labels: Dict = {},
+ ):
+ super().__init__(charm, relation_name)
+ self._charm = charm
+ self._relation_name = relation_name
+ self._forward_alert_rules = forward_alert_rules
+ self._extra_alert_labels = extra_alert_labels
+ self.topology = JujuTopology.from_charm(charm)
+
+ try:
+ alert_rules_path = _resolve_dir_against_charm_path(charm, alert_rules_path)
+ except InvalidAlertRulePathError as e:
+ logger.debug(
+ "Invalid Loki alert rules folder at %s: %s",
+ e.alert_rules_absolute_path,
+ e.message,
+ )
+ self._alert_rules_path = alert_rules_path
+ self._skip_alert_topology_labeling = skip_alert_topology_labeling
+
+ self._recursive = recursive
+
+ @staticmethod
+ def _inject_extra_labels_to_alert_rules(rules: Dict, extra_alert_labels: Dict) -> Dict:
+ """Return a copy of the rules dict with extra labels injected."""
+ result = copy.deepcopy(rules)
+ for group in result.get("groups", []):
+ for rule in group.get("rules", []):
+ rule.setdefault("labels", {}).update(extra_alert_labels)
+ return result
+
+ def _handle_alert_rules(self, relation):
+ if not self._charm.unit.is_leader():
+ return
+
+ alert_rules = (
+ AlertRules(query_type="logql")
+ if self._skip_alert_topology_labeling
+ else AlertRules(query_type="logql", topology=self.topology)
+ )
+ if self._forward_alert_rules:
+ alert_rules.add_path(self._alert_rules_path, recursive=self._recursive)
+ alert_rules_as_dict = alert_rules.as_dict()
+
+ if self._extra_alert_labels:
+ alert_rules_as_dict = ConsumerBase._inject_extra_labels_to_alert_rules(
+ alert_rules_as_dict, self._extra_alert_labels
+ )
+
+ relation.data[self._charm.app]["metadata"] = json.dumps(self.topology.as_dict())
+ relation.data[self._charm.app]["alert_rules"] = json.dumps(
+ alert_rules_as_dict,
+ sort_keys=True, # sort, to prevent unnecessary relation_changed events
+ )
+
+ @property
+ def loki_endpoints(self) -> List[dict]:
+ """Fetch Loki Push API endpoints sent from LokiPushApiProvider through relation data.
+
+ Returns:
+ A list of unique dictionaries with Loki Push API endpoints, for instance:
+ [
+ {"url": "http://loki1:3100/loki/api/v1/push"},
+ {"url": "http://loki2:3100/loki/api/v1/push"},
+ ]
+ """
+ endpoints = []
+ seen_urls = set()
+
+ for relation in self._charm.model.relations[self._relation_name]:
+ for unit in relation.units:
+ if unit.app == self._charm.app:
+ continue
+
+ if not (endpoint := relation.data[unit].get("endpoint")):
+ continue
+
+ deserialized_endpoint = json.loads(endpoint)
+ url = deserialized_endpoint.get("url")
+
+ # Deduplicate by URL.
+ # With loki-k8s we have ingress-per-unit, so in that case
+ # we do want to collect the URLs of all the units.
+ # With loki-coordinator-k8s, even when the coordinator
+ # is scaled, we want to advertise only one URL.
+ # Without deduplication, we'd end up with the same
+ # tls config section in the promtail config file, in which
+ # case promtail immediately exits with the following error:
+ # [promtail] level=error ts= msg="error creating promtail" error="failed to create client manager: duplicate client configs are not allowed, found duplicate for name: "
+
+ if not url or url in seen_urls:
+ continue
+
+ seen_urls.add(url)
+ endpoints.append(deserialized_endpoint)
+
+ return endpoints
+
+
+
+class LokiPushApiConsumer(ConsumerBase):
+ """Loki Consumer class."""
+
+ on = LokiPushApiEvents() # pyright: ignore
+
+ def __init__(
+ self,
+ charm: CharmBase,
+ relation_name: str = DEFAULT_RELATION_NAME,
+ alert_rules_path: str = DEFAULT_ALERT_RULES_RELATIVE_PATH,
+ recursive: bool = True,
+ skip_alert_topology_labeling: bool = False,
+ *,
+ refresh_event: Optional[Union[BoundEvent, List[BoundEvent]]] = None,
+ forward_alert_rules: bool = True,
+ extra_alert_labels: Dict = {},
+ ):
+ """Construct a Loki charm client.
+
+ The `LokiPushApiConsumer` object provides configurations to a Loki client charm, such as
+ the Loki API endpoint to push logs. It is intended for workloads that can speak
+ loki_push_api (https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki), such
+ as grafana-agent.
+ (If you need to forward workload stdout logs, then use LogForwarder; if you need to forward
+ log files, then use LogProxyConsumer.)
+
+ `LokiPushApiConsumer` can be instantiated as follows:
+
+ self._loki_consumer = LokiPushApiConsumer(self)
+
+ Args:
+ charm: a `CharmBase` object that manages this `LokiPushApiConsumer` object.
+ Typically, this is `self` in the instantiating class.
+ relation_name: the string name of the relation interface to look up.
+ If `charm` has exactly one relation with this interface, the relation's
+ name is returned. If none or multiple relations with the provided interface
+ are found, this method will raise either a NoRelationWithInterfaceFoundError or
+ MultipleRelationsWithInterfaceFoundError exception, respectively.
+ alert_rules_path: a string indicating a path where alert rules can be found
+ recursive: Whether to scan for rule files recursively.
+ skip_alert_topology_labeling: whether to skip the alert topology labeling.
+ forward_alert_rules: a boolean flag to toggle forwarding of charmed alert rules.
+ extra_alert_labels: Dict of extra labels to inject alert rules with.
+ refresh_event: an optional bound event or list of bound events which
+ will be observed to re-set scrape job data (IP address and others)
+
+ Raises:
+ RelationNotFoundError: If there is no relation in the charm's metadata.yaml
+ with the same name as provided via `relation_name` argument.
+ RelationInterfaceMismatchError: The relation with the same name as provided
+ via `relation_name` argument does not have the `loki_push_api` relation
+ interface.
+ RelationRoleMismatchError: If the relation with the same name as provided
+ via `relation_name` argument does not have the `RelationRole.provides`
+ role.
+
+ Emits:
+ loki_push_api_endpoint_joined: This event is emitted when the relation between the
+ Charmed Operator that instantiates `LokiPushApiProvider` (Loki charm for instance)
+ and the Charmed Operator that instantiates `LokiPushApiConsumer` is established.
+ loki_push_api_endpoint_departed: This event is emitted when the relation between the
+ Charmed Operator that implements `LokiPushApiProvider` (Loki charm for instance)
+ and the Charmed Operator that implements `LokiPushApiConsumer` is removed.
+ loki_push_api_alert_rules_error: This event is emitted when an invalid alert rules
+ file is encountered or if `alert_rules_path` is empty.
+ """
+ _validate_relation_by_interface_and_direction(
+ charm, relation_name, RELATION_INTERFACE_NAME, RelationRole.requires
+ )
+ super().__init__(
+ charm,
+ relation_name,
+ alert_rules_path,
+ recursive,
+ skip_alert_topology_labeling,
+ forward_alert_rules=forward_alert_rules,
+ extra_alert_labels=extra_alert_labels,
+ )
+ events = self._charm.on[relation_name]
+ self.framework.observe(self._charm.on.upgrade_charm, self._on_lifecycle_event)
+ self.framework.observe(self._charm.on.config_changed, self._on_lifecycle_event)
+ self.framework.observe(events.relation_joined, self._on_logging_relation_joined)
+ self.framework.observe(events.relation_changed, self._on_logging_relation_changed)
+ self.framework.observe(events.relation_departed, self._on_logging_relation_departed)
+
+ if refresh_event:
+ if not isinstance(refresh_event, list):
+ refresh_event = [refresh_event]
+ for ev in refresh_event:
+ self.framework.observe(ev, self._on_lifecycle_event)
+
+ def _on_lifecycle_event(self, _: HookEvent):
+ """Update require relation data on charm upgrades and other lifecycle events.
+
+ Args:
+ event: a `CharmEvent` in response to which the consumer
+ charm must update its relation data.
+ """
+ # Upgrade event or other charm-level event
+ self._reinitialize_alert_rules()
+ self.on.loki_push_api_endpoint_joined.emit()
+
+ def _on_logging_relation_joined(self, event: RelationJoinedEvent):
+ """Handle changes in related consumers.
+
+ Update relation data and emit events when a relation is established.
+
+ Args:
+ event: a `CharmEvent` in response to which the consumer
+ charm must update its relation data.
+
+ Emits:
+ loki_push_api_endpoint_joined: Once the relation is established, this event is emitted.
+ loki_push_api_alert_rules_error: This event is emitted when an invalid alert rules
+ file is encountered or if `alert_rules_path` is empty.
+ """
+ # Alert rules will not change over the lifecycle of a charm, and do not need to be
+ # constantly set on every relation_changed event. Leave them here.
+ self._handle_alert_rules(event.relation)
+ self.on.loki_push_api_endpoint_joined.emit()
+
+ def _on_logging_relation_changed(self, event: RelationEvent):
+ """Handle changes in related consumers.
+
+ Anytime there are changes in the relation between Loki
+ and its consumers charms.
+
+ Args:
+ event: a `CharmEvent` in response to which the consumer
+ charm must update its relation data.
+
+ Emits:
+ loki_push_api_endpoint_joined: Once the relation is established, this event is emitted.
+ loki_push_api_alert_rules_error: This event is emitted when an invalid alert rules
+ file is encountered or if `alert_rules_path` is empty.
+ """
+ if self._charm.unit.is_leader():
+ ev = json.loads(event.relation.data[event.app].get("event", "{}"))
+
+ if ev:
+ valid = bool(ev.get("valid", True))
+ errors = ev.get("errors", "")
+
+ if valid and not errors:
+ self.on.alert_rule_status_changed.emit(valid=valid)
+ else:
+ self.on.alert_rule_status_changed.emit(valid=valid, errors=errors)
+
+ self.on.loki_push_api_endpoint_joined.emit()
+
+ def reload_alerts(self) -> None:
+ """Reloads alert rules and updates all relations."""
+ self._reinitialize_alert_rules()
+
+ def _reinitialize_alert_rules(self):
+ for relation in self._charm.model.relations[self._relation_name]:
+ self._handle_alert_rules(relation)
+
+ def _process_logging_relation_changed(self, relation: Relation):
+ self._handle_alert_rules(relation)
+ self.on.loki_push_api_endpoint_joined.emit()
+
+ def _on_logging_relation_departed(self, _: RelationEvent):
+ """Handle departures in related providers.
+
+ Anytime there are departures in relations between the consumer charm and Loki
+ the consumer charm is informed, through a `LokiPushApiEndpointDeparted` event.
+ The consumer charm can then choose to update its configuration.
+ """
+ # Provide default to avoid throwing, as in some complicated scenarios with
+ # upgrades and hook failures we might not have data in the storage
+ self.on.loki_push_api_endpoint_departed.emit()
+
+
+class ContainerNotFoundError(Exception):
+ """Raised if the specified container does not exist."""
+
+ def __init__(self):
+ msg = "The specified container does not exist."
+ self.message = msg
+
+ super().__init__(self.message)
+
+
+class PromtailDigestError(EventBase):
+ """Event emitted when there is an error with Promtail initialization."""
+
+ def __init__(self, handle, message):
+ super().__init__(handle)
+ self.message = message
+
+ def snapshot(self):
+ """Save message information."""
+ return {"message": self.message}
+
+ def restore(self, snapshot):
+ """Restore message information."""
+ self.message = snapshot["message"]
+
+
+class LogProxyEndpointDeparted(EventBase):
+ """Event emitted when a Log Proxy has departed."""
+
+
+class LogProxyEndpointJoined(EventBase):
+ """Event emitted when a Log Proxy joins."""
+
+
+class LogProxyEvents(ObjectEvents):
+ """Event descriptor for events raised by `LogProxyConsumer`."""
+
+ promtail_digest_error = EventSource(PromtailDigestError)
+ log_proxy_endpoint_departed = EventSource(LogProxyEndpointDeparted)
+ log_proxy_endpoint_joined = EventSource(LogProxyEndpointJoined)
+
+
+class LogProxyConsumer(ConsumerBase):
+ """LogProxyConsumer class.
+
+ > Note: This object is deprecated. Consider migrating to LogForwarder with the release of Juju
+ > 3.6 LTS.
+
+ The `LogProxyConsumer` object provides a method for attaching `promtail` to
+ a workload in order to generate structured logging data from applications
+ which traditionally log to syslog or do not have native Loki integration.
+ The `LogProxyConsumer` can be instantiated as follows:
+
+ self._log_proxy = LogProxyConsumer(
+ self,
+ logs_scheme={
+ "workload-a": {
+ "log-files": ["/tmp/worload-a-1.log", "/tmp/worload-a-2.log"],
+ "syslog-port": 1514,
+ },
+ "workload-b": {"log-files": ["/tmp/worload-b.log"], "syslog-port": 1515},
+ },
+ relation_name="log-proxy",
+ )
+
+ Args:
+ charm: a `CharmBase` object that manages this `LokiPushApiConsumer` object.
+ Typically, this is `self` in the instantiating class.
+ logs_scheme: a dict which maps containers and a list of log files and syslog port.
+ relation_name: the string name of the relation interface to look up.
+ If `charm` has exactly one relation with this interface, the relation's
+ name is returned. If none or multiple relations with the provided interface
+ are found, this method will raise either a NoRelationWithInterfaceFoundError or
+ MultipleRelationsWithInterfaceFoundError exception, respectively.
+ containers_syslog_port: a dict which maps (and enable) containers and syslog port.
+ alert_rules_path: an optional path for the location of alert rules
+ files. Defaults to "./src/loki_alert_rules",
+ resolved from the directory hosting the charm entry file.
+ The alert rules are automatically updated on charm upgrade.
+ recursive: Whether to scan for rule files recursively.
+ promtail_resource_name: An optional promtail resource name from metadata
+ if it has been modified and attached
+ insecure_skip_verify: skip SSL verification.
+
+ Raises:
+ RelationNotFoundError: If there is no relation in the charm's metadata.yaml
+ with the same name as provided via `relation_name` argument.
+ RelationInterfaceMismatchError: The relation with the same name as provided
+ via `relation_name` argument does not have the `loki_push_api` relation
+ interface.
+ RelationRoleMismatchError: If the relation with the same name as provided
+ via `relation_name` argument does not have the `RelationRole.provides`
+ role.
+ """
+
+ on = LogProxyEvents() # pyright: ignore
+
+ def __init__(
+ self,
+ charm,
+ *,
+ logs_scheme=None,
+ relation_name: str = DEFAULT_LOG_PROXY_RELATION_NAME,
+ alert_rules_path: str = DEFAULT_ALERT_RULES_RELATIVE_PATH,
+ recursive: bool = False,
+ promtail_resource_name: Optional[str] = None,
+ insecure_skip_verify: bool = False,
+ ):
+ super().__init__(charm, relation_name, alert_rules_path, recursive)
+ self._charm = charm
+ self._logs_scheme = logs_scheme or {}
+ self._relation_name = relation_name
+ self.topology = JujuTopology.from_charm(charm)
+ self._promtail_resource_name = promtail_resource_name or "promtail-bin"
+ self.insecure_skip_verify = insecure_skip_verify
+ self._promtails_ports = self._generate_promtails_ports(logs_scheme)
+
+ # architecture used for promtail binary
+ arch = platform.processor()
+ if arch in ["x86_64", "amd64"]:
+ self._arch = "amd64"
+ elif arch in ["aarch64", "arm64", "armv8b", "armv8l"]:
+ self._arch = "arm64"
+ else:
+ self._arch = arch
+
+ events = self._charm.on[relation_name]
+ self.framework.observe(events.relation_created, self._on_relation_created)
+ self.framework.observe(events.relation_changed, self._on_relation_changed)
+ self.framework.observe(events.relation_departed, self._on_relation_departed)
+ self._observe_pebble_ready()
+
+ def _observe_pebble_ready(self):
+ for container in self._containers.keys():
+ snake_case_container_name = container.replace("-", "_")
+ self.framework.observe(
+ getattr(self._charm.on, f"{snake_case_container_name}_pebble_ready"),
+ self._on_pebble_ready,
+ )
+
+ def _on_pebble_ready(self, event: WorkloadEvent):
+ """Event handler for `pebble_ready`."""
+ if self.model.relations[self._relation_name]:
+ self._setup_promtail(event.workload)
+
+ def _on_relation_created(self, _: RelationCreatedEvent) -> None:
+ """Event handler for `relation_created`."""
+ for container in self._containers.values():
+ if container.can_connect():
+ self._setup_promtail(container)
+
+ def _on_relation_changed(self, event: RelationEvent) -> None:
+ """Event handler for `relation_changed`.
+
+ Args:
+ event: The event object `RelationChangedEvent`.
+ """
+ self._handle_alert_rules(event.relation)
+
+ if self._charm.unit.is_leader():
+ ev = json.loads(event.relation.data[event.app].get("event", "{}"))
+
+ if ev:
+ valid = bool(ev.get("valid", True))
+ errors = ev.get("errors", "")
+
+ if valid and not errors:
+ self.on.alert_rule_status_changed.emit(valid=valid)
+ else:
+ self.on.alert_rule_status_changed.emit(valid=valid, errors=errors)
+
+ for container in self._containers.values():
+ if not container.can_connect():
+ continue
+ if self.model.relations[self._relation_name]:
+ if "promtail" not in container.get_plan().services:
+ self._setup_promtail(container)
+ continue
+
+ new_config = self._promtail_config(container.name)
+ if new_config != self._current_config(container):
+ container.push(
+ WORKLOAD_CONFIG_PATH, yaml.safe_dump(new_config), make_dirs=True
+ )
+
+ # Loki may send endpoints late. Don't necessarily start, there may be
+ # no clients
+ if new_config["clients"]:
+ container.restart(WORKLOAD_SERVICE_NAME)
+ self.on.log_proxy_endpoint_joined.emit()
+ else:
+ self.on.promtail_digest_error.emit("No promtail client endpoints available!")
+
+ def _on_relation_departed(self, _: RelationEvent) -> None:
+ """Event handler for `relation_departed`.
+
+ Args:
+ event: The event object `RelationDepartedEvent`.
+ """
+ for container in self._containers.values():
+ if not container.can_connect():
+ continue
+ if not self._charm.model.relations[self._relation_name]:
+ container.stop(WORKLOAD_SERVICE_NAME)
+ continue
+
+ new_config = self._promtail_config(container.name)
+ if new_config != self._current_config(container):
+ container.push(WORKLOAD_CONFIG_PATH, yaml.safe_dump(new_config), make_dirs=True)
+
+ if new_config["clients"]:
+ container.restart(WORKLOAD_SERVICE_NAME)
+ else:
+ container.stop(WORKLOAD_SERVICE_NAME)
+ self.on.log_proxy_endpoint_departed.emit()
+
+ def _add_pebble_layer(self, workload_binary_path: str, container: Container) -> None:
+ """Adds Pebble layer that manages Promtail service in Workload container.
+
+ Args:
+ workload_binary_path: string providing path to promtail binary in workload container.
+ container: container into which the layer is to be added.
+ """
+ pebble_layer = Layer(
+ {
+ "summary": "promtail layer",
+ "description": "pebble config layer for promtail",
+ "services": {
+ WORKLOAD_SERVICE_NAME: {
+ "override": "replace",
+ "summary": WORKLOAD_SERVICE_NAME,
+ "command": f"{workload_binary_path} {self._cli_args}",
+ "startup": "disabled",
+ }
+ },
+ }
+ )
+ container.add_layer(container.name, pebble_layer, combine=True)
+
+ def _create_directories(self, container: Container) -> None:
+ """Creates the directories for Promtail binary and config file."""
+ container.make_dir(path=WORKLOAD_BINARY_DIR, make_parents=True)
+ container.make_dir(path=WORKLOAD_CONFIG_DIR, make_parents=True)
+
+ def _obtain_promtail(self, promtail_info: dict, container: Container) -> None:
+ """Obtain promtail binary from an attached resource or download it.
+
+ Args:
+ promtail_info: dictionary containing information about promtail binary
+ that must be used. The dictionary must have three keys
+ - "filename": filename of promtail binary
+ - "zipsha": sha256 sum of zip file of promtail binary
+ - "binsha": sha256 sum of unpacked promtail binary
+ container: container into which promtail is to be obtained.
+ """
+ workload_binary_path = os.path.join(WORKLOAD_BINARY_DIR, promtail_info["filename"])
+ if self._promtail_attached_as_resource:
+ self._push_promtail_if_attached(container, workload_binary_path)
+ return
+
+ if self._promtail_must_be_downloaded(promtail_info):
+ self._download_and_push_promtail_to_workload(container, promtail_info)
+ else:
+ binary_path = os.path.join(BINARY_DIR, promtail_info["filename"])
+ self._push_binary_to_workload(container, binary_path, workload_binary_path)
+
+ def _push_binary_to_workload(
+ self, container: Container, binary_path: str, workload_binary_path: str
+ ) -> None:
+ """Push promtail binary into workload container.
+
+ Args:
+ binary_path: path in charm container from which promtail binary is read.
+ workload_binary_path: path in workload container to which promtail binary is pushed.
+ container: container into which promtail is to be uploaded.
+ """
+ with open(binary_path, "rb") as f:
+ container.push(workload_binary_path, f, permissions=0o755, make_dirs=True)
+ logger.debug("The promtail binary file has been pushed to the workload container.")
+
+ @property
+ def _promtail_attached_as_resource(self) -> bool:
+ """Checks whether Promtail binary is attached to the charm or not.
+
+ Returns:
+ a boolean representing whether Promtail binary is attached as a resource or not.
+ """
+ try:
+ self._charm.model.resources.fetch(self._promtail_resource_name)
+ return True
+ except ModelError:
+ return False
+ except NameError as e:
+ if "invalid resource name" in str(e):
+ return False
+ raise
+
+ def _push_promtail_if_attached(self, container: Container, workload_binary_path: str) -> bool:
+ """Checks whether Promtail binary is attached to the charm or not.
+
+ Args:
+ workload_binary_path: string specifying expected path of promtail
+ in workload container
+ container: container into which promtail is to be pushed.
+
+ Returns:
+ a boolean representing whether Promtail binary is attached or not.
+ """
+ logger.info("Promtail binary file has been obtained from an attached resource.")
+ resource_path = self._charm.model.resources.fetch(self._promtail_resource_name)
+ self._push_binary_to_workload(container, resource_path, workload_binary_path)
+ return True
+
+ def _promtail_must_be_downloaded(self, promtail_info: dict) -> bool:
+ """Checks whether promtail binary must be downloaded or not.
+
+ Args:
+ promtail_info: dictionary containing information about promtail binary
+ that must be used. The dictionary must have three keys
+ - "filename": filename of promtail binary
+ - "zipsha": sha256 sum of zip file of promtail binary
+ - "binsha": sha256 sum of unpacked promtail binary
+
+ Returns:
+ a boolean representing whether Promtail binary must be downloaded or not.
+ """
+ binary_path = os.path.join(BINARY_DIR, promtail_info["filename"])
+ if not self._is_promtail_binary_in_charm(binary_path):
+ return True
+
+ if not self._sha256sums_matches(binary_path, promtail_info["binsha"]):
+ return True
+
+ logger.debug("Promtail binary file is already in the the charm container.")
+ return False
+
+ def _sha256sums_matches(self, file_path: str, sha256sum: str) -> bool:
+ """Checks whether a file's sha256sum matches or not with a specific sha256sum.
+
+ Args:
+ file_path: A string representing the files' patch.
+ sha256sum: The sha256sum against which we want to verify.
+
+ Returns:
+ a boolean representing whether a file's sha256sum matches or not with
+ a specific sha256sum.
+ """
+ try:
+ with open(file_path, "rb") as f:
+ file_bytes = f.read()
+ result = sha256(file_bytes).hexdigest()
+
+ if result != sha256sum:
+ msg = "File sha256sum mismatch, expected:'{}' but got '{}'".format(
+ sha256sum, result
+ )
+ logger.debug(msg)
+ return False
+
+ return True
+ except (APIError, FileNotFoundError):
+ msg = "File: '{}' could not be opened".format(file_path)
+ logger.error(msg)
+ return False
+
+ def _is_promtail_binary_in_charm(self, binary_path: str) -> bool:
+ """Check if Promtail binary is already stored in charm container.
+
+ Args:
+ binary_path: string path of promtail binary to check
+
+ Returns:
+ a boolean representing whether Promtail is present or not.
+ """
+ return True if Path(binary_path).is_file() else False
+
+ def _download_and_push_promtail_to_workload(
+ self, container: Container, promtail_info: dict
+ ) -> None:
+ """Downloads a Promtail zip file and pushes the binary to the workload.
+
+ Args:
+ promtail_info: dictionary containing information about promtail binary
+ that must be used. The dictionary must have three keys
+ - "filename": filename of promtail binary
+ - "zipsha": sha256 sum of zip file of promtail binary
+ - "binsha": sha256 sum of unpacked promtail binary
+ container: container into which promtail is to be uploaded.
+ """
+ # Check for Juju proxy variables and fall back to standard ones if not set
+ # If no Juju proxy variable was set, we set proxies to None to let the ProxyHandler get
+ # the proxy env variables from the environment
+ proxies = {
+ # The ProxyHandler uses only the protocol names as keys
+ # https://docs.python.org/3/library/urllib.request.html#urllib.request.ProxyHandler
+ "https": os.environ.get("JUJU_CHARM_HTTPS_PROXY", ""),
+ "http": os.environ.get("JUJU_CHARM_HTTP_PROXY", ""),
+ # The ProxyHandler uses `no` for the no_proxy key
+ # https://github.com/python/cpython/blob/3.12/Lib/urllib/request.py#L2553
+ "no": os.environ.get("JUJU_CHARM_NO_PROXY", ""),
+ }
+ proxies = {k: v for k, v in proxies.items() if v != ""} or None
+
+ proxy_handler = request.ProxyHandler(proxies)
+ opener = request.build_opener(proxy_handler)
+
+ with opener.open(promtail_info["url"]) as r:
+ file_bytes = r.read()
+ file_path = os.path.join(BINARY_DIR, promtail_info["filename"] + ".gz")
+ with open(file_path, "wb") as f:
+ f.write(file_bytes)
+ logger.info(
+ "Promtail binary zip file has been downloaded and stored in: %s",
+ file_path,
+ )
+
+ decompressed_file = GzipFile(fileobj=BytesIO(file_bytes))
+ binary_path = os.path.join(BINARY_DIR, promtail_info["filename"])
+ with open(binary_path, "wb") as outfile:
+ outfile.write(decompressed_file.read())
+ logger.debug("Promtail binary file has been downloaded.")
+
+ workload_binary_path = os.path.join(WORKLOAD_BINARY_DIR, promtail_info["filename"])
+ self._push_binary_to_workload(container, binary_path, workload_binary_path)
+
+ @property
+ def _cli_args(self) -> str:
+ """Return the cli arguments to pass to promtail.
+
+ Returns:
+ The arguments as a string
+ """
+ return "-config.file={}".format(WORKLOAD_CONFIG_PATH)
+
+ def _current_config(self, container) -> dict:
+ """Property that returns the current Promtail configuration.
+
+ Returns:
+ A dict containing Promtail configuration.
+ """
+ if not container.can_connect():
+ logger.debug("Could not connect to promtail container!")
+ return {}
+ try:
+ raw_current = container.pull(WORKLOAD_CONFIG_PATH).read()
+ return yaml.safe_load(raw_current)
+ except (ProtocolError, PathError) as e:
+ logger.warning(
+ "Could not check the current promtail configuration due to "
+ "a failure in retrieving the file: %s",
+ e,
+ )
+ return {}
+
+ def _promtail_config(self, container_name: str) -> dict:
+ """Generates the config file for Promtail.
+
+ Reference: https://grafana.com/docs/loki/latest/send-data/promtail/configuration
+ """
+ config = {"clients": self._clients_list()}
+ if self.insecure_skip_verify:
+ for client in config["clients"]:
+ client["tls_config"] = {"insecure_skip_verify": True}
+
+ config.update(self._server_config(container_name))
+ config.update(self._positions)
+ config.update(self._scrape_configs(container_name))
+ return config
+
+ def _clients_list(self) -> list:
+ """Generates a list of clients for use in the promtail config.
+
+ Returns:
+ A list of endpoints
+ """
+ return self.loki_endpoints
+
+ def _server_config(self, container_name: str) -> dict:
+ """Generates the server section of the Promtail config file.
+
+ Returns:
+ A dict representing the `server` section.
+ """
+ return {
+ "server": {
+ "http_listen_port": self._promtails_ports[container_name]["http_listen_port"],
+ "grpc_listen_port": self._promtails_ports[container_name]["grpc_listen_port"],
+ }
+ }
+
+ @property
+ def _positions(self) -> dict:
+ """Generates the positions section of the Promtail config file.
+
+ Returns:
+ A dict representing the `positions` section.
+ """
+ return {"positions": {"filename": WORKLOAD_POSITIONS_PATH}}
+
+ def _scrape_configs(self, container_name: str) -> dict:
+ """Generates the scrape_configs section of the Promtail config file.
+
+ Returns:
+ A dict representing the `scrape_configs` section.
+ """
+ job_name = f"juju_{self.topology.identifier}"
+
+ # The new JujuTopology doesn't include unit, but LogProxyConsumer should have it
+ common_labels = {
+ f"juju_{k}": v
+ for k, v in self.topology.as_dict(remapped_keys={"charm_name": "charm"}).items()
+ }
+ common_labels["container"] = container_name
+ scrape_configs = []
+
+ # Files config
+ labels = common_labels.copy()
+ labels.update(
+ {
+ "job": job_name,
+ "__path__": "",
+ }
+ )
+ config = {"targets": ["localhost"], "labels": labels}
+ scrape_config = {
+ "job_name": "system",
+ "static_configs": self._generate_static_configs(config, container_name),
+ }
+ scrape_configs.append(scrape_config)
+
+ # Syslog config
+ syslog_port = self._logs_scheme.get(container_name, {}).get("syslog-port")
+ if syslog_port:
+ relabel_mappings = [
+ "severity",
+ "facility",
+ "hostname",
+ "app_name",
+ "proc_id",
+ "msg_id",
+ ]
+ syslog_labels = common_labels.copy()
+ syslog_labels.update({"job": f"{job_name}_syslog"})
+ syslog_config = {
+ "job_name": "syslog",
+ "syslog": {
+ "listen_address": f"127.0.0.1:{syslog_port}",
+ "label_structured_data": True,
+ "labels": syslog_labels,
+ },
+ "relabel_configs": [
+ {"source_labels": [f"__syslog_message_{val}"], "target_label": val}
+ for val in relabel_mappings
+ ]
+ + [{"action": "labelmap", "regex": "__syslog_message_sd_(.+)"}],
+ }
+ scrape_configs.append(syslog_config) # type: ignore
+
+ return {"scrape_configs": scrape_configs}
+
+ def _generate_static_configs(self, config: dict, container_name: str) -> list:
+ """Generates static_configs section.
+
+ Returns:
+ - a list of dictionaries representing static_configs section
+ """
+ static_configs = []
+
+ for _file in self._logs_scheme.get(container_name, {}).get("log-files", []):
+ conf = deepcopy(config)
+ conf["labels"]["__path__"] = _file
+ static_configs.append(conf)
+
+ return static_configs
+
+ def _setup_promtail(self, container: Container) -> None:
+ # Use the first
+ relations = self._charm.model.relations[self._relation_name]
+ if len(relations) > 1:
+ logger.debug(
+ "Multiple log_proxy relations. Getting Promtail from application {}".format(
+ relations[0].app.name
+ )
+ )
+ relation = relations[0]
+ promtail_binaries = json.loads(
+ relation.data[relation.app].get("promtail_binary_zip_url", "{}")
+ )
+ if not promtail_binaries:
+ return
+
+ self._create_directories(container)
+ self._ensure_promtail_binary(promtail_binaries, container)
+
+ container.push(
+ WORKLOAD_CONFIG_PATH,
+ yaml.safe_dump(self._promtail_config(container.name)),
+ make_dirs=True,
+ )
+
+ workload_binary_path = os.path.join(
+ WORKLOAD_BINARY_DIR, promtail_binaries[self._arch]["filename"]
+ )
+ self._add_pebble_layer(workload_binary_path, container)
+
+ if self._current_config(container).get("clients"):
+ try:
+ container.restart(WORKLOAD_SERVICE_NAME)
+ except ChangeError as e:
+ self.on.promtail_digest_error.emit(str(e))
+ else:
+ self.on.log_proxy_endpoint_joined.emit()
+ else:
+ self.on.promtail_digest_error.emit("No promtail client endpoints available!")
+
+ def _ensure_promtail_binary(self, promtail_binaries: dict, container: Container):
+ if self._is_promtail_installed(promtail_binaries[self._arch], container):
+ return
+
+ try:
+ self._obtain_promtail(promtail_binaries[self._arch], container)
+ except URLError as e:
+ msg = f"Promtail binary couldn't be downloaded - {str(e)}"
+ logger.warning(msg)
+ self.on.promtail_digest_error.emit(msg)
+
+ def _is_promtail_installed(self, promtail_info: dict, container: Container) -> bool:
+ """Determine if promtail has already been installed to the container.
+
+ Args:
+ promtail_info: dictionary containing information about promtail binary
+ that must be used. The dictionary must at least contain a key
+ "filename" giving the name of promtail binary
+ container: container in which to check whether promtail is installed.
+ """
+ workload_binary_path = f"{WORKLOAD_BINARY_DIR}/{promtail_info['filename']}"
+ try:
+ container.list_files(workload_binary_path)
+ except (APIError, FileNotFoundError):
+ return False
+ return True
+
+ def _generate_promtails_ports(self, logs_scheme) -> dict:
+ return {
+ container: {
+ "http_listen_port": HTTP_LISTEN_PORT_START + 2 * i,
+ "grpc_listen_port": GRPC_LISTEN_PORT_START + 2 * i,
+ }
+ for i, container in enumerate(logs_scheme.keys())
+ }
+
+ def syslog_port(self, container_name: str) -> str:
+ """Gets the port on which promtail is listening for syslog in this container.
+
+ Returns:
+ A str representing the port
+ """
+ return str(self._logs_scheme.get(container_name, {}).get("syslog-port"))
+
+ def rsyslog_config(self, container_name: str) -> str:
+ """Generates a config line for use with rsyslog.
+
+ Returns:
+ The rsyslog config line as a string
+ """
+ return 'action(type="omfwd" protocol="tcp" target="127.0.0.1" port="{}" Template="RSYSLOG_SyslogProtocol23Format" TCP_Framing="octet-counted")'.format(
+ self._logs_scheme.get(container_name, {}).get("syslog-port")
+ )
+
+ @property
+ def _containers(self) -> Dict[str, Container]:
+ return {cont: self._charm.unit.get_container(cont) for cont in self._logs_scheme.keys()}
+
+
+class _PebbleLogClient:
+ @staticmethod
+ def check_juju_version() -> bool:
+ """Make sure the Juju version supports Log Forwarding."""
+ juju_version = JujuVersion.from_environ()
+ if not juju_version > JujuVersion(version=str("3.3")):
+ msg = f"Juju version {juju_version} does not support Pebble log forwarding. Juju >= 3.4 is needed."
+ logger.warning(msg)
+ return False
+ return True
+
+ @staticmethod
+ def _build_log_target(
+ unit_name: str, loki_endpoint: str, topology: JujuTopology, enable: bool
+ ) -> Dict:
+ """Build a log target for the log forwarding Pebble layer.
+
+ Log target's syntax for enabling/disabling forwarding is explained here:
+ https://github.com/canonical/pebble?tab=readme-ov-file#log-forwarding
+ """
+ services_value = ["all"] if enable else ["-all"]
+
+ log_target = {
+ "override": "replace",
+ "services": services_value,
+ "type": "loki",
+ "location": loki_endpoint,
+ }
+ if enable:
+ log_target.update(
+ {
+ "labels": {
+ "product": "Juju",
+ "charm": topology._charm_name,
+ "juju_model": topology._model,
+ "juju_model_uuid": topology._model_uuid,
+ "juju_application": topology._application,
+ "juju_unit": topology._unit,
+ },
+ }
+ )
+
+ return {unit_name: log_target}
+
+ @staticmethod
+ def _build_log_targets(
+ loki_endpoints: Optional[Dict[str, str]], topology: JujuTopology, enable: bool
+ ):
+ """Build all the targets for the log forwarding Pebble layer."""
+ targets = {}
+ if not loki_endpoints:
+ return targets
+
+ for unit_name, endpoint in loki_endpoints.items():
+ targets.update(
+ _PebbleLogClient._build_log_target(
+ unit_name=unit_name,
+ loki_endpoint=endpoint,
+ topology=topology,
+ enable=enable,
+ )
+ )
+ return targets
+
+ @staticmethod
+ def disable_inactive_endpoints(
+ container: Container, active_endpoints: Dict[str, str], topology: JujuTopology
+ ):
+ """Disable forwarding for inactive endpoints by checking against the Pebble plan."""
+ pebble_layer = container.get_plan().to_dict().get("log-targets", None)
+ if not pebble_layer:
+ return
+
+ for unit_name, target in pebble_layer.items():
+ # If the layer is a disabled log forwarding endpoint, skip it
+ if "-all" in target["services"]: # pyright: ignore
+ continue
+
+ if unit_name not in active_endpoints:
+ layer = Layer(
+ { # pyright: ignore
+ "log-targets": _PebbleLogClient._build_log_targets(
+ loki_endpoints={unit_name: "(removed)"},
+ topology=topology,
+ enable=False,
+ )
+ }
+ )
+ container.add_layer(f"{container.name}-log-forwarding", layer=layer, combine=True)
+
+ @staticmethod
+ def enable_endpoints(
+ container: Container, active_endpoints: Dict[str, str], topology: JujuTopology
+ ):
+ """Enable forwarding for the specified Loki endpoints."""
+ layer = Layer(
+ { # pyright: ignore
+ "log-targets": _PebbleLogClient._build_log_targets(
+ loki_endpoints=active_endpoints,
+ topology=topology,
+ enable=True,
+ )
+ }
+ )
+ container.add_layer(f"{container.name}-log-forwarding", layer, combine=True)
+
+
+class LogForwarder(ConsumerBase):
+ """Forward the standard outputs of all workloads operated by a charm to one or multiple Loki endpoints.
+
+ This class implements Pebble log forwarding. Juju >= 3.4 is needed.
+ """
+
+ def __init__(
+ self,
+ charm: CharmBase,
+ *,
+ relation_name: str = DEFAULT_RELATION_NAME,
+ alert_rules_path: str = DEFAULT_ALERT_RULES_RELATIVE_PATH,
+ recursive: bool = True,
+ skip_alert_topology_labeling: bool = False,
+ refresh_event: Optional[Union[BoundEvent, List[BoundEvent]]] = None,
+ forward_alert_rules: bool = True,
+ ):
+ _PebbleLogClient.check_juju_version()
+ super().__init__(
+ charm,
+ relation_name,
+ alert_rules_path,
+ recursive,
+ skip_alert_topology_labeling,
+ forward_alert_rules=forward_alert_rules,
+ )
+ self._charm = charm
+ self._relation_name = relation_name
+
+ on = self._charm.on[self._relation_name]
+ self.framework.observe(on.relation_joined, self._update_logging)
+ self.framework.observe(on.relation_changed, self._update_logging)
+ self.framework.observe(on.relation_departed, self._update_logging)
+ self.framework.observe(on.relation_broken, self._update_logging)
+
+ if refresh_event:
+ if not isinstance(refresh_event, list):
+ refresh_event = [refresh_event]
+ for ev in refresh_event:
+ self.framework.observe(ev, self._update_logging)
+
+ for container_name in self._charm.meta.containers.keys():
+ snake_case_container_name = container_name.replace("-", "_")
+ self.framework.observe(
+ getattr(self._charm.on, f"{snake_case_container_name}_pebble_ready"),
+ self._on_pebble_ready,
+ )
+
+ def _on_pebble_ready(self, event: PebbleReadyEvent):
+ if not (loki_endpoints := self._retrieve_endpoints_from_relation()):
+ logger.warning("No Loki endpoints available")
+ return
+
+ self._update_endpoints(event.workload, loki_endpoints)
+
+ def _update_logging(self, event: RelationEvent):
+ """Update the log forwarding to match the active Loki endpoints."""
+ if not (loki_endpoints := self._retrieve_endpoints_from_relation()):
+ logger.warning("No Loki endpoints available")
+ return
+
+ for container in self._charm.unit.containers.values():
+ if container.can_connect():
+ self._update_endpoints(container, loki_endpoints)
+ # else: `_update_endpoints` will be called on pebble-ready anyway.
+
+ self._handle_alert_rules(event.relation)
+
+ def _retrieve_endpoints_from_relation(self) -> dict:
+ loki_endpoints = {}
+
+ # Get the endpoints from relation data
+ for relation in self._charm.model.relations[self._relation_name]:
+ loki_endpoints.update(self._fetch_endpoints(relation))
+
+ return loki_endpoints
+
+ def _update_endpoints(self, container: Container, loki_endpoints: dict):
+ _PebbleLogClient.disable_inactive_endpoints(
+ container=container,
+ active_endpoints=loki_endpoints,
+ topology=self.topology,
+ )
+ _PebbleLogClient.enable_endpoints(
+ container=container, active_endpoints=loki_endpoints, topology=self.topology
+ )
+
+ def is_ready(self, relation: Optional[Relation] = None):
+ """Check if the relation is active and healthy."""
+ if not relation:
+ relations = self._charm.model.relations[self._relation_name]
+ if not relations:
+ return False
+ return all(self.is_ready(relation) for relation in relations)
+
+ try:
+ if self._extract_urls(relation):
+ return True
+ return False
+ except (KeyError, json.JSONDecodeError):
+ return False
+
+ def _extract_urls(self, relation: Relation) -> Dict[str, str]:
+ """Default getter function to extract Loki endpoints from a relation.
+
+ Returns:
+ A dictionary of remote units and the respective Loki endpoint.
+ {
+ "loki/0": "http://loki:3100/loki/api/v1/push",
+ "another-loki/0": "http://another-loki:3100/loki/api/v1/push",
+ }
+ """
+ endpoints: Dict = {}
+
+ for unit in relation.units:
+ endpoint = relation.data[unit]["endpoint"]
+ deserialized_endpoint = json.loads(endpoint)
+ url = deserialized_endpoint["url"]
+ endpoints[unit.name] = url
+
+ return endpoints
+
+ def _fetch_endpoints(self, relation: Relation) -> Dict[str, str]:
+ """Fetch Loki Push API endpoints from relation data using the endpoints getter."""
+ endpoints: Dict = {}
+
+ if not self.is_ready(relation):
+ logger.warning(f"The relation '{relation.name}' is not ready yet.")
+ return endpoints
+
+ # if the code gets here, the function won't raise anymore because it's
+ # also called in is_ready()
+ endpoints = self._extract_urls(relation)
+
+ return endpoints
+
+
+def charm_logging_config(
+ endpoint_requirer: LokiPushApiConsumer, cert_path: Optional[Union[Path, str]]
+) -> Tuple[Optional[List[str]], Optional[str]]:
+ """Utility function to determine the charm_logging config you will likely want.
+
+ If no endpoint is provided:
+ disable charm logging.
+ If https endpoint is provided but cert_path is not found on disk:
+ disable charm logging.
+ If https endpoint is provided and cert_path is None:
+ ERROR
+ Else:
+ proceed with charm logging (with or without tls, as appropriate)
+
+ Args:
+ endpoint_requirer: an instance of LokiPushApiConsumer.
+ cert_path: a path where a cert is stored.
+
+ Returns:
+ A tuple with (optionally) the values of the endpoints and the certificate path.
+
+ Raises:
+ LokiPushApiError: if some endpoint are http and others https.
+ """
+ endpoints = [ep["url"] for ep in endpoint_requirer.loki_endpoints]
+ if not endpoints:
+ return None, None
+
+ https = tuple(endpoint.startswith("https://") for endpoint in endpoints)
+
+ if all(https): # all endpoints are https
+ if cert_path is None:
+ raise LokiPushApiError("Cannot send logs to https endpoints without a certificate.")
+ if not Path(cert_path).exists():
+ # if endpoints is https BUT we don't have a server_cert yet:
+ # disable charm logging until we do to prevent tls errors
+ return None, None
+ return endpoints, str(cert_path)
+
+ if all(not x for x in https): # all endpoints are http
+ return endpoints, None
+
+ # if there's a disagreement, that's very weird:
+ raise LokiPushApiError("Some endpoints are http, some others are https. That's not good.")
diff --git a/src/charm.py b/src/charm.py
index 0eb869d..8a09d85 100755
--- a/src/charm.py
+++ b/src/charm.py
@@ -15,11 +15,13 @@
from charms.certificate_transfer_interface.v1.certificate_transfer import (
CertificateTransferRequires,
)
+from charms.data_platform_libs.v0.s3 import CredentialsChangedEvent, S3Requirer
+from charms.loki_k8s.v1.loki_push_api import LokiPushApiConsumer
from ops.charm import CharmBase, CollectStatusEvent
from ops.framework import StoredState
from ops.charm import InstallEvent, RelationJoinedEvent, RelationDepartedEvent
from ops.main import main
-from ops.model import ActiveStatus, BlockedStatus, Relation
+from ops.model import ActiveStatus, BlockedStatus, MaintenanceStatus, Relation
from pathlib import Path
from typing import List
@@ -46,11 +48,14 @@ def __init__(self, *args):
self._certificate_transfer = CertificateTransferRequires(
self, relationship_name='charm-tracing-ca-cert'
)
+ self._s3 = S3Requirer(self, "s3-backend")
+ self._loki_consumer = LokiPushApiConsumer(self, "loki-push-api")
self._stored.set_default(
last_bind_addresses=[],
tracing_endpoints={},
ca_cert=None,
+ s3_credentials=dict(),
)
# TODO (manadart 2024-03-05): Get these at need.
@@ -65,31 +70,68 @@ def __init__(self, *args):
def _observe(self):
"""Set up all framework event observers."""
self.framework.observe(self.on.install, self._on_install)
+ self.framework.observe(self.on.start, self._on_start)
self.framework.observe(self.on.collect_unit_status, self._on_collect_status)
self.framework.observe(self.on.config_changed, self._on_config_changed)
+
+ # Dashboard and website relation events are observed to set relation
+ # data for the dashboard and website charms, so that they can connect to
+ # the controller API and display the correct information about the
+ # controller.
self.framework.observe(
self.on.dashboard_relation_joined, self._on_dashboard_relation_joined)
self.framework.observe(
self.on.website_relation_joined, self._on_website_relation_joined)
+
+ # Metrics endpoint relation events are observed to manage users for the
+ # metrics endpoint, and to maintain the correct scrape configuration for
+ # the controller API in the Prometheus scrape config provided to related
+ # Prometheus charms.
self.framework.observe(
self.on.metrics_endpoint_relation_created, self._on_metrics_endpoint_relation_created)
self.framework.observe(
self.on.metrics_endpoint_relation_broken, self._on_metrics_endpoint_relation_broken)
+
+ # DB cluster relation events are observed to maintain the current set of
+ # bind addresses for the controller cluster in the charm's stored state,
+ # and to apply it to the charm configuration when it changes.
self.framework.observe(
self.on.dbcluster_relation_changed, self._on_dbcluster_relation_changed)
self.framework.observe(
self.on.dbcluster_relation_departed, self._on_dbcluster_relation_departed)
+
+ # Tracing relation events are observed to maintain the current tracing
+ # endpoint information in the charm's stored state, and to apply it to
+ # the charm configuration when it changes.
self.framework.observe(
self.tracing_requirer.on.endpoint_changed, self._on_tracing_relation_changed)
self.framework.observe(
self.tracing_requirer.on.endpoint_removed, self._on_tracing_relation_removed)
self.framework.observe(
self._certificate_transfer.on.certificate_set_updated,
- self._on_receive_ca_cert_updated,
- )
+ self._on_receive_ca_cert_updated)
self.framework.observe(
self._certificate_transfer.on.certificates_removed, self._on_receive_ca_cert_removed)
+ # S3 credential events are observed to maintain the current S3
+ # credentials in the charm's stored state, and to apply them via the
+ # control socket when they change.
+ self.framework.observe(
+ self._s3.on.credentials_changed, self._on_s3_credentials_changed)
+ self.framework.observe(
+ self._s3.on.credentials_gone,
+ self._on_s3_credentials_gone)
+
+ # Loki Push API events are observed to maintain the correct controller
+ # API port in the config file, which is needed for Loki to push logs to
+ # the correct place.
+ self.framework.observe(
+ self._loki_consumer.on.loki_push_api_endpoint_joined,
+ self._on_loki_push_api_endpoint_joined)
+ self.framework.observe(
+ self._loki_consumer.on.loki_push_api_endpoint_departed,
+ self._on_loki_push_api_endpoint_departed)
+
def _on_install(self, event: InstallEvent):
"""Ensure that the controller configuration file exists."""
file_path = self._controller_config_path()
@@ -341,19 +383,79 @@ def _request_config_reload(self):
def _update_charm_tracing_config(self):
"""Update charm configuration with current tracing endpoint and CA cert information."""
- self._control_socket.set_charm_tracing_config(
- grpc_endpoint=(
- self._stored.tracing_endpoints["otlp_grpc"]
- if "otlp_grpc" in self._stored.tracing_endpoints
- else None
- ),
- http_endpoint=(
- self._stored.tracing_endpoints["otlp_http"]
- if "otlp_http" in self._stored.tracing_endpoints
- else None
- ),
- ca_cert=self._stored.ca_cert,
- )
+ if not self.unit.is_leader():
+ return
+
+ try:
+ self._control_socket.set_charm_tracing_config(
+ grpc_endpoint=(
+ self._stored.tracing_endpoints["otlp_grpc"]
+ if "otlp_grpc" in self._stored.tracing_endpoints
+ else None
+ ),
+ http_endpoint=(
+ self._stored.tracing_endpoints["otlp_http"]
+ if "otlp_http" in self._stored.tracing_endpoints
+ else None
+ ),
+ ca_cert=self._stored.ca_cert,
+ )
+ except Exception as exc:
+ logger.error("failed to set charm tracing config: %s", exc)
+ self.unit.status = BlockedStatus("failed to set charm tracing config")
+
+ def _on_s3_credentials_changed(self, event: CredentialsChangedEvent):
+ """Handle new or updated S3 credentials."""
+ if self.unit.is_leader():
+ credentials = {
+ 'access_key': event.access_key,
+ 'secret_key': event.secret_key,
+ 'endpoint': event.endpoint,
+ }
+ self._stored.s3_credentials = credentials
+
+ try:
+ logger.info("applying new S3 credentials")
+ self._control_socket.add_s3_credentials(credentials)
+ self.unit.status = MaintenanceStatus("applying s3 credentials")
+ except Exception as exc: # pragma: no cover - defensive
+ logger.error("failed to apply S3 credentials: %s", exc)
+ self.unit.status = BlockedStatus("failed to apply s3 credentials")
+
+ def _on_s3_credentials_gone(self, _event):
+ """Handle removal of S3 credentials."""
+ if self.unit.is_leader():
+ try:
+ self._control_socket.remove_s3_credentials()
+ self._stored.s3_credentials = dict()
+ except Exception as exc: # pragma: no cover - defensive
+ logger.error("failed to remove S3 credentials: %s", exc)
+ self.unit.status = BlockedStatus("failed to remove s3 credentials")
+
+ def _on_loki_push_api_endpoint_joined(self, _event):
+ """Handle new or updated Loki push API endpoint."""
+ if self.unit.is_leader():
+ endpoints = self._loki_consumer.loki_endpoints
+ if not endpoints:
+ return
+
+ endpoint = {"url": endpoints[0]["url"]}
+ try:
+ logger.info("applying Loki push API endpoint")
+ self._control_socket.set_loki_endpoint(endpoint)
+ self.unit.status = MaintenanceStatus("applying loki endpoint")
+ except Exception as exc: # pragma: no cover - defensive
+ logger.error("failed to apply Loki endpoint: %s", exc)
+ self.unit.status = BlockedStatus("failed to apply loki endpoint")
+
+ def _on_loki_push_api_endpoint_departed(self, _event):
+ """Handle removal of Loki push API endpoint."""
+ if self.unit.is_leader():
+ try:
+ self._control_socket.remove_loki_endpoint()
+ except Exception as exc: # pragma: no cover - defensive
+ logger.error("failed to remove Loki endpoint: %s", exc)
+ self.unit.status = BlockedStatus("failed to remove loki endpoint")
def metrics_username(relation: Relation) -> str:
diff --git a/src/controlsocket.py b/src/controlsocket.py
index 423a5e0..7f4747c 100644
--- a/src/controlsocket.py
+++ b/src/controlsocket.py
@@ -51,3 +51,33 @@ def set_charm_tracing_config(
body=body,
)
logger.debug('result of set_charm_tracing_config request: %r', resp)
+
+ def add_s3_credentials(self, credentials: dict):
+ resp = self.json_request(
+ method='POST',
+ path='/s3-credentials',
+ body=credentials,
+ )
+ logger.debug('result of add_s3_credentials request: %r', resp)
+
+ def remove_s3_credentials(self):
+ resp = self.json_request(
+ method='DELETE',
+ path='/s3-credentials',
+ )
+ logger.debug('result of remove_s3_credentials request: %r', resp)
+
+ def set_loki_endpoint(self, endpoint: dict):
+ resp = self.json_request(
+ method='POST',
+ path='/loki-endpoint',
+ body=endpoint,
+ )
+ logger.debug('result of set_loki_endpoint request: %r', resp)
+
+ def remove_loki_endpoint(self):
+ resp = self.json_request(
+ method='DELETE',
+ path='/loki-endpoint',
+ )
+ logger.debug('result of remove_loki_endpoint request: %r', resp)
diff --git a/tests/test_charm.py b/tests/test_charm.py
index a612083..02414b8 100644
--- a/tests/test_charm.py
+++ b/tests/test_charm.py
@@ -18,7 +18,7 @@
TransportProtocolType,
)
from charm import JujuControllerCharm, AgentConfException
-from ops.model import BlockedStatus, ActiveStatus
+from ops.model import BlockedStatus, ActiveStatus, MaintenanceStatus
from ops.testing import Harness
from unittest.mock import mock_open, patch
from unixsocket import ConnectionError as SocketConnectionError
@@ -77,6 +77,13 @@ def setUp(self):
self.addCleanup(self.harness.cleanup)
self.harness.begin()
+ def test_start_sets_active_status(self):
+ harness = Harness(JujuControllerCharm)
+ self.addCleanup(harness.cleanup)
+ harness.begin()
+ harness.charm.on.start.emit()
+ self.assertIsInstance(harness.charm.unit.status, ActiveStatus)
+
def test_dashboard_relation_joined(self):
harness = self.harness
@@ -149,6 +156,7 @@ def test_metrics_endpoint_relation(self, mock_remove_user, mock_add_user,
@patch("controlsocket.ControlSocketClient.set_charm_tracing_config")
def test_tracing_relation_updates_endpoints(self, mock_set_tracing_config, *_):
harness = self.harness
+ harness.set_leader(True)
relation_id = harness.add_relation("charm-tracing", "tempo-coordinator")
harness.add_relation_unit(relation_id, "tempo-coordinator/0")
@@ -186,23 +194,27 @@ def test_tracing_relation_change_ignores_not_ready(
"controlsocket.ControlSocketClient.set_charm_tracing_config",
side_effect=SocketConnectionError("could not connect to socket"),
)
- def test_tracing_relation_update_propagates_socket_error(self, *_):
+ def test_tracing_relation_update_sets_blocked_on_socket_error(self, *_):
harness = self.harness
+ harness.set_leader(True)
relation_id = harness.add_relation("charm-tracing", "tempo-coordinator")
harness.add_relation_unit(relation_id, "tempo-coordinator/0")
- with self.assertRaisesRegex(
- SocketConnectionError, "could not connect to socket"
- ):
- harness.update_relation_data(
- relation_id, "tempo-coordinator", tracing_provider_data()
- )
+ harness.update_relation_data(
+ relation_id, "tempo-coordinator", tracing_provider_data()
+ )
+
+ self.assertIsInstance(harness.charm.unit.status, BlockedStatus)
+ self.assertEqual(
+ harness.charm.unit.status.message, "failed to set charm tracing config"
+ )
@patch("builtins.open", new_callable=mock_open, read_data=agent_conf)
@patch("controlsocket.ControlSocketClient.set_charm_tracing_config")
def test_tracing_relation_removed_clears_endpoints(self, mock_set_tracing_config, *_):
harness = self.harness
+ harness.set_leader(True)
relation_id = harness.add_relation("charm-tracing", "tempo-coordinator")
harness.add_relation_unit(relation_id, "tempo-coordinator/0")
@@ -234,6 +246,7 @@ def test_tracing_relation_removed_clears_endpoints(self, mock_set_tracing_config
@patch("controlsocket.ControlSocketClient.set_charm_tracing_config")
def test_receive_ca_cert_updates_stored_ca_cert(self, mock_set_tracing_config, *_):
harness = self.harness
+ harness.set_leader(True)
relation_id = harness.add_relation("charm-tracing-ca-cert", "cert-provider")
harness.add_relation_unit(relation_id, "cert-provider/0")
@@ -270,6 +283,7 @@ def test_receive_ca_cert_update_ignores_empty_cert_list(
@patch("controlsocket.ControlSocketClient.set_charm_tracing_config")
def test_receive_ca_cert_removed_clears_stored_ca_cert(self, mock_set_tracing_config, *_):
harness = self.harness
+ harness.set_leader(True)
relation_id = harness.add_relation("charm-tracing-ca-cert", "cert-provider")
harness.add_relation_unit(relation_id, "cert-provider/0")
@@ -475,6 +489,369 @@ def test_dbcluster_relation_departed(
harness.evaluate_status()
self.assertIsInstance(harness.charm.unit.status, ActiveStatus)
+ @patch("controlsocket.ControlSocketClient.add_s3_credentials")
+ def test_s3_relation_credentials_changed(self, mock_add_s3_credentials):
+ harness = self.harness
+ harness.set_leader(True)
+
+ relation_id = harness.add_relation("s3-backend", "s3-integrator")
+ harness.add_relation_unit(relation_id, "s3-integrator/0")
+
+ harness.update_relation_data(
+ relation_id,
+ "s3-integrator",
+ {
+ "access-key": "ak",
+ "secret-key": "sk",
+ "bucket": "test-bucket",
+ "endpoint": "https://s3.example",
+ },
+ )
+
+ expected_credentials = {
+ "access_key": "ak",
+ "secret_key": "sk",
+ "endpoint": "https://s3.example",
+ }
+ self.assertEqual(
+ harness.charm._stored.s3_credentials,
+ expected_credentials,
+ )
+ mock_add_s3_credentials.assert_called_once_with(expected_credentials)
+ self.assertIsInstance(harness.charm.unit.status, MaintenanceStatus)
+
+ @patch(
+ "controlsocket.ControlSocketClient.add_s3_credentials",
+ side_effect=RuntimeError("boom"),
+ )
+ def test_s3_relation_credentials_changed_failure_sets_blocked(self, _mock_add):
+ harness = self.harness
+ harness.set_leader(True)
+
+ relation_id = harness.add_relation("s3-backend", "s3-integrator")
+ harness.add_relation_unit(relation_id, "s3-integrator/0")
+
+ harness.update_relation_data(
+ relation_id,
+ "s3-integrator",
+ {"access-key": "ak", "secret-key": "sk", "bucket": "test-bucket"},
+ )
+
+ self.assertEqual(
+ harness.charm._stored.s3_credentials,
+ {"access_key": "ak", "secret_key": "sk", "endpoint": None},
+ )
+ self.assertIsInstance(harness.charm.unit.status, BlockedStatus)
+ self.assertIn(
+ "failed to apply s3 credentials",
+ harness.charm.unit.status.message,
+ )
+
+ @patch("controlsocket.ControlSocketClient.add_s3_credentials")
+ def test_s3_relation_credentials_changed_non_leader_no_set(self, mock_add_s3_credentials):
+ harness = self.harness
+ harness.set_leader(False)
+
+ relation_id = harness.add_relation("s3-backend", "s3-integrator")
+ harness.add_relation_unit(relation_id, "s3-integrator/0")
+
+ harness.update_relation_data(
+ relation_id,
+ "s3-integrator",
+ {"access-key": "ak", "secret-key": "sk", "bucket": "test-bucket"},
+ )
+
+ self.assertEqual(harness.charm._stored.s3_credentials, {})
+ mock_add_s3_credentials.assert_not_called()
+
+ @patch("controlsocket.ControlSocketClient.add_s3_credentials")
+ def test_s3_relation_credentials_updated(self, mock_add_s3_credentials):
+ harness = self.harness
+ harness.set_leader(True)
+
+ relation_id = harness.add_relation("s3-backend", "s3-integrator")
+ harness.add_relation_unit(relation_id, "s3-integrator/0")
+
+ harness.update_relation_data(
+ relation_id,
+ "s3-integrator",
+ {"access-key": "ak", "secret-key": "sk", "bucket": "test-bucket"},
+ )
+ self.assertEqual(
+ harness.charm._stored.s3_credentials,
+ {"access_key": "ak", "secret_key": "sk", "endpoint": None},
+ )
+
+ harness.update_relation_data(
+ relation_id,
+ "s3-integrator",
+ {"access-key": "ak2", "secret-key": "sk2", "bucket": "test-bucket"},
+ )
+ self.assertEqual(
+ harness.charm._stored.s3_credentials,
+ {"access_key": "ak2", "secret_key": "sk2", "endpoint": None},
+ )
+ mock_add_s3_credentials.assert_called_with(
+ {"access_key": "ak2", "secret_key": "sk2", "endpoint": None}
+ )
+ self.assertIsInstance(harness.charm.unit.status, MaintenanceStatus)
+
+ def test_s3_relation_sets_bucket_on_join(self):
+ harness = self.harness
+ harness.set_leader(True)
+
+ relation_id = harness.add_relation("s3-backend", "s3-integrator")
+ harness.add_relation_unit(relation_id, "s3-integrator/0")
+
+ # Bucket is auto-set by the S3Requirer when bucket_name is not provided.
+ data = harness.get_relation_data(relation_id, harness.charm.app.name)
+ self.assertEqual(data["bucket"], f"relation-{relation_id}")
+ self.assertEqual(harness.charm._stored.s3_credentials, {})
+
+ @patch("controlsocket.ControlSocketClient.remove_s3_credentials")
+ @patch("controlsocket.ControlSocketClient.add_s3_credentials")
+ def test_s3_relation_credentials_gone(self, *_):
+ harness = self.harness
+ harness.set_leader(True)
+
+ relation_id = harness.add_relation("s3-backend", "s3-integrator")
+ harness.add_relation_unit(relation_id, "s3-integrator/0")
+
+ harness.update_relation_data(
+ relation_id,
+ "s3-integrator",
+ {"access-key": "ak", "secret-key": "sk"},
+ )
+ expected_credentials = {"access_key": "ak", "secret_key": "sk", "endpoint": None}
+ self.assertEqual(harness.charm._stored.s3_credentials, expected_credentials)
+
+ harness.remove_relation(relation_id)
+ self.assertEqual(harness.charm._stored.s3_credentials, {})
+
+ @patch("controlsocket.ControlSocketClient.remove_s3_credentials")
+ def test_s3_relation_credentials_gone_non_leader(self, mock_remove_s3_credentials):
+ harness = self.harness
+ harness.set_leader(False)
+
+ relation_id = harness.add_relation("s3-backend", "s3-integrator")
+ harness.add_relation_unit(relation_id, "s3-integrator/0")
+
+ harness.update_relation_data(
+ relation_id,
+ "s3-integrator",
+ {"access-key": "ak", "secret-key": "sk"},
+ )
+ harness.remove_relation(relation_id)
+
+ self.assertEqual(harness.charm._stored.s3_credentials, {})
+ mock_remove_s3_credentials.assert_not_called()
+
+ @patch(
+ "controlsocket.ControlSocketClient.remove_s3_credentials",
+ side_effect=RuntimeError("boom"),
+ )
+ def test_s3_relation_credentials_gone_failure_sets_blocked(self, _mock_remove):
+ harness = self.harness
+ harness.set_leader(True)
+
+ relation_id = harness.add_relation("s3-backend", "s3-integrator")
+ harness.add_relation_unit(relation_id, "s3-integrator/0")
+
+ harness.update_relation_data(
+ relation_id,
+ "s3-integrator",
+ {"access-key": "ak", "secret-key": "sk"},
+ )
+
+ harness.remove_relation(relation_id)
+
+ self.assertEqual(
+ harness.charm._stored.s3_credentials,
+ {"access_key": "ak", "secret_key": "sk", "endpoint": None},
+ )
+ self.assertIsInstance(harness.charm.unit.status, BlockedStatus)
+ self.assertIn("failed to remove s3 credentials", harness.charm.unit.status.message)
+
+ @patch("controlsocket.ControlSocketClient.set_loki_endpoint")
+ def test_loki_push_api_endpoint_joined(self, mock_set_loki_endpoint):
+ harness = self.harness
+ harness.set_leader(True)
+
+ relation_id = harness.add_relation("loki-push-api", "loki")
+ harness.add_relation_unit(relation_id, "loki/0")
+
+ harness.update_relation_data(
+ relation_id,
+ "loki/0",
+ {"endpoint": json.dumps({"url": "http://loki:3100/loki/api/v1/push"})},
+ )
+
+ mock_set_loki_endpoint.assert_called_once_with(
+ {"url": "http://loki:3100/loki/api/v1/push"}
+ )
+ self.assertIsInstance(harness.charm.unit.status, MaintenanceStatus)
+ self.assertIn("applying loki endpoint", harness.charm.unit.status.message)
+
+ @patch("controlsocket.ControlSocketClient.set_loki_endpoint")
+ def test_loki_push_api_endpoint_joined_non_leader_no_set(self, mock_set_loki_endpoint):
+ harness = self.harness
+ harness.set_leader(False)
+
+ relation_id = harness.add_relation("loki-push-api", "loki")
+ harness.add_relation_unit(relation_id, "loki/0")
+
+ harness.update_relation_data(
+ relation_id,
+ "loki/0",
+ {"endpoint": json.dumps({"url": "http://loki:3100/loki/api/v1/push"})},
+ )
+
+ mock_set_loki_endpoint.assert_not_called()
+
+ @patch("controlsocket.ControlSocketClient.set_loki_endpoint")
+ def test_loki_push_api_endpoint_joined_no_endpoints(self, mock_set_loki_endpoint):
+ harness = self.harness
+ harness.set_leader(True)
+
+ relation_id = harness.add_relation("loki-push-api", "loki")
+ harness.add_relation_unit(relation_id, "loki/0")
+
+ # Don't set any endpoint data on the relation unit.
+ harness.update_relation_data(relation_id, "loki/0", {})
+
+ mock_set_loki_endpoint.assert_not_called()
+
+ @patch("controlsocket.ControlSocketClient.set_loki_endpoint")
+ def test_loki_push_api_endpoint_joined_uses_first_endpoint(self, mock_set_loki_endpoint):
+ harness = self.harness
+ harness.set_leader(True)
+
+ relation_id = harness.add_relation("loki-push-api", "loki")
+ harness.add_relation_unit(relation_id, "loki/0")
+ harness.add_relation_unit(relation_id, "loki/1")
+
+ harness.update_relation_data(
+ relation_id,
+ "loki/0",
+ {"endpoint": json.dumps({"url": "http://loki-0:3100/loki/api/v1/push"})},
+ )
+ harness.update_relation_data(
+ relation_id,
+ "loki/1",
+ {"endpoint": json.dumps({"url": "http://loki-1:3100/loki/api/v1/push"})},
+ )
+
+ # Only one endpoint URL should be sent (the first from the deduplicated list).
+ last_call_args = mock_set_loki_endpoint.call_args[0][0]
+ self.assertIn(last_call_args["url"], [
+ "http://loki-0:3100/loki/api/v1/push",
+ "http://loki-1:3100/loki/api/v1/push",
+ ])
+
+ @patch(
+ "controlsocket.ControlSocketClient.set_loki_endpoint",
+ side_effect=RuntimeError("boom"),
+ )
+ def test_loki_push_api_endpoint_joined_failure_sets_blocked(self, _mock_set):
+ harness = self.harness
+ harness.set_leader(True)
+
+ relation_id = harness.add_relation("loki-push-api", "loki")
+ harness.add_relation_unit(relation_id, "loki/0")
+
+ harness.update_relation_data(
+ relation_id,
+ "loki/0",
+ {"endpoint": json.dumps({"url": "http://loki:3100/loki/api/v1/push"})},
+ )
+
+ self.assertIsInstance(harness.charm.unit.status, BlockedStatus)
+ self.assertIn("failed to apply loki endpoint", harness.charm.unit.status.message)
+
+ @patch("controlsocket.ControlSocketClient.remove_loki_endpoint")
+ @patch("controlsocket.ControlSocketClient.set_loki_endpoint")
+ def test_loki_push_api_endpoint_departed(self, _mock_set, mock_remove_loki_endpoint):
+ harness = self.harness
+ harness.set_leader(True)
+
+ relation_id = harness.add_relation("loki-push-api", "loki")
+ harness.add_relation_unit(relation_id, "loki/0")
+
+ harness.update_relation_data(
+ relation_id,
+ "loki/0",
+ {"endpoint": json.dumps({"url": "http://loki:3100/loki/api/v1/push"})},
+ )
+
+ harness.remove_relation(relation_id)
+ mock_remove_loki_endpoint.assert_called_once()
+
+ @patch("controlsocket.ControlSocketClient.remove_loki_endpoint")
+ def test_loki_push_api_endpoint_departed_non_leader(self, mock_remove_loki_endpoint):
+ harness = self.harness
+ harness.set_leader(False)
+
+ relation_id = harness.add_relation("loki-push-api", "loki")
+ harness.add_relation_unit(relation_id, "loki/0")
+
+ harness.remove_relation(relation_id)
+ mock_remove_loki_endpoint.assert_not_called()
+
+ @patch(
+ "controlsocket.ControlSocketClient.remove_loki_endpoint",
+ side_effect=RuntimeError("boom"),
+ )
+ @patch("controlsocket.ControlSocketClient.set_loki_endpoint")
+ def test_loki_push_api_endpoint_departed_failure_sets_blocked(
+ self, _mock_set, _mock_remove
+ ):
+ harness = self.harness
+ harness.set_leader(True)
+
+ relation_id = harness.add_relation("loki-push-api", "loki")
+ harness.add_relation_unit(relation_id, "loki/0")
+
+ harness.update_relation_data(
+ relation_id,
+ "loki/0",
+ {"endpoint": json.dumps({"url": "http://loki:3100/loki/api/v1/push"})},
+ )
+
+ harness.remove_relation(relation_id)
+
+ self.assertIsInstance(harness.charm.unit.status, BlockedStatus)
+ self.assertIn("failed to remove loki endpoint", harness.charm.unit.status.message)
+
+ @patch("controlsocket.ControlSocketClient.set_loki_endpoint")
+ def test_loki_push_api_endpoint_updated(self, mock_set_loki_endpoint):
+ harness = self.harness
+ harness.set_leader(True)
+
+ relation_id = harness.add_relation("loki-push-api", "loki")
+ harness.add_relation_unit(relation_id, "loki/0")
+
+ harness.update_relation_data(
+ relation_id,
+ "loki/0",
+ {"endpoint": json.dumps({"url": "http://loki:3100/loki/api/v1/push"})},
+ )
+
+ mock_set_loki_endpoint.assert_called_with(
+ {"url": "http://loki:3100/loki/api/v1/push"}
+ )
+
+ # Update the endpoint URL.
+ harness.update_relation_data(
+ relation_id,
+ "loki/0",
+ {"endpoint": json.dumps({"url": "http://loki-new:3100/loki/api/v1/push"})},
+ )
+
+ mock_set_loki_endpoint.assert_called_with(
+ {"url": "http://loki-new:3100/loki/api/v1/push"}
+ )
+ self.assertEqual(mock_set_loki_endpoint.call_count, 2)
+
class mockNetwork:
def __init__(self, addresses):
diff --git a/tests/test_sockets.py b/tests/test_sockets.py
index bde89f0..1b13bcd 100644
--- a/tests/test_sockets.py
+++ b/tests/test_sockets.py
@@ -113,6 +113,86 @@ def test_set_charm_tracing_config_success(self):
ca_cert='-----BEGIN CERTIFICATE-----\nabc\n-----END CERTIFICATE-----',
)
+ def test_set_loki_endpoint_success(self):
+ mock_opener = MockOpener(self)
+ control_socket = ControlSocketClient('fake_socket_path', opener=mock_opener)
+
+ mock_opener.expect(
+ url='http://localhost/loki-endpoint',
+ method='POST',
+ body=r'{"url": "http://loki:3100/loki/api/v1/push"}',
+ response=MockResponse(
+ headers=MockHeaders(content_type='application/json'),
+ body=r'{"message":"set loki endpoint"}'
+ )
+ )
+ control_socket.set_loki_endpoint(
+ {"url": "http://loki:3100/loki/api/v1/push"}
+ )
+
+ def test_set_loki_endpoint_fail(self):
+ mock_opener = MockOpener(self)
+ control_socket = ControlSocketClient('fake_socket_path', opener=mock_opener)
+
+ mock_opener.expect(
+ url='http://localhost/loki-endpoint',
+ method='POST',
+ body=r'{"url": "http://loki:3100/loki/api/v1/push"}',
+ error=urllib.error.HTTPError(
+ url='http://localhost/loki-endpoint',
+ code=500,
+ msg='',
+ hdrs=None,
+ fp=io.BytesIO(br'{"error":"internal error"}'),
+ )
+ )
+
+ with self.assertRaises(APIError) as cm:
+ control_socket.set_loki_endpoint(
+ {"url": "http://loki:3100/loki/api/v1/push"}
+ )
+ self.assertEqual(cm.exception.body, {'error': 'internal error'})
+ self.assertEqual(cm.exception.code, 500)
+ self.assertEqual(cm.exception.message, 'internal error')
+
+ def test_remove_loki_endpoint_success(self):
+ mock_opener = MockOpener(self)
+ control_socket = ControlSocketClient('fake_socket_path', opener=mock_opener)
+
+ mock_opener.expect(
+ url='http://localhost/loki-endpoint',
+ method='DELETE',
+ body=None,
+ response=MockResponse(
+ headers=MockHeaders(content_type='application/json'),
+ body=r'{"message":"removed loki endpoint"}'
+ )
+ )
+ control_socket.remove_loki_endpoint()
+
+ def test_remove_loki_endpoint_fail(self):
+ mock_opener = MockOpener(self)
+ control_socket = ControlSocketClient('fake_socket_path', opener=mock_opener)
+
+ mock_opener.expect(
+ url='http://localhost/loki-endpoint',
+ method='DELETE',
+ body=None,
+ error=urllib.error.HTTPError(
+ url='http://localhost/loki-endpoint',
+ code=404,
+ msg='',
+ hdrs=None,
+ fp=io.BytesIO(br'{"error":"loki endpoint not found"}'),
+ )
+ )
+
+ with self.assertRaises(APIError) as cm:
+ control_socket.remove_loki_endpoint()
+ self.assertEqual(cm.exception.body, {'error': 'loki endpoint not found'})
+ self.assertEqual(cm.exception.code, 404)
+ self.assertEqual(cm.exception.message, 'loki endpoint not found')
+
def test_connection_error(self):
mock_opener = MockOpener(self)
control_socket = ControlSocketClient('fake_socket_path', opener=mock_opener)