Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
3097b60
initial muxer implementation
Feb 6, 2025
5a36087
two files to connect muxer to dripline service
miniwynne Mar 19, 2025
37d26bd
added RelayService by copying old spime_endpoints.py
miniwynne Apr 2, 2025
23e118e
closed the comment that was commenting out all of RelayService
miniwynne Apr 30, 2025
d0eb97c
commenting out the error lines
miniwynne May 1, 2025
32b43e2
added an 'append.MuxerRelay' above the relay class and fixed a commen…
miniwynne May 5, 2025
52e578c
added an 'Enitity.__init__(self,**kwargs) and hopefully fixed the ind…
miniwynne May 5, 2025
b30672a
changed the tabs around using 'expand -t 4'
miniwynne May 5, 2025
1f9d282
changed 'Entity.__init__(self,**kwargs)' to be at the end of Relay
miniwynne May 6, 2025
4722d3b
trying to address the get_str eror using pop
miniwynne May 6, 2025
d21d26b
added self. in front of any kwargs, hoping I didnt break spacing
miniwynne May 6, 2025
cf1000b
did away with adding self. to all objects, ended up doing the kwargs.…
miniwynne May 6, 2025
8ae9e0b
changing the input to MuxerRelay from Entity to FormatEntity
miniwynne May 7, 2025
3985355
importing FormatEntity from implementations
miniwynne May 7, 2025
854ae0b
removed ' ' from get_str to see if that is the issue
miniwynne May 7, 2025
62e4524
putting self. in front get_str
miniwynne May 7, 2025
97cfa8b
adding the kwargs.pop thing again to get_str
miniwynne May 7, 2025
c01ccbe
Fixing super __init__ for Relay
May 7, 2025
da79744
indentation issue at line 147 w calibration
miniwynne May 7, 2025
21518e0
putting get_str back into a string
miniwynne May 7, 2025
18f2eed
Pid loop from phase 2 changed certain syntax, mostly updated throw re…
miniwynne Jul 16, 2025
18dccc0
added ServiceAttributeEntity, which is the module in the config file …
miniwynne Jul 16, 2025
67c54f7
changing this_time def to match Mainz and removing constants
miniwynne Jul 23, 2025
d9ec4f3
removed exceptions from import
miniwynne Jul 23, 2025
9fafe4c
remove fancy docs
miniwynne Jul 23, 2025
2aedcb2
added the same imports as muxer_service.py
miniwynne Jul 23, 2025
c95e9c9
had a trailing comma rip
miniwynne Jul 23, 2025
41ea9a2
changed def of __validate_status() to be like Mainz
miniwynne Jul 23, 2025
35869cc
import interface from dripline.core
miniwynne Jul 23, 2025
da00834
debugging __validate_status
miniwynne Jul 23, 2025
a8b5e8d
change validate status back to normal
miniwynne Jul 23, 2025
00ea3ac
commenting out validate_status as a whole
miniwynne Jul 23, 2025
7bfbb0d
commenting out all oisntances of valid status
miniwynne Jul 23, 2025
6178245
missing v on value
miniwynne Jul 23, 2025
675078b
change .service back to .provider
miniwynne Jul 23, 2025
e09b189
copying the Mainz method and seeing what sticks
miniwynne Jul 23, 2025
fa887f2
import Simple_PID
miniwynne Jul 23, 2025
8308572
I found this in a folder called simple_pid on my computer
miniwynne Jul 23, 2025
df7880f
getting PID.py to come from this repo hopefully
miniwynne Jul 23, 2025
52cb318
missing an I in import
miniwynne Jul 23, 2025
3dcf0dd
commenting out simple_pid
miniwynne Jul 23, 2025
37fdbb1
still trying to import this PID
miniwynne Jul 23, 2025
072b599
back to doing phase 2 way
miniwynne Jul 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 147 additions & 0 deletions dripline/extensions/PID.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import time
import warnings


def _clamp(value, limits):
lower, upper = limits
if value is None:
return None
elif upper is not None and value > upper:
return upper
elif lower is not None and value < lower:
return lower
return value


