diff --git a/.github/workflows/code_quality.yaml b/.github/workflows/code_quality.yaml new file mode 100644 index 0000000..0151a12 --- /dev/null +++ b/.github/workflows/code_quality.yaml @@ -0,0 +1,42 @@ +name: Code Quality + +on: + pull_request: + branches: + - master + push: + +jobs: + code_quality: + name: ${{ matrix.name }} + runs-on: ubuntu-latest + strategy: + matrix: + include: + - id: black + name: Check code with black + - id: isort + name: Check code with isort + - id: pylint + name: Check code with pylint + - id: mypy + name: Check code with mypy + steps: + - name: Checkout the repository + uses: actions/checkout@v4 + + - name: Set up Python 3 + uses: actions/setup-python@v5 + id: python + with: + python-version: "3.11" + + - name: Install workflow dependencies + run: | + pip install -r .github/workflows/requirements.txt + + - name: Install Python dependencies + run: poetry install --no-interaction + + - name: Run ${{ matrix.id }} checks + run: poetry run ${{ matrix.id }} src \ No newline at end of file diff --git a/.github/workflows/codeql.yaml b/.github/workflows/codeql.yaml new file mode 100644 index 0000000..e1d329e --- /dev/null +++ b/.github/workflows/codeql.yaml @@ -0,0 +1,36 @@ +name: "CodeQL" + +on: + push: + branches: [master] + pull_request: + branches: [master] + +jobs: + analyze: + name: Analyze + runs-on: ubuntu-latest + permissions: + actions: read + contents: read + security-events: write + + strategy: + fail-fast: false + matrix: + language: ["python"] + + steps: + - name: Checkout repository + uses: actions/checkout@v5 + + - name: Initialize CodeQL + uses: github/codeql-action/init@v4 + with: + languages: ${{ matrix.language }} + + - name: Autobuild + uses: github/codeql-action/autobuild@v4 + + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v4 \ No newline at end of file diff --git a/.github/workflows/pylint.yml b/.github/workflows/pylint.yml deleted file mode 100644 index c73e032..0000000 --- a/.github/workflows/pylint.yml +++ /dev/null @@ -1,23 +0,0 @@ -name: Pylint - -on: [push] - -jobs: - build: - runs-on: ubuntu-latest - strategy: - matrix: - python-version: ["3.8", "3.9", "3.10"] - steps: - - uses: actions/checkout@v4 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v3 - with: - python-version: ${{ matrix.python-version }} - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install pylint - - name: Analysing the code with pylint - run: | - pylint $(git ls-files '*.py') diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml new file mode 100644 index 0000000..f60e63c --- /dev/null +++ b/.github/workflows/release.yaml @@ -0,0 +1,43 @@ +name: Publish release + +on: + release: + types: [published] + +jobs: + build-and-publish-pypi: + name: Builds and publishes release to PyPI + runs-on: ubuntu-latest + outputs: + version: ${{ steps.vars.outputs.tag }} + steps: + - uses: actions/checkout@v5 + + - name: Set up Python 3.11 + uses: actions/setup-python@v6 + with: + python-version: "3.11" + + - name: Install workflow dependencies + run: | + pip install -r .github/workflows/requirements.txt + + - name: Install dependencies + run: poetry install --no-interaction + + - name: Set package version + run: | + version="${{ github.event.release.tag_name }}" + version="${version,,}" + version="${version#v}" + poetry version --no-interaction "${version}" + + - name: Build package + run: poetry build --no-interaction + + - name: Publish to PyPi + env: + PYPI_TOKEN: ${{ secrets.PYPI_TOKEN }} + run: | + poetry config pypi-token.pypi "${PYPI_TOKEN}" + poetry publish --no-interaction \ No newline at end of file diff --git a/.github/workflows/requirements.txt b/.github/workflows/requirements.txt new file mode 100644 index 0000000..f2b9c88 --- /dev/null +++ b/.github/workflows/requirements.txt @@ -0,0 +1,2 @@ +pip>=23.3 +poetry==1.5.1 \ No newline at end of file diff --git a/README.md b/README.md index 8d90d0e..c574d6f 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,15 @@ +[![GitHub Latest Release][releases_shield]][latest_release] +[![PyPI][pypi_releases_shield]][pypi_latest_release] +[![PyPI - Downloads][pypi_downloads_shield]][pypi_downloads] + +[latest_release]: https://github.com/maksp86/Python-package-vacuum-map-parser-ijai/releases/latest +[releases_shield]: https://img.shields.io/github/v/release/maksp86/Python-package-vacuum-map-parser-ijai + +[pypi_latest_release]: https://pypi.org/project/vacuum-map-parser-ijai/ +[pypi_releases_shield]: https://img.shields.io/pypi/v/vacuum-map-parser-ijai + +[pypi_downloads]: https://pepy.tech/project/vacuum-map-parser-ijai +[pypi_downloads_shield]: https://static.pepy.tech/badge/vacuum-map-parser-ijai # Vacuum map parser - Ijai @@ -36,6 +48,13 @@ unpacked_map = parser.unpack_map(raw_map, device_mac='**:**:**:**:**:**') parsed_map = parser.parse(unpacked_map) ``` + +## Supported vacuums: +- ijai.vacuum.* (at least v1, v2, v3, v10, v13, v18, v19) +- xiaomi.vacuum.c103 +- xiaomi.vacuum.c104 +- xiaomi.vacuum.b106eu +*If you got another vacuum to work, please tell us* ## Special thanks diff --git a/pyproject.toml b/pyproject.toml index e8252c8..cefc17e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ version = "0.0.0" license = "Apache-2.0" description = "Functionalities for Ijai vacuum map parsing" readme = "README.md" -authors = ["Alexander Vassilyevsky "] +authors = ["maksp86 ", "Alexander Vassilyevsky "] classifiers = [ "Development Status :: 4 - Beta", "Intended Audience :: Developers", @@ -13,22 +13,22 @@ classifiers = [ "Programming Language :: Python :: 3.11", "Topic :: Home Automation", ] -packages = [ - { include = "vacuum_map_parser_ijai", from = "src" }, -] +packages = [{ include = "vacuum_map_parser_ijai", from = "src" }] [tool.poetry.urls] -"Homepage" = "https://github.com/Tarh-76/Python-package-vacuum-map-parser-ijai" -"Repository" = "https://github.com/Tarh-76/Python-package-vacuum-map-parser-ijai" -"Bug Tracker" = "https://github.com/Tarh-76/Python-package-vacuum-map-parser-ijai/issues" -"Changelog" = "https://github.com/Tarh-76/Python-package-vacuum-map-parser-ijai/releases" +"Homepage" = "https://github.com/maksp86/Python-package-vacuum-map-parser-ijai" +"Repository" = "https://github.com/maksp86/Python-package-vacuum-map-parser-ijai" +"Bug Tracker" = "https://github.com/maksp86/Python-package-vacuum-map-parser-ijai/issues" +"Changelog" = "https://github.com/maksp86/Python-package-vacuum-map-parser-ijai/releases" [tool.poetry.dependencies] python = "^3.11" Pillow = "*" -vacuum-map-parser-base = "0.1.2" +pycryptodome = "*" + +vacuum-map-parser-base = ">=0.1.5" -[tool.poetry.dev-dependencies] +[tool.poetry.group.dev.dependencies] black = "*" mypy = "*" ruff = "*" @@ -45,6 +45,12 @@ line_length = 120 [tool.mypy] platform = "linux" +exclude = '''(?x) + ( ^.*RobotMap_pb2\.py$ + | ^.*RobotMap_pb2\.pyi$ + | ^.*beautify_min\.py$ + ) +''' check_untyped_defs = true disallow_any_generics = true @@ -64,7 +70,18 @@ warn_unused_configs = true warn_unused_ignores = true [tool.pylint] -disable = ["C0103", "C0116", "R0902", "R0903", "R0912", "R0913", "R0914", "R0915", "W0640"] +disable = [ + "C0103", + "C0116", + "R0902", + "R0903", + "R0912", + "R0913", + "R0914", + "R0915", + "W0640", + "R0917", +] max-line-length = 120 [build-system] diff --git a/src/vacuum_map_parser_ijai/RobotMap.proto b/src/vacuum_map_parser_ijai/RobotMap.proto new file mode 100644 index 0000000..2ead1d9 --- /dev/null +++ b/src/vacuum_map_parser_ijai/RobotMap.proto @@ -0,0 +1,159 @@ +syntax = "proto3"; +message RobotMap { + + message HouseInfo { + uint32 id = 1; + string name = 2; + uint32 curMapCount = 3; + uint32 maxMapSize = 4; + repeated AllMapInfo maps = 5; + } + message FurnitureDataInfo { + uint32 id = 1; + uint32 typeId = 2; + repeated DevicePointInfo points = 3; + string url = 4; + uint32 status = 5; + repeated DevicePointInfo react = 6; + } + message ObjectDataInfo { + uint32 objectId = 1; + uint32 objectTypeId = 2; + string objectName = 3; + uint32 confirm = 4; + float x = 5; + float y = 6; + string url = 7; + uint32 notShow = 8; + } + message DeviceChainPointDataInfo { + uint32 x = 1; + uint32 y = 2; + uint32 value = 3; + } + message DeviceRoomChainDataInfo { + uint32 roomId = 1; + repeated DeviceChainPointDataInfo points = 2; + } + message DeviceRoomMatrix { + bytes matrix = 1; + } + message CleanPerferenceDataInfo { + uint32 cleanMode = 1; + uint32 waterLevel = 2; + uint32 windPower = 3; + uint32 twiceClean = 4; + } + message RoomDataInfo { + uint32 roomId = 1; + string roomName = 2; + uint32 roomTypeId = 3; + uint32 meterialId = 4; + uint32 cleanState = 5; + uint32 roomClean = 6; + uint32 roomCleanIndex = 7; + DevicePointInfo roomNamePost = 8; + CleanPerferenceDataInfo cleanPerfer = 9; + uint32 colorId = 10; + } + message DeviceNavigationPointDataInfo { + uint32 pointId = 1; + uint32 status = 2; + uint32 pointType = 3; + float x = 4; + float y = 5; + float phi = 6; + } + message DevicePointInfo { + float x = 1; + float y = 2; + } + message DeviceAreaDataInfo { + uint32 status = 1; + uint32 type = 2; + uint32 areaIndex = 3; + repeated DevicePointInfo points = 4; + } + message DeviceCurrentPoseInfo { + uint32 poseId = 1; + uint32 update = 2; + float x = 3; + float y = 4; + float phi = 5; + } + message DevicePoseDataInfo { + float x = 1; + float y = 2; + float phi = 3; + uint32 roomId = 4; + } + message DeviceCoverPointDataInfo { + uint32 update = 1; + float x = 2; + float y = 3; + } + message DeviceHistoryPoseInfo { + uint32 poseId = 1; + repeated DeviceCoverPointDataInfo points = 2; + uint32 pathType = 3; + } + message AllMapInfo { + uint32 mapHeadId = 1; + string mapName = 2; + } + message MapDataInfo { + bytes mapData = 1; + } + message MapHeadInfo { + uint32 mapHeadId = 1; + uint32 sizeX = 2; + uint32 sizeY = 3; + float minX = 4; + float minY = 5; + float maxX = 6; + float maxY = 7; + float resolution = 8; + } + message CarpetOffsetInfo { + float phi = 1; + float dist = 2; + } + message MapBoundaryInfo { + string mapMd5 = 1; + uint32 vMinX = 2; + uint32 vMaxX = 3; + uint32 vMinY = 4; + uint32 vMaxY = 5; + } + message MapExtInfo { + uint32 taskBeginDate = 1; + uint32 mapUploadDate = 2; + uint32 mapValid = 3; + uint32 radian = 4; + uint32 force = 5; + uint32 cleanPath = 6; + MapBoundaryInfo boudaryInfo = 7; + uint32 mapVersion = 8; + uint32 mapValueType = 9; + CarpetOffsetInfo carpetOffsetInfo = 10; + } + + uint32 mapType = 1; + MapExtInfo mapExtInfo = 2; + MapHeadInfo mapHead = 3; + MapDataInfo mapData = 4; + repeated AllMapInfo mapInfo = 5; + DeviceHistoryPoseInfo historyPose = 6; + DevicePoseDataInfo chargeStation = 7; + DeviceCurrentPoseInfo currentPose = 8; + repeated DeviceAreaDataInfo virtualWalls = 9; + repeated DeviceAreaDataInfo areasInfo = 10; + repeated DeviceNavigationPointDataInfo navigationPoints = 11; + repeated RoomDataInfo roomDataInfo = 12; + DeviceRoomMatrix roomMatrix = 13; + repeated DeviceRoomChainDataInfo roomChain = 14; + repeated ObjectDataInfo objects = 15; + repeated FurnitureDataInfo furnitureInfo = 16; + repeated HouseInfo houseInfos = 17; + repeated DeviceAreaDataInfo backupAreas = 18; +} \ No newline at end of file diff --git a/src/vacuum_map_parser_ijai/RobotMap_pb2.py b/src/vacuum_map_parser_ijai/RobotMap_pb2.py new file mode 100644 index 0000000..e216713 --- /dev/null +++ b/src/vacuum_map_parser_ijai/RobotMap_pb2.py @@ -0,0 +1,70 @@ +# pylint: skip-file +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: RobotMap.proto +# Protobuf Python Version: 5.26.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder + +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0eRobotMap.proto\"\xa8\x18\n\x08RobotMap\x12\x0f\n\x07mapType\x18\x01 \x01(\r\x12(\n\nmapExtInfo\x18\x02 \x01(\x0b\x32\x14.RobotMap.MapExtInfo\x12&\n\x07mapHead\x18\x03 \x01(\x0b\x32\x15.RobotMap.MapHeadInfo\x12&\n\x07mapData\x18\x04 \x01(\x0b\x32\x15.RobotMap.MapDataInfo\x12%\n\x07mapInfo\x18\x05 \x03(\x0b\x32\x14.RobotMap.AllMapInfo\x12\x34\n\x0bhistoryPose\x18\x06 \x01(\x0b\x32\x1f.RobotMap.DeviceHistoryPoseInfo\x12\x33\n\rchargeStation\x18\x07 \x01(\x0b\x32\x1c.RobotMap.DevicePoseDataInfo\x12\x34\n\x0b\x63urrentPose\x18\x08 \x01(\x0b\x32\x1f.RobotMap.DeviceCurrentPoseInfo\x12\x32\n\x0cvirtualWalls\x18\t \x03(\x0b\x32\x1c.RobotMap.DeviceAreaDataInfo\x12/\n\tareasInfo\x18\n \x03(\x0b\x32\x1c.RobotMap.DeviceAreaDataInfo\x12\x41\n\x10navigationPoints\x18\x0b \x03(\x0b\x32\'.RobotMap.DeviceNavigationPointDataInfo\x12,\n\x0croomDataInfo\x18\x0c \x03(\x0b\x32\x16.RobotMap.RoomDataInfo\x12.\n\nroomMatrix\x18\r \x01(\x0b\x32\x1a.RobotMap.DeviceRoomMatrix\x12\x34\n\troomChain\x18\x0e \x03(\x0b\x32!.RobotMap.DeviceRoomChainDataInfo\x12)\n\x07objects\x18\x0f \x03(\x0b\x32\x18.RobotMap.ObjectDataInfo\x12\x32\n\rfurnitureInfo\x18\x10 \x03(\x0b\x32\x1b.RobotMap.FurnitureDataInfo\x12\'\n\nhouseInfos\x18\x11 \x03(\x0b\x32\x13.RobotMap.HouseInfo\x12\x31\n\x0b\x62\x61\x63kupAreas\x18\x12 \x03(\x0b\x32\x1c.RobotMap.DeviceAreaDataInfo\x1ar\n\tHouseInfo\x12\n\n\x02id\x18\x01 \x01(\r\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x13\n\x0b\x63urMapCount\x18\x03 \x01(\r\x12\x12\n\nmaxMapSize\x18\x04 \x01(\r\x12\"\n\x04maps\x18\x05 \x03(\x0b\x32\x14.RobotMap.AllMapInfo\x1a\xa1\x01\n\x11\x46urnitureDataInfo\x12\n\n\x02id\x18\x01 \x01(\r\x12\x0e\n\x06typeId\x18\x02 \x01(\r\x12)\n\x06points\x18\x03 \x03(\x0b\x32\x19.RobotMap.DevicePointInfo\x12\x0b\n\x03url\x18\x04 \x01(\t\x12\x0e\n\x06status\x18\x05 \x01(\r\x12(\n\x05react\x18\x06 \x03(\x0b\x32\x19.RobotMap.DevicePointInfo\x1a\x91\x01\n\x0eObjectDataInfo\x12\x10\n\x08objectId\x18\x01 \x01(\r\x12\x14\n\x0cobjectTypeId\x18\x02 \x01(\r\x12\x12\n\nobjectName\x18\x03 \x01(\t\x12\x0f\n\x07\x63onfirm\x18\x04 \x01(\r\x12\t\n\x01x\x18\x05 \x01(\x02\x12\t\n\x01y\x18\x06 \x01(\x02\x12\x0b\n\x03url\x18\x07 \x01(\t\x12\x0f\n\x07notShow\x18\x08 \x01(\r\x1a?\n\x18\x44\x65viceChainPointDataInfo\x12\t\n\x01x\x18\x01 \x01(\r\x12\t\n\x01y\x18\x02 \x01(\r\x12\r\n\x05value\x18\x03 \x01(\r\x1a]\n\x17\x44\x65viceRoomChainDataInfo\x12\x0e\n\x06roomId\x18\x01 \x01(\r\x12\x32\n\x06points\x18\x02 \x03(\x0b\x32\".RobotMap.DeviceChainPointDataInfo\x1a\"\n\x10\x44\x65viceRoomMatrix\x12\x0e\n\x06matrix\x18\x01 \x01(\x0c\x1ag\n\x17\x43leanPerferenceDataInfo\x12\x11\n\tcleanMode\x18\x01 \x01(\r\x12\x12\n\nwaterLevel\x18\x02 \x01(\r\x12\x11\n\twindPower\x18\x03 \x01(\r\x12\x12\n\ntwiceClean\x18\x04 \x01(\r\x1a\x91\x02\n\x0cRoomDataInfo\x12\x0e\n\x06roomId\x18\x01 \x01(\r\x12\x10\n\x08roomName\x18\x02 \x01(\t\x12\x12\n\nroomTypeId\x18\x03 \x01(\r\x12\x12\n\nmeterialId\x18\x04 \x01(\r\x12\x12\n\ncleanState\x18\x05 \x01(\r\x12\x11\n\troomClean\x18\x06 \x01(\r\x12\x16\n\x0eroomCleanIndex\x18\x07 \x01(\r\x12/\n\x0croomNamePost\x18\x08 \x01(\x0b\x32\x19.RobotMap.DevicePointInfo\x12\x36\n\x0b\x63leanPerfer\x18\t \x01(\x0b\x32!.RobotMap.CleanPerferenceDataInfo\x12\x0f\n\x07\x63olorId\x18\n \x01(\r\x1av\n\x1d\x44\x65viceNavigationPointDataInfo\x12\x0f\n\x07pointId\x18\x01 \x01(\r\x12\x0e\n\x06status\x18\x02 \x01(\r\x12\x11\n\tpointType\x18\x03 \x01(\r\x12\t\n\x01x\x18\x04 \x01(\x02\x12\t\n\x01y\x18\x05 \x01(\x02\x12\x0b\n\x03phi\x18\x06 \x01(\x02\x1a\'\n\x0f\x44\x65vicePointInfo\x12\t\n\x01x\x18\x01 \x01(\x02\x12\t\n\x01y\x18\x02 \x01(\x02\x1ap\n\x12\x44\x65viceAreaDataInfo\x12\x0e\n\x06status\x18\x01 \x01(\r\x12\x0c\n\x04type\x18\x02 \x01(\r\x12\x11\n\tareaIndex\x18\x03 \x01(\r\x12)\n\x06points\x18\x04 \x03(\x0b\x32\x19.RobotMap.DevicePointInfo\x1aZ\n\x15\x44\x65viceCurrentPoseInfo\x12\x0e\n\x06poseId\x18\x01 \x01(\r\x12\x0e\n\x06update\x18\x02 \x01(\r\x12\t\n\x01x\x18\x03 \x01(\x02\x12\t\n\x01y\x18\x04 \x01(\x02\x12\x0b\n\x03phi\x18\x05 \x01(\x02\x1aG\n\x12\x44\x65vicePoseDataInfo\x12\t\n\x01x\x18\x01 \x01(\x02\x12\t\n\x01y\x18\x02 \x01(\x02\x12\x0b\n\x03phi\x18\x03 \x01(\x02\x12\x0e\n\x06roomId\x18\x04 \x01(\r\x1a@\n\x18\x44\x65viceCoverPointDataInfo\x12\x0e\n\x06update\x18\x01 \x01(\r\x12\t\n\x01x\x18\x02 \x01(\x02\x12\t\n\x01y\x18\x03 \x01(\x02\x1am\n\x15\x44\x65viceHistoryPoseInfo\x12\x0e\n\x06poseId\x18\x01 \x01(\r\x12\x32\n\x06points\x18\x02 \x03(\x0b\x32\".RobotMap.DeviceCoverPointDataInfo\x12\x10\n\x08pathType\x18\x03 \x01(\r\x1a\x30\n\nAllMapInfo\x12\x11\n\tmapHeadId\x18\x01 \x01(\r\x12\x0f\n\x07mapName\x18\x02 \x01(\t\x1a\x1e\n\x0bMapDataInfo\x12\x0f\n\x07mapData\x18\x01 \x01(\x0c\x1a\x8a\x01\n\x0bMapHeadInfo\x12\x11\n\tmapHeadId\x18\x01 \x01(\r\x12\r\n\x05sizeX\x18\x02 \x01(\r\x12\r\n\x05sizeY\x18\x03 \x01(\r\x12\x0c\n\x04minX\x18\x04 \x01(\x02\x12\x0c\n\x04minY\x18\x05 \x01(\x02\x12\x0c\n\x04maxX\x18\x06 \x01(\x02\x12\x0c\n\x04maxY\x18\x07 \x01(\x02\x12\x12\n\nresolution\x18\x08 \x01(\x02\x1a-\n\x10\x43\x61rpetOffsetInfo\x12\x0b\n\x03phi\x18\x01 \x01(\x02\x12\x0c\n\x04\x64ist\x18\x02 \x01(\x02\x1a]\n\x0fMapBoundaryInfo\x12\x0e\n\x06mapMd5\x18\x01 \x01(\t\x12\r\n\x05vMinX\x18\x02 \x01(\r\x12\r\n\x05vMaxX\x18\x03 \x01(\r\x12\r\n\x05vMinY\x18\x04 \x01(\r\x12\r\n\x05vMaxY\x18\x05 \x01(\r\x1a\x8e\x02\n\nMapExtInfo\x12\x15\n\rtaskBeginDate\x18\x01 \x01(\r\x12\x15\n\rmapUploadDate\x18\x02 \x01(\r\x12\x10\n\x08mapValid\x18\x03 \x01(\r\x12\x0e\n\x06radian\x18\x04 \x01(\r\x12\r\n\x05\x66orce\x18\x05 \x01(\r\x12\x11\n\tcleanPath\x18\x06 \x01(\r\x12.\n\x0b\x62oudaryInfo\x18\x07 \x01(\x0b\x32\x19.RobotMap.MapBoundaryInfo\x12\x12\n\nmapVersion\x18\x08 \x01(\r\x12\x14\n\x0cmapValueType\x18\t \x01(\r\x12\x34\n\x10\x63\x61rpetOffsetInfo\x18\n \x01(\x0b\x32\x1a.RobotMap.CarpetOffsetInfob\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'RobotMap_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals['_ROBOTMAP']._serialized_start=19 + _globals['_ROBOTMAP']._serialized_end=3131 + _globals['_ROBOTMAP_HOUSEINFO']._serialized_start=873 + _globals['_ROBOTMAP_HOUSEINFO']._serialized_end=987 + _globals['_ROBOTMAP_FURNITUREDATAINFO']._serialized_start=990 + _globals['_ROBOTMAP_FURNITUREDATAINFO']._serialized_end=1151 + _globals['_ROBOTMAP_OBJECTDATAINFO']._serialized_start=1154 + _globals['_ROBOTMAP_OBJECTDATAINFO']._serialized_end=1299 + _globals['_ROBOTMAP_DEVICECHAINPOINTDATAINFO']._serialized_start=1301 + _globals['_ROBOTMAP_DEVICECHAINPOINTDATAINFO']._serialized_end=1364 + _globals['_ROBOTMAP_DEVICEROOMCHAINDATAINFO']._serialized_start=1366 + _globals['_ROBOTMAP_DEVICEROOMCHAINDATAINFO']._serialized_end=1459 + _globals['_ROBOTMAP_DEVICEROOMMATRIX']._serialized_start=1461 + _globals['_ROBOTMAP_DEVICEROOMMATRIX']._serialized_end=1495 + _globals['_ROBOTMAP_CLEANPERFERENCEDATAINFO']._serialized_start=1497 + _globals['_ROBOTMAP_CLEANPERFERENCEDATAINFO']._serialized_end=1600 + _globals['_ROBOTMAP_ROOMDATAINFO']._serialized_start=1603 + _globals['_ROBOTMAP_ROOMDATAINFO']._serialized_end=1876 + _globals['_ROBOTMAP_DEVICENAVIGATIONPOINTDATAINFO']._serialized_start=1878 + _globals['_ROBOTMAP_DEVICENAVIGATIONPOINTDATAINFO']._serialized_end=1996 + _globals['_ROBOTMAP_DEVICEPOINTINFO']._serialized_start=1998 + _globals['_ROBOTMAP_DEVICEPOINTINFO']._serialized_end=2037 + _globals['_ROBOTMAP_DEVICEAREADATAINFO']._serialized_start=2039 + _globals['_ROBOTMAP_DEVICEAREADATAINFO']._serialized_end=2151 + _globals['_ROBOTMAP_DEVICECURRENTPOSEINFO']._serialized_start=2153 + _globals['_ROBOTMAP_DEVICECURRENTPOSEINFO']._serialized_end=2243 + _globals['_ROBOTMAP_DEVICEPOSEDATAINFO']._serialized_start=2245 + _globals['_ROBOTMAP_DEVICEPOSEDATAINFO']._serialized_end=2316 + _globals['_ROBOTMAP_DEVICECOVERPOINTDATAINFO']._serialized_start=2318 + _globals['_ROBOTMAP_DEVICECOVERPOINTDATAINFO']._serialized_end=2382 + _globals['_ROBOTMAP_DEVICEHISTORYPOSEINFO']._serialized_start=2384 + _globals['_ROBOTMAP_DEVICEHISTORYPOSEINFO']._serialized_end=2493 + _globals['_ROBOTMAP_ALLMAPINFO']._serialized_start=2495 + _globals['_ROBOTMAP_ALLMAPINFO']._serialized_end=2543 + _globals['_ROBOTMAP_MAPDATAINFO']._serialized_start=2545 + _globals['_ROBOTMAP_MAPDATAINFO']._serialized_end=2575 + _globals['_ROBOTMAP_MAPHEADINFO']._serialized_start=2578 + _globals['_ROBOTMAP_MAPHEADINFO']._serialized_end=2716 + _globals['_ROBOTMAP_CARPETOFFSETINFO']._serialized_start=2718 + _globals['_ROBOTMAP_CARPETOFFSETINFO']._serialized_end=2763 + _globals['_ROBOTMAP_MAPBOUNDARYINFO']._serialized_start=2765 + _globals['_ROBOTMAP_MAPBOUNDARYINFO']._serialized_end=2858 + _globals['_ROBOTMAP_MAPEXTINFO']._serialized_start=2861 + _globals['_ROBOTMAP_MAPEXTINFO']._serialized_end=3131 +# @@protoc_insertion_point(module_scope) diff --git a/src/vacuum_map_parser_ijai/RobotMap_pb2.pyi b/src/vacuum_map_parser_ijai/RobotMap_pb2.pyi new file mode 100644 index 0000000..c6bd64a --- /dev/null +++ b/src/vacuum_map_parser_ijai/RobotMap_pb2.pyi @@ -0,0 +1,620 @@ +""" +@generated by mypy-protobuf. Do not edit manually! +isort:skip_file +""" + +import builtins +import collections.abc +import google.protobuf.descriptor +import google.protobuf.internal.containers +import google.protobuf.message +import typing + +DESCRIPTOR: google.protobuf.descriptor.FileDescriptor + +@typing.final +class RobotMap(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + @typing.final + class HouseInfo(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + ID_FIELD_NUMBER: builtins.int + NAME_FIELD_NUMBER: builtins.int + CURMAPCOUNT_FIELD_NUMBER: builtins.int + MAXMAPSIZE_FIELD_NUMBER: builtins.int + MAPS_FIELD_NUMBER: builtins.int + id: builtins.int + name: builtins.str + curMapCount: builtins.int + maxMapSize: builtins.int + @property + def maps(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___RobotMap.AllMapInfo]: ... + def __init__( + self, + *, + id: builtins.int | None = ..., + name: builtins.str | None = ..., + curMapCount: builtins.int | None = ..., + maxMapSize: builtins.int | None = ..., + maps: collections.abc.Iterable[global___RobotMap.AllMapInfo] | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["curMapCount", b"curMapCount", "id", b"id", "maxMapSize", b"maxMapSize", "name", b"name"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["curMapCount", b"curMapCount", "id", b"id", "maps", b"maps", "maxMapSize", b"maxMapSize", "name", b"name"]) -> None: ... + + @typing.final + class FurnitureDataInfo(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + ID_FIELD_NUMBER: builtins.int + TYPEID_FIELD_NUMBER: builtins.int + POINTS_FIELD_NUMBER: builtins.int + URL_FIELD_NUMBER: builtins.int + STATUS_FIELD_NUMBER: builtins.int + REACT_FIELD_NUMBER: builtins.int + id: builtins.int + typeId: builtins.int + url: builtins.str + status: builtins.int + @property + def points(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___RobotMap.DevicePointInfo]: ... + @property + def react(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___RobotMap.DevicePointInfo]: ... + def __init__( + self, + *, + id: builtins.int | None = ..., + typeId: builtins.int | None = ..., + points: collections.abc.Iterable[global___RobotMap.DevicePointInfo] | None = ..., + url: builtins.str | None = ..., + status: builtins.int | None = ..., + react: collections.abc.Iterable[global___RobotMap.DevicePointInfo] | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["id", b"id", "status", b"status", "typeId", b"typeId", "url", b"url"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["id", b"id", "points", b"points", "react", b"react", "status", b"status", "typeId", b"typeId", "url", b"url"]) -> None: ... + + @typing.final + class ObjectDataInfo(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + OBJECTID_FIELD_NUMBER: builtins.int + OBJECTTYPEID_FIELD_NUMBER: builtins.int + OBJECTNAME_FIELD_NUMBER: builtins.int + CONFIRM_FIELD_NUMBER: builtins.int + X_FIELD_NUMBER: builtins.int + Y_FIELD_NUMBER: builtins.int + URL_FIELD_NUMBER: builtins.int + NOTSHOW_FIELD_NUMBER: builtins.int + objectId: builtins.int + objectTypeId: builtins.int + objectName: builtins.str + confirm: builtins.int + x: builtins.float + y: builtins.float + url: builtins.str + notShow: builtins.int + def __init__( + self, + *, + objectId: builtins.int | None = ..., + objectTypeId: builtins.int | None = ..., + objectName: builtins.str | None = ..., + confirm: builtins.int | None = ..., + x: builtins.float | None = ..., + y: builtins.float | None = ..., + url: builtins.str | None = ..., + notShow: builtins.int | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["confirm", b"confirm", "notShow", b"notShow", "objectId", b"objectId", "objectName", b"objectName", "objectTypeId", b"objectTypeId", "url", b"url", "x", b"x", "y", b"y"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["confirm", b"confirm", "notShow", b"notShow", "objectId", b"objectId", "objectName", b"objectName", "objectTypeId", b"objectTypeId", "url", b"url", "x", b"x", "y", b"y"]) -> None: ... + + @typing.final + class DeviceChainPointDataInfo(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + X_FIELD_NUMBER: builtins.int + Y_FIELD_NUMBER: builtins.int + VALUE_FIELD_NUMBER: builtins.int + x: builtins.int + y: builtins.int + value: builtins.int + def __init__( + self, + *, + x: builtins.int | None = ..., + y: builtins.int | None = ..., + value: builtins.int | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["value", b"value", "x", b"x", "y", b"y"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["value", b"value", "x", b"x", "y", b"y"]) -> None: ... + + @typing.final + class DeviceRoomChainDataInfo(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + ROOMID_FIELD_NUMBER: builtins.int + POINTS_FIELD_NUMBER: builtins.int + roomId: builtins.int + @property + def points(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___RobotMap.DeviceChainPointDataInfo]: ... + def __init__( + self, + *, + roomId: builtins.int | None = ..., + points: collections.abc.Iterable[global___RobotMap.DeviceChainPointDataInfo] | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["roomId", b"roomId"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["points", b"points", "roomId", b"roomId"]) -> None: ... + + @typing.final + class DeviceRoomMatrix(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + MATRIX_FIELD_NUMBER: builtins.int + matrix: builtins.bytes + def __init__( + self, + *, + matrix: builtins.bytes | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["matrix", b"matrix"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["matrix", b"matrix"]) -> None: ... + + @typing.final + class CleanPerferenceDataInfo(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + CLEANMODE_FIELD_NUMBER: builtins.int + WATERLEVEL_FIELD_NUMBER: builtins.int + WINDPOWER_FIELD_NUMBER: builtins.int + TWICECLEAN_FIELD_NUMBER: builtins.int + cleanMode: builtins.int + waterLevel: builtins.int + windPower: builtins.int + twiceClean: builtins.int + def __init__( + self, + *, + cleanMode: builtins.int | None = ..., + waterLevel: builtins.int | None = ..., + windPower: builtins.int | None = ..., + twiceClean: builtins.int | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["cleanMode", b"cleanMode", "twiceClean", b"twiceClean", "waterLevel", b"waterLevel", "windPower", b"windPower"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["cleanMode", b"cleanMode", "twiceClean", b"twiceClean", "waterLevel", b"waterLevel", "windPower", b"windPower"]) -> None: ... + + @typing.final + class RoomDataInfo(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + ROOMID_FIELD_NUMBER: builtins.int + ROOMNAME_FIELD_NUMBER: builtins.int + ROOMTYPEID_FIELD_NUMBER: builtins.int + METERIALID_FIELD_NUMBER: builtins.int + CLEANSTATE_FIELD_NUMBER: builtins.int + ROOMCLEAN_FIELD_NUMBER: builtins.int + ROOMCLEANINDEX_FIELD_NUMBER: builtins.int + ROOMNAMEPOST_FIELD_NUMBER: builtins.int + CLEANPERFER_FIELD_NUMBER: builtins.int + COLORID_FIELD_NUMBER: builtins.int + roomId: builtins.int + roomName: builtins.str + roomTypeId: builtins.int + meterialId: builtins.int + cleanState: builtins.int + roomClean: builtins.int + roomCleanIndex: builtins.int + colorId: builtins.int + @property + def roomNamePost(self) -> global___RobotMap.DevicePointInfo: ... + @property + def cleanPerfer(self) -> global___RobotMap.CleanPerferenceDataInfo: ... + def __init__( + self, + *, + roomId: builtins.int | None = ..., + roomName: builtins.str | None = ..., + roomTypeId: builtins.int | None = ..., + meterialId: builtins.int | None = ..., + cleanState: builtins.int | None = ..., + roomClean: builtins.int | None = ..., + roomCleanIndex: builtins.int | None = ..., + roomNamePost: global___RobotMap.DevicePointInfo | None = ..., + cleanPerfer: global___RobotMap.CleanPerferenceDataInfo | None = ..., + colorId: builtins.int | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["cleanPerfer", b"cleanPerfer", "cleanState", b"cleanState", "colorId", b"colorId", "meterialId", b"meterialId", "roomClean", b"roomClean", "roomCleanIndex", b"roomCleanIndex", "roomId", b"roomId", "roomName", b"roomName", "roomNamePost", b"roomNamePost", "roomTypeId", b"roomTypeId"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["cleanPerfer", b"cleanPerfer", "cleanState", b"cleanState", "colorId", b"colorId", "meterialId", b"meterialId", "roomClean", b"roomClean", "roomCleanIndex", b"roomCleanIndex", "roomId", b"roomId", "roomName", b"roomName", "roomNamePost", b"roomNamePost", "roomTypeId", b"roomTypeId"]) -> None: ... + + @typing.final + class DeviceNavigationPointDataInfo(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + POINTID_FIELD_NUMBER: builtins.int + STATUS_FIELD_NUMBER: builtins.int + POINTTYPE_FIELD_NUMBER: builtins.int + X_FIELD_NUMBER: builtins.int + Y_FIELD_NUMBER: builtins.int + PHI_FIELD_NUMBER: builtins.int + pointId: builtins.int + status: builtins.int + pointType: builtins.int + x: builtins.float + y: builtins.float + phi: builtins.float + def __init__( + self, + *, + pointId: builtins.int | None = ..., + status: builtins.int | None = ..., + pointType: builtins.int | None = ..., + x: builtins.float | None = ..., + y: builtins.float | None = ..., + phi: builtins.float | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["phi", b"phi", "pointId", b"pointId", "pointType", b"pointType", "status", b"status", "x", b"x", "y", b"y"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["phi", b"phi", "pointId", b"pointId", "pointType", b"pointType", "status", b"status", "x", b"x", "y", b"y"]) -> None: ... + + @typing.final + class DevicePointInfo(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + X_FIELD_NUMBER: builtins.int + Y_FIELD_NUMBER: builtins.int + x: builtins.float + y: builtins.float + def __init__( + self, + *, + x: builtins.float | None = ..., + y: builtins.float | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["x", b"x", "y", b"y"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["x", b"x", "y", b"y"]) -> None: ... + + @typing.final + class DeviceAreaDataInfo(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + STATUS_FIELD_NUMBER: builtins.int + TYPE_FIELD_NUMBER: builtins.int + AREAINDEX_FIELD_NUMBER: builtins.int + POINTS_FIELD_NUMBER: builtins.int + status: builtins.int + type: builtins.int + areaIndex: builtins.int + @property + def points(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___RobotMap.DevicePointInfo]: ... + def __init__( + self, + *, + status: builtins.int | None = ..., + type: builtins.int | None = ..., + areaIndex: builtins.int | None = ..., + points: collections.abc.Iterable[global___RobotMap.DevicePointInfo] | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["areaIndex", b"areaIndex", "status", b"status", "type", b"type"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["areaIndex", b"areaIndex", "points", b"points", "status", b"status", "type", b"type"]) -> None: ... + + @typing.final + class DeviceCurrentPoseInfo(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + POSEID_FIELD_NUMBER: builtins.int + UPDATE_FIELD_NUMBER: builtins.int + X_FIELD_NUMBER: builtins.int + Y_FIELD_NUMBER: builtins.int + PHI_FIELD_NUMBER: builtins.int + poseId: builtins.int + update: builtins.int + x: builtins.float + y: builtins.float + phi: builtins.float + def __init__( + self, + *, + poseId: builtins.int | None = ..., + update: builtins.int | None = ..., + x: builtins.float | None = ..., + y: builtins.float | None = ..., + phi: builtins.float | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["phi", b"phi", "poseId", b"poseId", "update", b"update", "x", b"x", "y", b"y"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["phi", b"phi", "poseId", b"poseId", "update", b"update", "x", b"x", "y", b"y"]) -> None: ... + + @typing.final + class DevicePoseDataInfo(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + X_FIELD_NUMBER: builtins.int + Y_FIELD_NUMBER: builtins.int + PHI_FIELD_NUMBER: builtins.int + ROOMID_FIELD_NUMBER: builtins.int + x: builtins.float + y: builtins.float + phi: builtins.float + roomId: builtins.int + def __init__( + self, + *, + x: builtins.float | None = ..., + y: builtins.float | None = ..., + phi: builtins.float | None = ..., + roomId: builtins.int | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["phi", b"phi", "roomId", b"roomId", "x", b"x", "y", b"y"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["phi", b"phi", "roomId", b"roomId", "x", b"x", "y", b"y"]) -> None: ... + + @typing.final + class DeviceCoverPointDataInfo(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + UPDATE_FIELD_NUMBER: builtins.int + X_FIELD_NUMBER: builtins.int + Y_FIELD_NUMBER: builtins.int + update: builtins.int + x: builtins.float + y: builtins.float + def __init__( + self, + *, + update: builtins.int | None = ..., + x: builtins.float | None = ..., + y: builtins.float | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["update", b"update", "x", b"x", "y", b"y"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["update", b"update", "x", b"x", "y", b"y"]) -> None: ... + + @typing.final + class DeviceHistoryPoseInfo(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + POSEID_FIELD_NUMBER: builtins.int + POINTS_FIELD_NUMBER: builtins.int + PATHTYPE_FIELD_NUMBER: builtins.int + poseId: builtins.int + pathType: builtins.int + @property + def points(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___RobotMap.DeviceCoverPointDataInfo]: ... + def __init__( + self, + *, + poseId: builtins.int | None = ..., + points: collections.abc.Iterable[global___RobotMap.DeviceCoverPointDataInfo] | None = ..., + pathType: builtins.int | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["pathType", b"pathType", "poseId", b"poseId"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["pathType", b"pathType", "points", b"points", "poseId", b"poseId"]) -> None: ... + + @typing.final + class AllMapInfo(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + MAPHEADID_FIELD_NUMBER: builtins.int + MAPNAME_FIELD_NUMBER: builtins.int + mapHeadId: builtins.int + mapName: builtins.str + def __init__( + self, + *, + mapHeadId: builtins.int | None = ..., + mapName: builtins.str | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["mapHeadId", b"mapHeadId", "mapName", b"mapName"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["mapHeadId", b"mapHeadId", "mapName", b"mapName"]) -> None: ... + + @typing.final + class MapDataInfo(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + MAPDATA_FIELD_NUMBER: builtins.int + mapData: builtins.bytes + def __init__( + self, + *, + mapData: builtins.bytes | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["mapData", b"mapData"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["mapData", b"mapData"]) -> None: ... + + @typing.final + class MapHeadInfo(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + MAPHEADID_FIELD_NUMBER: builtins.int + SIZEX_FIELD_NUMBER: builtins.int + SIZEY_FIELD_NUMBER: builtins.int + MINX_FIELD_NUMBER: builtins.int + MINY_FIELD_NUMBER: builtins.int + MAXX_FIELD_NUMBER: builtins.int + MAXY_FIELD_NUMBER: builtins.int + RESOLUTION_FIELD_NUMBER: builtins.int + mapHeadId: builtins.int + sizeX: builtins.int + sizeY: builtins.int + minX: builtins.float + minY: builtins.float + maxX: builtins.float + maxY: builtins.float + resolution: builtins.float + def __init__( + self, + *, + mapHeadId: builtins.int | None = ..., + sizeX: builtins.int | None = ..., + sizeY: builtins.int | None = ..., + minX: builtins.float | None = ..., + minY: builtins.float | None = ..., + maxX: builtins.float | None = ..., + maxY: builtins.float | None = ..., + resolution: builtins.float | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["mapHeadId", b"mapHeadId", "maxX", b"maxX", "maxY", b"maxY", "minX", b"minX", "minY", b"minY", "resolution", b"resolution", "sizeX", b"sizeX", "sizeY", b"sizeY"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["mapHeadId", b"mapHeadId", "maxX", b"maxX", "maxY", b"maxY", "minX", b"minX", "minY", b"minY", "resolution", b"resolution", "sizeX", b"sizeX", "sizeY", b"sizeY"]) -> None: ... + + @typing.final + class CarpetOffsetInfo(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + PHI_FIELD_NUMBER: builtins.int + DIST_FIELD_NUMBER: builtins.int + phi: builtins.float + dist: builtins.float + def __init__( + self, + *, + phi: builtins.float | None = ..., + dist: builtins.float | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["dist", b"dist", "phi", b"phi"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["dist", b"dist", "phi", b"phi"]) -> None: ... + + @typing.final + class MapBoundaryInfo(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + MAPMD5_FIELD_NUMBER: builtins.int + VMINX_FIELD_NUMBER: builtins.int + VMAXX_FIELD_NUMBER: builtins.int + VMINY_FIELD_NUMBER: builtins.int + VMAXY_FIELD_NUMBER: builtins.int + mapMd5: builtins.str + vMinX: builtins.int + vMaxX: builtins.int + vMinY: builtins.int + vMaxY: builtins.int + def __init__( + self, + *, + mapMd5: builtins.str | None = ..., + vMinX: builtins.int | None = ..., + vMaxX: builtins.int | None = ..., + vMinY: builtins.int | None = ..., + vMaxY: builtins.int | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["mapMd5", b"mapMd5", "vMaxX", b"vMaxX", "vMaxY", b"vMaxY", "vMinX", b"vMinX", "vMinY", b"vMinY"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["mapMd5", b"mapMd5", "vMaxX", b"vMaxX", "vMaxY", b"vMaxY", "vMinX", b"vMinX", "vMinY", b"vMinY"]) -> None: ... + + @typing.final + class MapExtInfo(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + TASKBEGINDATE_FIELD_NUMBER: builtins.int + MAPUPLOADDATE_FIELD_NUMBER: builtins.int + MAPVALID_FIELD_NUMBER: builtins.int + RADIAN_FIELD_NUMBER: builtins.int + FORCE_FIELD_NUMBER: builtins.int + CLEANPATH_FIELD_NUMBER: builtins.int + BOUDARYINFO_FIELD_NUMBER: builtins.int + MAPVERSION_FIELD_NUMBER: builtins.int + MAPVALUETYPE_FIELD_NUMBER: builtins.int + CARPETOFFSETINFO_FIELD_NUMBER: builtins.int + taskBeginDate: builtins.int + mapUploadDate: builtins.int + mapValid: builtins.int + radian: builtins.int + force: builtins.int + cleanPath: builtins.int + mapVersion: builtins.int + mapValueType: builtins.int + @property + def boudaryInfo(self) -> global___RobotMap.MapBoundaryInfo: ... + @property + def carpetOffsetInfo(self) -> global___RobotMap.CarpetOffsetInfo: ... + def __init__( + self, + *, + taskBeginDate: builtins.int | None = ..., + mapUploadDate: builtins.int | None = ..., + mapValid: builtins.int | None = ..., + radian: builtins.int | None = ..., + force: builtins.int | None = ..., + cleanPath: builtins.int | None = ..., + boudaryInfo: global___RobotMap.MapBoundaryInfo | None = ..., + mapVersion: builtins.int | None = ..., + mapValueType: builtins.int | None = ..., + carpetOffsetInfo: global___RobotMap.CarpetOffsetInfo | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["boudaryInfo", b"boudaryInfo", "carpetOffsetInfo", b"carpetOffsetInfo", "cleanPath", b"cleanPath", "force", b"force", "mapUploadDate", b"mapUploadDate", "mapValid", b"mapValid", "mapValueType", b"mapValueType", "mapVersion", b"mapVersion", "radian", b"radian", "taskBeginDate", b"taskBeginDate"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["boudaryInfo", b"boudaryInfo", "carpetOffsetInfo", b"carpetOffsetInfo", "cleanPath", b"cleanPath", "force", b"force", "mapUploadDate", b"mapUploadDate", "mapValid", b"mapValid", "mapValueType", b"mapValueType", "mapVersion", b"mapVersion", "radian", b"radian", "taskBeginDate", b"taskBeginDate"]) -> None: ... + + MAPTYPE_FIELD_NUMBER: builtins.int + MAPEXTINFO_FIELD_NUMBER: builtins.int + MAPHEAD_FIELD_NUMBER: builtins.int + MAPDATA_FIELD_NUMBER: builtins.int + MAPINFO_FIELD_NUMBER: builtins.int + HISTORYPOSE_FIELD_NUMBER: builtins.int + CHARGESTATION_FIELD_NUMBER: builtins.int + CURRENTPOSE_FIELD_NUMBER: builtins.int + VIRTUALWALLS_FIELD_NUMBER: builtins.int + AREASINFO_FIELD_NUMBER: builtins.int + NAVIGATIONPOINTS_FIELD_NUMBER: builtins.int + ROOMDATAINFO_FIELD_NUMBER: builtins.int + ROOMMATRIX_FIELD_NUMBER: builtins.int + ROOMCHAIN_FIELD_NUMBER: builtins.int + OBJECTS_FIELD_NUMBER: builtins.int + FURNITUREINFO_FIELD_NUMBER: builtins.int + HOUSEINFOS_FIELD_NUMBER: builtins.int + BACKUPAREAS_FIELD_NUMBER: builtins.int + mapType: builtins.int + @property + def mapExtInfo(self) -> global___RobotMap.MapExtInfo: ... + @property + def mapHead(self) -> global___RobotMap.MapHeadInfo: ... + @property + def mapData(self) -> global___RobotMap.MapDataInfo: ... + @property + def mapInfo(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___RobotMap.AllMapInfo]: ... + @property + def historyPose(self) -> global___RobotMap.DeviceHistoryPoseInfo: ... + @property + def chargeStation(self) -> global___RobotMap.DevicePoseDataInfo: ... + @property + def currentPose(self) -> global___RobotMap.DeviceCurrentPoseInfo: ... + @property + def virtualWalls(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___RobotMap.DeviceAreaDataInfo]: ... + @property + def areasInfo(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___RobotMap.DeviceAreaDataInfo]: ... + @property + def navigationPoints(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___RobotMap.DeviceNavigationPointDataInfo]: ... + @property + def roomDataInfo(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___RobotMap.RoomDataInfo]: ... + @property + def roomMatrix(self) -> global___RobotMap.DeviceRoomMatrix: ... + @property + def roomChain(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___RobotMap.DeviceRoomChainDataInfo]: ... + @property + def objects(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___RobotMap.ObjectDataInfo]: ... + @property + def furnitureInfo(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___RobotMap.FurnitureDataInfo]: ... + @property + def houseInfos(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___RobotMap.HouseInfo]: ... + @property + def backupAreas(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___RobotMap.DeviceAreaDataInfo]: ... + def __init__( + self, + *, + mapType: builtins.int | None = ..., + mapExtInfo: global___RobotMap.MapExtInfo | None = ..., + mapHead: global___RobotMap.MapHeadInfo | None = ..., + mapData: global___RobotMap.MapDataInfo | None = ..., + mapInfo: collections.abc.Iterable[global___RobotMap.AllMapInfo] | None = ..., + historyPose: global___RobotMap.DeviceHistoryPoseInfo | None = ..., + chargeStation: global___RobotMap.DevicePoseDataInfo | None = ..., + currentPose: global___RobotMap.DeviceCurrentPoseInfo | None = ..., + virtualWalls: collections.abc.Iterable[global___RobotMap.DeviceAreaDataInfo] | None = ..., + areasInfo: collections.abc.Iterable[global___RobotMap.DeviceAreaDataInfo] | None = ..., + navigationPoints: collections.abc.Iterable[global___RobotMap.DeviceNavigationPointDataInfo] | None = ..., + roomDataInfo: collections.abc.Iterable[global___RobotMap.RoomDataInfo] | None = ..., + roomMatrix: global___RobotMap.DeviceRoomMatrix | None = ..., + roomChain: collections.abc.Iterable[global___RobotMap.DeviceRoomChainDataInfo] | None = ..., + objects: collections.abc.Iterable[global___RobotMap.ObjectDataInfo] | None = ..., + furnitureInfo: collections.abc.Iterable[global___RobotMap.FurnitureDataInfo] | None = ..., + houseInfos: collections.abc.Iterable[global___RobotMap.HouseInfo] | None = ..., + backupAreas: collections.abc.Iterable[global___RobotMap.DeviceAreaDataInfo] | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["chargeStation", b"chargeStation", "currentPose", b"currentPose", "historyPose", b"historyPose", "mapData", b"mapData", "mapExtInfo", b"mapExtInfo", "mapHead", b"mapHead", "mapType", b"mapType", "roomMatrix", b"roomMatrix"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["areasInfo", b"areasInfo", "backupAreas", b"backupAreas", "chargeStation", b"chargeStation", "currentPose", b"currentPose", "furnitureInfo", b"furnitureInfo", "historyPose", b"historyPose", "houseInfos", b"houseInfos", "mapData", b"mapData", "mapExtInfo", b"mapExtInfo", "mapHead", b"mapHead", "mapInfo", b"mapInfo", "mapType", b"mapType", "navigationPoints", b"navigationPoints", "objects", b"objects", "roomChain", b"roomChain", "roomDataInfo", b"roomDataInfo", "roomMatrix", b"roomMatrix", "virtualWalls", b"virtualWalls"]) -> None: ... + +global___RobotMap = RobotMap diff --git a/src/vacuum_map_parser_ijai/aes_decryptor.py b/src/vacuum_map_parser_ijai/aes_decryptor.py index 92f1d5e..caf9cbe 100644 --- a/src/vacuum_map_parser_ijai/aes_decryptor.py +++ b/src/vacuum_map_parser_ijai/aes_decryptor.py @@ -1,14 +1,16 @@ +"""Module that provides functions for decrypting a map.""" + +import base64 +import binascii + from Crypto.Cipher import AES from Crypto.Hash import MD5 from Crypto.Util.Padding import pad, unpad -import base64 -import logging - -_LOGGER = logging.getLogger(__name__) isEncryptKeyTypeHex = True -def aes_encrypt(data, key: str): + +def aes_encrypt(data: str, key: str) -> str: cipher = AES.new(key.encode("utf-8"), AES.MODE_ECB) encryptedData = cipher.encrypt( @@ -17,21 +19,22 @@ def aes_encrypt(data, key: str): return encryptedBase64Str -def aes_decrypt(data, key: str): + +def aes_decrypt(data: bytes, key: str) -> bytes: parsedKey = key.encode("utf-8") if isEncryptKeyTypeHex: parsedKey = bytes.fromhex(key) cipher = AES.new(parsedKey, AES.MODE_ECB) - decryptedBytes = cipher.decrypt(base64.b64decode(data)) + decryptedBytes = cipher.decrypt(data) decryptedData = unpad(decryptedBytes, AES.block_size, 'pkcs7') - + return bytes.fromhex(decryptedData.decode("utf-8")) -def md5key(string: str, model: str, device_mac: str): +def md5key(string: str, model: str, device_mac: str) -> str: pjstr = "".join(device_mac.lower().split(":")) tempModel = model.split('.')[-1] @@ -40,23 +43,27 @@ def md5key(string: str, model: str, device_mac: str): tempModel = "00" + tempModel elif len(tempModel) == 3: tempModel = "0" + tempModel - + elif len(tempModel) > 4: + tempModel = tempModel[-4:] + tempKey = pjstr + tempModel aeskey = aes_encrypt(string, tempKey) temp = MD5.new(aeskey.encode('utf-8')).hexdigest() if isEncryptKeyTypeHex: return temp - else: - return temp[8:-8].upper() + return temp[8:-8].upper() -def gen_md5_key(wifi_info_sn: str, owner_id: str, device_id: str, model: str, device_mac: str): +def gen_md5_key(wifi_info_sn: str, owner_id: str, device_id: str, model: str, device_mac: str) -> str: arr = [wifi_info_sn, owner_id, device_id] tempString = '+'.join(arr) return md5key(tempString, model, device_mac) -def decrypt(data: str, wifi_info_sn: str, owner_id: str, device_id: str, model: str, device_mac: str) -> bytes: +def decrypt(data: bytes, wifi_info_sn: str, owner_id: str, device_id: str, model: str, device_mac: str) -> bytes: + try: + data = base64.b64decode(data, validate=True) + except binascii.Error: + pass return aes_decrypt(data, gen_md5_key(wifi_info_sn, owner_id, device_id, model, device_mac)) - diff --git a/src/vacuum_map_parser_ijai/beautify_min.py b/src/vacuum_map_parser_ijai/beautify_min.py new file mode 100644 index 0000000..fb4f329 --- /dev/null +++ b/src/vacuum_map_parser_ijai/beautify_min.py @@ -0,0 +1,970 @@ +# pylint: skip-file +import collections + +from vacuum_map_parser_base.map_data import Point + +from vacuum_map_parser_ijai.RobotMap_pb2 import RobotMap + + +class BeautifyMap: + def __init__(self, mapHead: RobotMap.MapHeadInfo): + self.map = [] + self.tRect = { + "x": 0, + "y": 0, + "width": 0, + "height": 0 + } + self.x_min = mapHead.minX + self.x_max = mapHead.maxX + self.y_min = mapHead.minY + self.y_max = mapHead.maxY + self.resolution = mapHead.resolution + self.size_x = mapHead.sizeX + self.size_y = mapHead.sizeY + + def setMap(self, mapData: RobotMap.MapDataInfo): + temp_mapData = mapData.mapData + + tempArray = [0] * len(temp_mapData) + + for i in range(len(temp_mapData)): + if (temp_mapData[i] > 127): + tempArray[i] = -128 + else: + tempArray[i] = temp_mapData[i] + + self.map = tempArray + + def normalizeMap(self) -> None: + # normalizing all data to bytes and values suitable for map_data_parser + for i in range(len(self.map)): + if self.map[i] < 0: + self.map[i] = (256 + self.map[i]) % 256 + elif self.map[i] > 255: + self.map[i] = self.map[i] % 256 + elif self.map[i] == 30: + self.map[i] = 0 + elif self.map[i] == 40: + self.map[i] = 255 + + def getMap(self) -> list[int]: + return self.map + + def transform(self) -> None: + non_boundary_noise = [] + self.findRoiMap() + self.expandBlackRect(4, 4, self.map[0]) + self.expandWhiteRect(4, 4, self.map[0]) + self.refineBoundary(0, 10) + non_boundary_noise = self.eliminateNonBoundaryNoise( + non_boundary_noise, 127, -128, 0) + self.expandSingleConvexBoundary(50, -128, 4, 4) + non_boundary_noise = self.fillNonBoundaryNoise2( + non_boundary_noise) + self.refineBoundary(0, 10) + self.fillBlackComponent([], -128) + + def findRoiMap(self): + top_bound = self.size_x + bottom_bound = 0 + left_bound = self.size_y + right_bound = 0 + for x in range(self.size_x): + for y in range(self.size_y): + if (self.map[y * self.size_x + x] != 0): + if (left_bound > y - 10): + if (x - 10 >= 0): + left_bound = y - 10 + else: + left_bound = 0 + if (top_bound > x - 10): + if (x - 10 >= 0): + top_bound = x - 10 + else: + top_bound = 0 + if (right_bound < y + 10): + if (y + 10 < self.size_y): + right_bound = y + 10 + else: + right_bound = self.size_y - 1 + if (bottom_bound < x + 10): + if (x + 10 < self.size_x): + bottom_bound = x + 10 + else: + bottom_bound = self.size_x - 1 + + width = right_bound - left_bound + 1 + height = bottom_bound - top_bound + 1 + if (width > 0 and height > 0 and width < self.size_y and height < self.size_x): + self.tRect["x"] = top_bound + self.tRect["y"] = left_bound + self.tRect["width"] = width + self.tRect["height"] = height + + def expandBlackRect(self, kernel_size_x, kernel_size_y, threshold): + il, ir, jl, jr = (None, None, None, None) + + if (kernel_size_x % 2 == 1): + ir = kernel_size_x - 1 >> 1 + il = -ir + else: + ir = kernel_size_x >> 1 + il = 1 - ir + + if (kernel_size_y % 2 == 1): + jr = kernel_size_y - 1 >> 1 + jl = -jr + else: + jr = kernel_size_y >> 1 + jl = 1 - jr + + dst = [127] * len(self.map) + + for i in range(self.tRect["y"], self.tRect["y"] + self.tRect["width"]): + for j in range(self.tRect["x"], self.tRect["x"] + self.tRect["height"]): + if (self.map[i * self.size_x + j] < threshold): + for di in range(il, ir + 1): + for dj in range(jl, jr + 1): + if (i + di < 0 or i + di >= self.tRect["y"] + self.tRect["width"] or j + dj < 0 or j + dj >= self.tRect["x"] + self.tRect["height"]): + continue + + if (dst[(i + di) * self.size_x + j + dj] > self.map[i * self.size_x + j]): + dst[(i + di) * self.size_x + j + + dj] = self.map[i * self.size_x + j] + + for offset in range(len(self.map)): + if (dst[offset] == 127): + dst[offset] = self.map[offset] + + self.map = dst + + def expandWhiteRect(self, kernel_size_x, kernel_size_y, threshold): + il, ir, jl, jr = (None, None, None, None) + + if (kernel_size_x % 2 == 1): + ir = kernel_size_x - 1 >> 1 + il = -ir + else: + ir = kernel_size_x >> 1 + il = 1 - ir + + if (kernel_size_y % 2 == 1): + jr = kernel_size_y - 1 >> 1 + jl = -jr + else: + jr = kernel_size_y >> 1 + jl = 1 - jr + + dst = [-128] * len(self.map) + + for i in range(self.tRect["y"], self.tRect["y"] + self.tRect["width"]): + for j in range(self.tRect["x"], self.tRect["x"] + self.tRect["height"]): + if (self.map[i * self.size_x + j] > threshold): + for di in range(il, ir + 1): + for dj in range(jl, jr + 1): + if (i + di < 0 or i + di >= self.tRect["y"] + self.tRect["width"] or j + dj < 0 or j + dj >= self.tRect["x"] + self.tRect["height"]): + continue + + if (dst[(i + di) * self.size_x + j + dj] < self.map[i * self.size_x + j] and self.map[(i + di) * self.size_x + j + dj] < threshold): + dst[(i + di) * self.size_x + j + + dj] = self.map[i * self.size_x + j] + + for offset in range(len(self.map)): + if (dst[offset] == -128): + dst[offset] = self.map[offset] + + self.map = dst + + def refineBoundary(self, threshold_black, threshold_white): + points = [] + hasWhiteNeighbor = None + + for i in range(self.tRect["y"], self.tRect["y"] + self.tRect["width"]): + for j in range(self.tRect["x"], self.tRect["x"] + self.tRect["height"]): + if (self.map[i * self.size_x + j] < threshold_black): + hasWhiteNeighbor = False + + for di in range(-1, 2): + for dj in range(-1, 2): + if (i + di < 0 or i + di >= self.tRect["y"] + self.tRect["width"] or j + dj < 0 or j + dj >= self.tRect["x"] + self.tRect["height"]): + continue + + if (self.map[(i + di) * self.size_x + j + dj] > threshold_white): + hasWhiteNeighbor = True + + if (not hasWhiteNeighbor): + points.append((i, j)) + + for x, y in points: + self.map[x * self.size_x + y] = 0 + + def eliminateNonBoundaryNoise(self, nonBoundaryNoise, noise_color, border_color, outer_border_color): + tempnonBoundaryNoise = nonBoundaryNoise + + for i in range(self.tRect["y"], self.tRect["y"] + self.tRect["width"]): + for j in range(self.tRect["x"], self.tRect["x"] + self.tRect["height"]): + if (self.map[i * self.size_x + j] == border_color): + if (i - 1 < 0 or i + 1 >= self.tRect["y"] + self.tRect["width"] or j - 1 < 0 or j + 1 >= self.tRect["x"] + self.tRect["height"]): + continue + + if (self.map[(i - 1) * self.size_x + j] != outer_border_color and self.map[(i + 1) * self.size_x + j] != outer_border_color and self.map[i * self.size_x + j - 1] != outer_border_color and self.map[i * self.size_x + j + 1] != outer_border_color and self.map[(i - 1) * self.size_x + j - 1] != outer_border_color and self.map[(i - 1) * self.size_x + j + 1] != outer_border_color and self.map[(i + 1) * self.size_x + j - 1] != outer_border_color and self.map[(i + 1) * self.size_x + j + 1] != outer_border_color): + self.map[i * self.size_x + j] = noise_color + tempnonBoundaryNoise.append(Point(i, j)) + + return tempnonBoundaryNoise + + def expandSingleConvexBoundary(self, external_corner_value, fill_value, valid_length, times): + contour = self.extractExternalContoursNewStrategy([]) + + for _ in range(times): + fill_edges = [] + inner_corner_value = external_corner_value + 5 + four_neighbourhood = [[-1, 0], [1, 0], [0, -1], [0, 1]] + + extract_corner = self.extractCorners( + [], contour, external_corner_value, inner_corner_value) + + for p in extract_corner: + is_valid_length = False + + for k in range(4): + currpoint = Point( + p.x + four_neighbourhood[k][0], p.y + four_neighbourhood[k][1]) + + if (currpoint.x < self.tRect["y"] or currpoint.x >= self.tRect["y"] + self.tRect["width"] or currpoint.y < self.tRect["x"] or currpoint.y >= self.tRect["x"] + self.tRect["height"]): + continue + + if (self.map[currpoint.x * self.size_x + currpoint.y] == inner_corner_value): + is_valid_length = self.statisticalLineLength( + currpoint, external_corner_value, inner_corner_value, valid_length) + break + + if (k == 3): + is_valid_length = True + + _, fill_edges = self.fourNeighbourhoodSearchForExtractCorners( + p, [], [], external_corner_value, inner_corner_value, valid_length, is_valid_length) + + contour = self.fillEdges(contour, fill_edges, fill_value) + + def extractExternalContoursNewStrategy(self, contour): + gray_region = self.findGrayConnectComponent([]) + return self.findExternalContoursNewStrategy(gray_region, contour) + + def findGrayConnectComponent(self, gray_region) -> tuple[list[Point], list[int]]: + four_neighbourhood = [[-1, 0], [0, 1], [1, 0], [0, -1]] + findOnePoint = False + + for y in range(self.tRect["y"], self.tRect["y"] + self.tRect["width"]): + for x in range(self.tRect["x"], self.tRect["x"] + self.tRect["height"]): + if (self.map[y * self.size_x + x] == 0): + findOnePoint = True + points_for_search = [Point(y, x)] + gray_region.append(Point(y, x)) + self.map[y * self.size_x + x] = 30 + + while (len(points_for_search) > 0): + seed = points_for_search.pop(0) + + for k in range(4): + currpoint = Point( + seed.x + four_neighbourhood[k][0], seed.y + four_neighbourhood[k][1]) + + if (currpoint.x < self.tRect["y"] or currpoint.x >= self.tRect["y"] + self.tRect["width"] or currpoint.y < self.tRect["x"] or currpoint.y >= self.tRect["x"] + self.tRect["height"]): + continue + + if (self.map[currpoint.x * self.size_x + currpoint.y] == 0): + self.map[currpoint.x * + self.size_x + currpoint.y] = 30 + points_for_search.append(currpoint) + gray_region.append(currpoint) + + if findOnePoint: + break + + if findOnePoint: + findOnePoint = False + break + + return gray_region + + def findExternalContoursNewStrategy(self, gray_region, contour) -> list[Point]: + eight_neighbourhood = [[-1, 0], [1, 0], [0, -1], + [0, 1], [-1, 1], [1, 1], [1, -1], [-1, -1]] + + for i in range(len(gray_region)): + for k in range(8): + temp_idy = gray_region[i].x + eight_neighbourhood[k][0] + temp_idx = gray_region[i].y + eight_neighbourhood[k][1] + + if (temp_idy < self.tRect["y"] or temp_idy >= self.tRect["y"] + self.tRect["width"] or temp_idx < self.tRect["x"] or temp_idx >= self.tRect["x"] + self.tRect["height"]): + continue + + if (self.map[temp_idy * self.size_x + temp_idx] == -128): + self.map[temp_idy * self.size_x + temp_idx] = 40 + contour.append(Point(temp_idy, temp_idx)) + + return contour + + def extractCorners(self, extract_corner, contour, external_corner_value, inner_corner_value): + four_neighbourhood = [[-1, 0], [0, 1], [1, 0], [0, -1]] + + for i in range(len(contour)): + black_count = 0 + white_count = 0 + gray_count = 0 + + for k in range(4): + currpoint = Point( + contour[i].x + four_neighbourhood[k][0], contour[i].y + four_neighbourhood[k][1]) + + if (currpoint.x < self.tRect["y"] or currpoint.x >= self.tRect["y"] + self.tRect["width"] or currpoint.y < self.tRect["x"] or currpoint.y >= self.tRect["x"] + self.tRect["height"]): + continue + + if (self.map[currpoint.x * self.size_x + currpoint.y] == -128): + black_count += 1 + elif (self.map[currpoint.x * self.size_x + currpoint.y] == 0): + gray_count += 1 + elif (self.map[currpoint.x * self.size_x + currpoint.y] == 127): + white_count += 1 + + if (gray_count == 2 and black_count == 2): + extract_corner.append(currpoint) + self.map[contour[i].x * self.size_x + + contour[i].y] = external_corner_value + elif (white_count == 2 and black_count == 2): + self.map[contour[i].x * self.size_x + + contour[i].y] = inner_corner_value + + return extract_corner + + def statisticalLineLength(self, point, external_corner_value, inner_corner_value, valid_length): + if (self.upSearchStatisticalLineLength(point, external_corner_value, inner_corner_value, valid_length)): + return True + elif (self.downSearchStatisticalLineLength(point, external_corner_value, inner_corner_value, valid_length)): + return True + elif (self.leftSearchStatisticalLineLength(point, external_corner_value, inner_corner_value, valid_length)): + return True + elif (self.rightSearchStatisticalLineLength(point, external_corner_value, inner_corner_value, valid_length)): + return True + + return False + + def upSearchStatisticalLineLength(self, point, external_corner_value, inner_corner_value, valid_length): + if (point.x + 1 < self.tRect["y"] + self.tRect["width"] and self.map[(point.x + 1) * self.size_x + point.y] == 127): + idy = point.x + 1 + idx = point.y + line = [] + line.append(Point(idy, idx)) + + for j in range(idy, self.tRect["y"] + self.tRect["width"]): + if (self.map[j * self.size_x + idx] == 127): + black_count = 0 + left_and_right_neighbourhood = [[0, -1], [0, 1]] + + for k in range(2): + tmp_idy = j + left_and_right_neighbourhood[k][0] + tmp_idx = idx + left_and_right_neighbourhood[k][1] + + if (tmp_idx < self.tRect["x"] or tmp_idx >= self.tRect["x"] + self.tRect["height"] or tmp_idy < self.tRect["y"] or tmp_idy >= self.tRect["y"] + self.tRect["width"]): + continue + + if (self.map[tmp_idy * self.size_x + tmp_idx] == -128 or self.map[tmp_idy * self.size_x + tmp_idx] == inner_corner_value or self.map[tmp_idy * self.size_x + tmp_idx] == external_corner_value): + black_count += 1 + + if (black_count == 1): + line.append(Point(j, idx)) + else: + break + else: + break + + if (len(line) > valid_length): + return True + else: + return False + + return False + + def downSearchStatisticalLineLength(self, point, external_corner_value, inner_corner_value, valid_length): + if (point.x - 1 > self.tRect["y"] and self.map[(point.x - 1) * self.size_x + point.y] == 127): + idy = point.x - 1 + idx = point.y + line = [] + line.append(Point(idy, idx)) + + for j in range(idy, self.tRect["y"], -1): + if (self.map[j * self.size_x + idx] == 127): + black_count = 0 + left_and_right_neighbourhood = [[0, -1], [0, 1]] + + for k in range(2): + tmp_idy = j + left_and_right_neighbourhood[k][0] + tmp_idx = idx + left_and_right_neighbourhood[k][1] + + if (tmp_idy < self.tRect["y"] or tmp_idy >= self.tRect["y"] + self.tRect["width"] or tmp_idx < self.tRect["x"] or tmp_idx >= self.tRect["x"] + self.tRect["height"]): + continue + + if (self.map[tmp_idy * self.size_x + tmp_idx] == -128 or self.map[tmp_idy * self.size_x + tmp_idx] == inner_corner_value or self.map[tmp_idy * self.size_x + tmp_idx] == external_corner_value): + black_count += 1 + + if (black_count == 1): + line.append(Point(j, idx)) + else: + break + else: + break + + if (len(line) > valid_length): + return True + else: + return False + return False + + def leftSearchStatisticalLineLength(self, point, external_corner_value, inner_corner_value, valid_length): + if (point.y + 1 < self.tRect["x"] + self.tRect["height"] and self.map[point.x * self.size_x + point.y + 1] == 127): + idy = point.x + idx = point.y + 1 + line = [] + line.append(Point(idy, idx)) + + for j in range(idx, self.tRect["x"] + self.tRect["height"]): + if (self.map[idy * self.size_x + j] == 127): + black_count = 0 + up_and_down_neighbourhood = [[-1, 0], [1, 0]] + + for k in range(2): + tmp_idy = idy + up_and_down_neighbourhood[k][0] + tmp_idx = j + up_and_down_neighbourhood[k][1] + + if (tmp_idy < self.tRect["y"] or tmp_idy >= self.tRect["y"] + self.tRect["width"] or tmp_idx < self.tRect["x"] or tmp_idx >= self.tRect["x"] + self.tRect["height"]): + continue + + if (self.map[tmp_idy * self.size_x + tmp_idx] == -128 or self.map[tmp_idy * self.size_x + tmp_idx] == inner_corner_value or self.map[tmp_idy * self.size_x + tmp_idx] == external_corner_value): + black_count += 1 + + if (black_count == 1): + line.append(Point(idy, j)) + else: + break + else: + break + + if (len(line) > valid_length): + return True + else: + return False + return False + + def rightSearchStatisticalLineLength(self, point, external_corner_value, inner_corner_value, valid_length): + if (point.y - 1 > self.tRect["x"] and self.map[point.x * self.size_x + point.y - 1] == 127): + idy = point.x + idx = point.y - 1 + line = [] + line.append(Point(idy, idx)) + + for j in range(idx, self.tRect["x"], -1): + if (self.map[idy * self.size_x + j] == 127): + black_count = 0 + up_and_down_neighbourhood = [[-1, 0], [1, 0]] + + for k in range(2): + tmp_idy = idy + up_and_down_neighbourhood[k][0] + tmp_idx = j + up_and_down_neighbourhood[k][1] + + if (tmp_idx < self.tRect["x"] or tmp_idx >= self.tRect["x"] + self.tRect["height"] or tmp_idy < self.tRect["y"] or tmp_idy >= self.tRect["y"] + self.tRect["width"]): + continue + + if (self.map[tmp_idy * self.size_x + tmp_idx] == -128 or self.map[tmp_idy * self.size_x + tmp_idx] == inner_corner_value or self.map[tmp_idy * self.size_x + tmp_idx] == external_corner_value): + black_count += 1 + + if (black_count == 1): + line.append(Point(idy, j)) + else: + break + else: + break + + if (len(line) > valid_length): + return True + else: + return False + + return False + + def fourNeighbourhoodSearchForExtractCorners(self, point, fill_edges, delete_point, external_corner_value, inner_corner_value, valid_length, is_valid_length) -> tuple[list[Point], list[Point]]: + delete_point, fill_edges = self.upSearchForExtractCorners(point, fill_edges, delete_point, + external_corner_value, inner_corner_value, valid_length, is_valid_length) + delete_point, fill_edges = self.downSearchForExtractCorners( + point, fill_edges, delete_point, external_corner_value, inner_corner_value, valid_length, is_valid_length) + delete_point, fill_edges = self.leftSearchForExtractCorners( + point, fill_edges, delete_point, external_corner_value, inner_corner_value, valid_length, is_valid_length) + delete_point, fill_edges = self.rightSearchForExtractCorners( + point, fill_edges, delete_point, external_corner_value, inner_corner_value, valid_length, is_valid_length) + return delete_point, fill_edges + + def upSearchForExtractCorners(self, point, fill_edges, delete_point, external_corner_value, inner_corner_value, valid_length, is_valid_length) -> tuple[list[Point], list[Point]]: + if (point.x + 1 < self.tRect["y"] + self.tRect["width"] and self.map[(point.x + 1) * self.size_x + point.y] == 0): + idy = point.x + 1 + idx = point.y + line = [] + line.append(Point(idy, idx)) + + for j in range(idy, self.tRect["y"] + self.tRect["width"]): + if (self.map[j * self.size_x + idx] == 0): + black_count = 0 + left_and_right_neighbourhood = [[0, -1], [0, 1]] + + for k in range(2): + tmp_idy = j + left_and_right_neighbourhood[k][0] + tmp_idx = idx + left_and_right_neighbourhood[k][1] + + if (tmp_idx < self.tRect["x"] or tmp_idx >= self.tRect["x"] + self.tRect["height"] or tmp_idy < self.tRect["y"] or tmp_idy >= self.tRect["y"] + self.tRect["width"]): + continue + + if (self.map[tmp_idy * self.size_x + tmp_idx] == -128 or self.map[tmp_idy * self.size_x + tmp_idx] == external_corner_value or self.map[tmp_idy * self.size_x + tmp_idx] == inner_corner_value): + black_count += 1 + + if (black_count == 1): + line.append(Point(j, idx)) + else: + break + else: + break + + if (is_valid_length and len(line) > 1): + line.append(point) + fill_edges.append(line) + + for i in range(len(line)): + _left_and_right_neighbourhood = [[0, -1], [0, 1]] + + for k in range(2): + tmp_idy = line[i].x + \ + _left_and_right_neighbourhood[k][0] + tmp_idx = line[i].y + \ + _left_and_right_neighbourhood[k][1] + + if (tmp_idx < self.tRect["x"] or tmp_idx >= self.tRect["x"] + self.tRect["height"] or tmp_idy < self.tRect["y"] or tmp_idy >= self.tRect["y"] + self.tRect["width"]): + continue + + if (self.map[tmp_idy * self.size_x + tmp_idx] == -128 or self.map[tmp_idy * self.size_x + tmp_idx] == inner_corner_value): + self.map[tmp_idy * self.size_x + tmp_idx] = 127 + delete_point.append(Point(tmp_idy, tmp_idx)) + elif (len(line) > valid_length): + line.append(point) + fill_edges.append(line) + + for i in range(len(line)): + _left_and_right_neighbourhood2 = [[0, -1], [0, 1]] + + for k in range(2): + tmp_idy = line[i].x + \ + _left_and_right_neighbourhood2[k][0] + tmp_idx = line[i].y + \ + _left_and_right_neighbourhood2[k][1] + + if (tmp_idx < self.tRect["x"] or tmp_idx >= self.tRect["x"] + self.tRect["height"] or tmp_idy < self.tRect["y"] or tmp_idy >= self.tRect["y"] + self.tRect["width"]): + continue + + if (self.map[tmp_idy * self.size_x + tmp_idx] == -128 or self.map[tmp_idy * self.size_x + tmp_idx] == inner_corner_value): + self.map[tmp_idy * self.size_x + tmp_idx] = 127 + delete_point.append(Point(tmp_idy, tmp_idx)) + else: + line = [] + + return delete_point, fill_edges + + def downSearchForExtractCorners(self, point, fill_edges, delete_point, external_corner_value, inner_corner_value, valid_length, is_valid_length) -> tuple[list[Point], list[Point]]: + if (point.x - 1 > self.tRect["y"] and self.map[(point.x - 1) * self.size_x + point.y] == 0): + idy = point.x - 1 + idx = point.y + line = [] + line.append(Point(idy, idx)) + + for j in range(idy, self.tRect["y"], -1): + if (self.map[j * self.size_x + idx] == 0): + black_count = 0 + left_and_right_neighbourhood = [[0, -1], [0, 1]] + + for k in range(2): + tmp_idy = j + left_and_right_neighbourhood[k][0] + tmp_idx = idx + left_and_right_neighbourhood[k][1] + + if (tmp_idy < self.tRect["y"] or tmp_idy >= self.tRect["y"] + self.tRect["width"] or tmp_idx < self.tRect["x"] or tmp_idx >= self.tRect["x"] + self.tRect["height"]): + continue + + if (self.map[tmp_idy * self.size_x + tmp_idx] == -128 or self.map[tmp_idy * self.size_x + tmp_idx] == external_corner_value or self.map[tmp_idy * self.size_x + tmp_idx] == inner_corner_value): + black_count += 1 + + if (black_count == 1): + line.append(Point(j, idx)) + else: + break + else: + break + + if (is_valid_length and len(line) > 1): + line.append(point) + fill_edges.append(line) + + for i in range(len(line)): + _left_and_right_neighbourhood3 = [[0, -1], [0, 1]] + + for k in range(2): + tmp_idy = line[i].x + \ + _left_and_right_neighbourhood3[k][0] + tmp_idx = line[i].y + \ + _left_and_right_neighbourhood3[k][1] + + if (tmp_idy < self.tRect["y"] or tmp_idy >= self.tRect["y"] + self.tRect["width"] or tmp_idx < self.tRect["x"] or tmp_idx >= self.tRect["x"] + self.tRect["height"]): + continue + + if (self.map[tmp_idy * self.size_x + tmp_idx] == -128 or self.map[tmp_idy * self.size_x + tmp_idx] == inner_corner_value): + self.map[tmp_idy * self.size_x + tmp_idx] = 127 + delete_point.append(Point(tmp_idy, tmp_idx)) + elif (len(line) > valid_length): + line.append(point) + fill_edges.append(line) + + for i in range(len(line)): + _left_and_right_neighbourhood4 = [[0, -1], [0, 1]] + + for k in range(2): + tmp_idy = line[i].x + \ + _left_and_right_neighbourhood4[k][0] + tmp_idx = line[i].y + \ + _left_and_right_neighbourhood4[k][1] + + if (tmp_idy < self.tRect["y"] or tmp_idy >= self.tRect["y"] + self.tRect["width"] or tmp_idx < self.tRect["x"] or tmp_idx >= self.tRect["x"] + self.tRect["height"]): + continue + + if (self.map[tmp_idy * self.size_x + tmp_idx] == -128 or self.map[tmp_idy * self.size_x + tmp_idx] == inner_corner_value): + self.map[tmp_idy * self.size_x + tmp_idx] = 127 + delete_point.append(Point(tmp_idy, tmp_idx)) + else: + line = [] + + return delete_point, fill_edges + + def leftSearchForExtractCorners(self, point, fill_edges, delete_point, external_corner_value, inner_corner_value, valid_length, is_valid_length) -> tuple[list[Point], list[Point]]: + if (point.y + 1 < self.tRect["x"] + self.tRect["height"] and self.map[point.x * self.size_x + point.y + 1] == 0): + idy = point.x + idx = point.y + 1 + line = [] + line.append(Point(idy, idx)) + + for j in range(idx, self.tRect["x"] + self.tRect["height"]): + if (self.map[idy * self.size_x + j] == 0): + black_count = 0 + up_and_down_neighbourhood = [[-1, 0], [1, 0]] + + for k in range(2): + tmp_idy = idy + up_and_down_neighbourhood[k][0] + tmp_idx = j + up_and_down_neighbourhood[k][1] + + if (tmp_idy < self.tRect["y"] or tmp_idy >= self.tRect["y"] + self.tRect["width"] or tmp_idx < self.tRect["x"] or tmp_idx >= self.tRect["x"] + self.tRect["height"]): + continue + + if (self.map[tmp_idy * self.size_x + tmp_idx] == -128 or self.map[tmp_idy * self.size_x + tmp_idx] == external_corner_value or self.map[tmp_idy * self.size_x + tmp_idx] == inner_corner_value): + black_count += 1 + + if (black_count == 1): + line.append(Point(idy, j)) + else: + break + else: + break + + if (is_valid_length and len(line) > 1): + line.append(point) + fill_edges.append(line) + + for i in range(len(line)): + _up_and_down_neighbourhood = [[-1, 0], [1, 0]] + + for k in range(2): + tmp_idy = line[i].x + _up_and_down_neighbourhood[k][0] + tmp_idx = line[i].y + _up_and_down_neighbourhood[k][1] + + if (tmp_idy < self.tRect["y"] or tmp_idy >= self.tRect["y"] + self.tRect["width"] or tmp_idx < self.tRect["x"] or tmp_idx >= self.tRect["x"] + self.tRect["height"]): + continue + + if (self.map[tmp_idy * self.size_x + tmp_idx] == -128 or self.map[tmp_idy * self.size_x + tmp_idx] == inner_corner_value): + self.map[tmp_idy * self.size_x + tmp_idx] = 127 + delete_point.append(Point(tmp_idy, tmp_idx)) + elif (len(line) > valid_length): + line.append(point) + fill_edges.append(line) + + for i in range(len(line)): + _up_and_down_neighbourhood2 = [[-1, 0], [1, 0]] + + for k in range(2): + tmp_idy = line[i].x + _up_and_down_neighbourhood2[k][0] + tmp_idx = line[i].y + _up_and_down_neighbourhood2[k][1] + + if (tmp_idy < self.tRect["y"] or tmp_idy >= self.tRect["y"] + self.tRect["width"] or tmp_idx < self.tRect["x"] or tmp_idx >= self.tRect["x"] + self.tRect["height"]): + continue + + if (self.map[tmp_idy * self.size_x + tmp_idx] == -128 or self.map[tmp_idy * self.size_x + tmp_idx] == inner_corner_value): + self.map[tmp_idy * self.size_x + tmp_idx] = 127 + delete_point.append(Point(tmp_idy, tmp_idx)) + else: + line = [] + + return delete_point, fill_edges + + def rightSearchForExtractCorners(self, point, fill_edges, delete_point: list[Point], external_corner_value, inner_corner_value, valid_length, is_valid_length) -> tuple[list[Point], list[Point]]: + if (point.y - 1 > self.tRect["x"] and self.map[point.x * self.size_x + point.y - 1] == 0): + idy = point.x + idx = point.y - 1 + line = [] + line.append(Point(idy, idx)) + + for j in range(idx, self.tRect["x"], -1): + if (self.map[idy * self.size_x + j] == 0): + black_count = 0 + up_and_down_neighbourhood = [[-1, 0], [1, 0]] + + for k in range(2): + tmp_idy = idy + up_and_down_neighbourhood[k][0] + tmp_idx = j + up_and_down_neighbourhood[k][1] + + if (tmp_idx < self.tRect["x"] or tmp_idx >= self.tRect["x"] + self.tRect["height"] or tmp_idy < self.tRect["y"] or tmp_idy >= self.tRect["y"] + self.tRect["width"]): + continue + + if (self.map[tmp_idy * self.size_x + tmp_idx] == -128 or self.map[tmp_idy * self.size_x + tmp_idx] == external_corner_value or self.map[tmp_idy * self.size_x + tmp_idx] == inner_corner_value): + black_count += 1 + + if (black_count == 1): + line.append(Point(idy, j)) + else: + break + else: + break + + if (is_valid_length and len(line) > 1): + line.append(point) + fill_edges.append(line) + + for i in range(len(line)): + _up_and_down_neighbourhood3 = [[-1, 0], [1, 0]] + + for k in range(2): + tmp_idy = line[i].x + _up_and_down_neighbourhood3[k][0] + tmp_idx = line[i].y + _up_and_down_neighbourhood3[k][1] + + if (tmp_idx < self.tRect["x"] or tmp_idx >= self.tRect["x"] + self.tRect["height"] or tmp_idy < self.tRect["y"] or tmp_idy >= self.tRect["y"] + self.tRect["width"]): + continue + + if (self.map[tmp_idy * self.size_x + tmp_idx] == -128 or self.map[tmp_idy * self.size_x + tmp_idx] == inner_corner_value): + self.map[tmp_idy * self.size_x + tmp_idx] = 127 + delete_point.append(Point(tmp_idy, tmp_idx)) + elif (len(line) > valid_length): + line.append(point) + fill_edges.append(line) + + for i in range(len(line)): + _up_and_down_neighbourhood4 = [[-1, 0], [1, 0]] + + for k in range(2): + tmp_idy = line[i].x + _up_and_down_neighbourhood4[k][0] + tmp_idx = line[i].y + _up_and_down_neighbourhood4[k][1] + + if (tmp_idx < self.tRect["x"] or tmp_idx >= self.tRect["x"] + self.tRect["height"] or tmp_idy < self.tRect["y"] or tmp_idy >= self.tRect["y"] + self.tRect["width"]): + continue + + if (self.map[tmp_idy * self.size_x + tmp_idx] == -128 or self.map[tmp_idy * self.size_x + tmp_idx] == inner_corner_value): + self.map[tmp_idy * self.size_x + tmp_idx] = 127 + delete_point.append(Point(tmp_idy, tmp_idx)) + else: + line = [] + + return delete_point, fill_edges + + def fillEdges(self, contour: list[Point], fill_edges: list[Point], value): + for i in range(len(fill_edges)): + edge = fill_edges[i] + + for j in range(len(edge)): + self.map[edge[j].x * self.size_x + edge[j].y] = value + contour.append(edge[j]) + return contour + + def fillBlackComponent(self, black_region: list[Point], value): + for i in range(len(black_region)): + self.map[black_region[i].x * + self.size_x + black_region[i].y] = value + + def fillNonBoundaryNoise2(self, nonBoundaryNoise: list[Point]): + four_neighbourhood = [[5, 0, 4, 0, 3, 0, 2, 0, 1, 0], [0, 5, 0, 4, 0, 3, 0, 2, 0, 1], + [-5, 0, -4, 0, -3, 0, -2, 0, -1, 0], [0, -5, 0, -4, 0, -3, 0, -2, 0, -1]] + + for i in range(len(nonBoundaryNoise)): + p = nonBoundaryNoise[i] + self.map[p.x * self.size_x + p.y] = 28 + + for neighbourhood in four_neighbourhood: + tmp_p5 = Point(p.x + neighbourhood[1], p.y + neighbourhood[0]) + tmp_p4 = Point(p.x + neighbourhood[3], p.y + neighbourhood[2]) + tmp_p3 = Point(p.x + neighbourhood[5], p.y + neighbourhood[4]) + tmp_p2 = Point(p.x + neighbourhood[7], p.y + neighbourhood[6]) + tmp_p1 = Point(p.x + neighbourhood[9], p.y + neighbourhood[8]) + + if (tmp_p5.x < self.tRect["y"] or tmp_p5.x >= self.tRect["y"] + self.tRect["width"] or tmp_p5.y < self.tRect["x"] or tmp_p5.y >= self.tRect["x"] + self.tRect["height"] or tmp_p4.x < self.tRect["y"] or tmp_p4.x >= self.tRect["y"] + self.tRect["width"] or tmp_p4.y < self.tRect["x"] or tmp_p4.y >= self.tRect["x"] + self.tRect["height"] or tmp_p3.x < self.tRect["y"] or tmp_p3.x >= self.tRect["y"] + self.tRect["width"] or tmp_p3.y < self.tRect["x"] or tmp_p3.y >= self.tRect["x"] + self.tRect["height"] or tmp_p2.x < self.tRect["y"] or tmp_p2.x >= self.tRect["y"] + self.tRect["width"] or tmp_p2.y < self.tRect["x"] or tmp_p2.y >= self.tRect["x"] + self.tRect["height"] or tmp_p3.x < self.tRect["y"] or tmp_p3.x >= self.tRect["y"] + self.tRect["width"] or tmp_p1.y < self.tRect["x"] or tmp_p1.y >= self.tRect["x"] + self.tRect["height"]): + continue + if self.map[tmp_p3.x * self.size_x + tmp_p1.y] == 127: + if self.map[tmp_p5.x * self.size_x + tmp_p5.y] == -128 and self.map[tmp_p4.x * self.size_x + tmp_p4.y] == 127 and self.map[tmp_p3.x * self.size_x + tmp_p3.y] == 127 and self.map[tmp_p2.x * self.size_x + tmp_p2.y] == 127: + nonBoundaryNoise.append(tmp_p4) + nonBoundaryNoise.append(tmp_p3) + nonBoundaryNoise.append(tmp_p2) + nonBoundaryNoise.append(tmp_p1) + break + elif self.map[tmp_p4.x * self.size_x + tmp_p4.y] == -128 and self.map[tmp_p3.x * self.size_x + tmp_p3.y] == 127 and self.map[tmp_p2.x * self.size_x + tmp_p2.y] == 127: + nonBoundaryNoise.append(tmp_p3) + nonBoundaryNoise.append(tmp_p2) + nonBoundaryNoise.append(tmp_p1) + break + elif self.map[tmp_p3.x * self.size_x + tmp_p3.y] == -128 and self.map[tmp_p2.x * self.size_x + tmp_p2.y] == 127: + nonBoundaryNoise.append(tmp_p2) + nonBoundaryNoise.append(tmp_p1) + break + elif self.map[tmp_p2.x * self.size_x + tmp_p2.y] == -128: + nonBoundaryNoise.append(tmp_p1) + break + + for p in nonBoundaryNoise: + self.map[p.x * self.size_x + p.y] = -128 + + return nonBoundaryNoise + + def roomColorByChain(self, roomChain: collections.abc.Iterable[RobotMap.DeviceRoomChainDataInfo]): + for offset in range(len(self.map)): + match self.map[offset]: + case -128: self.map[offset] = -1 + case 127: self.map[offset] = 1 + + for room in roomChain: + self.floodFillSingleChain(room.points, room.roomId) + + def floodFillSingleChain(self, room_points, roomId): + dst = [roomId] * len(self.map) + + for p in room_points: + dst[p.y * self.size_x + p.x] = 0 + + dst = self.scanLineFloodFill(dst, Point(1, 1), roomId, 0) + + for p in room_points: + dst[p.y * self.size_x + p.x] = roomId + + for offset in range(len(self.map)): + if (dst[offset] == roomId and self.map[offset] not in [-1, 0, -9]): + self.map[offset] = dst[offset] + + if (len(room_points) > 3): + for p in room_points[1:-1]: + for di in range(-2, 3): + for dj in range(-2, 3): + offset = (p.y + di) * self.size_x + p.x + dj + if (p.y + di >= 0 and p.y + di < self.size_y and p.x + dj >= 0 and p.x + dj < self.size_x) and self.map[offset] == 1: + self.map[offset] = roomId + + def scanLineFloodFill(self, dst: list[int], initial_seed: Point, raw_value, new_value): + scan_line_seed = [initial_seed] + tempDst = None + + while (len(scan_line_seed) > 0): + seed = scan_line_seed[0] + scan_line_seed.pop(0) + tempDst, x_left = self.floodFillLine( + dst, seed, -1, raw_value, new_value) + + tempDst, x_right = self.floodFillLine( + tempDst, seed, 1, raw_value, new_value) + + scan_line_seed = self.searchLineForNewSeed( + tempDst, x_left, x_right, seed.y - 1, raw_value, scan_line_seed) + scan_line_seed = self.searchLineForNewSeed( + tempDst, x_left, x_right, seed.y + 1, raw_value, scan_line_seed) + + return tempDst + + def floodFillLine(self, dst: list[int], initial_seed, direction, raw_value, new_value) -> tuple[list, int]: + row = initial_seed.y + col = initial_seed.x + boundary = col + + if (direction > 0): + col += direction + + while (col >= 0 and col < self.size_x): + if (dst[row * self.size_x + col] == raw_value): + boundary = col + dst[row * self.size_x + col] = new_value + col += direction + else: + break + + return dst, boundary + + def searchLineForNewSeed(self, dst: list[int], x_left, x_right, line_row, raw_value, scan_line_seed: list[Point]): + if (line_row < 0 or line_row > self.size_y - 1): + return scan_line_seed + + x_right_copy = x_right + is_find_seed = False + + while (x_right_copy >= x_left): + if (dst[line_row * self.size_x + x_right_copy] == raw_value): + if (not is_find_seed): + seed = Point(x_right_copy, line_row) + scan_line_seed.append(seed) + is_find_seed = True + else: + is_find_seed = False + + x_right_copy -= 1 + + return scan_line_seed + + def fillInternalObstacles(self) -> None: + if (self.tRect["width"] == 0 and self.tRect["height"] == 0): + self.findRoiMap() + + contour = self.extractExternalContoursNewStrategy([]) + + contour = self.findContourConnectComponent(contour) + + self.fillBlackComponent(contour, 30) + internal_obstacles = self.findInternalObstacles([]) + + self.fillBlackComponent(internal_obstacles, -9) + + def findContourConnectComponent(self, contour: list[Point]): + eight_neighbourhood = [[-1, 0], [1, 0], [0, -1], + [0, 1], [-1, 1], [1, 1], [1, -1], [-1, -1]] + + while (len(contour) != 0): + seed = contour.pop(0) + + for k in range(8): + currpoint = Point( + seed.x + eight_neighbourhood[k][0], seed.y + eight_neighbourhood[k][1]) + + if (self.map[currpoint.x * self.size_x + currpoint.y] != -128 or currpoint.x < self.tRect["y"] or currpoint.x >= self.tRect["y"] + self.tRect["width"] or currpoint.y < self.tRect["x"] or currpoint.y >= self.tRect["x"] + self.tRect["height"]): + continue + + self.map[currpoint.x * self.size_x + currpoint.y] = 30 + contour.append(currpoint) + + return contour + + def findInternalObstacles(self, point_deque: list[Point]): + for idy in range(self.tRect["y"], self.tRect["y"] + self.tRect["width"]): + for idx in range(self.tRect["x"], self.tRect["x"] + self.tRect["height"]): + if (self.map[idy * self.size_x + idx] == -128): + point_deque.append(Point(idy, idx)) + return point_deque diff --git a/src/vacuum_map_parser_ijai/ijai_coordinate_transforms.py b/src/vacuum_map_parser_ijai/ijai_coordinate_transforms.py new file mode 100644 index 0000000..815e236 --- /dev/null +++ b/src/vacuum_map_parser_ijai/ijai_coordinate_transforms.py @@ -0,0 +1,23 @@ +"""Module for transforming coordinates.""" +from vacuum_map_parser_base.map_data import Point + +import vacuum_map_parser_ijai.RobotMap_pb2 as RobotMap + + +class Transformer: # pylint: disable=E1101 + """Class for transforming coordinates.""" + + def __init__(self, robotmap: RobotMap.RobotMap): + self.map_head = robotmap.mapHead + self.to_image_multiplier = Point(self.map_head.sizeX/(self.map_head.maxX - self.map_head.minX), + self.map_head.sizeY/(self.map_head.maxY - self.map_head.minY)) + + def map_to_image(self, pt: Point) -> Point: + return Point((pt.x - self.map_head.minX) * self.to_image_multiplier.x, + (pt.y - self.map_head.minY) * self.to_image_multiplier.y) + + def image_to_map_x(self, x: int) -> float: + return x/self.to_image_multiplier.x + self.map_head.minX + + def image_to_map_y(self, y: int) -> float: + return y/self.to_image_multiplier.y + self.map_head.minY diff --git a/src/vacuum_map_parser_ijai/image_parser.py b/src/vacuum_map_parser_ijai/image_parser.py index d13eccf..0126fe4 100644 --- a/src/vacuum_map_parser_ijai/image_parser.py +++ b/src/vacuum_map_parser_ijai/image_parser.py @@ -5,19 +5,16 @@ from PIL import Image from PIL.Image import Image as ImageType from PIL.Image import Resampling - from vacuum_map_parser_base.config.color import ColorsPalette, SupportedColor from vacuum_map_parser_base.config.drawable import Drawable from vacuum_map_parser_base.config.image_config import ImageConfig from vacuum_map_parser_base.map_data import Point -from .parsing_buffer import ParsingBuffer - _LOGGER = logging.getLogger(__name__) class IjaiImageParser: - + """Ijai map image parser.""" MAP_OUTSIDE = 0x00 @@ -33,17 +30,18 @@ def __init__(self, palette: ColorsPalette, image_config: ImageConfig, drawables: self._palette = palette self._image_config = image_config self._drawables = drawables + self.color_map = { + IjaiImageParser.MAP_OUTSIDE: palette.get_color(SupportedColor.MAP_OUTSIDE), + IjaiImageParser.MAP_WALL: palette.get_color(SupportedColor.MAP_WALL_V2), + IjaiImageParser.MAP_SCAN: palette.get_color(SupportedColor.SCAN), + IjaiImageParser.MAP_NEW_DISCOVERED_AREA: palette.get_color(SupportedColor.NEW_DISCOVERED_AREA)} def parse( - self, buf: ParsingBuffer, width: int, height: int - ) -> tuple[ImageType | None, dict[int, tuple[int, int, int, int]], set[int], ImageType | None]: - color_map = {\ - IjaiImageParser.MAP_OUTSIDE: self._palette.get_color(SupportedColor.MAP_OUTSIDE),\ - IjaiImageParser.MAP_WALL:self._palette.get_color(SupportedColor.MAP_WALL_V2),\ - IjaiImageParser.MAP_SCAN: self._palette.get_color(SupportedColor.SCAN),\ - IjaiImageParser.MAP_NEW_DISCOVERED_AREA: self._palette.get_color(SupportedColor.NEW_DISCOVERED_AREA)} - rooms = {} + self, map_data: bytes, width: int, height: int + ) -> tuple[ImageType | None, dict[int, tuple[int, int, int, int]], set[int]]: + rooms: dict[int, tuple[int, int, int, int]] = {} cleaned_areas = set() + _LOGGER.debug("ijai parser: image_config = %s", self._image_config) scale = self._image_config.scale trim_left = int(self._image_config.trim.left * width / 100) trim_right = int(self._image_config.trim.right * width / 100) @@ -52,62 +50,54 @@ def parse( trimmed_height = height - trim_top - trim_bottom trimmed_width = width - trim_left - trim_right if trimmed_width == 0 or trimmed_height == 0: - return None, {}, set(), None + return None, {}, set() + image = Image.new('RGBA', (trimmed_width, trimmed_height)) pixels = image.load() - cleaned_areas_layer = None - cleaned_areas_pixels = None - draw_cleaned_area = Drawable.CLEANED_AREA in self._drawables - if draw_cleaned_area: - cleaned_areas_layer = Image.new("RGBA", (trimmed_width, trimmed_height)) - cleaned_areas_pixels = cleaned_areas_layer.load() - _LOGGER.debug(f"trim_bottom = {trim_bottom}, trim_top = {trim_top}, trim_left = {trim_left}, trim_right = {trim_right}") - buf.skip('trim_bottom', trim_bottom * width) + _LOGGER.debug("trim_bottom = %s, trim_top = %s, trim_left = %s, trim_right = %s", + trim_bottom, trim_top, trim_left, trim_right) unknown_pixels = set() for img_y in range(trimmed_height): - buf.skip('trim_left', trim_left) + y = trimmed_height - 1 - img_y for img_x in range(trimmed_width): - pixel_type = buf.get_uint8('pixel') x = img_x - y = trimmed_height - 1 - img_y - if pixel_type in color_map.keys(): - pixels[x, y] = color_map[pixel_type] + pixel_type = map_data[(img_y + trim_bottom) + * width + x + trim_left] + if pixel_type in self.color_map: + pixels[x, y] = self.color_map[pixel_type] elif IjaiImageParser.MAP_ROOM_MIN <= pixel_type <= IjaiImageParser.MAP_SELECTED_ROOM_MAX: room_x = img_x + trim_left room_y = img_y + trim_bottom - if pixel_type < IjaiImageParser.MAP_SELECTED_ROOM_MIN: - room_number = pixel_type - else: - room_number = pixel_type - IjaiImageParser.MAP_SELECTED_ROOM_MIN + IjaiImageParser.MAP_ROOM_MIN + room_number = pixel_type + if pixel_type >= IjaiImageParser.MAP_SELECTED_ROOM_MIN: + room_number = pixel_type - IjaiImageParser.MAP_SELECTED_ROOM_MIN + \ + IjaiImageParser.MAP_ROOM_MIN cleaned_areas.add(room_number) - if draw_cleaned_area: - cleaned_areas_pixels[x, y] = IjaiImageParser.get_color(SupportedColor.CLEANED_AREA) - if room_number not in rooms: - rooms[room_number] = (room_x, room_y, room_x, room_y) - else: - rooms[room_number] = (min(rooms[room_number][0], room_x), - min(rooms[room_number][1], room_y), - max(rooms[room_number][2], room_x), - max(rooms[room_number][3], room_y)) + rooms[room_number] = (room_x, room_y, room_x, room_y) \ + if room_number not in rooms \ + else (min(rooms[room_number][0], room_x), + min(rooms[room_number][1], room_y), + max(rooms[room_number][2], room_x), + max(rooms[room_number][3], room_y)) pixels[x, y] = self._palette.get_room_color(room_number) else: - pixels[x, y] = IjaiImageParser.get_color(SupportedColor.UNKNOWN) + pixels[x, y] = self._palette.get_color( + SupportedColor.UNKNOWN) unknown_pixels.add(pixel_type) - _LOGGER.debug(f"unknown pixel [{x},{y}] = {pixel_type}") - buf.skip('trim_right', trim_right) - buf.skip('trim_top', trim_top * width) + _LOGGER.debug( + "unknown pixel [%s,%s] = %s", x, y, pixel_type) if self._image_config.scale != 1 and trimmed_width != 0 and trimmed_height != 0: - image = image.resize((int(trimmed_width * scale), int(trimmed_height * scale)), resample=Resampling.NEAREST) - if draw_cleaned_area: - cleaned_areas_layer = cleaned_areas_layer.resize( - (int(trimmed_width * scale), int(trimmed_height * scale)), resample=Image.NEAREST) + image = image.resize( + (int(trimmed_width * scale), int(trimmed_height * scale)), resample=Resampling.NEAREST) if len(unknown_pixels) > 0: _LOGGER.warning('unknown pixel_types: %s', unknown_pixels) - return image, rooms, cleaned_areas, cleaned_areas_layer + return image, rooms, cleaned_areas @staticmethod - def get_current_vacuum_room(buf: ParsingBuffer, vacuum_position_on_image: Point) -> int | None: - pixel_type = buf.get_at_image(int(vacuum_position_on_image.y) * 800 + int(vacuum_position_on_image.x)) + def get_current_vacuum_room(map_data: bytes, vacuum_position_on_image: Point, image_width: int) -> int | None: + _LOGGER.debug("pos on image: %s", vacuum_position_on_image) + pixel_type = map_data[int(vacuum_position_on_image.y) + * image_width + int(vacuum_position_on_image.x)] if IjaiImageParser.MAP_ROOM_MIN <= pixel_type <= IjaiImageParser.MAP_ROOM_MAX: return pixel_type if IjaiImageParser.MAP_SELECTED_ROOM_MIN <= pixel_type <= IjaiImageParser.MAP_SELECTED_ROOM_MAX: diff --git a/src/vacuum_map_parser_ijai/map_data_parser.py b/src/vacuum_map_parser_ijai/map_data_parser.py index 3260e07..a9690fe 100644 --- a/src/vacuum_map_parser_ijai/map_data_parser.py +++ b/src/vacuum_map_parser_ijai/map_data_parser.py @@ -13,27 +13,26 @@ from vacuum_map_parser_base.map_data import Area, ImageData, MapData, Path, Point, Room, Wall, Zone from vacuum_map_parser_base.map_data_parser import MapDataParser -from .image_parser import IjaiImageParser -from .parsing_buffer import ParsingBuffer +import vacuum_map_parser_ijai.beautify_min as Beautify +import vacuum_map_parser_ijai.RobotMap_pb2 as RobotMap + from .aes_decryptor import decrypt +from .ijai_coordinate_transforms import Transformer +from .image_parser import IjaiImageParser _LOGGER = logging.getLogger(__name__) class IjaiMapDataParser(MapDataParser): """Ijai map parser.""" - - FEATURE_ROBOT_STATUS = 0x00000001 - FEATURE_IMAGE = 0x00000002 - FEATURE_HISTORY = 0x00000004 - FEATURE_CHARGE_STATION = 0x00000008 - FEATURE_RESTRICTED_AREAS = 0x00000010 - FEATURE_CLEANING_AREAS = 0x00000020 - FEATURE_NAVIGATE = 0x00000040 - FEATURE_REALTIME = 0x00000080 - FEATURE_ROOMS = 0x00001000 POSITION_UNKNOWN = 1100 + VIRTUALWALL_TYPE_WALL = 2 + VIRTUALWALL_TYPE_NO_MOP = 6 + VIRTUALWALL_TYPE_NO_GO = 3 + + # pylint: disable=E1101 + robot_map = RobotMap.RobotMap() def __init__( self, @@ -41,136 +40,115 @@ def __init__( sizes: Sizes, drawables: list[Drawable], image_config: ImageConfig, - texts: list[Text], + texts: list[Text] ): super().__init__(palette, sizes, drawables, image_config, texts) self._image_parser = IjaiImageParser(palette, image_config, drawables) def unpack_map(self, raw_encoded: bytes, *args: Any, **kwargs: Any) -> bytes: - return zlib.decompress( decrypt( - raw_encoded, - kwargs['wifi_sn'], - kwargs['owner_id'], - kwargs['device_id'], - kwargs['model'], + raw_encoded, + kwargs['wifi_sn'], + kwargs['owner_id'], + kwargs['device_id'], + kwargs['model'], kwargs['device_mac'])) def parse(self, raw: bytes, *args: Any, **kwargs: Any) -> MapData: map_data = MapData(0, 1) - buf = ParsingBuffer('header', raw, 0, len(raw)) - - buf.get_uint8('id1') - buf.get_uint16('magic1') - offset1 = buf.get_uint8('offset1') - 1 - - buf.skip('unknown_hdr1', offset1) - _LOGGER.debug(f"Skipping {offset1} bytes, value: {buf.data[buf.offs]:#x}") - - feature_flags = IjaiMapDataParser.FEATURE_IMAGE - map_id = buf.peek_uint32('map_id') - - - if feature_flags & IjaiMapDataParser.FEATURE_ROBOT_STATUS != 0: - IjaiMapDataParser.parse_section(buf, 'robot_status', map_id) - buf.skip('unknown1', 0x28) - - if feature_flags & IjaiMapDataParser.FEATURE_IMAGE != 0: - buf.set_name('image') - IjaiMapDataParser._parse_section(buf, "image", map_id) - map_data.image, map_data.rooms, map_data.cleaned_rooms = self._parse_image(buf) - - if feature_flags & IjaiMapDataParser.FEATURE_HISTORY != 0: - IjaiMapDataParser._parse_section(buf, "history", map_id) - map_data.path = IjaiMapDataParser._parse_history(buf) - - if feature_flags & IjaiMapDataParser.FEATURE_CHARGE_STATION != 0: - IjaiMapDataParser._parse_section(buf, "charge_station", map_id) - map_data.charger = IjaiMapDataParser._parse_position(buf, "pos", with_angle=True) + + self.robot_map.ParseFromString(raw) + # pylint: disable=W0201 + self.coord_transformer = Transformer(self.robot_map) + + if hasattr(self.robot_map, "mapData"): + map_data.image, map_data.rooms, map_data.cleaned_rooms = self._parse_image() + + if hasattr(self.robot_map, "historyPose"): + map_data.path = IjaiMapDataParser._parse_history() + + if hasattr(self.robot_map, "chargeStation"): + pos_info = self.robot_map.chargeStation + map_data.charger = Point( + x=pos_info.x, y=pos_info.y, a=pos_info.phi * 180 / math.pi) _LOGGER.debug("pos: %s", map_data.charger) - if feature_flags & IjaiMapDataParser.FEATURE_RESTRICTED_AREAS != 0: - IjaiMapDataParser._parse_section(buf, "restricted_areas", map_id) - map_data.walls, map_data.no_go_areas = IjaiMapDataParser._parse_restricted_areas(buf) - - if feature_flags & IjaiMapDataParser.FEATURE_CLEANING_AREAS != 0: - IjaiMapDataParser._parse_section(buf, "cleaning_areas", map_id) - map_data.zones = IjaiMapDataParser._parse_cleaning_areas(buf) - - if feature_flags & IjaiMapDataParser.FEATURE_NAVIGATE != 0: - IjaiMapDataParser._parse_section(buf, "navigate", map_id) - buf.skip("unknown1", 4) - map_data.goto = IjaiMapDataParser._parse_position(buf, "pos") - value = buf.get_float32("value") - _LOGGER.debug("pos: %s, value: %f", map_data.goto, value) - - if feature_flags & IjaiMapDataParser.FEATURE_REALTIME != 0: - IjaiMapDataParser._parse_section(buf, "realtime", map_id) - buf.skip("unknown1", 5) - map_data.vacuum_position = IjaiMapDataParser._parse_position(buf, "pos", with_angle=True) + if hasattr(self.robot_map, "currentPose"): + pos_info = self.robot_map.currentPose + map_data.vacuum_position = Point( + x=pos_info.x, y=pos_info.y, a=pos_info.phi * 180 / math.pi) _LOGGER.debug("pos: %s", map_data.vacuum_position) - if feature_flags & 0x00000800 != 0: - IjaiMapDataParser._parse_section(buf, "unknown1", map_id) - IjaiMapDataParser._parse_unknown_section(buf) - - if feature_flags & IjaiMapDataParser.FEATURE_ROOMS != 0 and map_data.rooms is not None: - IjaiMapDataParser._parse_section(buf, "rooms", map_id) - IjaiMapDataParser._parse_rooms(buf, map_data.rooms) + if ( + hasattr(self.robot_map, "mapInfo") + and hasattr(self.robot_map, "roomDataInfo") + and map_data.rooms is not None): + IjaiMapDataParser._parse_rooms(map_data.rooms) - if feature_flags & 0x00002000 != 0: - IjaiMapDataParser._parse_section(buf, "unknown2", map_id) - IjaiMapDataParser._parse_unknown_section(buf) + if hasattr(self.robot_map, "virtualWalls"): + (map_data.walls, + map_data.no_go_areas, + map_data.no_mopping_areas) = IjaiMapDataParser._parse_restricted_areas() - if feature_flags & 0x00004000 != 0: - IjaiMapDataParser._parse_section(buf, "room_outlines", map_id) - IjaiMapDataParser._parse_room_outlines(buf) + if hasattr(self.robot_map, "areasInfo"): + map_data.zones = IjaiMapDataParser._parse_cleaning_zones() - buf.check_empty() + if hasattr(self.robot_map, "navigationPoints"): + map_data.goto = IjaiMapDataParser._parse_goto_point() if map_data.rooms is not None: - _LOGGER.debug("rooms: %s", [str(room) for number, room in map_data.rooms.items()]) - if map_data.image is not None and not map_data.image.is_empty: - self._image_generator.draw_map(map_data) + _LOGGER.debug("rooms: %s", [str(room) + for number, room in map_data.rooms.items()]) if map_data.rooms is not None and len(map_data.rooms) > 0 and map_data.vacuum_position is not None: - vacuum_position_on_image = IjaiMapDataParser._map_to_image(map_data.vacuum_position) - map_data.vacuum_room = IjaiImageParser.get_current_vacuum_room(buf, vacuum_position_on_image) + vacuum_position_on_image = self.coord_transformer.map_to_image( + map_data.vacuum_position) + map_data.vacuum_room = IjaiImageParser.get_current_vacuum_room( + self.robot_map.mapData.mapData, vacuum_position_on_image, IjaiMapDataParser.robot_map.mapHead.sizeX) if map_data.vacuum_room is not None: map_data.vacuum_room_name = map_data.rooms[map_data.vacuum_room].name _LOGGER.debug("current vacuum room: %s", map_data.vacuum_room) - return map_data - @staticmethod - def _map_to_image(p: Point) -> Point: - return Point(p.x * 20 + 400, p.y * 20 + 400) + if map_data.image is not None and not map_data.image.is_empty: + self._image_generator.draw_map(map_data) - @staticmethod - def _image_to_map(x: float) -> float: - return (x - 400) / 20 + return map_data - def _parse_image(self, buf: ParsingBuffer) -> tuple[ImageData, dict[int, Room], set[int]]: - buf.skip('unknown1', 0x6) - image_top = 0 + def _parse_image(self) -> tuple[ImageData, dict[int, Room], set[int]]: image_left = 0 - image_width = buf.get_uint16_remove_parity('image_width') - buf.skip('unknown2', 1) - image_height = buf.get_uint16_remove_parity('image_height') - buf.skip('unknown3', 0x21) + image_top = 0 + image_width = self.robot_map.mapHead.sizeX + image_height = self.robot_map.mapHead.sizeY image_size = image_height * image_width _LOGGER.debug("width: %d, height: %d", image_width, image_height) - buf.mark_as_image_beginning() - image, rooms_raw, cleaned_areas, cleaned_areas_layer = self._image_parser.parse(buf, image_width, image_height) + + # Non painted map tranformation + if ( + len(set(self.robot_map.mapData.mapData).symmetric_difference( + [0, 128, 127])) == 0 + and len(self.robot_map.roomChain) > 0 + and self.robot_map.mapType == 0): + buautify_obj = Beautify.BeautifyMap(self.robot_map.mapHead) + buautify_obj.setMap(self.robot_map.mapData) + buautify_obj.transform() + buautify_obj.roomColorByChain(self.robot_map.roomChain) + buautify_obj.fillInternalObstacles() + buautify_obj.normalizeMap() + self.robot_map.mapData.mapData = bytes(buautify_obj.getMap()) + + image, rooms_raw, cleaned_areas = self._image_parser.parse( + self.robot_map.mapData.mapData, image_width, image_height) if image is None: image = self._image_generator.create_empty_map_image() - _LOGGER.debug("img: number of rooms: %d, numbers: %s", len(rooms_raw), rooms_raw.keys()) + _LOGGER.debug("img: number of rooms: %d, numbers: %s", + len(rooms_raw), rooms_raw.keys()) rooms = {} for number, room in rooms_raw.items(): rooms[number] = Room( - IjaiMapDataParser._image_to_map(room[0] + image_left), - IjaiMapDataParser._image_to_map(room[1] + image_top), - IjaiMapDataParser._image_to_map(room[2] + image_left), - IjaiMapDataParser._image_to_map(room[3] + image_top), + self.coord_transformer.image_to_map_x(room[0] + image_left), + self.coord_transformer.image_to_map_y(room[1] + image_top), + self.coord_transformer.image_to_map_x(room[2] + image_left), + self.coord_transformer.image_to_map_y(room[3] + image_top), number, ) return ( @@ -182,121 +160,77 @@ def _parse_image(self, buf: ParsingBuffer) -> tuple[ImageData, dict[int, Room], image_width, self._image_config, image, - IjaiMapDataParser._map_to_image, - additional_layers={Drawable.CLEANED_AREA: cleaned_areas_layer}, + self.coord_transformer.map_to_image, ), rooms, cleaned_areas, ) @staticmethod - def _parse_history(buf: ParsingBuffer) -> Path: + def _parse_history() -> Path: path_points = [] - buf.skip("unknown1", 4) - history_count = buf.get_uint32("history_count") - for _ in range(history_count): - buf.get_uint8("mode") # 0: taxi, 1: working - position = IjaiMapDataParser._parse_position(buf, "path") - if position is not None: - path_points.append(position) + for pt in IjaiMapDataParser.robot_map.historyPose.points: + # 0: taxi, 1: working + path_points.append(Point(x=pt.x, y=pt.y)) return Path(len(path_points), 1, 0, [path_points]) @staticmethod - def _parse_restricted_areas(buf: ParsingBuffer) -> tuple[list[Wall], list[Area]]: + def _parse_restricted_areas() -> tuple[list[Wall], list[Area], list[Area]]: walls = [] - areas = [] - buf.skip("unknown1", 4) - area_count = buf.get_uint32("area_count") - for _ in range(area_count): - buf.skip("restricted.unknown1", 12) - p1 = IjaiMapDataParser._parse_position(buf, "p1") - p2 = IjaiMapDataParser._parse_position(buf, "p2") - p3 = IjaiMapDataParser._parse_position(buf, "p3") - p4 = IjaiMapDataParser._parse_position(buf, "p4") - buf.skip("restricted.unknown2", 48) - _LOGGER.debug("restricted: %s %s %s %s", p1, p2, p3, p4) - if p1 is not None and p2 is not None and p3 is not None and p4 is not None: - if p1 == p2 and p3 == p4: - walls.append(Wall(p1.x, p1.y, p3.x, p3.y)) - else: - areas.append(Area(p1.x, p1.y, p2.x, p2.y, p3.x, p3.y, p4.x, p4.y)) - return walls, areas + no_go_areas = [] + no_mop_areas = [] + + for virtualWall in IjaiMapDataParser.robot_map.virtualWalls: + p1, p2, p3, p4 = virtualWall.points + + if virtualWall.type == IjaiMapDataParser.VIRTUALWALL_TYPE_WALL: + walls.append(Wall(p1.x, p1.y, p3.x, p3.y)) + elif virtualWall.type == IjaiMapDataParser.VIRTUALWALL_TYPE_NO_GO: + no_go_areas.append( + Area(p1.x, p1.y, p2.x, p2.y, p3.x, p3.y, p4.x, p4.y)) + elif virtualWall.type == IjaiMapDataParser.VIRTUALWALL_TYPE_NO_MOP: + no_mop_areas.append( + Area(p1.x, p1.y, p2.x, p2.y, p3.x, p3.y, p4.x, p4.y)) + return walls, no_go_areas, no_mop_areas @staticmethod - def _parse_cleaning_areas(buf: ParsingBuffer) -> list[Zone]: - buf.skip("unknown1", 4) - area_count = buf.get_uint32("area_count") + def _parse_cleaning_zones() -> list[Zone]: zones = [] - for _ in range(area_count): - buf.skip("area.unknown1", 12) - p1 = IjaiMapDataParser._parse_position(buf, "p1") - IjaiMapDataParser._parse_position(buf, "p2") - p3 = IjaiMapDataParser._parse_position(buf, "p3") - IjaiMapDataParser._parse_position(buf, "p4") - buf.skip("area.unknown2", 48) - if p1 is not None and p3 is not None: - zones.append(Zone(p1.x, p1.y, p3.x, p3.y)) + for areaInfo in IjaiMapDataParser.robot_map.areasInfo: + zones.append(Zone(areaInfo.points[0].x, + areaInfo.points[0].y, + areaInfo.points[2].x, + areaInfo.points[2].y)) return zones @staticmethod - def _parse_rooms(buf: ParsingBuffer, map_data_rooms: dict[int, Room]) -> None: - map_name = buf.get_string_len8("map_name") - map_arg = buf.get_uint32("map_arg") - _LOGGER.debug("map#%d: %s", map_arg, map_name) - while map_arg > 1: - map_name = buf.get_string_len8("map_name") - map_arg = buf.get_uint32("map_arg") - _LOGGER.debug("map#%d: %s", map_arg, map_name) - room_count = buf.get_uint32("room_count") - for _ in range(room_count): - room_id = buf.get_uint8("room.id") - room_name = buf.get_string_len8("room.name") - if map_data_rooms is not None and room_id in map_data_rooms: - map_data_rooms[room_id].name = room_name - buf.skip("room.unknown1", 1) - room_text_pos = IjaiMapDataParser._parse_position(buf, "room.text_pos") - _LOGGER.debug("room#%d: %s %s", room_id, room_name, room_text_pos) - buf.skip("unknown1", 6) - - @staticmethod - def _parse_room_outlines(buf: ParsingBuffer) -> None: - buf.skip("unknown1", 51) - room_count = buf.get_uint32("room_count") - for _ in range(room_count): - room_id = buf.get_uint32("room.id") - segment_count = buf.get_uint32("room.segment_count") - for _ in range(segment_count): - buf.skip("unknown2", 5) - _LOGGER.debug("room#%d: segment_count: %d", room_id, segment_count) - - @staticmethod - def _parse_section(buf: ParsingBuffer, name: str, map_id: int) -> None: - buf.set_name(name) - magic = buf.get_uint32("magic") - if magic != map_id: - raise ValueError( - f"error parsing section {name} at offset {buf.offs - 4:#x}: magic check failed. " - + f"Magic: {magic:#x}, Map ID: {map_id:#x}" - ) - - @staticmethod - def _parse_position(buf: ParsingBuffer, name: str, with_angle: bool = False) -> Point | None: - x = buf.get_float32(name + ".x") - y = buf.get_float32(name + ".y") - if IjaiMapDataParser.POSITION_UNKNOWN in (x, y): - return None - a = None - if with_angle: - a = buf.get_float32(name + ".a") * 180 / math.pi - return Point(x, y, a) + def _parse_goto_point() -> Point | None: + for navigationPoint in IjaiMapDataParser.robot_map.navigationPoints: + if ( + navigationPoint.status == 0 + and navigationPoint.pointType == 1 + and navigationPoint.x != 1100.0 # outside map + and navigationPoint.y != 1100.0): + return Point(navigationPoint.x, + navigationPoint.y, + navigationPoint.phi * 180 / math.pi) + return None @staticmethod - def _parse_unknown_section(buf: ParsingBuffer) -> bool: - n = buf.data[buf.offs :].find(buf.data[4:8]) - if n >= 0: - buf.offs += n - buf.length -= n - return True - buf.offs += buf.length - buf.length = 0 - return False + def _parse_rooms(map_data_rooms: dict[int, Room]) -> None: + map_id = IjaiMapDataParser.robot_map.mapHead.mapHeadId + for map_data in IjaiMapDataParser.robot_map.mapInfo: + if map_data.mapHeadId == map_id: + current_map = map_data + break + map_name = current_map.mapName + _LOGGER.debug("map#%d: %s", current_map.mapHeadId, map_name) + for r in IjaiMapDataParser.robot_map.roomDataInfo: + if map_data_rooms is not None and r.roomId in map_data_rooms: + map_data_rooms[r.roomId].name = r.roomName + map_data_rooms[r.roomId].pos_x = r.roomNamePost.x + map_data_rooms[r.roomId].pos_y = r.roomNamePost.y + + room_text_pos = Point(r.roomNamePost.x, r.roomNamePost.y) + _LOGGER.debug("room#%d: %s %s", r.roomId, + r.roomName, room_text_pos) diff --git a/src/vacuum_map_parser_ijai/parsing_buffer.py b/src/vacuum_map_parser_ijai/parsing_buffer.py deleted file mode 100644 index a1abc75..0000000 --- a/src/vacuum_map_parser_ijai/parsing_buffer.py +++ /dev/null @@ -1,96 +0,0 @@ -"""Parsing buffer for Viomi map data.""" - -import logging -from struct import unpack_from - -_LOGGER = logging.getLogger(__name__) - - -class ParsingBuffer: - """Parsing buffer for Viomi map data.""" - - def __init__(self, name: str, data: bytes, start_offs: int, length: int): - self._name = name - self.data = data - self.offs = start_offs - self.length = length - self._image_beginning: int = 0 - - def set_name(self, name: str) -> None: - self._name = name - _LOGGER.debug("SECTION %s: offset 0x%x", self._name, self.offs) - - def mark_as_image_beginning(self) -> None: - self._image_beginning = self.offs - - def get_at_image(self, offset: int) -> int: - return self.data[self._image_beginning + offset - 1] - - def skip(self, field: str, n: int) -> None: - if self.length < n: - raise ValueError(f"error parsing {self._name}.{field} at offset {self.offs:#x}: buffer underrun") - self.offs += n - self.length -= n - - def get_uint8(self, field: str) -> int: - if self.length < 1: - raise ValueError(f"error parsing {self._name}.{field} at offset {self.offs:#x}: buffer underrun") - self.offs += 1 - self.length -= 1 - return self.data[self.offs - 1] - - def get_uint16(self, field: str) -> int: - if self.length < 2: - raise ValueError(f"error parsing {self._name}.{field} at offset {self.offs:#x}: buffer underrun") - value = self._unpack_int(" int: - if self.length < 2: - raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun") - lo = self.data[self.offs] - hi = self.data[self.offs + 1] - self.offs += 2 - self.length -= 2 - return ((hi^1) << 7) ^ lo - - def get_uint32(self, field: str) -> int: - value = self.peek_uint32(field) - self.offs += 4 - self.length -= 4 - return value - - def get_float32(self, field: str) -> float: - if self.length < 4: - raise ValueError(f"error parsing {self._name}.{field} at offset {self.offs:#x}: buffer underrun") - self.offs += 4 - self.length -= 4 - return float(unpack_from(" str: - n = self.get_uint8(field + ".len") - if self.length < n: - raise ValueError(f"error parsing {self._name}.{field} at offset {self.offs:#x}: buffer underrun") - self.offs += n - self.length -= n - return self.data[self.offs - n : self.offs].decode("UTF-8") - - def peek_uint32(self, field: str) -> int: - if self.length < 4: - raise ValueError(f"error parsing {self._name}.{field} at offset {self.offs:#x}: buffer underrun") - return self._unpack_int(" None: - if self.length == 0: - _LOGGER.debug("all of the data has been processed") - else: - _LOGGER.warning("%d bytes remained in the buffer", self.length) - - def _unpack_int(self, fmt: str) -> int: - return int(unpack_from(fmt, self.data, self.offs)[0]) - - def _unpack_float(self, fmt: str) -> float: - return float(unpack_from(fmt, self.data, self.offs)[0]) diff --git a/src/vacuum_map_parser_ijai/status_mapping.py b/src/vacuum_map_parser_ijai/status_mapping.py new file mode 100644 index 0000000..b3d31e3 --- /dev/null +++ b/src/vacuum_map_parser_ijai/status_mapping.py @@ -0,0 +1,61 @@ +"""Module that provides mapping for status property""" +from dataclasses import dataclass + + +@dataclass +class IjaiVacuumStatusMapping: + """Dataclass containing mapping for status property""" + # vacuum service id + siid: int = 2 + + # status property id in vacuum service + piid: int = 1 + + # idle_at is status property values from https://home.miot-spec.com/spec/model + # 0,1,2,4,8,10 are common idle states for most ijai/xiaomi miot robot-vacuums + idle_at: tuple[int, ...] = (0, 1, 2, 4, 8, 10) + + +_NON_STANDARD_STATUS_PROP = [ + ( + [ + "xiaomi.vacuum.c107", + "xiaomi.vacuum.d101", + "xiaomi.vacuum.d102gl", + "xiaomi.vacuum.d102ev", + "xiaomi.vacuum.d109gl", + ], + IjaiVacuumStatusMapping(idle_at=(1, 2, 5, 9, 11, 12, 13, 14, 15, 18)) + ), + + ( + [ + "xiaomi.vacuum.c108" + ], + IjaiVacuumStatusMapping(idle_at=(1, 3, 4, 5, 7)) + ), + + ( + [ + "xiaomi.vacuum.b108gl" + ], + IjaiVacuumStatusMapping(idle_at=(1, 2, 5, 8, 10)) + ), + + ( + [ + "xiaomi.vacuum.c102gl", + "xiaomi.vacuum.c102cn", + "xiaomi.vacuum.d103cn", + "xiaomi.vacuum.d110ch", + ], + IjaiVacuumStatusMapping(piid=2, + idle_at=(2, 3, 4, 6, 8, 9, 13, 19, 21, 22, 30)) + ) +] + + +def get_status_mapping(model: str) -> IjaiVacuumStatusMapping: + return next((mapping for models, + mapping in _NON_STANDARD_STATUS_PROP if model in models), + IjaiVacuumStatusMapping())