Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,55 @@
# lib-protocol-proxy-bacnet
# Protocol Proxy BACnet Library
![Python 3.10](https://img.shields.io/badge/python-3.10-blue.svg)
![Python 3.11](https://img.shields.io/badge/python-3.11-blue.svg)
[![Passing?](https://github.com/eclipse-volttron/lib-protocol-proxy-bacnet/actions/workflows/run-tests.yml/badge.svg)](https://github.com/eclipse-volttron/lib-protocol-proxy-bacnet/actions/workflows/run-tests.yml)
[![pypi version](https://img.shields.io/pypi/v/protocol-proxy-bacnet.svg)](https://pypi.org/project/protocol-proxy-bacnet/)

This library provides support for communication and management of BACnet devices to a [Protocol Proxy](https://github.com/eclipse-volttron/lib-protocol-proxy) Manager.
Communication with a BACnet device on a network happens via a virtual BACnet device.

## Automatically installed dependencies
- python = ">=3.10,<4.0"
- protocol-proxy = ">=2.0.0rc0"
- bacpypes3 = ">=0.0.102"


[//]: # (# Documentation)

[//]: # (More detailed documentation can be found on [ReadTheDocs]&#40;https://eclipse-volttron.readthedocs.io/en/latest/external-docs/lib-protocol-proxy-bacnet/index.html. The RST source)

[//]: # (of the documentation for this component is located in the "docs" directory of this repository.)

# Installation
This library, along with its dependencies, can be installed using pip:

```shell
pip install lib-protocol-proxy-bacnet
```

# Development
This library is maintained by the VOLTTRON Development Team.

Please see the following [guidelines](https://github.com/eclipse-volttron/volttron-core/blob/develop/CONTRIBUTING.md)
for contributing to this and/or other VOLTTRON repositories.

[//]: # (Please see the following helpful guide about [using the Protocol Proxy]&#40;https://github.com/eclipse-volttron/lib-protocol-proxy/blob/develop/developing_with_protocol_proxy.md&#41;)

[//]: # (in your VOLTTRON agent or other applications.)

# Disclaimer Notice

This material was prepared as an account of work sponsored by an agency of the
United States Government. Neither the United States Government nor the United
States Department of Energy, nor Battelle, nor any of their employees, nor any
jurisdiction or organization that has cooperated in the development of these
materials, makes any warranty, express or implied, or assumes any legal
liability or responsibility for the accuracy, completeness, or usefulness or any
information, apparatus, product, software, or process disclosed, or represents
that its use would not infringe privately owned rights.

Reference herein to any specific commercial product, process, or service by
trade name, trademark, manufacturer, or otherwise does not necessarily
constitute or imply its endorsement, recommendation, or favoring by the United
States Government or any agency thereof, or Battelle Memorial Institute. The
views and opinions of authors expressed herein do not necessarily state or
reflect those of the United States Government or any agency thereof.
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ python = ">=3.10,<4.0"
bacpypes3 = ">=0.0.102"
protocol-proxy = ">=2.0.0rc0"

[tool.poetry.group.dev.dependencies]
# No additional dependencies.

[tool.poetry.group.documentation.dependencies]
Sphinx = "^4.5.0"
sphinx-rtd-theme = "^1.0.0"
Expand Down
12 changes: 8 additions & 4 deletions src/protocol_proxy/protocol/bacnet/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
import logging

from argparse import ArgumentParser
from typing import Callable

from .bacnet_proxy import BACnetProxy

PROXY_CLASS = BACnetProxy

_log = logging.getLogger(__name__)

async def run_proxy(local_device_address, **kwargs):
bp = BACnetProxy(local_device_address, **kwargs)
async def run_proxy(local_interface, **kwargs):
_log.info(f'Launching BACnet Proxy at interface {local_interface} using parameters: {kwargs}.')
bp = BACnetProxy(local_interface, **kwargs)
await bp.start()

def launch_bacnet(parser: ArgumentParser) -> tuple[ArgumentParser, Callable]:
parser.add_argument('--local-device-address', type=str, required=True,
parser.add_argument('--local-interface', type=str, required=True,
help='Address on the local machine of this BACnet Proxy.')
parser.add_argument('--bacnet-network', type=int, default=0,
parser.add_argument('--bacnet-port', type=int, default=0,
help='The BACnet port as an offset from 47808.')
parser.add_argument('--vendor-id', type=int, default=999,
help='The BACnet vendor ID to use for the local device of this BACnet Proxy.')
Expand Down
53 changes: 27 additions & 26 deletions src/protocol_proxy/protocol/bacnet/bacnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,49 +25,52 @@


class BACnet:
def __init__(self, local_device_address, bacnet_network=0, vendor_id=999, object_name='VOLTTRON BACnet Proxy',
def __init__(self, local_interface, bacnet_port=0, vendor_id=999, object_name='VOLTTRON BACnet Proxy',
device_info_cache=None, router_info_cache=None, ase_id=None, **_):
_log.debug('WELCOME BAC')
#_log.debug('WELCOME BAC')
vendor_info = get_vendor_info(vendor_id)
device_object_class = vendor_info.get_object_class(ObjectType.device)
device_object = device_object_class(objectIdentifier=('device', vendor_id), objectName=object_name)
network_port_object_class = vendor_info.get_object_class(ObjectType.networkPort)
network_port_object = network_port_object_class(local_device_address,
objectIdentifier=("network-port", bacnet_network),
objectName="NetworkPort-1", networkNumber=bacnet_network,
network_port_object = network_port_object_class(local_interface,
objectIdentifier=("network-port", bacnet_port),
objectName="NetworkPort-1", networkNumber=bacnet_port,
networkNumberQuality="configured")
self.app = Application.from_object_list(
[device_object, network_port_object],
device_info_cache=device_info_cache, # TODO: If these should be passed in, add to args & launch.
router_info_cache=router_info_cache,
aseID=ase_id
)
_log.debug(f'WE HAVE AN APP: {self.app.device_info_cache}')
#_log.debug(f'WE HAVE AN APP: {self.app.device_info_cache}')

async def query_device(self, address: str, property_name: str = 'object-identifier'):
"""Returns properties about the device at the given address.
If a different property name is not given, this will be the object-id.
This function allows unicast discovery.
This can get everything from device if it is using read_property_multiple and ALL
"""
_log.debug('IN QUERY DEVICE METHOD')
#_log.debug('IN QUERY DEVICE METHOD')
return await self.read_property(device_address=address, object_identifier='device:4194303',
property_identifier=property_name)

async def batch_read(self, device_address: str, read_specifications: dict[str, dict]):
daopr_list = [
DeviceAddressObjectPropertyReference(
key=key,
device_address=device_address,
object_identifier=spec['object_id'],
property_reference=(spec['property'], spec['array_index'])
if spec['array_index'] is not None else spec['property']
) for key, spec in read_specifications.items()
]
results = {}
batch = BatchRead(daopr_list)
# run until the batch is done
await batch.run(self.app, lambda k, v: results.update({k: v}))
try:
daopr_list = [
DeviceAddressObjectPropertyReference(
key=key,
device_address=device_address,
object_identifier=spec['object_id'],
property_reference=(spec['property'], spec['array_index'])
if spec['array_index'] is not None else spec['property']
) for key, spec in read_specifications.items()
]
batch = BatchRead(daopr_list)
# run until the batch is done
await batch.run(self.app, lambda k, v: results.update({k: v}))
except BaseException as e:
_log.warning(f'Exception in BatchRead: {e}')
return results

async def change_of_value(self, device_address: str, object_identifier: str, process_identifier: int | None,
Expand Down Expand Up @@ -110,13 +113,13 @@ async def read_property(self, device_address: str, object_identifier: str, prope
property_identifier,
int(property_array_index) if property_array_index is not None else None
)
_log.debug(f"BACnet.read_property response: {response}")
#_log.debug(f"BACnet.read_property response: {response}")
except ErrorRejectAbortNack as err:
_log.debug(f'Error reading property {err}')
response = err
if isinstance(response, AnyAtomic):
response = response.get_value()
_log.debug(f"BACnet.read_property final response: {response}")
#_log.debug(f"BACnet.read_property final response: {response}")
return response

# async def read_property_multiple(self, device_address: str, read_specifications: dict):
Expand Down Expand Up @@ -157,7 +160,7 @@ async def write_property(self, device_address: str, object_identifier: str, prop
int(priority)
)
except ErrorRejectAbortNack as e:
_log.debug(str(e))
return e

# async def write_property_multiple(self, device_address: str, write_specifications: list):
# # TODO Implement write_property_multiple. Commenting until completed.
Expand Down Expand Up @@ -500,10 +503,8 @@ async def who_is(self, device_instance_low: int, device_instance_high: int, dest
)

try:
# Perform WHO-IS discovery (note: bacpypes3 who_is doesn't accept apdu_timeout parameter)
i_am_responses = await self.app.who_is(device_instance_low, device_instance_high,
destination_addr)
_log.debug(f"Received {len(i_am_responses)} I-Am response(s) from {destination_addr}")
i_am_responses = await self.app.who_is(device_instance_low, device_instance_high, destination_addr)
_log.info(f"Received {len(i_am_responses)} I-Am response(s) from {destination_addr}")

devices_found = []
if i_am_responses:
Expand Down
57 changes: 42 additions & 15 deletions src/protocol_proxy/protocol/bacnet/bacnet_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import traceback

from dataclasses import dataclass
from datetime import datetime
from datetime import datetime, timedelta, timezone
from functools import partial
from zoneinfo import ZoneInfo

from protocol_proxy.ipc import callback, ProtocolProxyMessage
from protocol_proxy.proxy import launch
Expand All @@ -33,18 +34,19 @@ def is_unchanged(self, address: str, object_identifier: str, confirmed: bool | N
and confirmed == confirmed and lifetime == lifetime)

class BACnetProxy(AsyncioProtocolProxy):
def __init__(self, local_device_address, bacnet_network=0, vendor_id=999, object_name='VOLTTRON BACnet Proxy',
def __init__(self, local_interface, bacnet_port=0, vendor_id=999, object_name='VOLTTRON BACnet Proxy',
**kwargs):
_log.debug('IN BACNETPROXY __init__')
#_log.debug('IN BACNET PROXY __init__')
super(BACnetProxy, self).__init__(**kwargs)
self.bacnet = BACnet(local_device_address, bacnet_network, vendor_id, object_name, **kwargs)
self.bacnet = BACnet(local_interface, bacnet_port, vendor_id, object_name, **kwargs)
self.loop = asyncio.get_event_loop()

# Cache for object-list to avoid re-reading on every page request
# Format: {device_key: (object_list, timestamp)}
self._object_list_cache = {}
self._cache_timeout = 300
self._subscribed_cov: dict[str, COVSubscription] = {}
self._time_sync_periodics = {}

self.register_callback(self.batch_read_endpoint, 'BATCH_READ', provides_response=True)
#self.register_callback(self.confirmed_private_transfer_endpoint, 'CONFIRMED_PRIVATE_TRANSFER', provides_response=True)
Expand All @@ -54,6 +56,7 @@ def __init__(self, local_device_address, bacnet_network=0, vendor_id=999, object
self.register_callback(self.read_property_endpoint, 'READ_PROPERTY', provides_response=True)
#self.register_callback(self.read_property_multiple_endpoint, 'READ_PROPERTY_MULTIPLE', provides_response=True)
self.register_callback(self.time_synchronization_endpoint, 'TIME_SYNCHRONIZATION', provides_response=True)
self.register_callback(self.setup_time_synchronization_endpoint, 'SETUP_TIME_SYNCHRONIZATION', provides_response=False)
self.register_callback(self.write_property_endpoint, 'WRITE_PROPERTY', provides_response=True)
self.register_callback(self.read_device_all_endpoint, 'READ_DEVICE_ALL', provides_response=True)
self.register_callback(self.who_is_endpoint, 'WHO_IS', provides_response=True)
Expand Down Expand Up @@ -90,7 +93,7 @@ async def cov_callback_function(self, peer, key, value):
method_name='RECEIVE_COV',
payload=serialize({key: value})
)
_log.debug(f'@@@@@@ SENDING COV CALLBACK: {message}')
#_log.debug(f'@@@@@@ SENDING COV CALLBACK: {message}')
await self.send(peer, message)

@callback
Expand Down Expand Up @@ -159,14 +162,39 @@ async def read_property_endpoint(self, _, raw_message: bytes):
async def time_synchronization_endpoint(self, _, raw_message: bytes):
"""Endpoint for setting time on a BACnet device."""
message = json.loads(raw_message.decode('utf8'))
address = message['address']
if date_time_string := message.get('date_time'):
try:
date_time = datetime.fromisoformat(date_time_string)
except ValueError as e:
return serialize(e)
result = await self.bacnet.time_synchronization(address, date_time)
return serialize(result)
address = message['device_address']
date_time_string = message['date_time']
try:
date_time = datetime.fromisoformat(date_time_string)
result = await self.bacnet.time_synchronization(address, date_time)
return serialize(result)
except ValueError as e:
return serialize(e)

@callback
async def setup_time_synchronization_endpoint(self, _, raw_message: bytes):
"""Endpoint for setting time on a BACnet device."""
message = json.loads(raw_message.decode('utf8'))
address = message['device_address']
interval_string = message.get('interval')
time_zone_string = message.get('time_zone')
if interval_string is None:
task = self._time_sync_periodics.pop(address, None)
task.cancel()
else:
interval_seconds = timedelta(seconds=float(interval_string))
time_zone = ZoneInfo(time_zone_string)
async def synchronize_time():
try:
_log.info(f'Starting time synchronization periodic for: {address}')
while True:
date_time = datetime.now(timezone.utc).astimezone(time_zone)
_log.info(f'Synchronizing time on {address} to: {date_time}')
await self.bacnet.time_synchronization(device_address=address, date_time=date_time)
await asyncio.sleep(interval_seconds.total_seconds())
except asyncio.CancelledError:
_log.info(f'Stopping time synchronization periodic for {address}')
self._time_sync_periodics[address] = self.loop.create_task(synchronize_time())

@callback
async def write_property_endpoint(self, _, raw_message: bytes):
Expand Down Expand Up @@ -214,7 +242,6 @@ async def who_is_endpoint(self, _, raw_message: bytes):
device_instance_low = message.get('device_instance_low', 0)
device_instance_high = message.get('device_instance_high', 4194303)
dest = message.get('dest', '255.255.255.255:47808')
apdu_timeout = message.get('apdu_timeout', None) # Keep for backward compatibility but don't use # TODO: Why!?
result = await self.bacnet.who_is(device_instance_low, device_instance_high, dest)
return serialize(result)

Expand Down Expand Up @@ -354,7 +381,7 @@ async def clear_cache_endpoint(self, _, raw_message: bytes):
return json.dumps(result).encode('utf8')

@callback
async def get_cache_stats_endpoint(self, _, raw_message: bytes):
async def get_cache_stats_endpoint(self, _, __):
"""Endpoint for getting cache statistics."""
result = _get_cache_stats(self._object_list_cache, self._cache_timeout)
return json.dumps(result).encode('utf8')
Expand Down
6 changes: 5 additions & 1 deletion src/protocol_proxy/protocol/bacnet/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from enum import Enum

from bacpypes3.apdu import ErrorRejectAbortNack, AbortPDU, ErrorPDU, RejectPDU
from bacpypes3.basetypes import EngineeringUnits
from bacpypes3.basetypes import EngineeringUnits, BinaryPV
from bacpypes3.constructeddata import Sequence
from bacpypes3.primitivedata import Atomic
from bacpypes3.json.util import atomic_encode, sequence_to_json
Expand Down Expand Up @@ -68,6 +68,10 @@ def _serialize(val):
ret_val = str(val)
elif isinstance(val, (ipaddress.IPv4Address, ipaddress.IPv6Address)):
ret_val = str(val)
# TODO: Casting binaries to int for backwards compatability. What is the best way to deal with this permanently?
# Report binary input/output/value as integer.
elif isinstance(val, BinaryPV):
ret_val = int(val)
# Handle BACPypes Atomic and Sequence types:
elif isinstance(val, Atomic):
ret_val = atomic_encode(val)
Expand Down
Loading