try:
# get monotonic time to ensure that time deltas are always positive
_current_time = time.monotonic
except AttributeError:
# time.monotonic() not available (using python < 3.3), fallback to time.time()
_current_time = time.time
warnings.warn('time.monotonic() not available, using time.time() as fallback. Consider using Python 3.3 or newer to get monotonic time measurements.')


class PID(object):
"""
A simple PID controller. No fuss.
"""
def __init__(self, Kp=1.0, Ki=0.0, Kd=0.0, setpoint=0, sample_time=0.01, output_limits=(None, None), auto_mode=True, proportional_on_measurement=False):
"""
:param Kp: The value for the proportional gain Kp
:param Ki: The value for the integral gain Ki
:param Kd: The value for the derivative gain Kd
:param setpoint: The initial setpoint that the PID will try to achieve
:param sample_time: The time in seconds which the controller should wait before generating a new output value. The PID works best when it is constantly called (eg. during a loop), but with a sample time set so that the time difference between each update is (close to) constant. If set to None, the PID will compute a new output value every time it is called.
:param output_limits: The initial output limits to use, given as an iterable with 2 elements, for example: (lower, upper). The output will never go below the lower limit or above the upper limit. Either of the limits can also be set to None to have no limit in that direction. Setting output limits also avoids integral windup, since the integral term will never be allowed to grow outside of the limits.
:param auto_mode: Whether the controller should be enabled (in auto mode) or not (in manual mode)
:param proportional_on_measurement: Whether the proportional term should be calculated on the input directly rather than on the error (which is the traditional way). Using proportional-on-measurement avoids overshoot for some types of systems.
"""
self.Kp, self.Ki, self.Kd = Kp, Ki, Kd
self.setpoint = setpoint
self.sample_time = sample_time

self._min_output, self._max_output = output_limits
self._auto_mode = auto_mode
self.proportional_on_measurement = proportional_on_measurement

self._error_sum = 0

self._last_time = _current_time()
self._last_output = None
self._proportional = 0
self._last_input = None

def __call__(self, input_):
"""
Call the PID controller with *input_* and calculate and return a control output if sample_time seconds has passed
since the last update. If no new output is calculated, return the previous output instead (or None if no value
has been calculated yet).
"""
if not self.auto_mode:
return self._last_output

now = _current_time()
dt = now - self._last_time

if self.sample_time is not None and dt < self.sample_time and self._last_output is not None:
# only update every sample_time seconds
return self._last_output

# compute error terms
error = self.setpoint - input_
self._error_sum += self.Ki * error * dt
d_input = input_ - (self._last_input if self._last_input is not None else input_)

# compute the proportional term
if not self.proportional_on_measurement:
# regular proportional-on-error, simply set the proportional term
self._proportional = self.Kp * error
else:
# add the proportional error on measurement to error_sum
self._error_sum -= self.Kp * d_input
self._proportional = 0

# clamp error sum to avoid integral windup (and proportional, if proportional-on-measurement is used)
self._error_sum = _clamp(self._error_sum, self.output_limits)

# compute final output
output = self._proportional + self._error_sum - self.Kd * d_input
output = _clamp(output, self.output_limits)

# keep track of state
self._last_output = output
self._last_input = input_
self._last_time = now

return output

@property
def tunings(self):
"""The tunings used by the controller as a tuple: (Kp, Ki, Kd)"""
return self.Kp, self.Ki, self.Kd

@tunings.setter
def tunings(self, tunings):
"""Setter for the PID tunings"""
self.Kp, self.Ki, self.Kd = tunings

@property
def auto_mode(self):
"""Whether the controller is currently enabled (in auto mode) or not"""
return self._auto_mode

@auto_mode.setter
def auto_mode(self, enabled):
"""Enable or disable the PID controller"""
if enabled and not self._auto_mode:
# switching from manual mode to auto, reset
self._last_output = None
self._last_input = None
self._error_sum = 0
self._error_sum = _clamp(self._error_sum, self.output_limits)

self._auto_mode = enabled

@property
def output_limits(self):
"""The current output limits as a 2-tuple: (lower, upper). See also the *output_limts* parameter in :meth:`PID.__init__`."""
return (self._min_output, self._max_output)

@output_limits.setter
def output_limits(self, limits):
"""Setter for the output limits"""
if limits is None:
self._min_output, self._max_output = None, None
return

