Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 196 additions & 3 deletions custom_components/smartir/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import aiofiles
import aiohttp
import asyncio
from base64 import encodebytes
import binascii
from bisect import bisect
from distutils.version import StrictVersion
import io
from itertools import islice
import json
import logging
import os.path
Expand Down Expand Up @@ -71,7 +75,7 @@ async def _update(hass, branch, do_update=False, notify_if_latest=True):
async with aiohttp.ClientSession() as session:
async with session.get(MANIFEST_URL.format(branch)) as response:
if response.status == 200:

data = await response.json(content_type='text/plain')
min_ha_version = data['homeassistant']
last_version = data['updater']['version']
Expand All @@ -80,7 +84,7 @@ async def _update(hass, branch, do_update=False, notify_if_latest=True):
if StrictVersion(last_version) <= StrictVersion(VERSION):
if notify_if_latest:
hass.components.persistent_notification.async_create(
"You're already using the latest version!",
"You're already using the latest version!",
title='SmartIR')
return

Expand Down Expand Up @@ -168,4 +172,193 @@ def lirc2broadlink(pulses):
remainder = (len(packet) + 4) % 16
if remainder:
packet += bytearray(16 - remainder)
return packet
return packet

@staticmethod
def broadlink2tuya(data, compression_level = 2):
"""
Convert Broadlink remote codes into a format
that can be used in Tuya's IR Blasters (ZS06, ZS08, TS1201, UFO-R11).

Based on:
* @vills (https://gist.github.com/vills/590c154b377ac50acab079328e4ddaf9)
* @mildsunrise (https://gist.github.com/mildsunrise/1d576669b63a260d2cff35fda63ec0b5)
* @elupus (https://github.com/elupus/irgen)
(thank you!)
"""
def decode_broadlink(data):
"""Generate raw values from broadlink data."""
v = iter(data)
code = next(v)
next(v) # repeat

assert code == 0x26 # IR

length = int.from_bytes(islice(v, 2), byteorder="little")
assert length >= 3 # a At least trailer

def decode_iter(x):
while True:
try:
d = next(x)
except StopIteration:
return
if d == 0:
d = int.from_bytes(islice(x, 2), byteorder="big")

ms = int(round(d * 8192 / 269, 0))

# skip last time interval
if ms > 65535:
return

yield ms

yield from decode_iter(islice(v, length))

rem = list(v)
if any(rem):
_LOGGER.warning("Ignored extra data: %s", rem)

def encode_tuya(signal, compression_level):
"""
Encodes an IR signal
into an IR code string for a Tuya blaster.
"""

def compress(out: io.FileIO, data: bytes, level=2):
"""
Takes a byte string and outputs a compressed "Tuya stream".
Implemented compression levels:
0 - copy over (no compression, 3.1% overhead)
1 - eagerly use first length-distance pair found (linear)
2 - eagerly use best length-distance pair found
3 - optimal compression (n^3)
"""
def emit_literal_block(out: io.FileIO, data: bytes):
length = len(data) - 1
assert 0 <= length < (1 << 5)
out.write(bytes([length]))
out.write(data)

def emit_literal_blocks(out: io.FileIO, data: bytes):
for i in range(0, len(data), 32):
emit_literal_block(out, data[i : i + 32])

def emit_distance_block(out: io.FileIO, length: int, distance: int):
distance -= 1
assert 0 <= distance < (1 << 13)
length -= 2
assert length > 0
block = bytearray()
if length >= 7:
assert length - 7 < (1 << 8)
block.append(length - 7)
length = 7
block.insert(0, length << 5 | distance >> 8)
block.append(distance & 0xFF)
out.write(block)

if level == 0:
return emit_literal_blocks(out, data)

W = 2**13 # window size
L = 255 + 9 # maximum length

def distance_candidates():
return range(1, min(pos, W) + 1)

def find_length_for_distance(start: int) -> int:
length = 0
limit = min(L, len(data) - pos)
while length < limit and data[pos + length] == data[start + length]:
length += 1
return length

