Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
ARG img_user=ghcr.io/driplineorg
ARG img_repo=dripline-cpp
#ARG img_tag=develop
ARG img_tag=v2.10.0
ARG img_tag=v2.10.1

FROM ${img_user}/${img_repo}:${img_tag}

Expand Down
8 changes: 5 additions & 3 deletions dripline/core/calibrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,12 @@ def wrapper(self, *args, **kwargs):
val_dict['value_cal'] = cal
elif isinstance(self._calibration, dict):
logger.debug('calibration is dictionary, looking up value')
if val_dict['value_raw'] in self._calibration:
val_dict['value_cal'] = self._calibration[val_dict['value_raw']]
value_raw_str = str(val_dict['value_raw'])
if value_raw_str in self._calibration:
val_dict['value_cal'] = self._calibration[str(val_dict['value_raw'])]
else:
raise ThrowReply('service_error_invalid_value', f"raw value <{repr(val_dict['value_raw'])}> not in cal dict")
raise ThrowReply('service_error_invalid_value', f"raw value <{str(val_dict['value_raw'])}> {type(val_dict['value_raw'])} not in cal dict with calibrate dict {self._calibration}")
logger.debug(f"formatted cal is:\n{ val_dict['value_cal'] }")
else:
logger.warning('the _calibration property is of unknown type')
return val_dict
Expand Down
58 changes: 50 additions & 8 deletions dripline/core/entity.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import datetime
import functools
import types

import numbers

import scarab

from _dripline.core import MsgAlert
from .endpoint import Endpoint
from dripline.core import MsgAlert
from .throw_reply import ThrowReply
__all__ = []

import logging
Expand All @@ -31,6 +31,11 @@ def wrapper(*args, **kwargs):
values.update({'value_raw': args[0]})
logger.debug('set done, now log')
self.log_a_value(values)
try:
this_value = float(values[self._check_field])
except (TypeError, ValueError):
this_value = False
self._last_log_value = this_value
return result
return wrapper