min_output, max_output = limits

if None not in limits and max_output < min_output:
raise ValueError('lower limit must be less than upper limit')

self._min_output = min_output
self._max_output = max_output

self._error_sum = _clamp(self._error_sum, self.output_limits)
self._last_output = _clamp(self._last_output, self.output_limits)
1 change: 1 addition & 0 deletions dripline/extensions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@

# Modules in this directory
from .add_auth_spec import *
from .muxer_service import *
229 changes: 229 additions & 0 deletions dripline/extensions/cca_pid_loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
'''
Implementation of a PID control loop
'''

from __future__ import print_function
__all__ = []

import time
import datetime

from dripline.core import ThrowReply, Entity, calibrate, AlertConsumer, Interface
from dripline.implementations import EthernetSCPIService, FormatEntity

import logging
logger = logging.getLogger(__name__)

__all__.append('PidController')

class PidController(AlertConsumer):
'''
Implementation of a PID control loop with constant offset. That is, the PID equation
is used to compute the **change** to the value of some channel and not the value
itself. In the case of temperature control, this makes sense if the loop is working
against some fixed load (such as a cryocooler).

The input sensor can be anything which broadcasts regular values on the alerts
exchange (using the standard sensor_value.<name> routing key format). Usually
this would be a temperature sensor, but it could be anything. Similarly, the
output is anything that can be set to a float value, though a current output
is probably most common. After setting the new value of current, this value is checked
to be within a range around the desired value.

**NOTE**
The "exchange" and "keys" arguments list below come from the Service class but
are not valid for this class. Any value provided will be ignored
'''

def __init__(self,
input_channel,
output_channel,
check_channel,
status_channel,
payload_field='value_cal',
tolerance = 0.01,
target_value=110,
proportional=0.0, integral=0.0, differential=0.0,
maximum_out=1.0, minimum_out=1.0, delta_out_min= 0.001,
enable_offset_term=True,
minimum_elapsed_time=0,
**kwargs
):
'''
input_channel (str): name of the logging sensor to use as input to PID (this will override any provided values for keys)
output_channel (str): name of the endpoint to be set() based on PID
check_channel (str): name of the endpoint to be checked() after a set()
status_channel (str): name of the endpoint which controls the status of the heater (enabled/disabled output)
payload_field (str): name of the field in the payload when the sensor logs (default is 'value_cal' and 'value_raw' is the only other expected value)
target_value (float): numerical value to which the loop will try to lock the input_channel
proportional (float): coefficient for the P term in the PID equation
integral (float): coefficient for the I term in the PID equation
differential (float): coefficient for the D term in the PID equation
maximum_out (float): max value to which the output_channel may be set; if the PID equation gives a larger value this value is used instead
delta_out_min (float): minimum value by which to change the output_channel; if the PID equation gives a smaller change, the value is left unchanged (no set is attempted)
tolerance (float): acceptable difference between the set and get values (default: 0.01)
minimum_elapsed_time (float): minimum time interval to perform PID calculation over
'''
kwargs.update({'keys':['sensor_value.'+input_channel]})
AlertConsumer.__init__(self, **kwargs)

self._set_channel = output_channel
self._check_channel = check_channel
self._status_channel = status_channel
self.payload_field = payload_field
self.tolerance = tolerance

self._last_data = {'value':None, 'time':datetime.datetime.utcnow()}
self.target_value = target_value

self.Kproportional = proportional
self.Kintegral = integral
self.Kdifferential = differential

self._integral= 0

self.max_current = maximum_out
self.min_current = minimum_out
self.min_current_change = delta_out_min

self.enable_offset_term = enable_offset_term
self.minimum_elapsed_time = minimum_elapsed_time

self.__validate_status()
self._old_current = self.__get_current()
logger.info('starting current is: {}'.format(self._old_current))

def __get_current(self):
value = self.provider.get(self._check_channel)[self.payload_field]
#value = self.service.get(self._check_channel)[self.payload_field]
logger.info('current get is {}'.format(value))

try:
value = float(value)
except (TypeError, ValueError):
raise ThrowReply('DriplineValueError','value get ({}) is not floatable'.format(value))
return value

