diff --git a/sebs.py b/sebs.py index 80fb11ed3..dc6de535a 100755 --- a/sebs.py +++ b/sebs.py @@ -335,6 +335,130 @@ def process(**kwargs): sebs_client.logging.info("Save results to {}".format(output_file)) +@benchmark.command("logs") +@click.argument( + "target", + type=str, + required=True, +) +@click.option( + "--request-id", + type=str, + default=None, + help="Request/invocation ID to query (optional, for specific invocation).", +) +@click.option( + "--minutes", + type=int, + default=30, + help="Number of minutes to look back for logs (default: 30, only for function name mode).", +) +@common_params +def query_logs(target, request_id, minutes, **kwargs): + """ + Query and display logs for benchmark invocations. + + TARGET can be either: + 1. Path to experiments.json file - queries logs for all invocations in that file + 2. Function name - queries logs for that function (requires --request-id) + + Examples: + ./sebs.py benchmark logs experiments.json --config config.json + ./sebs.py benchmark logs my-function --request-id abc123 --config config.json + ./sebs.py benchmark logs my-function --request-id abc123 --minutes 60 --config config.json + """ + ( + config, + output_dir, + logging_filename, + sebs_client, + deployment_client, + ) = parse_common_params(**kwargs) + + # Detect if target is a file or function name + is_file = os.path.isfile(target) + + # Case 1: Load from experiments file + if is_file: + sebs_client.logging.info(f"Loading invocations from {target}") + with open(target, "r") as in_f: + exp_config = json.load(in_f) + experiments = sebs.experiments.ExperimentResult.deserialize( + exp_config, + sebs_client.cache_client, + sebs_client.generate_logging_handlers(logging_filename), + ) + + # Get time bounds from experiments + exp_start_time, exp_end_time = experiments.times() + start_time = int(exp_start_time) + end_time = int(exp_end_time) + + # Query logs for all functions and invocations + for func_name in experiments.functions(): + invocations = experiments.invocations(func_name) + sebs_client.logging.info( + f"\n{'=' * 80}\nFunction: {func_name} ({len(invocations)} invocations)\n{'=' * 80}" + ) + + for req_id, execution_result in invocations.items(): + sebs_client.logging.info(f"\n{'-' * 80}\nRequest ID: {req_id}\n{'-' * 80}") + + try: + logs = deployment_client.get_invocation_logs( + func_name, req_id, start_time, end_time + ) + + if logs: + for log_line in logs: + if log_line.strip(): # Skip empty lines + print(log_line) + else: + sebs_client.logging.warning(f"No logs found for request {req_id}") + + except Exception as e: + sebs_client.logging.error(f"Error retrieving logs for {req_id}: {e}") + + # Case 2: Query specific function + else: + function_name = target + + # Validate that request_id is provided + if not request_id: + sebs_client.logging.error( + "When querying by function name, please provide --request-id. " + "Alternatively, use an experiments.json file to query all invocations." + ) + return + + import time + + # Calculate time window - look back X minutes from now + end_time = int(time.time()) + start_time = end_time - (minutes * 60) + + sebs_client.logging.info( + f"Querying logs for function '{function_name}', request ID '{request_id}' " + f"(last {minutes} minutes)" + ) + + try: + logs = deployment_client.get_invocation_logs( + function_name, request_id, start_time, end_time + ) + + if logs: + sebs_client.logging.info(f"\n{'-' * 80}\nLogs:\n{'-' * 80}") + for log_line in logs: + if log_line.strip(): # Skip empty lines + print(log_line) + else: + sebs_client.logging.warning("No logs found") + + except Exception as e: + sebs_client.logging.error(f"Error retrieving logs: {e}") + + @benchmark.command() @click.argument( "benchmark-input-size", type=click.Choice(["test", "small", "large"]) diff --git a/sebs/__init__.py b/sebs/__init__.py index b92b9f25c..bd1b0b43f 100644 --- a/sebs/__init__.py +++ b/sebs/__init__.py @@ -1,5 +1,5 @@ """ - SeBS +SeBS """ from .version import __version__ # noqa diff --git a/sebs/aws/aws.py b/sebs/aws/aws.py index 243a6f0f9..fcd7086ed 100644 --- a/sebs/aws/aws.py +++ b/sebs/aws/aws.py @@ -525,6 +525,86 @@ def get_invocation_error(self, function_name: str, start_time: int, end_time: in if value["field"] == "@message": self.logging.error(value["value"]) + def get_invocation_logs( + self, function_name: str, request_id: str, start_time: int, end_time: int + ) -> List[str]: + """ + Retrieve full logs (stdout and stderr) for a specific invocation. + + Args: + function_name: Name of the Lambda function + request_id: AWS request ID for the invocation + start_time: Start time as Unix timestamp + end_time: End time as Unix timestamp + + Returns: + List of log messages for the invocation + """ + if not self.logs_client: + self.logs_client = boto3.client( + service_name="logs", + aws_access_key_id=self.config.credentials.access_key, + aws_secret_access_key=self.config.credentials.secret_key, + region_name=self.config.region, + ) + + # Query CloudWatch Logs for the specific request ID + query_string = ( + f'filter @requestId = "{request_id}" | ' + f"fields @timestamp, @message | sort @timestamp asc" + ) + + response = None + retries = 0 + max_retries = 3 + + while retries < max_retries: + query = self.logs_client.start_query( + logGroupName="/aws/lambda/{}".format(function_name), + queryString=query_string, + startTime=math.floor(start_time), + endTime=math.ceil(end_time + 60), # Add buffer for log delivery + ) + query_id = query["queryId"] + + # Poll for query completion + while response is None or response["status"] == "Running": + time.sleep(1) + response = self.logs_client.get_query_results(queryId=query_id) + + if len(response["results"]) > 0: + break + + # Logs might not be available yet + retries += 1 + if retries < max_retries: + self.logging.info( + f"AWS logs not yet available for request {request_id}, " + f"retrying in 10s... ({retries}/{max_retries})" + ) + time.sleep(10) + response = None + + # Extract log messages + log_messages = [] + if response and "results" in response: + for log_entry in response["results"]: + message = None + timestamp = None + for field in log_entry: + if field["field"] == "@message": + message = field["value"] + elif field["field"] == "@timestamp": + timestamp = field["value"] + + if message: + if timestamp: + log_messages.append(f"[{timestamp}] {message}") + else: + log_messages.append(message) + + return log_messages + def download_metrics( self, function_name: str, diff --git a/sebs/azure/azure.py b/sebs/azure/azure.py index d848d724a..1e09be279 100644 --- a/sebs/azure/azure.py +++ b/sebs/azure/azure.py @@ -85,7 +85,6 @@ def shutdown(self): super().shutdown() def find_deployments(self) -> List[str]: - """ Look for duplicated resource groups. """ @@ -584,6 +583,106 @@ def download_metrics( # TODO: query performance counters for mem + def get_invocation_logs( + self, function_name: str, invocation_id: str, start_time: int, end_time: int + ) -> List[str]: + """ + Retrieve full logs (stdout and stderr) for a specific invocation. + + Args: + function_name: Name of the Azure function + invocation_id: Azure invocation ID + start_time: Start time as Unix timestamp + end_time: End time as Unix timestamp + + Returns: + List of log messages for the invocation + """ + self.cli_instance.install_insights() + + resource_group = self.config.resources.resource_group(self.cli_instance) + + # Get Application Insights ID + app_id_query = self.cli_instance.execute( + ("az monitor app-insights component show " "--app {} --resource-group {}").format( + function_name, resource_group + ) + ).decode("utf-8") + application_id = json.loads(app_id_query)["appId"] + + # Azure CLI requires date in the following format + # Format: date (yyyy-mm-dd) time (hh:mm:ss.xxxxx) timezone (+/-hh:mm) + start_time_str = datetime.datetime.fromtimestamp(start_time).strftime( + "%Y-%m-%d %H:%M:%S.%f" + ) + end_time_str = datetime.datetime.fromtimestamp(end_time + 60).strftime("%Y-%m-%d %H:%M:%S") + from tzlocal import get_localzone + + timezone_str = datetime.datetime.now(get_localzone()).strftime("%z") + + # Query for traces (logs) associated with the invocation ID + query = ( + f"union traces, requests | " + f"where customDimensions['InvocationId'] == '{invocation_id}' | " + f"project timestamp, message, severityLevel, itemType | " + f"order by timestamp asc" + ) + + log_messages = [] + retries = 0 + max_retries = 3 + + while retries < max_retries: + ret = self.cli_instance.execute( + ( + 'az monitor app-insights query --app {} --analytics-query "{}" ' + "--start-time {} {} --end-time {} {}" + ).format( + application_id, + query, + start_time_str, + timezone_str, + end_time_str, + timezone_str, + ) + ).decode("utf-8") + + result = json.loads(ret) + + if "tables" in result and len(result["tables"]) > 0: + table = result["tables"][0] + if len(table["rows"]) > 0: + # Extract messages + for row in table["rows"]: + timestamp = row[0] + message = row[1] + severity = row[2] if len(row) > 2 else None + + if message: + prefix = f"[{timestamp}]" + if severity is not None: + severity_map = { + 0: "VERBOSE", + 1: "INFO", + 2: "WARNING", + 3: "ERROR", + 4: "CRITICAL", + } + severity_str = severity_map.get(severity, str(severity)) + prefix += f" [{severity_str}]" + log_messages.append(f"{prefix} {message}") + break + + retries += 1 + if retries < max_retries: + self.logging.info( + f"Azure logs not yet available for invocation {invocation_id}, " + f"retrying in 10s... ({retries}/{max_retries})" + ) + time.sleep(10) + + return log_messages + def _enforce_cold_start(self, function: Function, code_package: Benchmark): self.update_envs(function, code_package, {"ForceColdStart": str(self.cold_start_counter)}) diff --git a/sebs/azure/cli.py b/sebs/azure/cli.py index b875ee029..3ecefdc8a 100644 --- a/sebs/azure/cli.py +++ b/sebs/azure/cli.py @@ -85,7 +85,6 @@ def login(self, appId: str, tenant: str, password: str) -> bytes: return result def upload_package(self, directory: str, dest: str): - """ This is not an efficient and memory-intensive implementation. So far, we didn't have very large functions that require many gigabytes. diff --git a/sebs/benchmark.py b/sebs/benchmark.py index f159e820c..816cac67b 100644 --- a/sebs/benchmark.py +++ b/sebs/benchmark.py @@ -657,7 +657,6 @@ def build( def prepare_input( self, system_resources: SystemResources, size: str, replace_existing: bool = False ): - """ Handle object storage buckets. """ diff --git a/sebs/faas/storage.py b/sebs/faas/storage.py index 5b93c0539..f736c0e97 100644 --- a/sebs/faas/storage.py +++ b/sebs/faas/storage.py @@ -143,7 +143,6 @@ def remove_bucket(self, bucket: str): def benchmark_data( self, benchmark: str, requested_buckets: Tuple[int, int] ) -> Tuple[List[str], List[str]]: - """ Add an input path inside benchmarks bucket. Bucket name format: name-idx-input diff --git a/sebs/faas/system.py b/sebs/faas/system.py index 9fbe0e273..b6c79615f 100644 --- a/sebs/faas/system.py +++ b/sebs/faas/system.py @@ -75,7 +75,6 @@ def function_type() -> "Type[Function]": pass def find_deployments(self) -> List[str]: - """ Default implementation that uses storage buckets. data storage accounts. @@ -187,7 +186,6 @@ def create_function( container_deployment: bool, container_uri: str, ) -> Function: - """ Create a new function in the FaaS platform. The implementation is responsible for creating all necessary @@ -406,6 +404,24 @@ def download_metrics( ): pass + @abstractmethod + def get_invocation_logs( + self, function_name: str, request_id: str, start_time: int, end_time: int + ) -> List[str]: + """ + Retrieve full logs (stdout and stderr) for a specific invocation. + + Args: + function_name: Name of the function + request_id: Platform-specific request/invocation ID + start_time: Start time as Unix timestamp + end_time: End time as Unix timestamp + + Returns: + List of log messages for the invocation + """ + pass + @abstractmethod def create_trigger(self, function: Function, trigger_type: Trigger.TriggerType) -> Trigger: pass diff --git a/sebs/gcp/gcp.py b/sebs/gcp/gcp.py index 6525034c2..ef470d1b0 100644 --- a/sebs/gcp/gcp.py +++ b/sebs/gcp/gcp.py @@ -655,6 +655,86 @@ def wrapper(gen): } ] + def get_invocation_logs( + self, function_name: str, execution_id: str, start_time: int, end_time: int + ) -> List[str]: + """ + Retrieve full logs (stdout and stderr) for a specific invocation. + + Args: + function_name: Name of the GCP Cloud Function + execution_id: GCP execution ID + start_time: Start time as Unix timestamp + end_time: End time as Unix timestamp + + Returns: + List of log messages for the invocation + """ + import google.cloud.logging as gcp_logging + from google.api_core import exceptions + + logging_client = gcp_logging.Client() + logger = logging_client.logger("cloudfunctions.googleapis.com%2Fcloud-functions") + + # Convert timestamps to GCP's required format + timestamps = [] + for ts in [start_time, end_time + 60]: # Add buffer + utc_date = datetime.fromtimestamp(ts, tz=timezone.utc) + timestamps.append(utc_date.strftime("%Y-%m-%dT%H:%M:%SZ")) + + # Query logs for the specific execution ID + filter_str = ( + f'resource.labels.function_name = "{function_name}" ' + f'labels.execution_id = "{execution_id}" ' + f'timestamp >= "{timestamps[0]}" ' + f'timestamp <= "{timestamps[1]}"' + ) + + log_messages = [] + retries = 0 + max_retries = 3 + + while retries < max_retries: + try: + entries = logger.list_entries( + filter_=filter_str, page_size=1000, order_by="timestamp asc" + ) + + found_logs = False + for entry in entries: + found_logs = True + timestamp = entry.timestamp.isoformat() if entry.timestamp else "unknown" + severity = entry.severity if hasattr(entry, "severity") else "DEFAULT" + + # Extract the log message + if hasattr(entry, "payload"): + if isinstance(entry.payload, str): + message = entry.payload + elif isinstance(entry.payload, dict): + message = entry.payload.get("message", str(entry.payload)) + else: + message = str(entry.payload) + else: + message = str(entry) + + log_messages.append(f"[{timestamp}] [{severity}] {message}") + + if found_logs or retries >= max_retries - 1: + break + + except exceptions.GoogleAPIError as e: + self.logging.warning(f"Error querying GCP logs: {e}") + + retries += 1 + if retries < max_retries: + self.logging.info( + f"GCP logs not yet available for execution {execution_id}, " + f"retrying in 10s... ({retries}/{max_retries})" + ) + time.sleep(10) + + return log_messages + def _enforce_cold_start(self, function: Function, code_package: Benchmark): self.cold_start_counter += 1 diff --git a/sebs/local/local.py b/sebs/local/local.py index 32b9f9ffb..156858bc1 100644 --- a/sebs/local/local.py +++ b/sebs/local/local.py @@ -343,6 +343,58 @@ def download_metrics( ): pass + def get_invocation_logs( + self, function_name: str, request_id: str, start_time: int, end_time: int + ) -> List[str]: + """ + Retrieve full logs (stdout and stderr) for a specific invocation. + + For local Docker-based execution, this retrieves logs from the Docker container. + + Args: + function_name: Name of the local function + request_id: Request ID (container ID or identifier) + start_time: Start time as Unix timestamp + end_time: End time as Unix timestamp + + Returns: + List of log messages for the invocation + """ + log_messages = [] + + # For local execution, try to get logs from Docker container + # The request_id might be a container ID or we need to find the container + try: + # Try to find container by name (function_name) + containers = self.docker_client.containers.list( + all=True, filters={"name": function_name} + ) + + if containers: + # Get the most recent container or the specific one matching request_id + container = containers[0] + for c in containers: + if request_id in c.id or request_id in c.name: + container = c + break + + # Retrieve logs + logs = container.logs(stdout=True, stderr=True, timestamps=True).decode("utf-8") + + log_messages = logs.split("\n") + else: + self.logging.warning(f"No Docker container found for function {function_name}") + log_messages.append( + f"Note: Local execution logs are ephemeral. " + f"Container for {function_name} may have been removed." + ) + + except Exception as e: + self.logging.error(f"Error retrieving local logs: {e}") + log_messages.append(f"Error: {str(e)}") + + return log_messages + def enforce_cold_start(self, functions: List[Function], code_package: Benchmark): raise NotImplementedError() diff --git a/sebs/openwhisk/openwhisk.py b/sebs/openwhisk/openwhisk.py index 9c196fe25..4ff8f5bda 100644 --- a/sebs/openwhisk/openwhisk.py +++ b/sebs/openwhisk/openwhisk.py @@ -372,6 +372,43 @@ def download_metrics( ): pass + def get_invocation_logs( + self, function_name: str, request_id: str, start_time: int, end_time: int + ) -> List[str]: + """ + Retrieve full logs (stdout and stderr) for a specific invocation. + + Args: + function_name: Name of the OpenWhisk action + request_id: OpenWhisk activation ID + start_time: Start time as Unix timestamp + end_time: End time as Unix timestamp + + Returns: + List of log messages for the invocation + """ + log_messages = [] + + try: + # Use wsk CLI to get activation logs + result = subprocess.run( + [*self.get_wsk_cmd(), "activation", "logs", request_id], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=True, + ) + + logs = result.stdout.decode("utf-8") + log_messages = logs.split("\n") + + except subprocess.CalledProcessError as e: + self.logging.error(f"Error retrieving OpenWhisk logs: {e}") + log_messages.append(f"Error: {str(e)}") + if e.stderr: + log_messages.append(e.stderr.decode("utf-8")) + + return log_messages + def create_trigger(self, function: Function, trigger_type: Trigger.TriggerType) -> Trigger: if trigger_type == Trigger.TriggerType.LIBRARY: return function.triggers(Trigger.TriggerType.LIBRARY)[0]