|
15 | 15 | import struct |
16 | 16 | import sys |
17 | 17 | import zlib |
18 | | -from typing import Any, List, Tuple |
| 18 | +from typing import Any, List, Tuple, Optional |
19 | 19 |
|
20 | 20 | import numpy as np |
21 | 21 |
|
@@ -568,6 +568,93 @@ def _op_scale(interpreter, args, _arg_nodes, _env, location): |
568 | 568 | return Value(TYPE_TNS, Tensor(shape=[target_h, target_w, 4], data=flat)) |
569 | 569 |
|
570 | 570 |
|
| 571 | +def _op_rotate(interpreter, args, _arg_nodes, _env, location): |
| 572 | + from interpreter import ASMRuntimeError, TYPE_INT, TYPE_TNS, Tensor, Value |
| 573 | + |
| 574 | + if len(args) < 2: |
| 575 | + raise ASMRuntimeError("ROTATE expects 2 arguments", location=location, rewrite_rule="ROTATE") |
| 576 | + src = interpreter._expect_tns(args[0], "ROTATE", location) |
| 577 | + degrees = interpreter._expect_flt(args[1], "ROTATE", location) |
| 578 | + |
| 579 | + if len(src.shape) != 3 or src.shape[2] != 4: |
| 580 | + raise ASMRuntimeError("ROTATE expects a 3D image tensor with 4 channels", location=location, rewrite_rule="ROTATE") |
| 581 | + |
| 582 | + h, w, _ = src.shape |
| 583 | + interpreter.builtins._ensure_tensor_ints(src, "ROTATE", location) |
| 584 | + arr = src.data.reshape(tuple(src.shape)) |
| 585 | + |
| 586 | + # Try Pillow for robust, fast rotation. Fallback to a numpy implementation. |
| 587 | + try: |
| 588 | + from PIL import Image |
| 589 | + |
| 590 | + flat = bytearray() |
| 591 | + for y in range(h): |
| 592 | + for x in range(w): |
| 593 | + r = interpreter._expect_int(arr[y, x, 0], "ROTATE", location) |
| 594 | + g = interpreter._expect_int(arr[y, x, 1], "ROTATE", location) |
| 595 | + b = interpreter._expect_int(arr[y, x, 2], "ROTATE", location) |
| 596 | + a = interpreter._expect_int(arr[y, x, 3], "ROTATE", location) |
| 597 | + flat.extend((r & 0xFF, g & 0xFF, b & 0xFF, a & 0xFF)) |
| 598 | + |
| 599 | + im = Image.frombytes('RGBA', (w, h), bytes(flat)) |
| 600 | + im = im.rotate(float(degrees), resample=Image.BICUBIC, expand=False, fillcolor=(0, 0, 0, 0)) |
| 601 | + out_bytes = im.tobytes('raw', 'RGBA') |
| 602 | + out_vals = [Value(TYPE_INT, int(b)) for b in out_bytes] |
| 603 | + data = np.array(out_vals, dtype=object) |
| 604 | + return Value(TYPE_TNS, Tensor(shape=[h, w, 4], data=data)) |
| 605 | + except Exception: |
| 606 | + # Fall back to numpy bilinear sampling |
| 607 | + import math |
| 608 | + |
| 609 | + cx = (w - 1) / 2.0 |
| 610 | + cy = (h - 1) / 2.0 |
| 611 | + rad = math.radians(float(degrees)) |
| 612 | + c = math.cos(rad) |
| 613 | + s = math.sin(rad) |
| 614 | + |
| 615 | + out_flat: List[int] = [0] * (h * w * 4) |
| 616 | + |
| 617 | + def sample_channel(sx: float, sy: float, ch: int) -> float: |
| 618 | + # Bilinear sample at floating point coordinates, return float |
| 619 | + x0 = math.floor(sx) |
| 620 | + y0 = math.floor(sy) |
| 621 | + wx = sx - x0 |
| 622 | + wy = sy - y0 |
| 623 | + def get(px: int, py: int) -> int: |
| 624 | + if px < 0 or px >= w or py < 0 or py >= h: |
| 625 | + return 0 |
| 626 | + return interpreter._expect_int(arr[py, px, ch], "ROTATE", location) |
| 627 | + v00 = get(x0, y0) |
| 628 | + v10 = get(x0 + 1, y0) |
| 629 | + v01 = get(x0, y0 + 1) |
| 630 | + v11 = get(x0 + 1, y0 + 1) |
| 631 | + return (1 - wx) * (1 - wy) * v00 + wx * (1 - wy) * v10 + (1 - wx) * wy * v01 + wx * wy * v11 |
| 632 | + |
| 633 | + for yy in range(h): |
| 634 | + for xx in range(w): |
| 635 | + dx = xx - cx |
| 636 | + dy = yy - cy |
| 637 | + # inverse rotation to fetch source coordinate |
| 638 | + sx = cx + (c * dx + s * dy) |
| 639 | + sy = cy + (-s * dx + c * dy) |
| 640 | + |
| 641 | + base = (yy * w + xx) * 4 |
| 642 | + if sx < 0 or sx >= w or sy < 0 or sy >= h: |
| 643 | + out_flat[base:base+4] = [0, 0, 0, 0] |
| 644 | + continue |
| 645 | + r = int(round(sample_channel(sx, sy, 0))) |
| 646 | + g = int(round(sample_channel(sx, sy, 1))) |
| 647 | + b = int(round(sample_channel(sx, sy, 2))) |
| 648 | + a = int(round(sample_channel(sx, sy, 3))) |
| 649 | + out_flat[base] = max(0, min(255, r)) |
| 650 | + out_flat[base+1] = max(0, min(255, g)) |
| 651 | + out_flat[base+2] = max(0, min(255, b)) |
| 652 | + out_flat[base+3] = max(0, min(255, a)) |
| 653 | + |
| 654 | + data = np.array([Value(TYPE_INT, int(v)) for v in out_flat], dtype=object) |
| 655 | + return Value(TYPE_TNS, Tensor(shape=[h, w, 4], data=data)) |
| 656 | + |
| 657 | + |
571 | 658 | def _op_crop(interpreter, args, _arg_nodes, _env, location): |
572 | 659 | from interpreter import ASMRuntimeError, TYPE_INT, TYPE_TNS, Tensor, Value |
573 | 660 |
|
@@ -700,15 +787,221 @@ def _op_blur(interpreter, args, _arg_nodes, _env, location): |
700 | 787 | return Value(TYPE_TNS, Tensor(shape=[h, w, 4], data=flat)) |
701 | 788 |
|
702 | 789 |
|
| 790 | +def _write_bmp_file(path: str, width: int, height: int, pixels: List[int]) -> None: |
| 791 | + # Write a simple 32-bit BMP (BGRA) uncompressed |
| 792 | + with open(path, "wb") as handle: |
| 793 | + row_bytes = width * 4 |
| 794 | + pad = 0 |
| 795 | + # File header (14 bytes) |
| 796 | + bfType = b'BM' |
| 797 | + bfOffBits = 14 + 40 # file header + info header |
| 798 | + bfSize = bfOffBits + (row_bytes * height) |
| 799 | + handle.write(struct.pack('<2sIHHI', bfType, bfSize, 0, 0, bfOffBits)) |
| 800 | + # BITMAPINFOHEADER (40 bytes) |
| 801 | + biSize = 40 |
| 802 | + biWidth = width |
| 803 | + biHeight = height # bottom-up |
| 804 | + biPlanes = 1 |
| 805 | + biBitCount = 32 |
| 806 | + biCompression = 0 |
| 807 | + biSizeImage = row_bytes * height |
| 808 | + biXPelsPerMeter = 0 |
| 809 | + biYPelsPerMeter = 0 |
| 810 | + biClrUsed = 0 |
| 811 | + biClrImportant = 0 |
| 812 | + handle.write(struct.pack('<IIIHHIIIIII', biSize, biWidth, biHeight, biPlanes, biBitCount, biCompression, biSizeImage, biXPelsPerMeter, biYPelsPerMeter, biClrUsed, biClrImportant)) |
| 813 | + # Pixel data: BMP stores rows bottom-up, each pixel B G R A |
| 814 | + for y in range(height - 1, -1, -1): |
| 815 | + row_start = y * width * 4 |
| 816 | + for x in range(width): |
| 817 | + i = row_start + x * 4 |
| 818 | + r = pixels[i] |
| 819 | + g = pixels[i + 1] |
| 820 | + b = pixels[i + 2] |
| 821 | + a = pixels[i + 3] |
| 822 | + handle.write(struct.pack('<BBBB', b & 0xFF, g & 0xFF, r & 0xFF, a & 0xFF)) |
| 823 | + |
| 824 | + |
| 825 | +def _save_with_gdiplus(path: str, width: int, height: int, pixels: List[int], fmt: str, quality: Optional[int] = None) -> None: |
| 826 | + gdiplus = _gdiplus_start() |
| 827 | + bitmap = ctypes.c_void_p() |
| 828 | + stride = width * 4 |
| 829 | + buf_len = width * height * 4 |
| 830 | + buf = (ctypes.c_ubyte * buf_len)() |
| 831 | + # pixels are [r,g,b,a] |
| 832 | + for i in range(width * height): |
| 833 | + r = int(pixels[i * 4]) & 0xFF |
| 834 | + g = int(pixels[i * 4 + 1]) & 0xFF |
| 835 | + b = int(pixels[i * 4 + 2]) & 0xFF |
| 836 | + a = int(pixels[i * 4 + 3]) & 0xFF |
| 837 | + idx = i * 4 |
| 838 | + buf[idx] = b |
| 839 | + buf[idx + 1] = g |
| 840 | + buf[idx + 2] = r |
| 841 | + buf[idx + 3] = a |
| 842 | + |
| 843 | + status = gdiplus.GdipCreateBitmapFromScan0(width, height, stride, _PixelFormat32bppARGB, ctypes.cast(buf, ctypes.c_void_p), ctypes.byref(bitmap)) |
| 844 | + if status != 0: |
| 845 | + raise RuntimeError(f"GdipCreateBitmapFromScan0 failed ({status})") |
| 846 | + |
| 847 | + try: |
| 848 | + class GUID(ctypes.Structure): |
| 849 | + _fields_ = [("Data1", ctypes.c_uint32), ("Data2", ctypes.c_uint16), ("Data3", ctypes.c_uint16), ("Data4", ctypes.c_ubyte * 8)] |
| 850 | + |
| 851 | + def _guid_from_str(s: str) -> GUID: |
| 852 | + hexs = s.strip('{}').split('-') |
| 853 | + d1 = int(hexs[0], 16) |
| 854 | + d2 = int(hexs[1], 16) |
| 855 | + d3 = int(hexs[2], 16) |
| 856 | + d4_bytes = bytes.fromhex(hexs[3] + hexs[4]) |
| 857 | + arr = (ctypes.c_ubyte * 8)(*d4_bytes) |
| 858 | + return GUID(d1, d2, d3, arr) |
| 859 | + |
| 860 | + # Known encoder CLSIDs |
| 861 | + if fmt.upper() == "PNG": |
| 862 | + clsid = _guid_from_str('{557CF406-1A04-11D3-9A73-0000F81EF32E}') |
| 863 | + elif fmt.upper() == "JPEG" or fmt.upper() == "JPG": |
| 864 | + clsid = _guid_from_str('{557CF401-1A04-11D3-9A73-0000F81EF32E}') |
| 865 | + else: |
| 866 | + clsid = _guid_from_str('{557CF400-1A04-11D3-9A73-0000F81EF32E}') |
| 867 | + |
| 868 | + status = gdiplus.GdipSaveImageToFile(bitmap, ctypes.c_wchar_p(path), ctypes.byref(clsid), None) |
| 869 | + if status != 0: |
| 870 | + raise RuntimeError(f"GdipSaveImageToFile failed ({status})") |
| 871 | + finally: |
| 872 | + try: |
| 873 | + gdiplus.GdipDisposeImage(bitmap) |
| 874 | + except Exception: |
| 875 | + pass |
| 876 | + |
| 877 | + |
| 878 | +def _op_save_bmp(interpreter, args, _arg_nodes, _env, location): |
| 879 | + from interpreter import ASMRuntimeError, TYPE_TNS, TYPE_STR, Value |
| 880 | + |
| 881 | + if len(args) < 2: |
| 882 | + raise ASMRuntimeError("SAVE_BMP expects 2 arguments", location=location, rewrite_rule="SAVE_BMP") |
| 883 | + t = interpreter._expect_tns(args[0], "SAVE_BMP", location) |
| 884 | + path = _expect_str(args[1], "SAVE_BMP", location) |
| 885 | + if len(t.shape) != 3 or t.shape[2] != 4: |
| 886 | + raise ASMRuntimeError("SAVE_BMP expects a 3D image tensor with 4 channels", location=location, rewrite_rule="SAVE_BMP") |
| 887 | + h, w, _ = t.shape |
| 888 | + interpreter.builtins._ensure_tensor_ints(t, "SAVE_BMP", location) |
| 889 | + flat = [] |
| 890 | + arr = t.data.reshape(tuple(t.shape)) |
| 891 | + for y in range(h): |
| 892 | + for x in range(w): |
| 893 | + flat.append(interpreter._expect_int(arr[y, x, 0], "SAVE_BMP", location)) |
| 894 | + flat.append(interpreter._expect_int(arr[y, x, 1], "SAVE_BMP", location)) |
| 895 | + flat.append(interpreter._expect_int(arr[y, x, 2], "SAVE_BMP", location)) |
| 896 | + flat.append(interpreter._expect_int(arr[y, x, 3], "SAVE_BMP", location)) |
| 897 | + try: |
| 898 | + _write_bmp_file(path, w, h, flat) |
| 899 | + except Exception as exc: |
| 900 | + raise ASMRuntimeError(f"SAVE_BMP failed: {exc}", location=location, rewrite_rule="SAVE_BMP") |
| 901 | + return Value(TYPE_STR, "OK") |
| 902 | + |
| 903 | + |
| 904 | +def _op_save_png(interpreter, args, _arg_nodes, _env, location): |
| 905 | + from interpreter import ASMRuntimeError, TYPE_STR, Value |
| 906 | + |
| 907 | + if len(args) < 3: |
| 908 | + raise ASMRuntimeError("SAVE_PNG expects 3 arguments", location=location, rewrite_rule="SAVE_PNG") |
| 909 | + t = interpreter._expect_tns(args[0], "SAVE_PNG", location) |
| 910 | + path = _expect_str(args[1], "SAVE_PNG", location) |
| 911 | + level = interpreter._expect_int(args[2], "SAVE_PNG", location) |
| 912 | + if len(t.shape) != 3 or t.shape[2] != 4: |
| 913 | + raise ASMRuntimeError("SAVE_PNG expects a 3D image tensor with 4 channels", location=location, rewrite_rule="SAVE_PNG") |
| 914 | + h, w, _ = t.shape |
| 915 | + interpreter.builtins._ensure_tensor_ints(t, "SAVE_PNG", location) |
| 916 | + arr = t.data.reshape(tuple(t.shape)) |
| 917 | + flat = bytearray() |
| 918 | + for y in range(h): |
| 919 | + for x in range(w): |
| 920 | + r = interpreter._expect_int(arr[y, x, 0], "SAVE_PNG", location) |
| 921 | + g = interpreter._expect_int(arr[y, x, 1], "SAVE_PNG", location) |
| 922 | + b = interpreter._expect_int(arr[y, x, 2], "SAVE_PNG", location) |
| 923 | + a = interpreter._expect_int(arr[y, x, 3], "SAVE_PNG", location) |
| 924 | + flat.extend((r & 0xFF, g & 0xFF, b & 0xFF, a & 0xFF)) |
| 925 | + # Try Pillow first |
| 926 | + try: |
| 927 | + from PIL import Image |
| 928 | + |
| 929 | + im = Image.frombytes('RGBA', (w, h), bytes(flat)) |
| 930 | + im.save(path, compress_level=max(0, min(9, int(level)))) |
| 931 | + return Value(TYPE_STR, "OK") |
| 932 | + except Exception: |
| 933 | + pass |
| 934 | + # Try GDI+ on Windows |
| 935 | + if _load_with_gdiplus is not None: |
| 936 | + try: |
| 937 | + _save_with_gdiplus(path, w, h, list(flat), "PNG", quality=None) |
| 938 | + return Value(TYPE_STR, "OK") |
| 939 | + except Exception as exc: |
| 940 | + raise ASMRuntimeError(f"SAVE_PNG failed: {exc}", location=location, rewrite_rule="SAVE_PNG") |
| 941 | + raise ASMRuntimeError("SAVE_PNG not supported on this platform (install Pillow or use Windows)", location=location, rewrite_rule="SAVE_PNG") |
| 942 | + |
| 943 | + |
| 944 | +def _op_save_jpeg(interpreter, args, _arg_nodes, _env, location): |
| 945 | + from interpreter import ASMRuntimeError, TYPE_STR, Value |
| 946 | + |
| 947 | + if len(args) < 3: |
| 948 | + raise ASMRuntimeError("SAVE_JPEG expects 3 arguments", location=location, rewrite_rule="SAVE_JPEG") |
| 949 | + t = interpreter._expect_tns(args[0], "SAVE_JPEG", location) |
| 950 | + path = _expect_str(args[1], "SAVE_JPEG", location) |
| 951 | + quality = interpreter._expect_int(args[2], "SAVE_JPEG", location) |
| 952 | + if len(t.shape) != 3 or t.shape[2] != 4: |
| 953 | + raise ASMRuntimeError("SAVE_JPEG expects a 3D image tensor with 4 channels", location=location, rewrite_rule="SAVE_JPEG") |
| 954 | + h, w, _ = t.shape |
| 955 | + interpreter.builtins._ensure_tensor_ints(t, "SAVE_JPEG", location) |
| 956 | + arr = t.data.reshape(tuple(t.shape)) |
| 957 | + flat = bytearray() |
| 958 | + for y in range(h): |
| 959 | + for x in range(w): |
| 960 | + r = interpreter._expect_int(arr[y, x, 0], "SAVE_JPEG", location) |
| 961 | + g = interpreter._expect_int(arr[y, x, 1], "SAVE_JPEG", location) |
| 962 | + b = interpreter._expect_int(arr[y, x, 2], "SAVE_JPEG", location) |
| 963 | + a = interpreter._expect_int(arr[y, x, 3], "SAVE_JPEG", location) |
| 964 | + flat.extend((r & 0xFF, g & 0xFF, b & 0xFF)) |
| 965 | + # Try Pillow |
| 966 | + try: |
| 967 | + from PIL import Image |
| 968 | + |
| 969 | + im = Image.frombytes('RGB', (w, h), bytes(flat)) |
| 970 | + im.save(path, quality=max(1, min(95, int(quality)))) |
| 971 | + return Value(TYPE_STR, "OK") |
| 972 | + except Exception: |
| 973 | + pass |
| 974 | + # Try GDI+ |
| 975 | + if _load_with_gdiplus is not None: |
| 976 | + try: |
| 977 | + # _save_with_gdiplus expects RGBA list |
| 978 | + rgba = [] |
| 979 | + for y in range(h): |
| 980 | + for x in range(w): |
| 981 | + rgba.append(int(arr[y, x, 0]) & 0xFF) |
| 982 | + rgba.append(int(arr[y, x, 1]) & 0xFF) |
| 983 | + rgba.append(int(arr[y, x, 2]) & 0xFF) |
| 984 | + rgba.append(int(arr[y, x, 3]) & 0xFF) |
| 985 | + _save_with_gdiplus(path, w, h, rgba, "JPEG", quality=int(quality)) |
| 986 | + return Value(TYPE_STR, "OK") |
| 987 | + except Exception as exc: |
| 988 | + raise ASMRuntimeError(f"SAVE_JPEG failed: {exc}", location=location, rewrite_rule="SAVE_JPEG") |
| 989 | + raise ASMRuntimeError("SAVE_JPEG not supported on this platform (install Pillow or use Windows)", location=location, rewrite_rule="SAVE_JPEG") |
| 990 | + |
| 991 | + |
703 | 992 | # ---- Registration ---- |
704 | 993 |
|
705 | 994 | def asm_lang_register(ext: ExtensionAPI) -> None: |
706 | 995 | ext.metadata(name="image", version="0.1.0") |
707 | 996 | ext.register_operator("LOAD_PNG", 1, 1, _op_load_png, doc="LOAD_PNG(path) -> TNS[height][width][r,g,b,a]") |
708 | 997 | ext.register_operator("LOAD_JPEG", 1, 1, _op_load_jpeg, doc="LOAD_JPEG(path) -> TNS[height][width][r,g,b,a]") |
709 | 998 | ext.register_operator("LOAD_BMP", 1, 1, _op_load_bmp, doc="LOAD_BMP(path) -> TNS[height][width][r,g,b,a]") |
| 999 | + ext.register_operator("SAVE_BMP", 2, 2, _op_save_bmp, doc="SAVE_BMP(TNS:img, STR:path) -> STR:OK") |
| 1000 | + ext.register_operator("SAVE_PNG", 3, 3, _op_save_png, doc="SAVE_PNG(TNS:img, STR:path, INT:compression_level) -> STR:OK") |
| 1001 | + ext.register_operator("SAVE_JPEG", 3, 3, _op_save_jpeg, doc="SAVE_JPEG(TNS:img, STR:path, INT:quality) -> STR:OK") |
710 | 1002 | ext.register_operator("BLIT", 4, 5, _op_blit, doc="BLIT(TNS:src, TNS:dest, INT:x, INT:y, INT:mixalpha=1) -> TNS") |
711 | 1003 | ext.register_operator("SCALE", 3, 4, _op_scale, doc="SCALE(TNS:src, INT:scale_x, INT:scale_y, INT:antialiasing=1) -> TNS") |
| 1004 | + ext.register_operator("ROTATE", 2, 2, _op_rotate, doc="ROTATE(TNS:img, FLT:degrees) -> TNS") |
712 | 1005 | ext.register_operator("CROP", 5, 5, _op_crop, doc="CROP(TNS:img, INT:top, INT:right, INT:bottom, INT:left) -> TNS") |
713 | 1006 | ext.register_operator("GRAYSCALE", 1, 1, _op_grayscale, doc="GRAYSCALE(TNS:img) -> TNS (rgb channels set to luminance, alpha preserved)") |
714 | 1007 | ext.register_operator("BLUR", 2, 2, _op_blur, doc="BLUR(TNS:img, INT:radius) -> TNS (gaussian blur, radius in pixels)") |
0 commit comments