Skip to content

Commit ea812c2

Browse files
[PRM-495] Moved location of the awsmesh.entrypoint module
1 parent 1448210 commit ea812c2

File tree

22 files changed

+727
-0
lines changed

22 files changed

+727
-0
lines changed

services/mesh-forwarder/__init__.py

Whitespace-only changes.

services/mesh-forwarder/awsmesh/__init__.py

Whitespace-only changes.
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import logging
2+
import sys
3+
from dataclasses import MISSING, Field, dataclass, fields
4+
from distutils.util import strtobool
5+
from typing import Optional
6+
7+
logger = logging.getLogger(__name__)
8+
9+
10+
def _parse_field_from_env_var(name: str, value: str, field: Field):
11+
if field.type == Optional[bool]:
12+
try:
13+
return strtobool(value)
14+
except ValueError:
15+
logger.warning(
16+
f"Invalid value '{value}' for {name}, ignoring and using default: {field.default}"
17+
)
18+
return field.default
19+
return value
20+
21+
22+
def _read_env(field: Field, env_vars):
23+
env_var_name = field.name.upper()
24+
if env_var_name in env_vars:
25+
return _parse_field_from_env_var(env_var_name, env_vars[env_var_name], field)
26+
elif field.default != MISSING:
27+
return field.default
28+
else:
29+
logger.error(f"Expected environment variable {env_var_name} was not set, exiting...")
30+
sys.exit(1)
31+
32+
33+
@dataclass
34+
class ForwarderConfig:
35+
mesh_url: str
36+
mesh_mailbox_ssm_param_name: str
37+
mesh_password_ssm_param_name: str
38+
mesh_shared_key_ssm_param_name: str
39+
mesh_client_cert_ssm_param_name: str
40+
mesh_client_key_ssm_param_name: str
41+
mesh_ca_cert_ssm_param_name: str
42+
poll_frequency: str
43+
forwarder_home: str
44+
message_destination: str = "s3"
45+
s3_bucket_name: Optional[str] = None
46+
sns_topic_arn: Optional[str] = None
47+
endpoint_url: Optional[str] = None
48+
ssm_endpoint_url: Optional[str] = None
49+
disable_message_header_validation: Optional[bool] = False
50+
51+
@classmethod
52+
def from_environment_variables(cls, env_vars):
53+
return cls(**{field.name: _read_env(field, env_vars) for field in fields(cls)})
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import logging
2+
from os import environ
3+
from os.path import join
4+
from signal import SIGINT, SIGTERM, signal
5+
6+
import boto3
7+
import urllib3
8+
9+
from awsmesh.config import ForwarderConfig
10+
from awsmesh.forwarder_service import MeshConfig, build_forwarder_service
11+
from awsmesh.logging import JsonFormatter
12+
from awsmesh.message_destination_resolver import MessageDestinationConfig
13+
from awsmesh.secrets import SsmSecretManager
14+
15+
urllib3.disable_warnings(urllib3.exceptions.SubjectAltNameWarning)
16+
17+
18+
def build_mesh_config_from_ssm(ssm, config) -> MeshConfig:
19+
mesh_client_cert_path = join(config.forwarder_home, "client_cert.pem")
20+
mesh_client_key_path = join(config.forwarder_home, "client_key.pem")
21+
mesh_ca_cert_path = join(config.forwarder_home, "ca_cert.pem")
22+
23+
secret_manager = SsmSecretManager(ssm)
24+
25+
secret_manager.download_secret(config.mesh_client_cert_ssm_param_name, mesh_client_cert_path)
26+
secret_manager.download_secret(config.mesh_client_key_ssm_param_name, mesh_client_key_path)
27+
secret_manager.download_secret(config.mesh_ca_cert_ssm_param_name, mesh_ca_cert_path)
28+
29+
mesh_mailbox = secret_manager.get_secret(config.mesh_mailbox_ssm_param_name)
30+
mesh_password = secret_manager.get_secret(config.mesh_password_ssm_param_name)
31+
mesh_shared_key = secret_manager.get_secret(config.mesh_shared_key_ssm_param_name)
32+
33+
return MeshConfig(
34+
url=config.mesh_url,
35+
mailbox=mesh_mailbox,
36+
password=mesh_password,
37+
shared_key=bytes(mesh_shared_key, "utf-8"),
38+
client_cert_path=mesh_client_cert_path,
39+
client_key_path=mesh_client_key_path,
40+
ca_cert_path=mesh_ca_cert_path,
41+
)
42+
43+
44+
def build_message_destination_config(config) -> MessageDestinationConfig:
45+
return MessageDestinationConfig(
46+
message_destination=config.message_destination,
47+
s3_bucket_name=config.s3_bucket_name,
48+
endpoint_url=config.endpoint_url,
49+
sns_topic_arn=config.sns_topic_arn,
50+
)
51+
52+
53+
def build_forwarder_from_environment_variables(env_vars=environ):
54+
config = ForwarderConfig.from_environment_variables(env_vars)
55+
ssm = boto3.client("ssm", endpoint_url=config.ssm_endpoint_url)
56+
57+
return build_forwarder_service(
58+
mesh_config=build_mesh_config_from_ssm(ssm, config),
59+
message_destination_config=build_message_destination_config(config),
60+
poll_frequency_sec=int(config.poll_frequency),
61+
disable_message_header_validation=config.disable_message_header_validation,
62+
)
63+
64+
65+
def setup_logger():
66+
logger = logging.getLogger()
67+
logger.setLevel(logging.INFO)
68+
formatter = JsonFormatter()
69+
handler = logging.StreamHandler()
70+
handler.setFormatter(formatter)
71+
logger.addHandler(handler)
72+
73+
74+
def main():
75+
setup_logger()
76+
77+
forwarder_service = build_forwarder_from_environment_variables()
78+
79+
def handle_sigterm(signum, frame):
80+
forwarder_service.stop()
81+
82+
signal(SIGINT, handle_sigterm)
83+
signal(SIGTERM, handle_sigterm)
84+
85+
forwarder_service.start()
86+
87+
88+
if __name__ == "__main__":
89+
main()
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import logging
2+
3+
from awsmesh.mesh import InvalidMeshHeader, MeshClientNetworkError, MeshInbox, MissingMeshHeader
4+
from awsmesh.monitoring.probe import LoggingProbe
5+
from awsmesh.uploader import MessageUploader, UploaderError
6+
7+
logger = logging.getLogger(__name__)
8+
9+
10+
class RetryableException(Exception):
11+
pass
12+
13+
14+
class MeshToAwsForwarder:
15+
def __init__(
16+
self,
17+
inbox: MeshInbox,
18+
uploader: MessageUploader,
19+
probe: LoggingProbe,
20+
disable_message_header_validation,
21+
):
22+
self._inbox = inbox
23+
self._uploader = uploader
24+
self._probe = probe
25+
self._disable_message_header_validation = disable_message_header_validation
26+
27+
def forward_messages(self):
28+
retryable_message_exceptions = []
29+
for message_id in self._poll_message_ids():
30+
try:
31+
self._process_message(message_id)
32+
except RetryableException as e:
33+
retryable_message_exceptions.append(e)
34+
35+
if len(retryable_message_exceptions) > 0:
36+
logger.info(
37+
f"Raising single retryable exception, actually caught {len(retryable_message_exceptions)} message exception(s)"
38+
)
39+
raise retryable_message_exceptions[0]
40+
41+
def is_mailbox_empty(self):
42+
count_message_event = self._probe.new_count_messages_event()
43+
try:
44+
message_count = self._inbox.count_messages()
45+
count_message_event.record_message_count(message_count)
46+
return message_count <= 0
47+
except MeshClientNetworkError as e:
48+
count_message_event.record_mesh_client_network_error(e)
49+
raise RetryableException()
50+
finally:
51+
count_message_event.finish()
52+
53+
def _poll_message_ids(self):
54+
poll_inbox_event = self._probe.new_poll_inbox_event()
55+
try:
56+
messages = self._inbox.list_message_ids()
57+
poll_inbox_event.record_message_batch_count(len(messages))
58+
return messages
59+
except MeshClientNetworkError as e:
60+
poll_inbox_event.record_mesh_client_network_error(e)
61+
raise RetryableException()
62+
finally:
63+
poll_inbox_event.finish()
64+
65+
# flake8: noqa: C901
66+
def _process_message(self, message_id):
67+
forward_message_event = self._probe.new_forward_message_event()
68+
try:
69+
message = self._inbox.retrieve_message(message_id)
70+
forward_message_event.record_message_metadata(message)
71+
if not self._disable_message_header_validation:
72+
message.validate()
73+
self._uploader.upload(message, forward_message_event)
74+
message.acknowledge()
75+
except MissingMeshHeader as e:
76+
forward_message_event.record_missing_mesh_header(e)
77+
except InvalidMeshHeader as e:
78+
forward_message_event.record_invalid_mesh_header(e)
79+
except UploaderError as e:
80+
forward_message_event.record_uploader_error(e)
81+
except MeshClientNetworkError as e:
82+
forward_message_event.record_mesh_client_network_error(e)
83+
raise RetryableException()
84+
finally:
85+
forward_message_event.finish()
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import logging
2+
from dataclasses import dataclass
3+
from threading import Event
4+
from typing import Optional
5+
6+
import mesh_client
7+
8+
from awsmesh.forwarder import MeshToAwsForwarder, RetryableException
9+
from awsmesh.mesh import MeshInbox
10+
from awsmesh.message_destination_resolver import MessageDestinationConfig, resolve_message_uploader
11+
from awsmesh.monitoring.probe import LoggingProbe
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
@dataclass
17+
class MeshConfig:
18+
url: str
19+
mailbox: str
20+
password: str
21+
shared_key: bytes
22+
client_cert_path: str
23+
client_key_path: str
24+
ca_cert_path: str
25+
26+
27+
class MeshToAwsForwarderService:
28+
def __init__(
29+
self,
30+
forwarder: MeshToAwsForwarder,
31+
poll_frequency_sec: int,
32+
exit_event: Optional[Event] = None,
33+
):
34+
self._forwarder = forwarder
35+
self._exit_event = exit_event or Event()
36+
self._poll_frequency_sec = poll_frequency_sec
37+
38+
def start(self):
39+
logger.info("Started forwarder service")
40+
while not self._exit_event.is_set():
41+
try:
42+
self._forwarder.forward_messages()
43+
44+
if self._forwarder.is_mailbox_empty():
45+
self._exit_event.wait(self._poll_frequency_sec)
46+
except RetryableException:
47+
self._exit_event.wait(self._poll_frequency_sec)
48+
logger.info("Exiting forwarder service")
49+
50+
def stop(self):
51+
logger.info("Received request to stop")
52+
self._exit_event.set()
53+
54+
55+
def build_forwarder_service(
56+
mesh_config: MeshConfig,
57+
message_destination_config: MessageDestinationConfig,
58+
poll_frequency_sec: int,
59+
disable_message_header_validation: bool,
60+
) -> MeshToAwsForwarderService:
61+
uploader = resolve_message_uploader(message_destination_config)
62+
63+
mesh = mesh_client.MeshClient(
64+
mesh_config.url,
65+
mesh_config.mailbox,
66+
mesh_config.password,
67+
shared_key=mesh_config.shared_key,
68+
cert=(mesh_config.client_cert_path, mesh_config.client_key_path),
69+
verify=mesh_config.ca_cert_path,
70+
)
71+
inbox = MeshInbox(mesh)
72+
forwarder = MeshToAwsForwarder(
73+
inbox, uploader, LoggingProbe(), disable_message_header_validation
74+
)
75+
return MeshToAwsForwarderService(forwarder, poll_frequency_sec)
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import json
2+
from datetime import datetime
3+
from logging import LogRecord, makeLogRecord
4+
5+
DEFAULT_LOG_RECORD_ATTRS = vars(makeLogRecord({})).keys()
6+
7+
8+
def _convert_timestamp_to_iso(timestamp: float) -> str:
9+
return datetime.utcfromtimestamp(timestamp).isoformat()
10+
11+
12+
class JsonFormatter:
13+
def format(self, record: LogRecord) -> str:
14+
base = {
15+
"level": record.levelname,
16+
"module": record.module,
17+
"message": record.msg,
18+
"time": _convert_timestamp_to_iso(record.created),
19+
}
20+
extra = {
21+
attr: getattr(record, attr)
22+
for attr in vars(record)
23+
if attr not in DEFAULT_LOG_RECORD_ATTRS
24+
}
25+
26+
return json.dumps({**base, **extra})

0 commit comments

Comments
 (0)