def __validate_status(self):

value = self.provider.get(self._status_channel)[self.payload_field]

if value == 'enabled':
logger.debug("{} returns {}".format(self._status_channel,value))
else:
logger.critical("Invalid status of {} for PID control by {}".format(self._status_channel,self.name))
raise ThrowReply('DriplineHardwareError',"{} returns {}".format(self._status_channel,value))




def this_consume(self, message, method):
logger.info('consuming message')
this_value = message.payload[self.payload_field]
if this_value is None:
logger.info('value is None')
return

this_time = datetime.datetime.strptime(message.timestamp, '%Y-%m-%dT%H:%M:%S.%fZ')
#this_time = datetime.datetime.strptime(message['timestamp'], constants.TIME_FORMAT)
if (this_time - self._last_data['time']).total_seconds() < self.minimum_elapsed_time:
# handle self._force_reprocess from @target_value.setter
if not self._force_reprocess:
logger.info("not enough time has elasped: {}[{}]".format((this_time - self._last_data['time']).total_seconds(),self.minimum_elapsed_time))
return
logger.info("Forcing process due to changed target_value")
self._force_reprocess = False

self.process_new_value(timestamp=this_time, value=float(this_value))

@property
def target_value(self):
return self._target_value
@target_value.setter
def target_value(self, value):
self._target_value = value
self._integral = 0
self._force_reprocess = True

def set_current(self, value):
logger.info('going to set new current to: {}'.format(value))
#reply = self.provider.set(self._set_channel, value)
reply = self.service.set(self._set_channel, value)
logger.info('set response was: {}'.format(reply))

def process_new_value(self, value, timestamp):

delta = self.target_value - value
logger.info('value is <{}>; delta is <{}>'.format(value, delta))

self._integral += delta * (timestamp - self._last_data['time']).total_seconds()
if (timestamp - self._last_data['time']).total_seconds() < 2*self.minimum_elapsed_time:
try:
derivative = (self._last_data['value'] - value) / (timestamp - self._last_data['time']).total_seconds()
except TypeError:
derivative = 0
else:
logger.warning("invalid time for calculating derivative")
derivative = 0.
self._last_data = {'value': value, 'time': timestamp}

logger.info("proportional <{}>; integral <{}>; differential <{}>".format\
(self.Kproportional*delta, self.Kintegral*self._integral, self.Kdifferential*derivative))
change_to_current = (self.Kproportional * delta +
self.Kintegral * self._integral +
self.Kdifferential * derivative
)
new_current = (self._old_current or 0)*self.enable_offset_term + change_to_current

if abs(change_to_current) < self.min_current_change:
logger.info("current change less than min delta")
logger.info("old[new] are: {}[{}]".format(self._old_current,new_current))
return
logger.info('computed new current to be: {}'.format(new_current))
if new_current > self.max_current:
logger.info("new current above max")
new_current = self.max_current
if new_current < self.min_current:
logger.info("new current below min")
new_current = self.min_current
if new_current < 0.:
logger.info("new current < 0")
new_current = 0.

self.set_current(new_current)
logger.debug("allow settling time and checking the current value")
# FIXME: remove sleep when set_and_check handled properly
time.sleep(1)
current_get = self.__get_current()
if abs(current_get-new_current) < self.tolerance:
logger.debug("current set is equal to current get")
else:
#self.__validate_status()
raise ThrowReply('DriplineValueError',"set value ({}) is not equal to checked value ({})".format(new_current,current_get))

logger.info("current set is: {}".format(new_current))
self._old_current = new_current

__all__.append('ServiceAttributeEntity')
#changed things like self.provider to self.service, idk if this is the move tho
class ServiceAttributeEntity(Entity):
'''
Entity allowing communication with spime property.
'''

def __init__(self,
attribute_name,
disable_set=False,
**kwargs):
Entity.__init__(self, **kwargs)
self._attribute_name = attribute_name
self._disable_set = disable_set

@calibrate()
def on_get(self):
return getattr(self.service, self._attribute_name)

def on_set(self, value):
if self._disable_set:
raise ThrowReply('DriplineMethodNotSupportedError','setting not available for {}'.format(self.name))
setattr(self.service, self._attribute_name, value)
Loading