-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathRepetierOutputDevicePlugin.py
More file actions
178 lines (146 loc) · 7.99 KB
/
RepetierOutputDevicePlugin.py
File metadata and controls
178 lines (146 loc) · 7.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# Copyright (c) 2020 Aldo Hoeben / fieldOfView & Shane Bumpurs
# OctoPrintPlugin is released under the terms of the AGPLv3 or higher.
from UM.OutputDevice.OutputDevicePlugin import OutputDevicePlugin
from .RepetierOutputDevice import RepetierOutputDevice
from UM.Signal import Signal, signalemitter
from UM.Application import Application
from UM.Logger import Logger
from UM.Util import parseBool
from PyQt6.QtCore import QTimer
import time
import json
import re
import base64
import os.path
import ipaddress
from typing import Any, Callable, Dict, List, Optional, TYPE_CHECKING
if TYPE_CHECKING:
from cura.PrinterOutput.PrinterOutputModel import PrinterOutputModel
## This plugin handles the connection detection & creation of output device objects for Repetier-connected printers.
# Zero-Conf is used to detect printers, which are saved in a dict.
# If we discover an instance that has the same key as the active machine instance a connection is made.
@signalemitter
class RepetierOutputDevicePlugin(OutputDevicePlugin):
def __init__(self) -> None:
super().__init__()
self._zero_conf = None
self._browser = None
self._instances = {}
# Because the model needs to be created in the same thread as the QMLEngine, we use a signal.
self.addInstanceSignal.connect(self.addInstance)
self.removeInstanceSignal.connect(self.removeInstance)
Application.getInstance().globalContainerStackChanged.connect(self.reCheckConnections)
# Load custom instances from preferences
self._preferences = Application.getInstance().getPreferences()
self._preferences.addPreference("Repetier/manual_instances", "{}")
try:
self._manual_instances = json.loads(self._preferences.getValue("Repetier/manual_instances"))
except ValueError:
self._manual_instances = {}
if not isinstance(self._manual_instances, dict):
self._manual_instances = {}
self._name_regex = re.compile("Repetier instance (\".*\"\.|on )(.*)\.")
self._keep_alive_timer = QTimer()
self._keep_alive_timer.setInterval(2000)
self._keep_alive_timer.setSingleShot(True)
self._keep_alive_timer.timeout.connect(self._keepDiscoveryAlive)
addInstanceSignal = Signal()
removeInstanceSignal = Signal()
instanceListChanged = Signal()
## Start looking for devices on network.
def start(self) -> None:
self.startDiscovery()
def startDiscovery(self):
if self._browser:
self._browser.cancel()
self._browser = None
self._printers = {}
instance_keys = list(self._instances.keys())
for key in instance_keys:
self.removeInstance(key)
# Add manual instances from preference
for name, properties in self._manual_instances.items():
additional_properties = {
b"path": properties["path"].encode("utf-8"),
b"useHttps": b"true" if properties.get("useHttps", False) else b"false",
b'userName': properties.get("userName", "").encode("utf-8"),
b'password': properties.get("password", "").encode("utf-8"),
b'repetier_id': properties.get("repetier_id", "").encode("utf-8"),
b"manual": b"true"
} # These additional properties use bytearrays to mimick the output of zeroconf
self.addInstance(name, properties["address"], properties["port"], additional_properties)
self.instanceListChanged.emit()
def _keepDiscoveryAlive(self) -> None:
if not self._browser or not self._browser.is_alive():
Logger.log("w", "Zeroconf discovery has died, restarting discovery of Repetier instances.")
self.startDiscovery()
def addManualInstance(self, name: str, address: str, port: int, path: str, useHttps: bool = False, userName: str = "", password: str = "", repetierid: str = "")-> None:
self._manual_instances[name] = {"address": address, "port": port, "path": path, "useHttps": useHttps, "userName": userName, "password": password, "repetier_id":repetierid}
self._preferences.setValue("Repetier/manual_instances", json.dumps(self._manual_instances))
properties = { b"path": path.encode("utf-8"), b"useHttps": b"true" if useHttps else b"false", b'userName': userName.encode("utf-8"), b'password': password.encode("utf-8"), b"manual": b"true",b'repetier_id':repetierid.encode("utf-8")}
if name in self._instances:
self.removeInstance(name)
self.addInstance(name, address, port, properties)
self.instanceListChanged.emit()
def removeManualInstance(self, name: str) -> None:
if name in self._instances:
self.removeInstance(name)
self.instanceListChanged.emit()
if name in self._manual_instances:
self._manual_instances.pop(name, None)
self._preferences.setValue("Repetier/manual_instances", json.dumps(self._manual_instances))
## Stop looking for devices on network.
def stop(self) -> None:
self._keep_alive_timer.stop()
if self._browser:
self._browser.cancel()
self._browser = None # type: Optional[ServiceBrowser]
if self._zero_conf:
self._zero_conf.close()
def getInstances(self) -> Dict[str, Any]:
return self._instances
def getInstanceById(self, instance_id: str) -> Optional[RepetierOutputDevice]:
instance = self._instances.get(instance_id, None)
if instance:
return instance
Logger.log("w", "No instance found with id %s", instance_id)
return None
def reCheckConnections(self) -> None:
global_container_stack = Application.getInstance().getGlobalContainerStack()
if not global_container_stack:
return
for key in self._instances:
if key == global_container_stack.getMetaDataEntry("id"):
api_key = global_container_stack.getMetaDataEntry("repetier_api_key", "")
self._instances[key].setApiKey(api_key)
self._instances[key].setShowCamera(parseBool(global_container_stack.getMetaDataEntry("repetier_show_camera", "true")))
self._instances[key].connectionStateChanged.connect(self._onInstanceConnectionStateChanged)
self._instances[key].connect()
else:
if self._instances[key].isConnected():
self._instances[key].close()
## Because the model needs to be created in the same thread as the QMLEngine, we use a signal.
def addInstance(self, name: str, address: str, port: int, properties: Dict[bytes, bytes]) -> None:
instance = RepetierOutputDevice(name, address, port, properties)
self._instances[instance.getId()] = instance
global_container_stack = Application.getInstance().getGlobalContainerStack()
if global_container_stack and instance.getId() == global_container_stack.getMetaDataEntry("id"):
api_key = global_container_stack.getMetaDataEntry("repetier_api_key", "")
instance.setApiKey(api_key)
instance.setShowCamera(parseBool(global_container_stack.getMetaDataEntry("repetier_show_camera", "true")))
instance.connectionStateChanged.connect(self._onInstanceConnectionStateChanged)
instance.connect()
def removeInstance(self, name: str) -> None:
instance = self._instances.pop(name, None)
if instance:
if instance.isConnected():
instance.connectionStateChanged.disconnect(self._onInstanceConnectionStateChanged)
instance.disconnect()
## Handler for when the connection state of one of the detected instances changes
def _onInstanceConnectionStateChanged(self, key: str) -> None:
if key not in self._instances:
return
if self._instances[key].isConnected():
self.getOutputDeviceManager().addOutputDevice(self._instances[key])
else:
self.getOutputDeviceManager().removeOutputDevice(key)