Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,17 @@ ifdef BOARDWALKD_SLACK_WEBHOOK_URL
--workspace-status-json \
--host-header-pattern="(localhost|127\.0\.0\.1)" \
--port=8888 \
--url='http://localhost:8888'
--url='http://localhost:8888' \
--slack-error-advice-config test/server-client/slack-error-advice.toml
--workspace-status-json
else
poetry run boardwalkd serve \
--develop \
--workspace-status-json \
--host-header-pattern="(localhost|127\.0\.0\.1)" \
--port=8888 \
--url='http://localhost:8888'
--url='http://localhost:8888' \
--workspace-status-json
endif

dist: clean
Expand Down Expand Up @@ -94,8 +97,8 @@ render-d2:
test: test-pytest test-ruff test-pyright test-semgrep test-ansible-lint

.PHONY: test-ansible-lint
test-ansible-lint: develop
-cd test && poetry run ansible-lint --config-file ansible-lint.yaml
test-ansible-lint:
uvx --directory test ansible-lint --config-file ansible-lint.yaml z.dummy_playbook_for_ansible_lint.yml server-client/playbooks/

# Run pytest verbosely if we're running manually, but normally if we're in a CI environment.
.PHONY: test-pytest
Expand Down
4,084 changes: 2,290 additions & 1,794 deletions poetry.lock

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "poetry.core.masonry.api"
name = "boardwalk"
description = "Boardwalk is a linear Ansible workflow engine"
readme = "README.md"
version = "0.8.29"
version = "0.9.0"
requires-python = ">=3.11,<4"
authors = [
{name="Mat Hornbeek", email="84995001+m4wh6k@users.noreply.github.com"},
Expand Down Expand Up @@ -64,7 +64,7 @@ anyio = "^4.7.0"
pyright = "==1.1.350"
pytest = "^8.3.4"
ruff = "^0.11.13"
semgrep = ">=1.92.0"
semgrep = "^1.164.0"

[tool.poetry.group.docs]
optional = true
Expand Down Expand Up @@ -105,6 +105,9 @@ exclude = [
"build/",
"dist/",
"typings/",
"**/node_modules",
"**/__pycache__",
"**/.*",
]
pythonPlatform = "All"
reportImportCycles = false
Expand Down
104 changes: 77 additions & 27 deletions src/boardwalk/ansible.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@

import json
import sys
from collections.abc import Mapping
from functools import partial
from pathlib import Path
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

import ansible_runner
from loguru import logger
Expand All @@ -17,7 +18,7 @@
from boardwalk.app_exceptions import BoardwalkException

if TYPE_CHECKING:
from typing import Any, TypedDict
from typing import TypedDict

from ansible_runner import Runner

Expand Down Expand Up @@ -76,34 +77,83 @@ def ansible_runner_cancel_callback(ws: Workspace):
return True


FAILED_RUNNER_EVENTS = {
"runner_on_failed",
"runner_item_on_failed",
"runner_on_async_failed",
"runner_on_unreachable",
}


def _event_ignored(event_data: Mapping[str, Any]) -> bool:
return bool(event_data.get("ignore_errors"))


def _event_result_hidden(event_data: Mapping[str, Any]) -> bool:
res = event_data.get("res", {})
if not isinstance(res, dict):
return False
return bool(res.get("_ansible_no_log"))


def _append_if_present(parts: list[str], value: Any) -> None:
if value is None:
return
value_str = str(value).strip()
if value_str:
parts.append(value_str)


def ansible_runner_errors_to_output(runner: Runner, include_msg: bool = True) -> str:
"""Collects error messages from a Runner into a multiline string"""
"""Collects useful error messages from a Runner into a de-duplicated multiline string."""
output: list[str] = []
seen: set[str] = set()

for event in runner.events:
if (
event["event"] == "runner_on_failed"
or event["event"] == "runner_item_on_failed"
or event["event"] == "runner_on_async_failed"
or event["event"] == "runner_on_unreachable"
) and not ("ignore_errors" in event["event_data"] and event["event_data"]["ignore_errors"]):
msg: list[str] = [
event["event"],
event["event_data"]["task"],
event["event_data"]["task_action"],
event["event_data"]["host"],
]
if include_msg and not (
"_ansible_no_log" in event["event_data"]["res"] and event["event_data"]["res"]["_ansible_no_log"]
):
try:
msg.append(event["event_data"]["res"]["msg"])
except KeyError:
logger.warning("Event error did not contain msg")
try:
msg.append(event["stdout"])
except KeyError:
logger.warning("Event error did not contain stdout")
output.append(": ".join(msg))
if event.get("event") not in FAILED_RUNNER_EVENTS:
continue

event_data = event.get("event_data", {})
if _event_ignored(event_data):
continue

parts: list[str] = []
_append_if_present(parts, event_data.get("host"))
_append_if_present(parts, event_data.get("task"))
_append_if_present(parts, event_data.get("task_action"))

if include_msg and not _event_result_hidden(event_data):
detail_parts: list[str] = []
has_result_message = False
res = event_data.get("res", {})
if isinstance(res, dict):
result_message = res.get("msg")
_append_if_present(detail_parts, result_message)
has_result_message = result_message is not None and str(result_message).strip() != ""
if "assertion" in res:
_append_if_present(detail_parts, f"assertion: {res.get('assertion')}")
if "evaluated_to" in res:
_append_if_present(detail_parts, f"evaluated_to: {res.get('evaluated_to')}")
if "status" in res:
_append_if_present(detail_parts, f"status: {res.get('status')}")
if "url" in res:
_append_if_present(detail_parts, f"url: {res.get('url')}")
result_json = res.get("json")
if isinstance(result_json, dict):
errors = result_json.get("errors")
if isinstance(errors, list):
_append_if_present(detail_parts, "errors: " + "; ".join(str(error) for error in errors))

if detail_parts:
parts.extend(detail_parts)
if not has_result_message:
_append_if_present(parts, event.get("stdout"))

line = ": ".join(parts)
if line and line not in seen:
seen.add(line)
output.append(line)

return "\n".join(output)


Expand Down
125 changes: 120 additions & 5 deletions src/boardwalk/cli_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@
help="An Ansible pattern to limit hosts by. Defaults to no limit",
default="",
)
@click.option(
"--open-browser-for-api-login/--no-open-browser-for-api-login",
help="Attempt to open a web browser, if required, to log into the boardwalkd API",
default=True,
show_default=True,
)
@click.option(
"--server-connect/--no-server-connect",
"-sc/-nsc",
Expand Down Expand Up @@ -108,6 +114,7 @@ def run(
server_connect: bool,
sort_hosts: str,
stomp_locks: bool,
open_browser_for_api_login: bool,
):
"""
Runs workflow jobs defined in the Boardwalkfile.py
Expand Down Expand Up @@ -274,7 +281,7 @@ def run_workflow(
),
)

handle_workflow_catch(workspace=workspace, hostname=host.name)
handle_workflow_catch(workspace=workspace, host=host)

# Connect to the remote host
# Wrap everything in try/except so we can handle failures
Expand Down Expand Up @@ -340,7 +347,7 @@ def run_failure_mode_handler(
"""
if boardwalkd_client:
if isinstance(exception, AnsibleRunnerBaseException):
runner_errors = ansible_runner_errors_to_output(runner=exception.runner, include_msg=False)
runner_errors = ansible_runner_errors_to_output(runner=exception.runner, include_msg=True)
msg = f"{exception.runner_msg}: {runner_errors}"
else:
try:
Expand Down Expand Up @@ -433,7 +440,9 @@ def check_host_preconditions_locally(
if workspace.cfg.workflow.cfg.always_retry_failed_hosts:
# If the workflow was started but never finished, ignore preconditions
try:
boardwalk_state = RemoteStateModel.parse_obj(host.ansible_facts["ansible_local"]["boardwalk_state"])
boardwalk_state = RemoteStateModel.model_validate(
host.ansible_facts["ansible_local"]["boardwalk_state"]
)
if (
boardwalk_state.workspaces[workspace.name].workflow.started
and not boardwalk_state.workspaces[workspace.name].workflow.succeeded
Expand Down Expand Up @@ -462,8 +471,9 @@ def check_host_preconditions_locally(
return hosts_meeting_preconditions


def handle_workflow_catch(workspace: Workspace, hostname: str):
def handle_workflow_catch(workspace: Workspace, host: Host):
"""Handles local and remote workspace catches. Blocks under caught conditions"""
hostname = host.name
if workspace.caught():
logger.info(
f"{hostname}: The {workspace.name} workspace is locally caught. Waiting for release before continuing..."
Expand Down Expand Up @@ -512,9 +522,91 @@ def check_boardwalkd_catch(client: WorkspaceClient) -> bool:
)
)
while check_boardwalkd_catch(boardwalkd_client):
maybe_clear_remote_state_fact(
host=host,
client=boardwalkd_client,
become_password=become_password,
check=_check_mode,
)
maybe_clear_remote_mutex(
host=host,
client=boardwalkd_client,
become_password=become_password,
check=_check_mode,
)
time.sleep(5) # nosemgrep: python.lang.best-practice.sleep.arbitrary-sleep


def maybe_clear_remote_state_fact(
host: Host,
client: WorkspaceClient,
become_password: str | None,
check: bool,
) -> bool:
"""Clears a host's remote state fact when boardwalkd has a pending request."""
if not client.get_semaphores().clear_remote_state_requested:
return False

logger.trace(f"Processing boardwalkd requested removal of remote state for {host.name}")
try:
host.clear_remote_state_fact(become_password=become_password, check=check)
client.queue_event(
WorkspaceEvent(
severity="success",
message=f"{host.name}: Removed remote Boardwalk state fact",
)
)
logger.success(f"{host.name}: Removed remote Boardwalk state fact @ {host.remote_state_path}")
return True
except AnsibleRunnerBaseException as e:
client.queue_event(
WorkspaceEvent(
severity="error",
message=f"{host.name}: Could not remove remote Boardwalk state fact: {e}",
)
)
logger.error(f"{host.name}: Unable to remove remote Boardwalk state fact @ {host.remote_state_path}")
logger.error(f"Runner output: {ansible_runner_errors_to_output(runner=e.runner)}")
return False
finally:
client.delete_clear_remote_state_request()


def maybe_clear_remote_mutex(
host: Host,
client: WorkspaceClient,
become_password: str | None,
check: bool,
) -> bool:
"""Clears a host's remote mutex when boardwalkd has a pending request."""
if not client.get_semaphores().clear_remote_mutex_requested:
return False

logger.trace(f"Processing boardwalkd requested removal of remote mutex for {host.name}")
try:
host.clear_remote_mutex(become_password=become_password, check=check)
client.queue_event(
WorkspaceEvent(
severity="success",
message=f"{host.name}: Removed remote Boardwalk mutex",
)
)
logger.success(f"{host.name}: Removed remote Boardwalk mutex @ {host.remote_mutex_path}")
return True
except AnsibleRunnerBaseException as e:
client.queue_event(
WorkspaceEvent(
severity="error",
message=f"{host.name}: Could not remove remote Boardwalk mutex: {e}",
)
)
logger.error(f"{host.name}: Unable to remove remote Boardwalk mutex @ {host.remote_mutex_path}")
logger.error(f"Runner output: {ansible_runner_errors_to_output(runner=e.runner)}")
return False
finally:
client.delete_clear_remote_mutex_request()


def lock_remote_host(host: Host):
if boardwalkd_client:
boardwalkd_client.queue_event(
Expand All @@ -536,6 +628,22 @@ def bootstrap_with_server(workspace: Workspace, ctx: click.Context):
if not boardwalkd_client:
raise BoardwalkException("bootstrap_with_server called but no boardwalkd_client exists")
boardwalkd_url = boardwalkd_client.url.geturl()
worker_limit = ctx.params.get("limit")
auth_login_context: dict[str, str | None] = {
"workspace": workspace.name,
"worker_command": "check" if _check_mode else "run",
"worker_hostname": socket.gethostname(),
"worker_limit": str(worker_limit) if worker_limit else None,
"worker_username": getpass.getuser(),
"deployment_url": os.environ.get("BUILD_URL") or os.environ.get("RUN_DISPLAY_URL"),
"deployment_tag": os.environ.get("BUILD_TAG"),
"deployment_name": os.environ.get("JOB_NAME"),
"deployment_number": os.environ.get("BUILD_NUMBER"),
"deployment_user": os.environ.get("BUILD_USER"),
"deployment_user_id": os.environ.get("BUILD_USER_ID"),
"deployment_user_email": os.environ.get("BUILD_USER_EMAIL"),
}
boardwalkd_client.set_auth_login_context(**{key: value for key, value in auth_login_context.items() if value})
# Check if the if the Workspace is locked. We don't want to conflict with another worker
try:
if boardwalkd_client.has_mutex():
Expand All @@ -553,6 +661,13 @@ def bootstrap_with_server(workspace: Workspace, ctx: click.Context):
try:
boardwalkd_client.post_details(
WorkspaceDetails(
deployment_url=auth_login_context.get("deployment_url") or "",
deployment_name=auth_login_context.get("deployment_name") or "",
deployment_number=auth_login_context.get("deployment_number") or "",
deployment_tag=auth_login_context.get("deployment_tag") or "",
deployment_user=auth_login_context.get("deployment_user") or "",
deployment_user_id=auth_login_context.get("deployment_user_id") or "",
deployment_user_email=auth_login_context.get("deployment_user_email") or "",
host_pattern=workspace.cfg.host_pattern,
workflow=workspace.cfg.workflow.__class__.__qualname__,
worker_command="check" if _check_mode else "run",
Expand Down Expand Up @@ -622,7 +737,7 @@ def directly_confirm_host_preconditions(host: Host, inventory_vars: InventoryHos
if workspace.cfg.workflow.cfg.always_retry_failed_hosts:
# If the workflow was started but never finished, ignore preconditions
try:
boardwalk_state = RemoteStateModel.parse_obj(host.ansible_facts["ansible_local"]["boardwalk_state"])
boardwalk_state = RemoteStateModel.model_validate(host.ansible_facts["ansible_local"]["boardwalk_state"])
if (
boardwalk_state.workspaces[workspace.name].workflow.started
and not boardwalk_state.workspaces[workspace.name].workflow.succeeded
Expand Down
Loading
Loading