Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
f474076
feat(config): support for reading .j2 config files with jinja
aelanman Jun 9, 2022
dcd60f5
fix: update requirements
aelanman Sep 13, 2022
510940b
style: blacken and fix docstring for linter
aelanman Sep 13, 2022
8c1b978
Update test_state.py and bump up version of redis gh action
aelanman Jan 18, 2023
98d01d3
fix: fixing unit tests
Jan 24, 2023
6c4c449
lint: blacken
Jul 20, 2023
6f16cbe
fix: run apt-get update in actions
Jul 21, 2023
51ad05b
fix: raise version in actions because requires python>3.8
Jul 23, 2023
32ef123
fix: set_start_method causes RuntimeError if run twice
Jul 24, 2023
a9a4aae
fix: need a longer buffer time in test_wait
Jul 24, 2023
fc15d64
ci: avoid running black on files generated by gh actions
Feb 16, 2024
7c6c51d
fix: enforce subprocess method
Feb 16, 2024
0ef65fa
feat: add get-coco-config endpoint
Feb 16, 2024
047b014
fix: update dockerfile
Feb 16, 2024
06a91cc
feat option in coco client script to pull config from backend
Feb 16, 2024
41e2359
fix: support python 3.8 to 3.11
Feb 17, 2024
b8423b2
feat: save part of returned json to state
May 2, 2024
c37247c
fix: handle timeout errors in StateReplyCheck
May 2, 2024
54588c4
fix: Clear space in docker image
May 2, 2024
1f02a71
fix dockerfile
Jul 17, 2024
2f0a8b3
fix:syntax in gh actions
Aug 1, 2024
1c58ef1
fix(test): extra internal endpoint means this value must be raised by…
Aug 1, 2024
660e74f
restore commented line
Sep 17, 2025
d4358ef
fix: use ThreadingWSGIServer in metric.py instead of deprecated _Thre…
Feb 6, 2026
7f1648e
feat: option to report request latencies for external forwards
Feb 6, 2026
7322d90
test: check that latencies are returned properly for different report…
Feb 6, 2026
a0542c8
docs: include report_latency in docstring
Feb 6, 2026
751bc28
Revert "fix: use ThreadingWSGIServer in metric.py instead of deprecat…
Feb 6, 2026
50d0f86
fix: remove failing line from Dockerfile
Feb 6, 2026
0d28ea8
feat: also report queueing time
Feb 6, 2026
d5de0f1
ci(main): slight fixes to main workflows
ljgray Mar 13, 2026
6735359
feat(state): re-impement simple yaml/jinja loader and remove kotekan …
ljgray Mar 13, 2026
b3e2861
style: blackify
ljgray Mar 13, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ jobs:
- name: Set up Python 3.14
uses: actions/setup-python@v6
with:
python-version: 3.14
python-version: "3.14"
- name: Install black
run: pip install black

- name: Check code with black
run: black --check --exclude kotekan .
run: black --check --exclude kotekan coco

- name: Run black on scripts
run: black --check scripts/cocod scripts/coco
Expand All @@ -34,7 +34,7 @@ jobs:
- name: Set up Python 3.14
uses: actions/setup-python@v6
with:
python-version: 3.14
python-version: "3.14"
- name: Install pydocstyle
run: pip install pydocstyle
- name: Run pydocstyle on all .py files
Expand All @@ -48,7 +48,7 @@ jobs:
- name: Set up Python 3.14
uses: actions/setup-python@v6
with:
python-version: 3.14
python-version: "3.14"
- name: Install flake8
run: pip install flake8
- name: Run flake8
Expand All @@ -62,7 +62,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: "3.10"
python-version: "3.14"
- name: Install pylint
run: pip install pylint==2.15 pylint-ignore
- name: Install dependencies
Expand All @@ -72,16 +72,20 @@ jobs:

tests:
runs-on: ubuntu-latest
strategy:
matrix:
python_version: ['3.8', '3.9', '3.10', '3.11']
steps:
- uses: actions/checkout@v6

- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: "3.10"
python-version: ${{ matrix.python_version }}

- name: Install apt dependencies
run: |
sudo apt-get update
sudo apt-get install -y python3-sphinx libevent-dev libhdf5-dev g++
- name: Install pip dependencies
run: |
Expand All @@ -90,7 +94,7 @@ jobs:
pip install .

- name: Start Redis
uses: supercharge/redis-github-action@1.2.0
uses: supercharge/redis-github-action@1.4.0
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
uses: supercharge/redis-github-action@1.4.0
uses: supercharge/redis-github-action@1.8.1

We might as well update this to the latest.

with:
redis-version: 5

Expand Down
27 changes: 11 additions & 16 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,27 +1,22 @@
# Use an official Python runtime as a base image
FROM python:3.7-slim
FROM python:3.10-slim

## The maintainer name and email
LABEL maintainer="CHIME/FRB Collaboration"