def find_length_candidates():
return ((find_length_for_distance(pos - d), d) for d in distance_candidates())

def find_length_cheap():
return next((c for c in find_length_candidates() if c[0] >= 3), None)

def find_length_max():
return max(find_length_candidates(), key=lambda c: (c[0], -c[1]), default=None)

if level >= 2:
suffixes = []
next_pos = 0

def key(n):
return data[n:]

def find_idx(n):
return bisect(suffixes, key(n), key=key)

def distance_candidates():
nonlocal next_pos
while next_pos <= pos:
if len(suffixes) == W:
suffixes.pop(find_idx(next_pos - W))
suffixes.insert(idx := find_idx(next_pos), next_pos)
next_pos += 1
idxs = (idx + i for i in (+1, -1)) # try +1 first
return (pos - suffixes[i] for i in idxs if 0 <= i < len(suffixes))

if level <= 2:
find_length = {1: find_length_cheap, 2: find_length_max}[level]
block_start = pos = 0
while pos < len(data):
if (c := find_length()) and c[0] >= 3:
emit_literal_blocks(out, data[block_start:pos])
emit_distance_block(out, c[0], c[1])
pos += c[0]
block_start = pos
else:
pos += 1
emit_literal_blocks(out, data[block_start:pos])
return

# use topological sort to find shortest path
predecessors = [(0, None, None)] + [None] * len(data)

def put_edge(cost, length, distance):
npos = pos + length
cost += predecessors[pos][0]
current = predecessors[npos]
if not current or cost < current[0]:
predecessors[npos] = cost, length, distance

for pos in range(len(data)):
if c := find_length_max():
for length in range(3, c[0] + 1):
put_edge(2 if length < 9 else 3, length, c[1])
for bit_length in range(1, min(32, len(data) - pos) + 1):
put_edge(1 + bit_length, bit_length, 0)

# reconstruct path, emit blocks
blocks = []
pos = len(data)
while pos > 0:
_, length, distance = predecessors[pos]
pos -= length
blocks.append((pos, length, distance))
for pos, length, distance in reversed(blocks):
if not distance:
emit_literal_block(out, data[pos : pos + length])
else:
emit_distance_block(out, length, distance)

payload = b"".join(struct.pack("<H", t) for t in signal)
compress(out := io.BytesIO(), payload, compression_level)
payload = out.getvalue()
return encodebytes(payload).decode("ascii").replace("\n", "")

raw_data = list(decode_broadlink(data))

_LOGGER.info("Raw data: %s", raw_data)

tuya_data = encode_tuya(raw_data, compression_level=compression_level)

_LOGGER.info("Tuya code: %s", tuya_data)

return tuya_data
6 changes: 3 additions & 3 deletions custom_components/smartir/climate.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.restore_state import RestoreEntity
from . import COMPONENT_ABS_DIR, Helper
from .controller import get_controller
from .controller import get_controller, cv_controller_data

_LOGGER = logging.getLogger(__name__)

Expand All @@ -44,7 +44,7 @@
vol.Optional(CONF_UNIQUE_ID): cv.string,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Required(CONF_DEVICE_CODE): cv.positive_int,
vol.Required(CONF_CONTROLLER_DATA): cv.string,
vol.Required(CONF_CONTROLLER_DATA): cv_controller_data,
vol.Optional(CONF_DELAY, default=DEFAULT_DELAY): cv.positive_float,
vol.Optional(CONF_TEMPERATURE_SENSOR): cv.entity_id,
vol.Optional(CONF_HUMIDITY_SENSOR): cv.entity_id,
Expand Down Expand Up @@ -451,4 +451,4 @@ def _async_update_humidity(self, state):
if state.state != STATE_UNKNOWN and state.state != STATE_UNAVAILABLE:
self._current_humidity = float(state.state)
except ValueError as ex:
_LOGGER.error("Unable to update from humidity sensor: %s", ex)
_LOGGER.error("Unable to update from humidity sensor: %s", ex)
Loading