-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmpvaSetMonitor.py
More file actions
44 lines (40 loc) · 1.71 KB
/
mpvaSetMonitor.py
File metadata and controls
44 lines (40 loc) · 1.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# -----------------------------------------------------------------------------
# Title : mpvaSetMonitor
# -----------------------------------------------------------------------------
# File : mpvaSetMonitor.py
# Author : Kuktae Kim, ktkim@slac.stanford.edu
# Created : 2023-11-02
# Last update: 2023-11-02
# -----------------------------------------------------------------------------
# Description:
# Python class for mpvaSetMonitor function.
# -----------------------------------------------------------------------------
# This file is part of matpva. It is subject to the license terms in the
# LICENSE.txt file found in the top-level directory of this distribution
# and at: https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of matpva, including this file, may be copied, modified,
# propagated, or distributed except according to the terms contained in
# the LICENSE.txt file.
# -----------------------------------------------------------------------------
# mpvaSetMonitor.py
from p4p.client.thread import Context, Disconnected
class ValueCache:
def __init__(self, ctxt: Context, pv: str):
self.__cache = None
self.updated = False
self.__sub = ctxt.monitor(pv, self.__update, notify_disconnect=True)
def __update(self, V): # Value or Exception
if isinstance(V, Disconnected):
self.__cache = None
self.updated = False
elif self.__cache is None:
self.__cache = V
self.updated = False
else:
self.__cache[None] = V
self.updated = True
def mpvaNewMonitorValue(self):
return self.updated
@property
def current(self):
return self.__cache