ADD . /coco

RUN apt-get update && \
apt-get install -y apt-utils && \
apt-get install -y software-properties-common && \
apt-get install -y git && \
apt-get install -y build-essential && \
apt-get install -y libmariadb-dev && \
apt-get install -y libevent-dev && \
apt-get install -y libhdf5-dev && \
pip install flask && \
pip install -r /coco/requirements.txt && \
pip install /coco && \
apt-get install -y apt-utils git build-essential curl \
libmariadb-dev libevent-dev && \
pip install --use-deprecated=legacy-resolver flask && \
pip install --use-deprecated=legacy-resolver -r /coco/requirements.txt && \
pip install --use-deprecated=legacy-resolver /coco

#-----------------------
# Minimize container size
#-----------------------
apt-get remove -y curl git && \
#-----------------------
# Minimize container size
#-----------------------
RUN apt-get remove -y git && \
apt-get autoremove -y && \
apt-get clean -y && \
rm -rf /tmp/build
rm -rf /tmp/build /coco
2 changes: 1 addition & 1 deletion coco/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class NotThisMethod(Exception):


def register_vcs_handler(vcs, method): # decorator
"""Decorator to mark a method as the handler for a particular VCS."""
"""Mark a method as the handler for a particular VCS (decorator)."""

def decorate(f):
"""Store f in HANDLERS[vcs][method]."""
Expand Down
2 changes: 0 additions & 2 deletions coco/blocklist.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ class Blocklist:
"""

def __init__(self, hosts, path: os.PathLike):

# Initialise persistent storage
self._state = PersistentState(path)
if self._state.state is None:
Expand Down Expand Up @@ -177,7 +176,6 @@ def _check_hosts(self, hosts: List[Host]) -> Tuple[List[Host], List[bool]]:
"""

def _check_host(host):

if not isinstance(host, Host):
host = Host(host)

Expand Down
12 changes: 11 additions & 1 deletion coco/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,11 @@ def _save_reply(self, reply):
for r in reply.values():
if isinstance(r, dict):
merged.update(r)
self.state.write(self.save_to_state, merged)
if isinstance(self.save_to_state, str):
self.state.write(self.save_to_state, merged)
else:
for key, targ in self.save_to_state.items():
self.state.write(key, merged[targ])


class ReplyCheck(Check):
Expand Down Expand Up @@ -413,6 +417,12 @@ async def run(self, result: Result):
failed_hosts.add(host)
result.report_failure(self._name, host, "missing", name)
continue
if result_ == "Timeout":
for name in self.state_paths.keys():
logger.debug(f"/{self._name}: Reply timed out for {host}.")
failed_hosts.add(host)
result.report_failure(self._name, host, "timeout", name)
continue
for name, value in result_.items():
if name not in self.state_paths:
logger.debug(
Expand Down
1 change: 0 additions & 1 deletion coco/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ def load_config(path=None):
any_exist = False

for cfile in config_files:

# Expand the configuration file path
absfile = Path(cfile).expanduser().resolve()

Expand Down
32 changes: 24 additions & 8 deletions coco/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@

import json
import redis
import aioredis

import sys

if sys.version_info.minor <= 10:
import aioredis
else:
from redis import asyncio as aioredis

from sanic import Sanic, response

Expand All @@ -29,17 +35,24 @@
Endpoint,
LocalEndpoint,
)
from .result import Result
from . import worker, __version__, wait
from .state import State
from .exceptions import ConfigError, InternalError
from .util import Host, str2total_seconds
from . import slack
from . import config

Sanic.START_METHOD_SET = True
Sanic.start_method = "fork"

logger = logging.getLogger(__name__)

# This should be a no-op on Linux but is required on MacOS for coco to run
set_start_method("fork")
try:
set_start_method("fork", force=True)
except RuntimeError:
pass


class Core:
Expand Down Expand Up @@ -166,7 +179,6 @@ def __del__(self):
logger.error(
f"Failed sending shutdown command to worker (have to kill it): {type(e)}: {e}"
)
self._kill_worker()
self._kill_worker()

def _kill_worker(self):
Expand Down Expand Up @@ -254,7 +266,6 @@ def _config_slack_loggers(self):
slack.set_token(self.config["slack_token"])

for rule in self.config["slack_rules"]:

logger_name = rule["logger"]
channel = rule["channel"]
level = rule.get("level", "INFO").upper()
Expand Down Expand Up @@ -292,7 +303,6 @@ def _register_config(self):
logger.warning("Config registration DISABLED. This is only OK for testing.")

def _load_config(self, config_path: os.PathLike):

self.config = config.load_config(config_path)

self.log_level = self.config["log_level"]
Expand Down Expand Up @@ -337,11 +347,9 @@ def _load_config(self, config_path: os.PathLike):
logger.error(f"Invalid slack rule {rdict}.")

def _load_endpoints(self):

self.endpoints = {}