Expand Down Expand Up @@ -58,16 +63,29 @@ class Entity(Endpoint):
'''
#check_on_set -> allows for more complex logic to confirm successful value updates
# (for example, the success condition may be measuring another endpoint)
def __init__(self, get_on_set=False, log_routing_key_prefix='sensor_value', log_interval=0, log_on_set=False, calibration=None, **kwargs):
def __init__(self,
get_on_set=False,
log_on_set=False,
log_routing_key_prefix='sensor_value',
log_interval=0,
max_interval=0,
max_fractional_change=0,
check_field='value_cal',
calibration=None,
**kwargs):
'''
Args:
get_on_set: if true, calls to on_set are immediately followed by an on_get, which is returned
log_on_set: if true, always call log_a_value() immediately after on_set
**Note:** requires get_on_set be true, overrides must be equivalent
log_routing_key_prefix: first term in routing key used in alert messages which log values
log_interval: how often to log the Entity's value. If 0 then scheduled logging is disabled;
log_interval: how often to check the Entity's value. If 0 then scheduled logging is disabled;
if a number, interpreted as number of seconds; if a dict, unpacked as arguments
to the datetime.time_delta initializer; if a datetime.timedelta taken as the new value
log_on_set: if true, always call log_a_value() immediately after on_set
**Note:** requires get_on_set be true, overrides must be equivalent
max_interval: max allowed time interval between logging, allows usage of conditional logging. If 0,
then logging values occurs every log_interval.
max_fractional_change: max allowed fractional difference between subsequent values to trigger log condition.
check_field: result field to check, 'value_cal' or 'value_raw'
calibration (string || dict) : if string, updated with raw on_get() result via str.format() in
@calibrate decorator, used to populate raw and calibrated values
fields of a result payload. If a dictionary, the raw result is used
Expand All @@ -81,13 +99,16 @@ def __init__(self, get_on_set=False, log_routing_key_prefix='sensor_value', log_
# keep a reference to the on_set (possibly decorated in a subclass), needed for changing *_on_set configurations
self.__initial_on_set = self.on_set

self._get_on_set = None
self._log_on_set = None
self.get_on_set = get_on_set
self.log_on_set = log_on_set

self.log_interval = log_interval
self._max_interval = max_interval
self._max_fractional_change = max_fractional_change
self._check_field = check_field
self._log_action_id = None
self._last_log_time = None

@property
def get_on_set(self):
Expand Down Expand Up @@ -136,10 +157,31 @@ def log_interval(self, new_interval):
def scheduled_log(self):
logger.debug("in a scheduled log event")
result = self.on_get()
try:
this_value = float(result[self._check_field])
except (TypeError, ValueError):
this_value = False
# Various checks for log condition
if self._last_log_time is None:
logger.debug("log because no last log")
elif (datetime.datetime.utcnow() - self._last_log_time).total_seconds() > self._max_interval:
logger.debug("log because too much time")
elif this_value is False:
logger.warning(f"cannot check value change for {self.name}")
return
elif ((self._last_log_value == 0 and this_value != 0) or
(self._last_log_value != 0 and\
abs((self._last_log_value - this_value)/self._last_log_value)>self._max_fractional_change)):
logger.debug("log because change magnitude")
else:
logger.debug("no log condition met, not logging for {self.name}")
return
self._last_log_value = this_value
self.log_a_value(result)

def log_a_value(self, the_value):
logger.debug(f"value to log is:\n{the_value}")
logger.info(f"value to log for {self.name} is:\n{the_value}")
self._last_log_time = datetime.datetime.utcnow()
the_alert = MsgAlert.create(payload=scarab.to_param(the_value), routing_key=f'{self.log_routing_key_prefix}.{self.name}')
alert_sent = self.service.send(the_alert)

Expand Down
29 changes: 16 additions & 13 deletions dripline/core/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class Interface(Core):
A class that provides user-friendly methods for dripline interactions in a Python interpreter.
Intended for use as a dripline client in scripts or interactive sessions.
'''
def __init__(self, username: str | dict=None, password: str | dict=None, dripline_mesh: dict=None, timeout_s: int=10, confirm_retcodes: bool=True):
def __init__(self, authentication_obj = None, username: str | dict=None, password: str | dict=None, dripline_mesh: dict=None, timeout_s: int=10, confirm_retcodes: bool=True):
'''
Configures an interface with the necessary parameters.

Expand Down Expand Up @@ -48,18 +48,21 @@ def __init__(self, username: str | dict=None, password: str | dict=None, driplin
if dripline_mesh is not None:
dripline_config.update(dripline_mesh)

dl_auth_spec = create_dripline_auth_spec()
auth_args = {
'username': {} if username is None else username,
'password': {} if password is None else password,
}
dl_auth_spec.merge( scarab.to_param(auth_args) )
auth_spec = scarab.ParamNode()
auth_spec.add('dripline', dl_auth_spec)
logger.debug(f'Loading auth spec:\n{auth_spec}')
auth = scarab.Authentication()
auth.add_groups(auth_spec)
auth.process_spec()
if authentication_obj is not None:
auth = authentication_obj
else:
dl_auth_spec = create_dripline_auth_spec()
auth_args = {
'username': {} if username is None else username,
'password': {} if password is None else password,
}
dl_auth_spec.merge( scarab.to_param(auth_args) )
auth_spec = scarab.ParamNode()
auth_spec.add('dripline', dl_auth_spec)
logger.debug(f'Loading auth spec:\n{auth_spec}')
auth = scarab.Authentication()
auth.add_groups(auth_spec)
auth.process_spec()

Core.__init__(self, config=scarab.to_param(dripline_config), auth=auth)

Expand Down
62 changes: 60 additions & 2 deletions dripline/core/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@

import scarab
from _dripline.core import _Service, DriplineConfig, create_dripline_auth_spec
from _dripline.core import MsgRequest, Receiver, op_t
from .throw_reply import ThrowReply
from .object_creator import ObjectCreator

import datetime
import logging
import types
import datetime
import numbers
import subprocess
import time

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -59,7 +64,10 @@ def __init__(self, name, make_connection=True, endpoints=None, add_endpoints_now
broadcast_key='broadcast', loop_timeout_ms=1000,
message_wait_ms=1000, heartbeat_interval_s=60,
username=None, password=None, authentication_obj=None,
dripline_mesh=None, **kwargs):
dripline_mesh=None,
heartbeat_broker_s=60,
rk_aliveness="state",
**kwargs):
'''
Configures a service with the necessary parameters.

Expand Down Expand Up @@ -97,6 +105,8 @@ def __init__(self, name, make_connection=True, endpoints=None, add_endpoints_now
Authentication information provided as a scarab.Authentication object; this will override the auth parameter.
dripline_mesh : dict, optional
Provide optional dripline mesh configuration information (see dripline_config for more information)
heartbeat_broker_s (int): self defined scheduler interval (seconds) to check the aliveness
rk_aliveness (str): the routing key to check the aliveness (on_get run by default)
'''
# Final dripline_mesh config should be the default updated by the parameters passed by the caller
dripline_config = DriplineConfig().to_python()
Expand Down Expand Up @@ -138,6 +148,54 @@ def __init__(self, name, make_connection=True, endpoints=None, add_endpoints_now
if kwargs:
logger.debug(f'Service received some kwargs that it doesn\'t handle, which will be ignored: {kwargs}')

self._heartbeat_action_id = None
self._message_wait_ms = message_wait_ms
self.heartbeat_broker_s = heartbeat_broker_s
self.rk_aliveness = rk_aliveness
self.broker = config["dripline_mesh"]["broker"]
self.start_heartbeat()

@property
def heartbeat_broker_s(self):
return self._heartbeat_broker_s
@heartbeat_broker_s.setter
def heartbeat_broker_s(self, new_interval):
if isinstance(new_interval, numbers.Number):
self._heartbeat_broker_s = datetime.timedelta(seconds=new_interval)
elif isinstance(new_interval, dict):
self._heartbeat_broker_s = datetime.timedelta(**new_interval)
elif isinstance(new_interval, datetime.timedelta):
self._heartbeat_broker_s = new_interval
else:
raise ThrowReply('service_error_invalid_value', f"unable to interpret a new_interval for heartbeat test of type <{type(new_interval)}>")

def scheduled_heartbeat(self):
logger.info("in a scheduled hearbeat event")
a_node= scarab.ParamNode()
a_receiver = Receiver()
#a_node.add('values',scarab.ParamValue('5'))
the_request = MsgRequest.create(a_node, op_t.get,self.rk_aliveness)
reply_pkg = self.send(the_request)
if not reply_pkg.successful_send:
raise ThrowReply("Failed rabbitmq connection test.")
sig_handler = scarab.SignalHandler()
sig_handler.add_cancelable(a_receiver)
result = a_receiver.wait_for_reply(reply_pkg, self._message_wait_ms) # receiver expects ms
sig_handler.remove_cancelable(a_receiver)
the_value = result.payload
logger.info(f"get {self.rk_aliveness}, result: {the_value}")
logger.info(f"rabbitmq connection is found in this scheduler")

def start_heartbeat(self):
if self._heartbeat_action_id is not None:
self.unschedule(self._heartbeat_action_id)
if self.heartbeat_broker_s:
logger.info(f'should start heart beat connection check every {self.heartbeat_broker_s}')
self._heartbeat_action_id = self.schedule(self.scheduled_heartbeat, self.heartbeat_broker_s, datetime.datetime.now() + self.execution_buffer*3)
else:
raise ValueError('unable to start logging when heartbeat_broker_s evaluates false')
logger.debug(f'heartbeat action id is {self._heartbeat_action_id}')

def add_endpoints_from_config(self):
if self.endpoint_configs is not None:
for an_endpoint_conf in self.endpoint_configs:
Expand Down
8 changes: 7 additions & 1 deletion dripline/implementations/postgres_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ def __init__(self, table_name,
self._optional_insert_names = self._ensure_col_key_map(optional_insert_names)
self._default_insert_dict = default_insert_values

def on_get(self):
result = {"default":"this is a default output"}
return result

def _ensure_col_key_map(self, column_list):
to_return = []
for a_col in column_list:
Expand Down Expand Up @@ -134,7 +138,8 @@ def do_select(self, return_cols=[], where_eq_dict={}, where_lt_dict={}, where_gt
return_cols = self.table.c
else:
return_cols = [sqlalchemy.text(col) for col in return_cols]
this_select = sqlalchemy.select(return_cols)

this_select = sqlalchemy.select(*return_cols)
for c,v in where_eq_dict.items():
this_select = this_select.where(getattr(self.table.c,c)==v)
for c,v in where_lt_dict.items():
Expand All @@ -143,6 +148,7 @@ def do_select(self, return_cols=[], where_eq_dict={}, where_lt_dict={}, where_gt
this_select = this_select.where(getattr(self.table.c,c)>v)
conn = self.service.engine.connect()
result = conn.execute(this_select)
conn.commit()
return (result.keys(), [i for i in result])

def _insert_with_return(self, insert_kv_dict, return_col_names_list):
Expand Down