diff --git a/pyW215/pyW215.py b/pyW215/pyW215.py index 8566e1e..2ef0849 100644 --- a/pyW215/pyW215.py +++ b/pyW215/pyW215.py @@ -1,4 +1,3 @@ - try: from urllib.request import Request, urlopen from urllib.error import URLError, HTTPError @@ -13,10 +12,22 @@ _LOGGER = logging.getLogger(__name__) - ON = 'ON' OFF = 'OFF' +W215_A2_DEV_ID = { + 'SWITCH': 1, + 'THERMAL': 2, + 'POWERMETER': 3 +} + +DEFAULT_DEV_ID = { + 'SWITCH': 1, + 'POWERMETER': 2, + 'THERMAL': 3 +} + + class SmartPlug(object): """ Class to access: @@ -38,8 +49,8 @@ class SmartPlug(object): Class layout is inspired by @rkabadi (https://github.com/rkabadi) for the Edimax Smart plug. """ - def __init__(self, ip, password, user = "admin", - use_legacy_protocol = False): + def __init__(self, ip, password, user="admin", + use_legacy_protocol=False): """ Create a new SmartPlug instance identified by the given URL and password. @@ -57,6 +68,19 @@ def __init__(self, ip, password, user = "admin", if self.use_legacy_protocol: _LOGGER.info("Enabled support for legacy firmware.") self._error_report = False + self.dev_id = DEFAULT_DEV_ID + if self.use_legacy_protocol: + try: + hw_ver = self.fetchMyCgi()['HW Ver'] + # change dev map for HW A2 + if hw_ver == 'A2': + self.dev_id = W215_A2_DEV_ID + # this assume the model number is W215, might be wrong + self.model_name = 'W215_' + hw_ver + except: + self.model_name = 'Unknown' + else: + self.model_name = self.SOAPAction("", "ModelName", "") def moduleParameters(self, module): """Returns moduleID XML. @@ -76,7 +100,7 @@ def controlParameters(self, module, status): :param status: The state to set (i.e. true (on) or false (off)) :return XML string to join with payload """ - if self.use_legacy_protocol : + if self.use_legacy_protocol: return '''{}Socket 1Socket 1 {}1'''.format(self.moduleParameters(module), status) else: @@ -91,7 +115,6 @@ def radioParameters(self, radio): """ return '''{}'''.format(radio) - def requestBody(self, Action, params): """Returns the request payload for an action as XML>. @@ -111,7 +134,7 @@ def requestBody(self, Action, params): '''.format(Action, params, Action) - def SOAPAction(self, Action, responseElement, params = ""): + def SOAPAction(self, Action, responseElement, params=""): """Generate the SOAP action call. :type Action: str @@ -130,15 +153,15 @@ def SOAPAction(self, Action, responseElement, params = ""): payload = self.requestBody(Action, params) # Timestamp in microseconds - time_stamp = str(round(time.time()/1e6)) + time_stamp = str(round(time.time() / 1e6)) action_url = '"http://purenetworks.com/HNAP1/{}"'.format(Action) - AUTHKey = hmac.new(auth[0].encode(), (time_stamp+action_url).encode()).hexdigest().upper() + " " + time_stamp + AUTHKey = hmac.new(auth[0].encode(), (time_stamp + action_url).encode()).hexdigest().upper() + " " + time_stamp - headers = {'Content-Type' : '"text/xml; charset=utf-8"', + headers = {'Content-Type': '"text/xml; charset=utf-8"', 'SOAPAction': '"http://purenetworks.com/HNAP1/{}"'.format(Action), - 'HNAP_AUTH' : '{}'.format(AUTHKey), - 'Cookie' : 'uid={}'.format(auth[1])} + 'HNAP_AUTH': '{}'.format(AUTHKey), + 'Cookie': 'uid={}'.format(auth[1])} try: response = urlopen(Request(self.url, payload.encode(), headers)) @@ -152,7 +175,7 @@ def SOAPAction(self, Action, responseElement, params = ""): # Get value from device try: - value = root.find('.//{http://purenetworks.com/HNAP1/}%s' % (responseElement)).text + value = root.find('.//{http://purenetworks.com/HNAP1/}%s' % responseElement).text except AttributeError: _LOGGER.warning("Unable to find %s in response." % responseElement) return None @@ -168,7 +191,7 @@ def SOAPAction(self, Action, responseElement, params = ""): def fetchMyCgi(self): """Fetches statistics from my_cgi.cgi""" try: - response = urlopen(Request('http://{}/my_cgi.cgi'.format(self.ip), b'request=create_chklst')); + response = urlopen(Request('http://{}/my_cgi.cgi'.format(self.ip), b'request=create_chklst')) except (HTTPError, URLError): _LOGGER.warning("Failed to open url to {}".format(self.ip)) self._error_report = True @@ -180,58 +203,37 @@ def fetchMyCgi(self): @property def current_consumption(self): """Get the current power consumption in Watt.""" - res = 'N/A' - if self.use_legacy_protocol: - # Use /my_cgi.cgi to retrieve current consumption - try: - res = self.fetchMyCgi()['Meter Watt'] - except: - return 'N/A' - else: - try: - res = self.SOAPAction('GetCurrentPowerConsumption', 'CurrentConsumption', self.moduleParameters("2")) - except: - return 'N/A' - - if res is None: + try: + res = self.SOAPAction('GetCurrentPowerConsumption', 'CurrentConsumption', + self.moduleParameters(self.dev_id['POWERMETER'])) + except: return 'N/A' - try: res = float(res) except ValueError: _LOGGER.error("Failed to retrieve current power consumption from SmartPlug") - return res @property def total_consumption(self): """Get the total power consumpuntion in the device lifetime.""" - if self.use_legacy_protocol: - # TotalConsumption currently fails on the legacy protocol and - # creates a mess in the logs. Just return 'N/A' for now. - return 'N/A' - - res = 'N/A' try: - res = self.SOAPAction("GetPMWarningThreshold", "TotalConsumption", self.moduleParameters("2")) + res = self.SOAPAction("GetPMWarningThreshold", "TotalConsumption", + self.moduleParameters(self.dev_id['POWERMETER'])) except: return 'N/A' - - if res is None: - return 'N/A' - try: float(res) except ValueError: _LOGGER.error("Failed to retrieve total power consumption from SmartPlug") - return res @property def temperature(self): """Get the device temperature in celsius.""" try: - res = self.SOAPAction('GetCurrentTemperature', 'CurrentTemperature', self.moduleParameters("3")) + res = self.SOAPAction('GetCurrentTemperature', 'CurrentTemperature', + self.moduleParameters(self.dev_id['THERMAL'])) except: res = 'N/A' @@ -240,7 +242,7 @@ def temperature(self): @property def state(self): """Get the device state (i.e. ON or OFF).""" - response = self.SOAPAction('GetSocketSettings', 'OPStatus', self.moduleParameters("1")) + response = self.SOAPAction('GetSocketSettings', 'OPStatus', self.moduleParameters(self.dev_id['SWITCH'])) if response is None: return 'unknown' elif response.lower() == 'true': @@ -259,9 +261,11 @@ def state(self, value): :param value: Future state (either ON or OFF) """ if value.upper() == ON: - return self.SOAPAction('SetSocketSettings', 'SetSocketSettingsResult', self.controlParameters("1", "true")) + return self.SOAPAction('SetSocketSettings', 'SetSocketSettingsResult', + self.controlParameters(self.dev_id['SWITCH'], "true")) elif value.upper() == OFF: - return self.SOAPAction('SetSocketSettings', 'SetSocketSettingsResult', self.controlParameters("1", "false")) + return self.SOAPAction('SetSocketSettings', 'SetSocketSettingsResult', + self.controlParameters(self.dev_id['SWITCH'], "false")) else: raise TypeError("State %s is not valid." % str(value)) @@ -283,8 +287,8 @@ def auth(self): payload = self.initial_auth_payload() # Build initial header - headers = {'Content-Type' : '"text/xml; charset=utf-8"', - 'SOAPAction': '"http://purenetworks.com/HNAP1/Login"'} + headers = {'Content-Type': '"text/xml; charset=utf-8"', + 'SOAPAction': '"http://purenetworks.com/HNAP1/Login"'} # Request privatekey, cookie and challenge try: @@ -302,21 +306,21 @@ def auth(self): Cookie = root.find('.//{http://purenetworks.com/HNAP1/}Cookie').text Publickey = root.find('.//{http://purenetworks.com/HNAP1/}PublicKey').text - if (Challenge == None or Cookie == None or Publickey == None) and self._error_report is False: + if (Challenge is None or Cookie is None or Publickey is None) and self._error_report is False: _LOGGER.warning("Failed to receive initial authentication from smartplug.") self._error_report = True return None # Generate hash responses - PrivateKey = hmac.new((Publickey+self.password).encode(), (Challenge).encode()).hexdigest().upper() + PrivateKey = hmac.new((Publickey + self.password).encode(), Challenge.encode()).hexdigest().upper() login_pwd = hmac.new(PrivateKey.encode(), Challenge.encode()).hexdigest().upper() response_payload = self.auth_payload(login_pwd) # Build response to initial request - headers = {'Content-Type' : '"text/xml; charset=utf-8"', - 'SOAPAction': '"http://purenetworks.com/HNAP1/Login"', - 'HNAP_AUTH' : '"{}"'.format(PrivateKey), - 'Cookie' : 'uid={}'.format(Cookie)} + headers = {'Content-Type': '"text/xml; charset=utf-8"', + 'SOAPAction': '"http://purenetworks.com/HNAP1/Login"', + 'HNAP_AUTH': '"{}"'.format(PrivateKey), + 'Cookie': 'uid={}'.format(Cookie)} response = urlopen(Request(self.url, response_payload, headers)) xmlData = response.read().decode() root = ET.fromstring(xmlData) @@ -329,8 +333,8 @@ def auth(self): self._error_report = True return None - self._error_report = False # Reset error logging - return (PrivateKey, Cookie) + self._error_report = False # Reset error logging + return PrivateKey, Cookie def initial_auth_payload(self): """Return the initial authentication payload."""