for conf in self.config["endpoints"]:

name = conf["name"]

# Create the endpoint object
Expand Down Expand Up @@ -371,13 +379,21 @@ def _local_endpoints(self):
"reset-state": ("POST", self.state.reset_state),
"save-state": ("POST", self.state.save_state),
"load-state": ("POST", self.state.load_state),
"get-coco-config": ("GET", self._get_config),
"wait": ("POST", wait.process_post),
}

for name, (type_, callable_) in endpoints.items():
self.endpoints[name] = LocalEndpoint(name, type_, callable_)
self.forwarder.add_endpoint(name, self.endpoints[name])

async def _get_config(self, _):
return Result(
"coco-config",
result={Host("coco"): (self.config, 200)},
type_="FULL",
)

def _check_endpoint_links(self):
def check(e):
if e:
Expand Down Expand Up @@ -414,7 +430,7 @@ async def external_endpoint(self, request, endpoint):
Core endpoint. Passes all endpoint calls on to redis and blocks until completion.
"""
# create a unique name for this task: <process ID>-<POSIX timestamp>
now = time.time()
now = time.perf_counter()
name = f"{os.getpid()}-{now}"

async with self.redis_async.client() as ra_cli:
Expand Down
10 changes: 6 additions & 4 deletions coco/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(self, name, conf, forwarder, state):
self.type = conf.get("type", "GET")
self.group = conf.get("group")
self.callable = conf.get("callable", False)
self.report_latency = conf.get("report_latency", True)
self.call_on_start = conf.get("call_on_start", False)
self.forwarder = forwarder
self.state = state
Expand Down Expand Up @@ -109,7 +110,6 @@ def __init__(self, name, conf, forwarder, state):

# If save_state is set, the configured values have to match.
if self.values:

# Check if endpoint value types match the associated part of the saved state
for key in self.values.keys():
try:
Expand Down Expand Up @@ -183,7 +183,7 @@ def __init__(self, name, conf, forwarder, state):

def _load_internal_forward(self, dict_, list_):
"""
Load Forward's from the config dictionary, generate objects and place in list.
Load Forwards from the config dictionary, generate objects and place in list.

Parameters
----------
Expand Down Expand Up @@ -298,11 +298,11 @@ def _load_checks(self, check_dict: Dict) -> List[Check]:

save_to_state = check_dict.get("save_reply_to_state", None)
if save_to_state:
if not isinstance(save_to_state, str):
if not isinstance(save_to_state, (str, dict)):
raise ConfigError(
f"'save_reply_to_state' in check for '{name}' in '{self.name}"
f".conf' is of type '{type(save_to_state).__name__}' "
f"(expected str)."
f"(expected str or dict)."
)
logger.debug(
f"Endpoint {self.name} will save replies to state: {save_to_state}."
Expand Down Expand Up @@ -514,6 +514,8 @@ async def call(self, request, hosts=None, params=None):
self.write_timestamp()
self.logger.debug("Success!")

result.report_latency = self.report_latency

return result

def write_timestamp(self):
Expand Down
19 changes: 11 additions & 8 deletions coco/request_forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ def _create_trace_callback(name):
"""

async def _callback(session, context, params): # pylint: disable=W0613

if not hasattr(context, "event_status"):
context.event_status = {}
context.start_time = time.time()
Expand All @@ -62,7 +61,6 @@ def _trace_config(trace_all=False):
If trace_all=False, only dump on exceptions, otherwise dump at the end of a request too.
"""
if not hasattr(_trace_config, "obj"):

_trace_config.obj = aiohttp.TraceConfig()

# This is a big hack, but it finds the events by looking for the underlying
Expand Down Expand Up @@ -353,7 +351,7 @@ async def _request(self, session, method, host, endpoint, request, params, timeo
"""
url = host.join_endpoint(endpoint)
hostname, port = host.hostname, host.port
start_time = time.time()
start_time = time.perf_counter()
status = "0"
try:
async with session.request(
Expand All @@ -364,20 +362,25 @@ async def _request(self, session, method, host, endpoint, request, params, timeo
timeout=aiohttp.ClientTimeout(timeout),
params=params,
) as response:
response_time = time.perf_counter() - start_time
try:
status = str(response.status)
return (
host,
(await response.json(content_type=None), response.status),
(
await response.json(content_type=None),
response.status,
response_time,
),
)
except json.decoder.JSONDecodeError:
return host, (await response.text(), response.status)
return host, (await response.text(), response.status, response_time)
except AsyncioTimeoutError:
return host, ("Timeout", 0)
return host, ("Timeout", 0, 0)
except Exception as e:
return host, (str(e), 0)
return host, (str(e), 0, 0)
finally:
response_time = time.time() - start_time
response_time = time.perf_counter() - start_time
self.response_time.labels(
endpoint=endpoint, host=hostname, port=port
).observe(response_time)
Expand Down
Loading
Loading