diff --git a/.gitignore b/.gitignore index c06f08c..dea3d22 100644 --- a/.gitignore +++ b/.gitignore @@ -142,4 +142,7 @@ d2vs/ocr_training_batch_*/ # Configuration .env -pickit.txt \ No newline at end of file +pickit.txt + +# mapping.. +d2vs/mapping/captures/* diff --git a/README.md b/README.md index 27929b4..be2d780 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ $ pip install d2vs ``` _NOTE: this can run via CPU, but via GPU is far superior. You must install CUDA and the appropriate python jazz -to get that working, for me with CUDA10.1:_ +to get that all going. This worked for me with CUDA10.1:_ ```bash $ conda install torch torchvision cudatoolkit=10.1 -c pytorch @@ -30,33 +30,39 @@ $ conda install torch torchvision cudatoolkit=10.1 -c pytorch ```py ->>> import numpy as np ->>> from d2vs.ocr import OCR ->>> from PIL import Image ->>> ->>> # Initiate OCR ->>> ocr = OCR() ->>> ->>> # Load an Image ->>> img = Image.open("586_gold.png") ->>> ->>> # Scan the image ->>> bounds, text, item_type = ocr.read(np.asarray(img, dtype='uint8')) ->>> print(text) -'586 Gold' ->>> print(item_type) -'Normal' +import numpy as np +from d2vs.ocr import OCR +from PIL import Image + +# Initiate OCR +ocr = OCR() + +# Load an image +image_rgb_data = np.asarray(Image.open("586_gold.png"), dtype='uint8') + +# Scan the image +bounds, text, item_type = ocr.read(image_rgb_data) + +# What do we have to work with? +print(bounds) +# ([2, 2], [158, 2], [158, 32], [2, 32]) +# which are top_left, top_right, bottom_right, bottom_left + +print(text) +# '586 Gold' + +print(item_type) +# 'Normal' ``` # project goals - - Have fun automating single player! Not for profit - - OCR with near 100% accuracy - - Visually determine where you are in game, area level and world coordinate system - - Click from world coords to screen coords + - ~~Have fun automating single player! Not for profit~~ + - ~~OCR with near 100% accuracy~~ + - ~~Visually determine where you are in game and use that for navigation~~ - Path through unexplored areas to a goal + - Pick it for identified items/gambling/etc. - Facilitate complete d2 bot from lvl 1 to 99 - - Pick it # development diff --git a/d2vs/mapping/OLD_pathing.py b/d2vs/mapping/OLD_pathing.py new file mode 100644 index 0000000..65ce7f6 --- /dev/null +++ b/d2vs/mapping/OLD_pathing.py @@ -0,0 +1,173 @@ +from time import sleep + +from cv2 import cv2 + +from capture2 import map_capture, map_merge_features, map_diff + + +DIRECTION_NORTH_WEST = 1 +DIRECTION_NORTH_EAST = 2 +DIRECTION_SOUTH_EAST = 3 +DIRECTION_SOUTH_WEST = 4 + + +class Node: + def __init__(self, x, y, diff, connections=None, unwalkable=False, is_start=False, is_end=False): + # Sanity checks.. + # assert not is_start and connections, "Must set at least one direction, we had to have come from " \ + # "somewhere?! Unless we just started, then mark is_start = True" + + assert not (is_start and is_end), "Cannot be start and end node at the same time!? or can you.. maybe!?" + + # Coords + self.x = x + self.y = y + + # Are we walkable? I.e., we tried to tele here and detected that tele failed.. not walkable! + self.unwalkable = unwalkable + + # Setup connections to other nodes + self.connections = connections or {} + + # Beginning or next to our goal? + self.is_start = is_start + self.is_end = is_end + + # The image of the map difference screenshot taken at this step + self.diff = diff + + +def find_and_enter_warp(text, preferred_direction=DIRECTION_NORTH_WEST): + """Flood fills area looking for warps. When a warp is found, mouse hovers over it to read + the text. If warp text matches given text, the warp is entered.""" + + """ + # "hidden" stop clause - not reinvoking for "c" or "b", only for "a". + if matrix[x][y] == "a": + matrix[x][y] = "c" + #recursively invoke flood fill on all surrounding cells: + if x > 0: + floodfill(matrix,x-1,y) + if x < len(matrix[y]) - 1: + floodfill(matrix,x+1,y) + if y > 0: + floodfill(matrix,x,y-1) + if y < len(matrix) - 1: + floodfill(matrix,x,y+1) + """ + + + + + + + + """ + + 3. Take a picture: + a. diff happens first to calc x/y + a. Create `new_node` (`if counter == 0 then is_start = True`, skip rest) + + a. Detect if we moved at all + - If no movement, `node.is_walkable = False` + - Change `preferred_direction` + a. Detect if warp is on minimap + - If so change `preferred_direction` to point to it + a. Detect if warp is on screen + - Scan mouse over area to look for it and try to click + - Mark `node.is_end = True` + - Return + a. go back to start ? + 4. + """ + + + + + + # pre, during_1, during_2 = map_capture() + # cv2.imshow("Result", pre) + # cv2.waitKey(0) + # exit() + + + + + + + + # Start... + counter = 0 + + sleep(2) + + map = map_diff(*map_capture(), is_start=True) + prev_node = Node( + 10_000, + 10_000, + map, + is_start=True, + ) + + + while True: + sleep(1) + + + diff = map_diff(*map_capture()) + map, x, y = map_merge_features(map, diff) + + + + + + + # TODO: Did we not move very far? If so, we should change to a new preferred_direction and go until we can't any more + + + + + + + + + + + + + new_node = Node(x, y, diff) + + # if counter != 0: + # prev_node.connections[what direction were we coming from??] + + counter += 1 + + if counter == 10: + break + + prev_node = new_node + + cv2.imshow("Result", map) + cv2.waitKey(0) + + + + + + + + + + + + + # can we see a warp? + # mouse over + check name + # if match: enter warp, return + # otherwise: we need to pick a direction to go and continue looking + # look based on preferred_direction + # + + +if __name__ == "__main__": + find_and_enter_warp("Durance of Hate Level 2", preferred_direction=DIRECTION_NORTH_EAST) diff --git a/d2vs/mapping/__init__.py b/d2vs/mapping/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/d2vs/mapping/areas/__init__.py b/d2vs/mapping/areas/__init__.py new file mode 100644 index 0000000..7fcd029 --- /dev/null +++ b/d2vs/mapping/areas/__init__.py @@ -0,0 +1 @@ +from .harrogath import Harrogath \ No newline at end of file diff --git a/d2vs/mapping/areas/base.py b/d2vs/mapping/areas/base.py new file mode 100644 index 0000000..e69de29 diff --git a/d2vs/mapping/areas/harrogath.py b/d2vs/mapping/areas/harrogath.py new file mode 100644 index 0000000..f6ef1dd --- /dev/null +++ b/d2vs/mapping/areas/harrogath.py @@ -0,0 +1,18 @@ +from d2vs.mapping.base_maps import StaticMap +# from d2vs.mapping.pathing import StaticPather, Node + + +class Harrogath(StaticMap): + area_name = "Harrogath" + # pathfinder = StaticPather + # threshold = .2 # when doing map diffs TODO: use this! + + def __init__(self, *args, **kwargs): + # self.nodes = [ + # Node(10_000, 10_000, is_start=True) + # ] + + # TODO: Defining nodes this way blows ass. do it via JSON! + + super().__init__(*args, **kwargs) + diff --git a/d2vs/mapping/areas/static_data/.gitkeep b/d2vs/mapping/areas/static_data/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/d2vs/mapping/base_maps.py b/d2vs/mapping/base_maps.py new file mode 100644 index 0000000..755a22c --- /dev/null +++ b/d2vs/mapping/base_maps.py @@ -0,0 +1,157 @@ +import json +import os +import d2vs + +from abc import ABC, abstractmethod + +from cv2 import cv2 + +from d2vs.mapping.pathing import Node +from d2vs.mapping.pathing.node import Interactable, InteractableType + + +class BaseMap(ABC): + def __init__(self): + assert self.area_name, "You must set an area_name on each Map class" + # assert self.pathfinder, "You must set a pathfinder on each Map class" + + + + + + + # TODO: Load nodes + interactables and all that shit from json + + + @abstractmethod + def find_point(self, x, y): + # TODO: Find node closest to x/y, if it's NOT already an existing node, then path to that node and return?? + # . + # Like, this would be useful for going to a certain spot close to red portal and using TK on it + pass + + @abstractmethod + def find_interactable(self, interactable): + pass + + @abstractmethod + def find_interactable_type(self, interactable_type: InteractableType): + pass + + + + + + +class StaticMap(BaseMap): + """I.e. Harrogath.""" + def __init__(self): + super().__init__() + + assert os.path.exists(self.png_path), f"{self.area_name} png_path file ({self.png_path}) is missing" + assert os.path.exists(self.debug_png_path), f"{self.area_name} debug_png_path ({self.debug_png_path}) is missing" + assert os.path.exists(self.json_path), f"{self.area_name} json_path ({self.json_path}) is missing" + + # Load map png (has red circle showing start point typically) + self.map = cv2.imread(self.png_path) + + # Nodes have keys as (x, y) tuple and values are Nodes + self.nodes = {} + + # Quick lookup dict for interactables, keys are (x, y) as well + self.interactables = {} + + self.start_node = None # this is set when loading json, should always be 10_000, 10_000 + + self._load_json() + + def _load_json(self): + """This is its own method in case we need to re-load nodes from json .. maybe for testing + or realtime recalculating nodes?""" + data = json.loads(open(self.json_path, "r").read()) + + # Loop over data once and make initial set of nodes + for n in data["nodes"]: + # pop data not used in class creation + n_copy = n.copy() + n_copy.pop("connections") + n_copy.pop("interactables") + node = Node(**n_copy) + self.nodes[(node.x, node.y)] = node + + if node.is_start: + assert self.start_node is None, "We found multiple start nodes?! tried setting it twice" + self.start_node = node + + # Loop over data again and make connections + for n in data["nodes"]: + for c_x, c_y in n["connections"]: + self.nodes[(n["x"], n["y"])].add_connection(self.nodes[(c_x, c_y)]) + + for interactable_data in n["interactables"]: + print(interactable_data) + interactable = Interactable(**interactable_data) + self.nodes[(n["x"], n["y"])].add_interactable(interactable) + self.interactables[(interactable.x, interactable.y)] = interactable + + @property + def png_path(self): + return os.path.join(os.path.dirname(d2vs.__file__), f"mapping", "areas", "static_data", f"{self.area_name}.png") + + @property + def debug_png_path(self): + # return f"areas/static_data/{self.area_name}_debug.png" + return os.path.join(os.path.dirname(d2vs.__file__), f"mapping", "areas", "static_data", f"{self.area_name}_debug.png") + + @property + def json_path(self): + # return f"areas/static_data/{self.area_name}.json" + return os.path.join(os.path.dirname(d2vs.__file__), f"mapping", "areas", "static_data", f"{self.area_name}.json") + + def find_interactable_type(self, interactable_type): + """Convenience wrapper for self.find_interactable that automatically goes to the first matching + interactable in this area.""" + interactable = None + for i in self.interactables.values(): + if i.interactable_type == interactable_type: + interactable = i + + assert interactable, f"Unable to find interactable of type {interactable_type}!" + + return self.find_interactable(interactable) + + def find_interactable(self, interactable): + explored = [] + queue = [[self.start_node]] # TODO: start should be some kind of param? + + while queue: + path = queue.pop(0) + node = path[-1] + + if node not in explored: + for conn in node.get_connections(): + new_path = list(path) # copy path + new_path.append(conn) + queue.append(new_path) + + for i in conn.get_interactables(): + if i == interactable: # We found it! return path with this interactable node attached + new_path.append(interactable) + return new_path + + explored.append(node) + + +class StaticMapWithVariations(BaseMap): + """I.e. Rogue Encampment, having many variations you can tell from some initial template. + + Another example: Halls of Pain 3 variations.""" + pass + + +class DynamicMap(BaseMap): + """Start position is whereever we start scanning. Look for a goal. + + TODO: Make a DynamicMapEnclosed vs Open for like durance vs black marsh, open area vs not? + """ + pass \ No newline at end of file diff --git a/d2vs/mapping/capture.py b/d2vs/mapping/capture.py new file mode 100644 index 0000000..e23f859 --- /dev/null +++ b/d2vs/mapping/capture.py @@ -0,0 +1,284 @@ +import keyboard +import mss +import mss.tools +import numpy as np + +from cv2 import cv2 +from glob import glob +from time import sleep + + +def mask_image(img, color, background="None"): + range_start, range_end = color_rgb_to_bgr_range(color) + img_mask = cv2.inRange(img, range_start, range_end) + + # expand what we found to cover the whole thing, make the whole blur part of the mask via threshold + img_mask = cv2.blur(img_mask, (4, 4)) + _, img_mask = cv2.threshold(img_mask, int(0.1 * 255), 255, cv2.THRESH_BINARY) + + if background == "None": + return cv2.bitwise_and(img, img, mask=255 - img_mask) + else: + return cv2.bitwise_and(img, img, mask=255 - img_mask) + + +def color_rgb_to_bgr_range(color, range=1): + r, g, b = color + offset = int(range / 2) + # return (b - offset, g - offset, r - offset), (b + offset, g + offset, r + offset) + return (b - (12 * range), g - (8 * range), r - (8 * range)), (b + (12 * range), g + (8 * range), r + (8 * range)) + + +def map_capture(label=0, map=None, unstitched_pool=None): + # we want to return x/y as best as we can tell it + current_x = None + current_y = None + + unstitched_pool = unstitched_pool or [] + + with mss.mss() as sct: + # The screen part to capture + # monitor = {"top": 50, "left": 1705, "width": 850, "height": 480} # this gets everything including game name + monitor = {"top": 65, "left": 1705, "width": 850, "height": 465} # this skips over difficulty/area, if game name/pass already hidden + center_x = int(monitor["width"] / 2) + center_y = int(monitor["height"] / 2) + + # center seems to be... x=2130 y=290 + + # i = 1 # setting this manually for now.. + before_image_name = f"captures/{label}-pre-map.png" + after_image_name = f"captures/{label}-post-map.png" + diff_image_name = f"captures/{label}-difference-map.png" + result_image_name = f"captures/{label}-result.png" + map_image_name = f"captures/{label}-map.png" + + while True: # TODO: set some number of max retries instead.. + # Grab the data + sct_img = sct.grab(monitor) + mss.tools.to_png(sct_img.rgb, sct_img.size, output=before_image_name) + keyboard.press_and_release("tab") + + sleep(.075) + + sct_img = sct.grab(monitor) + mss.tools.to_png(sct_img.rgb, sct_img.size, output=after_image_name) + keyboard.press_and_release("tab") + + # read our original image without map + f1 = cv2.imread(before_image_name) + f1 = cv2.cvtColor(f1, cv2.COLOR_BGR2GRAY) + + # read our image with map + f2 = cv2.imread(after_image_name) + + # remove shit from mini map + # f2 = mask_image(f2, (0x1F, 0x7F, 0xEC)) # player marker + # f2 = mask_image(f2, (0x21, 0x88, 0xFD)) # player marker + f2 = mask_image(f2, (0x20, 0x84, 0xF6)) # player marker + f2 = mask_image(f2, (0x44, 0x70, 0x74)) # merc marker + + # After removing stuff add + original_f2 = f2.copy() + + # we removed colored shit from map, back to grayscale + f2 = cv2.cvtColor(f2, cv2.COLOR_BGR2GRAY) + + threshold = 0.11 + absdiff = cv2.absdiff(f1, f2) + _, thresholded = cv2.threshold(absdiff, int(threshold * 255), 255, cv2.THRESH_BINARY) + + + # done pulling out mini map, now work with colors.. + thresholded = cv2.cvtColor(thresholded, cv2.COLOR_GRAY2BGR) + + + + + + + + + + + + # TODO: Nov 20, try to do thresholded = cv2.cvtColor(thresholded, cv2.COLOR_GRAY2BGRA) + # (****with an alpha channel!****) + + + + + + + + + + + + # were there any warps here? highlight them! + # range_start, range_end = color_rgb_to_bgr_range((0xD9, 0x58, 0xEB)) # gets top of warp + # range_start, range_end = color_rgb_to_bgr_range((0xA2, 0x46, 0xEA)) # middle of warp + # range_start, range_end = color_rgb_to_bgr_range((0xB5, 0x4C, 0xEB)) # middle of warp + range_start, range_end = color_rgb_to_bgr_range((0x8D, 0x3C, 0xB2), range=1.5) # middle of warp + # range_start, range_end = (0xEB - 15, 0x58 - 15, 0xD9 - 15), (0xEA + 15, 0x46 + 15, 0xA2 + 15) + warp_mask = cv2.inRange(original_f2, range_start, range_end) + warp_mask = cv2.blur(warp_mask, (5, 5)) + _, warp_mask = cv2.threshold(warp_mask, int(0.1 * 255), 255, cv2.THRESH_BINARY) + + thresholded[warp_mask > 0] = [0xD9, 0x58, 0xEB] # Where ever there is a warp color it in with da purps + + different_pixels = np.count_nonzero(thresholded) + print(f"Different pixels: {different_pixels}") + + # If this is our first scan .. this is our "base point" where we pivot from and start our lil coordinate system. + if label == 0: + # Draw a big red circle that will stick around between images + cv2.circle(thresholded, (center_x, center_y), 5, (0, 0, 255), -1) + + # cv2.imshow('Result', thresholded) + # cv2.waitKey(0) + # exit() + current_x = 10_000 + current_y = 10_000 + + map = thresholded + + # successful stitch means we can clear the list + unstitched_pool = [] + else: + # remove old green circles from stitched master image + old_green_mask = cv2.inRange(map, (0, 255, 0), (0, 255, 0)) + map[old_green_mask > 0] = [0, 0, 0] + if np.any(old_green_mask > 0): + print("%%%%%%%%%%%%%% seen ag reens") + + # draw new green circle in center where our player is + cv2.circle(thresholded, (center_x, center_y), 5, (0, 255, 0), -1) + + # do stitching? + stitcher = cv2.Stitcher_create(cv2.Stitcher_SCANS) # scans instead of panorama mode + stitcher.setPanoConfidenceThresh(0.65) # 0.7 seems to work great for black marsh + # stitcher.setSeamEstimationResol(1) + # stitcher.setCompositingResol(1) + + status, stitched = stitcher.stitch([map, thresholded] + unstitched_pool) + + if status == cv2.Stitcher_OK: + map = stitched + else: + print("Can't stitch images, error code = %d" % status) + # exit(-1) + + unstitched_pool.append(thresholded) + + + + # location of green/red? + # cv2.imshow('Result', map) + # cv2.waitKey(0) + # cv2.imshow('Result', cv2.inRange(map, (0, 255, 0), (0, 255, 0))) + # cv2.waitKey(0) + # exit() + green_circle_coord = cv2.findNonZero(cv2.inRange(map, (0, 255, 0), (0, 255, 0))) + red_circle_coord = cv2.findNonZero(cv2.inRange(map, (0, 0, 255), (0, 0, 255))) + + print(f"Green Circle Coord: {green_circle_coord}") + print(f"Red Circle Coord: {red_circle_coord}") + + # dist_from_green_to_red = ... + # direction = ... + # + # calculate_from_dist_and_dir_new_x_y = ... + cv2.imwrite(map_image_name, map) + + # if different_pixels > 110000: + # print("??? Bad capture? during a teleport? monster walked in the way? retry...") + # exit() + # continue + # else: + # break + break + + cv2.imwrite(diff_image_name, thresholded) + + # # weird edge post processing stuff, probably removing this? + # def unsharp_mask(img, blur_size=(9, 9), imgWeight=1.5, gaussianWeight=-0.5): + # gaussian = cv2.GaussianBlur(img, (5, 5), 0) + # return cv2.addWeighted(img, imgWeight, gaussian, gaussianWeight, 0) + # img = cv2.blur(thresholded, (5, 5)) + # img = unsharp_mask(img) + # img = unsharp_mask(img) + # img = unsharp_mask(img) + # + # # hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) + # # h, s, v = cv2.split(hsv) + # + # img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + # + # thresh = cv2.adaptiveThreshold(img, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, 11, 2) + # contours, heirarchy = cv2.findContours(thresh.copy(), cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) + # cnts = sorted(contours, key=cv2.contourArea, reverse=True) + # # for cnt in cnts: + # canvas_for_contours = thresh.copy() + # cv2.drawContours(thresh, cnts[:-1], 0, (0, 255, 0), 3) + # cv2.drawContours(canvas_for_contours, contours, 0, (0, 255, 0), 3) + # # # cv2.imshow('Result', canvas_for_contours - thresh) + # cv2.imwrite(result_image_name, canvas_for_contours - thresh) + # # # cv2.waitKey(0) + return current_x, current_y, map, unstitched_pool + + +def map_stitch(*files): + print(files) + images = [] + for f in files: + image = cv2.imread(f) + images.append(image) + + stitcher = cv2.Stitcher_create(cv2.Stitcher_SCANS) # scans instead of panorama mode + stitcher.setPanoConfidenceThresh(0.7) # 0.7 seems to work great for black marsh + # stitcher.setSeamEstimationResol(1) + # stitcher.setCompositingResol(1) + + + (status, stitched) = stitcher.stitch(images) + + if status != cv2.Stitcher_OK: + print("Can't stitch images, error code = %d" % status) + exit(-1) + + cv2.imshow('Result', stitched) + cv2.waitKey(0) + + +# Do captures +# map = None +# unstitched_pool = [] +# for n in range(15): +# sleep(2) +# # TODO: map_capture could return current_x, current_y +# x, y, map, unstitched_pool = map_capture(n, map=map, unstitched_pool=unstitched_pool) +# +# print(f"Current x = {x}, y = {y}") +# +# cv2.imshow('Result', map) +# cv2.waitKey(0) +# exit() + + +# Do stitching +# map_stitch(*glob("captures/*-result.png")) +map_stitch(*glob("captures/*-difference*.png")[:10]) + +# +# while True: +# with mss.mss() as sct: +# # The screen part to capture +# monitor = {"top": 52, "left": 1707, "width": 850, "height": 480} +# output = "sct-{top}x{left}_{width}x{height}.png".format(**monitor) +# +# # Grab the data +# sct_img = sct.grab(monitor) +# +# # Save to the picture file +# mss.tools.to_png(sct_img.rgb, sct_img.size, output=output) +# print(output) diff --git a/d2vs/mapping/capture2.py b/d2vs/mapping/capture2.py new file mode 100644 index 0000000..851fc5b --- /dev/null +++ b/d2vs/mapping/capture2.py @@ -0,0 +1,467 @@ +import os +from glob import glob +from typing import NamedTuple + +import imageio +import keyboard +import mss +import mss.tools + +from time import sleep, time + +from d2vs.utils import ImageMergeException +from padtransf import warpAffinePadded + +import numpy as np +import cv2 +#from matplotlib import pyplot as plt + +monitor = {"top": 65, "left": 1705, "width": 850, "height": 465} # this skips over difficulty/area, if game name/pass already hidden +center_x = int(monitor["width"] / 2) +center_y = int(monitor["height"] / 2) + + +def map_capture(): + """Gets 3 captures of the map: + 1. Before map displayed + 2. Map displayed (tab pressed) + 3. Map displayed a few frames later (tab still pressed) + + Tab is then depressed. The purpose for the third grab is to filter out any animations, later. + """ + with mss.mss() as sct: + # Before map is shown + pre = np.array(sct.grab(monitor)) + pre = cv2.cvtColor(pre, cv2.COLOR_BGRA2BGR) + keyboard.press_and_release("tab") + sleep(.075) + + # Map is shown + during_1 = np.array(sct.grab(monitor)) + during_1 = cv2.cvtColor(during_1, cv2.COLOR_BGRA2BGR) + sleep(.075) + + # Map is still there, but we can tell if any carvers/flames are underneath fucking up the diff + during_2 = np.array(sct.grab(monitor)) + during_2 = cv2.cvtColor(during_2, cv2.COLOR_BGRA2BGR) + keyboard.press_and_release("tab") + + # Debug showing captures + # cv2.imshow('pre', pre) + # cv2.waitKey(0) + # cv2.imshow('during 1', during_1) + # cv2.waitKey(0) + # cv2.imshow('during 2', during_2) + # cv2.waitKey(0) + return pre, during_1, during_2 + + +# def _mask_image(img, color, background="None"): +# range_start, range_end = _color_rgb_to_bgr_range(color) +# img_mask = cv2.inRange(img, range_start, range_end) +# +# # expand what we found to cover the whole thing, make the whole blur part of the mask via threshold +# img_mask = cv2.blur(img_mask, (4, 4)) +# _, img_mask = cv2.threshold(img_mask, int(0.1 * 255), 255, cv2.THRESH_BINARY) +# +# if background == "None": +# return cv2.bitwise_and(img, img, mask=255 - img_mask) +# else: +# return cv2.bitwise_and(img, img, mask=255 - img_mask) +# +# +# def _color_rgb_to_bgr_range(color, range=1.0): +# r, g, b = color +# offset = int(range / 2) +# # return (b - offset, g - offset, r - offset), (b + offset, g + offset, r + offset) +# return (b - (12 * range), g - (8 * range), r - (8 * range)), (b + (12 * range), g + (8 * range), r + (8 * range)) + + +# def _remove_range(img, range_start, range_end): +# img_mask = cv2.inRange(img, range_start, range_end) +# +# # Debug showing mask +# # cv2.imshow('mask', img_mask) +# # cv2.waitKey(0) +# # cv2.destroyAllWindows() +# +# # expand what we found to cover the whole thing, make the whole blur part of the mask via threshold +# img_mask = cv2.blur(img_mask, (4, 4)) +# _, img_mask = cv2.threshold(img_mask, int(0.1 * 255), 255, cv2.THRESH_BINARY) +# +# return cv2.bitwise_and(img, img, mask=255 - img_mask) + + +def map_diff(pre, during_1, during_2, is_start=False, show_current_location=True, threshold=0.11): + """Takes the 3 stages of map capture and outputs a final diff, removing carvers and adding our own markers""" + + # image without map + pre = cv2.cvtColor(pre, cv2.COLOR_BGR2GRAY) + + # images displaying the map, clean up some things from this display so it's less cluttered + original_during_1 = during_1.copy() + + + # during_1 = _mask_image(during_1, (0x20, 0x84, 0xF6)) # player marker + # during_1 = _mask_image(during_1, (0x44, 0x70, 0x74)) # merc marker + # during_1 = _mask_image(during_1, (0xff, 0xff, 0xff)) # npc marker + + + + + + # TODO: HSV it up.. + # 1. White (i.e. npc) is HSV (0, 1, 75%) through (0, 0, 100) + # 2. Blue on minimap (i.e. you) is HSV (210, 85%, 85%) through (215, 87%, 99%) + # 3. Greenish on minimap (i.e. merc) is HSV (180, 40%, 40%) through (190, 42%, 42%) + during_1 = cv2.cvtColor(during_1, cv2.COLOR_BGR2HSV) + # during_1 = _remove_range(during_1, (0, 0, .75 * 255), (0, 25, 255)) # white npcs + # during_1 = _remove_range(during_1, (210, .85 * 255, .85 * 255), (215, .90 * 255, 255)) # blue, current player marker + # during_1 = _remove_range(during_1, (180, .4 * 255, .4 * 255), (190, .42 * 255, .42 * 255)) # green mercs + masks_to_remove = [ + cv2.inRange(during_1, (0, 0, .75 * 255), (0, 25, 255)), # white npcs + # cv2.inRange(during_1, (200, .80 * 255, .85 * 255), (215, .95 * 255, 255)), # blue, current player marker + cv2.inRange(during_1, (105, int(.85 * 255), int(.8 * 255)), (110, int(.90 * 255), int(1 * 255))), # blue, current player marker + # (185,41,45) .. (183,37,41) .. (184,40,39) + cv2.inRange(during_1, (90, int(.35 * 255), int(.35 * 255)), (95, int(.45 * 255), int(.50 * 255))), # green mercs + + # TODO: Yellow warps ??? Red portal ?? remove it, but re-add it colored as warp? + ] + + + # Debug showing masked things being removed + # cv2.imshow('mask', during_1) + # cv2.waitKey(0) + # cv2.destroyAllWindows() + + + + + + + + + during_1 = cv2.cvtColor(during_1, cv2.COLOR_HSV2BGR) # convert it to bgr which lets us convert to gray.. + during_1 = cv2.cvtColor(during_1, cv2.COLOR_BGR2GRAY) + # during_2 = cv2.cvtColor(during_2, cv2.COLOR_BGR2GRAY) + + # Get diff of original pre-map image vs both map snapshots, combine the artifacts from both snapshots + absdiff_1 = cv2.absdiff(pre, during_1) + # _, thresholded_1 = cv2.threshold(absdiff_1, int(threshold * 255), 255, cv2.THRESH_BINARY) + + # absdiff_2 = cv2.absdiff(pre, during_2) + # _, thresholded_2 = cv2.threshold(absdiff_2, int(threshold * 255), 255, cv2.THRESH_BINARY) + + # diffed = cv2.bitwise_and(thresholded_1, thresholded_2) + # diffed = thresholded_1 + diffed = absdiff_1 + + # earlier we masked some things from the minimap, remove them now post-diff + for mask_locations in masks_to_remove: + img_mask = cv2.blur(mask_locations, (2, 2)) + _, img_mask = cv2.threshold(img_mask, int(0.1 * 255), 255, cv2.THRESH_BINARY) + + diffed = cv2.bitwise_and(diffed, diffed, mask=255 - img_mask) + + # Debug showing diff before adding circles + # cv2.imshow('absdiff_1', absdiff_1) + # cv2.waitKey(0) + # cv2.imshow('absdiff_2', absdiff_2) + # cv2.waitKey(0) + # cv2.imshow('diffed', diffed) + # cv2.waitKey(0) + # cv2.destroyAllWindows() + + + + + + + + + + + + # Draw a big red/green circle + warps that will stick around between images + diffed = cv2.cvtColor(diffed, cv2.COLOR_GRAY2BGR) + + + + + + # TODO: RE-DO WARP MASKS WITH HSV! + + + + # # were there any warps here? highlight them! + # # range_start, range_end = color_rgb_to_bgr_range((0xD9, 0x58, 0xEB)) # gets top of warp + # # range_start, range_end = color_rgb_to_bgr_range((0xA2, 0x46, 0xEA)) # middle of warp + # # range_start, range_end = color_rgb_to_bgr_range((0xB5, 0x4C, 0xEB)) # middle of warp + # range_start, range_end = _color_rgb_to_bgr_range((0x8D, 0x3C, 0xB2), range=1.5) # middle of warp + # # range_start, range_end = (0xEB - 15, 0x58 - 15, 0xD9 - 15), (0xEA + 15, 0x46 + 15, 0xA2 + 15) + # warp_mask = cv2.inRange(original_during_1, range_start, range_end) + # warp_mask = cv2.blur(warp_mask, (5, 5)) + # _, warp_mask = cv2.threshold(warp_mask, int(0.1 * 255), 255, cv2.THRESH_BINARY) + # + # diffed[warp_mask > 0] = [0xD9, 0x58, 0xEB] # Where ever there is a warp color it in with da purps + + if show_current_location: + if is_start: + color = (0, 0, 255) # red + else: + color = (0, 255, 0) # green + + # I don't know why I have to offset the center x/y, but if I don't it is .. offset! + cv2.circle(diffed, (center_x + 12, center_y - 12), 2, color, -1) + + # Debug showing diff post circles + # cv2.imshow('diffed', diffed) + # cv2.waitKey(0) + + return diffed + + +def map_get_features(diff): + # sift = cv2.SIFT_create() + # surf = cv2.SURF + + # Fast style? + # fast = cv2.FastFeatureDetector_create() + # fast.setNonmaxSuppression(0) + # kp = fast.detect(img, None) + # features = cv2.drawKeypoints(img, kp, None, color=(255, 0, 0)) + + # ORB style? + # orb = cv2.ORB_create() + orb = cv2.ORB_create(nfeatures=7500, edgeThreshold=0, scoreType=cv2.ORB_FAST_SCORE) + # orb = cv2.ORB_create(nfeatures=1500, edgeThreshold=0, scoreType=cv2.ORB_FAST_SCORE) + keypoints, descriptors = orb.detectAndCompute(diff, None) + # features = cv2.drawKeypoints(diff, keypoints, None, color=(255, 0, 0)) + + # Debug showing features + # cv2.imshow('Features', features) + # cv2.waitKey(0) + + return keypoints, descriptors + + +def map_merge_features(diff_1, diff_2): + keypoints_1, descriptors_1 = map_get_features(diff_1) + keypoints_2, descriptors_2 = map_get_features(diff_2) + + # Debug showing keypoints + # cv2.imshow("Result", cv2.drawKeypoints(diff_1, keypoints_1, None, color=(255, 0, 0))) + # cv2.waitKey(0) + # cv2.imshow("Result", cv2.drawKeypoints(diff_2, keypoints_2, None, color=(255, 0, 0))) + # cv2.waitKey(0) + + # *** BF MATCHER OLD + # Match descriptors between images + bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True) + matches = bf.match(descriptors_1, descriptors_2) + + # Debug outputting first 10 matches sorted by dist + # matches = sorted(matches, key=lambda x: x.distance) + # img3 = cv2.drawMatches(diff_1, keypoints_1, diff_2, keypoints_2, matches[:10], None, flags=2) + # cv2.imshow("Result", img3) + # cv2.waitKey(0) + + # Extract location of good matches (wtf are good matches?!) + points1 = np.zeros((len(matches), 2), dtype=np.float32) + points2 = np.zeros((len(matches), 2), dtype=np.float32) + + for i, match in enumerate(matches): + points1[i, :] = keypoints_1[match.queryIdx].pt + points2[i, :] = keypoints_2[match.trainIdx].pt + + # # *** Flann matcher + # Match descriptors between images + # FLANN_INDEX_LSH = 6 + # index_params = dict( + # algorithm=FLANN_INDEX_LSH, + # table_number=12, # 12 + # key_size=20, # 20 + # multi_probe_level=2, # 2 + # ) + # flann = cv2.FlannBasedMatcher(index_params, {"checks": 50}) + # matches = flann.knnMatch(descriptors_1, descriptors_2, k=2) + # + # # ratio test as per Lowe's paper + # points1 = np.zeros((len(matches), 2), dtype=np.float32) + # points2 = np.zeros((len(matches), 2), dtype=np.float32) + # for i, (m, n) in enumerate(matches): + # if m.distance < 0.7 * n.distance: + # points1[i, :] = keypoints_1[m.queryIdx].pt + # points2[i, :] = keypoints_2[m.trainIdx].pt + + + + + + + + + + + + + # This works... + H, mask = cv2.estimateAffine2D(points2, points1) + # H, mask = cv2.estimateAffinePartial2D(points1, points2) # ??? don't think we need this? doesn't seem to work + original_with_padding, new_with_padding = warpAffinePadded(diff_2, diff_1, H, flags=cv2.INTER_NEAREST) + # original_with_padding, new_with_padding = warpAffinePadded(diff_2, diff_1, H, flags=cv2.INTER_LANCZOS4) # slow ? + # original_with_padding, new_with_padding = warpAffinePadded(diff_2, diff_1, H, flags=cv2.INTER_CUBIC) # worked OK? + # original_with_padding, new_with_padding = warpAffinePadded(diff_2, diff_1, H, flags=cv2.INTER_NEAREST_EXACT) + + # Debug showing padding results + # cv2.imshow("new_with_padding", new_with_padding) + # cv2.imshow("original_with_padding", original_with_padding) + # cv2.waitKey(0) + # cv2.destroyAllWindows() + + # Let's find red starting point, which may be overwritten by waypoints/other things + # so we can highlight over it again later + red_starting_point_mask = np.all(original_with_padding == [0, 0, 255], axis=-1) + + # Delete any old green markers for "current location" + original_with_padding[np.all(original_with_padding == [0, 255, 0], axis=-1)] = [0, 0, 0] + + # Take black areas from newest diff and override old white areas in old image # TODO: make this a mode for areas we're taking MANY shots in? + # original_with_padding[np.all(original_with_padding == [127, 127, 127], axis=-1)] = [0, 0, 0] # anything that was gray -> remove it, black now + # original_with_padding[np.all(new_with_padding == [0, 0, 0], axis=-1)] = [127, 127, 127] # anything black in new image -> gray, prepped to be removed if not repeated + + # Merge original with new + # map = cv2.bitwise_or(original_with_padding, new_with_padding) + map = cv2.bitwise_or(new_with_padding, original_with_padding) # TODO: trying to swap ordering?? + # map = cv2.bitwise_or(new_with_padding, original_with_padding, mask=new_with_padding[np.all(new_with_padding == [0, 0, 0])]) + # map = cv2.bitwise_and(new_with_padding, original_with_padding, mask=excluding areas in padding or something?) + + + # TODO: Merge together AND the overlapping areas, but OR on areas not overlapping ?? + + + + + + + + # Re-add red mask so it's super clear + map[red_starting_point_mask] = [0, 0, 255] + + + + + + + + # where are we? if we're right on the red X, that'd be (10_000, 10_000) + # # green_current_point_mask = np.all(map == [0, 255, 0], axis=-1) + # green_current_point_mask = np.any(map == [0, 255, 0], axis=-1) + # # M = cv2.moments(green_current_point_mask) + # # cX = int(M["m10"] / M["m00"]) + # # cY = int(M["m01"] / M["m00"]) + # # print(cX, cY) + # import pdb; pdb.set_trace() + # coordinates = list(zip(green_current_point_mask[0], green_current_point_mask[1])) + # print(green_current_point_mask) + # print(coordinates) + + # Look in original image for red coordinate + # red_coords = np.where(np.all(map == [0, 0, 255], axis=-1)) + # red_coords = np.transpose(red_coords) # faster than 'zip' but does same thing ??? + # red_y, red_x = red_coords[0] + red_x, red_y = map_get_coordinates(map, [0, 0, 255]) + + try: + green_coords = np.where(np.all(new_with_padding == [0, 255, 0], axis=-1)) + green_coords = np.transpose(green_coords) # faster than 'zip' but does same thing ??? + green_y, green_x = green_coords[0] + except IndexError: + raise ImageMergeException("Could not find green marker indicating current position") from None + + # red is start at 10_000, 10_000 so base it off that... + current_x, current_y = 10_000 + green_x - red_x, 10_000 + green_y - red_y + + base_x, base_y = red_x, red_y + + + + + # Debug showing final map!!! + # cv2.imshow("Result", map) + # cv2.waitKey(0) + return map, current_x, current_y, base_x, base_y + + +def map_get_coordinates(map, color): + coords = np.where(np.all(map == color, axis=-1)) + coords = np.transpose(coords) # faster than 'zip' but does same thing ??? + y, x = coords[0] + return x, y + + +def map_process(): + """""" + pass + + + + + +if __name__ == "__main__": + images = [] + map = None + # diff_files = glob("captures/*-difference*.png") + # print("\n".join(diff_files)) + + + # diff_1 = cv2.imread("captures/0-difference-map.png") + # diff_2 = cv2.imread("captures/1-difference-map.png") + # diff_3 = cv2.imread("captures/2-difference-map.png") + # diff_4 = cv2.imread("captures/3-difference-map.png") + # + # map = map_merge_features(diff_1, diff_2) + # map = map_merge_features(map, diff_3) + # map = map_merge_features(map, diff_4) + + counter = 0 + # try: + while True: + if counter == 0: + map = cv2.imread(f"captures/{counter}-difference-map.png") + + next_img_path = f"captures/{counter + 1}-difference-map.png" + if not os.path.exists(next_img_path): + break + diff = cv2.imread(next_img_path) + + images.append(map) + + start = time() + map, x, y = map_merge_features(map, diff) + + counter += 1 + print(f"Capture {counter} @ ({x}, {y}) in {(time() - start) * 1000}ms") + + # Debug showing final map!!! + cv2.imshow("Result", map) + cv2.waitKey(0) + + + # except: + # pass # lazy exit on file read error + + + + + + + + + + # for diff in diffs: + # + # + # cv2.imshow('Result', map) + # cv2.waitKey(0) + diff --git a/d2vs/mapping/captures/.gitkeep b/d2vs/mapping/captures/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/d2vs/mapping/padtransf.py b/d2vs/mapping/padtransf.py new file mode 100644 index 0000000..618eac3 --- /dev/null +++ b/d2vs/mapping/padtransf.py @@ -0,0 +1,239 @@ +"""Padded transformation module. + +This module provides two functions, warpPerspectivePadded() and +warpAffinePadded(), which compliment the built-in OpenCV functions +warpPerspective() and warpAffine(). These functions calculate the +extent of the warped image and pads both the destination and the +warped image so both images can be fully displayed together. + +References +---------- +See the following question and my answer on Stack Overflow for an +idea of how this was conceptualized and to read the mathematics +behind the functions: https://stackoverflow.com/a/44459869/5087436 +""" + +import cv2 +import numpy as np + +from d2vs.utils import ImageMergeException + + +# def warpPerspectivePadded( +# src, dst, M, +# flags=cv2.INTER_LINEAR, +# borderMode=cv2.BORDER_CONSTANT, +# borderValue=0): +# """Performs a perspective warp with padding. +# +# Parameters +# ---------- +# src : array_like +# source image, to be warped. +# dst : array_like +# destination image, to be padded. +# M : array_like +# `3x3` perspective transformation matrix. +# +# Returns +# ------- +# src_warped : ndarray +# padded and warped source image +# dst_padded : ndarray +# padded destination image, same size as src_warped +# +# Optional Parameters +# ------------------- +# flags : int, optional +# combination of interpolation methods (`cv2.INTER_LINEAR` or +# `cv2.INTER_NEAREST`) and the optional flag `cv2.WARP_INVERSE_MAP`, +# that sets `M` as the inverse transformation (`dst` --> `src`). +# borderMode : int, optional +# pixel extrapolation method (`cv2.BORDER_CONSTANT` or +# `cv2.BORDER_REPLICATE`). +# borderValue : numeric, optional +# value used in case of a constant border; by default, it equals 0. +# +# See Also +# -------- +# warpAffinePadded() : for `2x3` affine transformations +# cv2.warpPerspective(), cv2.warpAffine() : original OpenCV functions +# """ +# +# assert M.shape == (3, 3), \ +# 'Perspective transformation shape should be (3, 3).\n' \ +# + 'Use warpAffinePadded() for (2, 3) affine transformations.' +# +# M = M / M[2, 2] # ensure a legal homography +# if flags in (cv2.WARP_INVERSE_MAP, +# cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP, +# cv2.INTER_NEAREST + cv2.WARP_INVERSE_MAP): +# M = cv2.invert(M)[1] +# flags -= cv2.WARP_INVERSE_MAP +# +# # it is enough to find where the corners of the image go to find +# # the padding bounds; points in clockwise order from origin +# src_h, src_w = src.shape[:2] +# lin_homg_pts = np.array([ +# [0, src_w, src_w, 0], +# [0, 0, src_h, src_h], +# [1, 1, 1, 1] +# ]) +# +# # transform points +# transf_lin_homg_pts = M.dot(lin_homg_pts) +# transf_lin_homg_pts /= transf_lin_homg_pts[2, :] +# +# # find min and max points +# min_x = np.floor(np.min(transf_lin_homg_pts[0])).astype(int) +# min_y = np.floor(np.min(transf_lin_homg_pts[1])).astype(int) +# max_x = np.ceil(np.max(transf_lin_homg_pts[0])).astype(int) +# max_y = np.ceil(np.max(transf_lin_homg_pts[1])).astype(int) +# +# # add translation to the transformation matrix to shift to positive values +# anchor_x, anchor_y = 0, 0 +# transl_transf = np.eye(3, 3) +# if min_x < 0: +# anchor_x = -min_x +# transl_transf[0, 2] += anchor_x +# if min_y < 0: +# anchor_y = -min_y +# transl_transf[1, 2] += anchor_y +# shifted_transf = transl_transf.dot(M) +# shifted_transf /= shifted_transf[2, 2] +# +# # create padded destination image +# dst_h, dst_w = dst.shape[:2] +# +# pad_widths = [ +# anchor_y, max(max_y, dst_h) - dst_h, +# anchor_x, max(max_x, dst_w) - dst_w +# ] +# +# dst_padded = cv2.copyMakeBorder(dst, *pad_widths, borderType=borderMode, value=borderValue) +# +# dst_pad_h, dst_pad_w = dst_padded.shape[:2] +# src_warped = cv2.warpPerspective(src, shifted_transf, (dst_pad_w, dst_pad_h), flags=flags, borderMode=borderMode, borderValue=borderValue) +# +# return dst_padded, src_warped + + +def warpAffinePadded(src, dst, M, flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT, borderValue=0): + """Performs an affine or Euclidean/rigid warp with padding. + + Parameters + ---------- + src : array_like + source image, to be warped. + dst : array_like + destination image, to be padded. + M : array_like + `2x3` affine transformation matrix. + + Returns + ------- + src_warped : ndarray + padded and warped source image + dst_padded : ndarray + padded destination image, same size as src_warped + + Optional Parameters + ------------------- + flags : int, optional + combination of interpolation methods (`cv2.INTER_LINEAR` or + `cv2.INTER_NEAREST`) and the optional flag `cv2.WARP_INVERSE_MAP`, + that sets `M` as the inverse transformation (`dst` --> `src`). + borderMode : int, optional + pixel extrapolation method (`cv2.BORDER_CONSTANT` or + `cv2.BORDER_REPLICATE`). + borderValue : numeric, optional + value used in case of a constant border; by default, it equals 0. + + See Also + -------- + warpPerspectivePadded() : for `3x3` perspective transformations + cv2.warpPerspective(), cv2.warpAffine() : original OpenCV functions + """ + assert M.shape == (2, 3), \ + 'Affine transformation shape should be (2, 3).\n' \ + + 'Use warpPerspectivePadded() for (3, 3) homography transformations.' + + if flags in (cv2.WARP_INVERSE_MAP, + cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP, + cv2.INTER_NEAREST + cv2.WARP_INVERSE_MAP): + M = cv2.invertAffineTransform(M) + flags -= cv2.WARP_INVERSE_MAP + + # it is enough to find where the corners of the image go to find + # the padding bounds; points in clockwise order from origin + src_h, src_w = src.shape[:2] + lin_pts = np.array([ + [0, src_w, src_w, 0], + [0, 0, src_h, src_h] + ]) + + # transform points + transf_lin_pts = M[:, :2].dot(lin_pts) + M[:, 2].reshape(2, 1) + + # find min and max points + min_x = np.floor(np.min(transf_lin_pts[0])).astype(int) + min_y = np.floor(np.min(transf_lin_pts[1])).astype(int) + max_x = np.ceil(np.max(transf_lin_pts[0])).astype(int) + max_y = np.ceil(np.max(transf_lin_pts[1])).astype(int) + + # add translation to the transformation matrix to shift to positive values + anchor_x, anchor_y = 0, 0 + if min_x < 0: + anchor_x = -min_x + if min_y < 0: + anchor_y = -min_y + shifted_transf = M + [[0, 0, anchor_x], [0, 0, anchor_y]] + + # if np.any(shifted_transf >= 4) or np.any(shifted_transf <= -4): + # raise ImageMergeException("We way outta da bounds!") + + if anchor_x >= 200 or anchor_y >= 200: + raise ImageMergeException("It seems like we had a bad stitch? try again, maybe someone walked in front of minimap?") + + # create padded destination image + dst_h, dst_w = dst.shape[:2] + + pad_widths = [anchor_y, max(max_y, dst_h) - dst_h, + anchor_x, max(max_x, dst_w) - dst_w] + + dst_padded = cv2.copyMakeBorder(dst, *pad_widths, borderType=borderMode, value=borderValue) + + dst_pad_h, dst_pad_w = dst_padded.shape[:2] + + + + + + + + + + + + # TODO: check old width vs new width, are we increasing > 10%? + + + + + + + + + + + + print("are we doing a huge shift here?") + print(shifted_transf) + print("anchors:", anchor_x, anchor_y) + print("src_h, src_w:", src_h, src_w) + print("pad h,w:", dst_pad_h, dst_pad_w) + + + src_warped = cv2.warpAffine(src, shifted_transf, (dst_pad_w, dst_pad_h), flags=flags, borderMode=borderMode, borderValue=borderValue) + + return dst_padded, src_warped diff --git a/d2vs/mapping/pathing/__init__.py b/d2vs/mapping/pathing/__init__.py new file mode 100644 index 0000000..c892754 --- /dev/null +++ b/d2vs/mapping/pathing/__init__.py @@ -0,0 +1,2 @@ +from .node import Node, DynamicNode +# from .static import StaticPather \ No newline at end of file diff --git a/d2vs/mapping/pathing/node.py b/d2vs/mapping/pathing/node.py new file mode 100644 index 0000000..e26ffa7 --- /dev/null +++ b/d2vs/mapping/pathing/node.py @@ -0,0 +1,101 @@ +import json +from enum import Enum, auto + + +class InteractableType(Enum): + WAYPOINT = "waypoint" + WARP = "warp" + HEALER = "healer" + REPAIRER = "repairer" + + +class Interactable: + """Appears near a node, not a "walkable point" but something you interact with typically. + + Examples: + - Red Portal + - Malah (the Act 5 healer) + """ + def __init__(self, x, y, name, interactable_type: InteractableType): + self.x = x + self.y = y + self.name = name + + assert interactable_type in [i.value for i in InteractableType], f"{interactable_type} not in InteractableTypes, " \ + f"known options are {InteractableType.__members__.values()}" + self.interactable_type = InteractableType(interactable_type) + + def __str__(self): + return f"{self.name} @ {self.x}, {self.y}" + + def to_dict(self): + """for json serialization""" + return { + "x": self.x, + "y": self.y, + "name": self.name or "", + "interactable_type": self.interactable_type.value, + } + + +class Node: + def __init__(self, x, y, unwalkable=False, is_start=False, is_end=False): + # Sanity checks.. + # assert not is_start and connections, "Must set at least one direction, we had to have come from " \ + # "somewhere?! Unless we just started, then mark is_start = True" + + assert not (is_start and is_end), "Cannot be start and end node at the same time!? or can you.. maybe!?" + + # Coords + self.x = x + self.y = y + + # Are we walkable? I.e., we tried to tele here and detected that tele failed.. not walkable! + self.unwalkable = unwalkable + + # Setup connections (list of x, y tuples) to other nodes + self._connections = {} + + # Things you can interact with near this node.. + self._interactables = [] + + # Beginning or next to our goal? Sometimes maybe we have no goal? + self.is_start = is_start + self.is_end = is_end + + def __str__(self): + return f"{self.x}, {self.y}" + + def to_dict(self): + """for json serialization""" + return { + "x": self.x, + "y": self.y, + "unwalkable": self.unwalkable, + "connections": [(x, y) for x, y in self._connections.keys()], + "interactables": [i.to_dict() for i in self._interactables], + "is_start": self.is_start, + "is_end": self.is_end, + } + + def get_connections(self): + return self._connections.values() + + def add_connection(self, node): + assert (node.x, node.y) not in self._connections, "Cannot add existing connection to node?!" + self._connections[(node.x, node.y)] = node + + def get_interactables(self): + return self._interactables + + def add_interactable(self, interactable): + # assert (interactable.x, interactable.y) not in self._connections, "Cannot add existing interactable to node?!" + self._interactables.append(interactable) + + +class DynamicNode(Node): + def __init__(self, x, y, diff, **kwargs): + super().__init__(x, y, **kwargs) + + # The image of the map difference screenshot taken at this step + self.diff = diff diff --git a/d2vs/mapping/pathing/static.py b/d2vs/mapping/pathing/static.py new file mode 100644 index 0000000..0b7a7bc --- /dev/null +++ b/d2vs/mapping/pathing/static.py @@ -0,0 +1,19 @@ +from d2vs.mapping.areas import Harrogath +from d2vs.mapping.pathing.node import InteractableType + + +# class StaticPather: +# """For pathfinding on static maps, like Harrogath""" +# pass + + +if __name__ == "__main__": + # from anywhere in a5 town, go to Malah + + # load a5 town nodes/interactables + area = Harrogath() + path = area.find_interactable_type(InteractableType.HEALER) + + print("Found path!") + for p in path: + print(p) diff --git a/d2vs/mapping/static_node_rebuilder_tool.py b/d2vs/mapping/static_node_rebuilder_tool.py new file mode 100644 index 0000000..5efd3ab --- /dev/null +++ b/d2vs/mapping/static_node_rebuilder_tool.py @@ -0,0 +1,19 @@ +import keyboard + +from time import sleep + +import numpy as np +from cv2 import cv2 + +from d2vs.mapping.capture2 import map_diff, map_capture, map_merge_features +from d2vs.mapping.pathing import Node + + + + +if __name__ == "__main__": + + # read json of map + + # say which node and interactables we're close to + pass diff --git a/d2vs/mapping/static_node_recorder_tool.py b/d2vs/mapping/static_node_recorder_tool.py new file mode 100644 index 0000000..8388f67 --- /dev/null +++ b/d2vs/mapping/static_node_recorder_tool.py @@ -0,0 +1,293 @@ +import json + +import keyboard + +from time import sleep + +import numpy as np +import cv2 + +from d2vs.mapping.capture2 import map_diff, map_capture, map_merge_features, map_get_coordinates +from d2vs.mapping.padtransf import ImageMergeException +from d2vs.mapping.pathing import Node +from d2vs.mapping.pathing.node import Interactable +from d2vs.utils import NpEncoder, windows_say + + +class AutoRecorder: + def __init__(self, area_name, load_existing=False, prev_node=None): + """ + + :param area_name: + :param load_existing: + :param prev_node: (x, y) coords of the node to start drawing from, empty to make a new (10_000, 10_000) start node + """ + # if start_node or prev_node: + # assert start_node and prev_node, "If you supply a start_node/prev_node/nodes, you must also supply the others" + + self.area_name = area_name + + self.load_existing = load_existing + + # Node the level starts from/is relative to .. the (10_000, 10_000) coordinate Node + self.start_node = None + + # We make map in record_first_node, or load existing map + self.map = None + + # Holds our nodes, keys are tuple of (x, y) value is Node + self.nodes = {} + if load_existing: + try: + data = json.loads(open(self._get_area_level_json_path(), "r").read()) + except FileNotFoundError: + print("wtf area missing:", area_name) + + # Loop over data once and make initial set of nodes + for n in data["nodes"]: + # pop data not used in class creation + n_copy = n.copy() + n_copy.pop("connections") + n_copy.pop("interactables") + node = Node(**n_copy) + self.nodes[(node.x, node.y)] = node + + if node.is_start: + assert self.start_node is None, "We found multiple start nodes?! tried setting it twice" + self.start_node = node + + # Loop over data again and make connections + for n in data["nodes"]: + for c_x, c_y in n["connections"]: + self.nodes[(n["x"], n["y"])].add_connection(self.nodes[(c_x, c_y)]) + + for interactable_data in n["interactables"]: + interactable = Interactable(**interactable_data) + self.nodes[(n["x"], n["y"])].add_interactable(interactable) + + try: + self.map = cv2.imread(self._get_area_level_png_path()) + except FileNotFoundError: + self.map = None + + + # Points to last created node + if prev_node: + self.prev_node = self.nodes[tuple(prev_node)] + else: + self.prev_node = None + + # Location of red dot (start) on map + self.last_base_x, self.last_base_y = None, None + + def _get_area_level_png_path(self): + return f"areas/static_data/{self.area_name}.png" + + def _get_area_level_debug_png_path(self): + return f"areas/static_data/{self.area_name}_debug.png" + + def _get_area_level_json_path(self): + return f"areas/static_data/{self.area_name}.json" + + def record_first_node(self): + self.map = map_diff(*map_capture(), is_start=True) + + # Debug: show map + # cv2.imshow("map map", self.map) + # cv2.waitKey(0) + + self.start_node = Node( + 10_000, + 10_000, + is_start=True, + ) + self.nodes[(self.start_node.x, self.start_node.y)] = self.start_node + self.last_base_x, self.last_base_y = (0, 0) + self.prev_node = self.start_node + + def record_new_node(self): + # Maybe we're trying to record the first node? do that instead! + if isinstance(self.map, type(None)) and not self.load_existing: + # no map supplied, we must have skipped record_first_node? + # self.map = map_diff(*map_capture(), is_start=True) + # self.prev_node = + self.record_first_node() + return + + assert not isinstance(self.map, type(None)), "You have to have a base static map to work from, by calling record_first_node or saving static_data/area_name.png" + + diff = map_diff(*map_capture()) + + # Debug: show diff + # cv2.imshow("map diff", diff) + # cv2.waitKey(0) + + try: + new_map, x, y, self.last_base_x, self.last_base_y = map_merge_features(self.map, diff) + + if not self.load_existing: + self.map = new_map + + # TODO: sanity check, is this node too close to a previous one? may min distance is like 20 pixels? + + new_node = Node(x, y) + + self.nodes[(new_node.x, new_node.y)] = new_node + + # TODO: also connect all nodes within ??? range? + self.prev_node.add_connection(new_node) # connect previous to new + self.prev_node = new_node # new is now the old! + + print(x, y) + except ImageMergeException as e: + print("!!!!!!!!!!!!!!!!!!!!!!!!!!!") + print(e) + windows_say("Failed") + + def finish(self): + # node = self.start_node + # while True: + # print(node) + + # print(self.start_node) + + + + + + # Dump the static map, TODO: make this optional? don't need to do this after a while? + if not self.load_existing: + cv2.imwrite(self._get_area_level_png_path(), self.map) + + # Dump the node data to json + self.dump_nodes() + + self.view_map() + + # TODO: rename to dump since it dumps interactables n such ? + def dump_nodes(self): + print("Dumping nodes...") + data = {} + + # add nodes + data["nodes"] = [] + for node in self.nodes.values(): + node_data = node.to_dict() + print(f"Adding node_data: {node_data}") + data["nodes"].append(node_data) + + with open(self._get_area_level_json_path(), "w") as f: + f.write(json.dumps(data, cls=NpEncoder, indent=4)) + + print("..done!") + + def draw_map_with_nodes(self): + map_copy = self.map.copy() + + # lighten map color so easier to see debug shit + map_copy[np.where((map_copy == [255, 255, 255]).all(axis=2))] = [0x70, 0x70, 0x70] + + # Delete any old green markers for "current location" + map_copy[np.all(map_copy == [0, 255, 0], axis=-1)] = [0, 0, 0] + + # maybe we need to initialiez these if no merge has been done yet? + if not self.last_base_x: + self.last_base_x, self.last_base_y = map_get_coordinates(map_copy, [0, 0, 255]) + print(self.last_base_x, self.last_base_y) + + + # keep track of drawn interactables, since nodes may point to the same thing we don't want to draw the txt label twice + seen_interactables = {} + + for node in self.nodes.values(): + # Go back from 10_000, 10_000 based coordinate system to 0, 0 based for drawing this on our diff + x = node.x + self.last_base_x - 10_000 + y = node.y + self.last_base_y - 10_000 + print(f"Drawing node from ({node.x}, {node.y}) to ({x}, {y})") + FONT_HERSHEY_COMPLEX_SMALL = 5 + cv2.putText(map_copy, f"{node.x}, {node.y}", (x - 20, y - 10), FONT_HERSHEY_COMPLEX_SMALL, .66, (0x00, 0xff, 0x00), 1) + + if node.is_start: + color = (0x00, 0x00, 0xff) + else: + color = (0x00, 0xff, 0x00) + + # NOTE: this is 3px radius, red circle marking start location is only 2px .. shouldn't matter much .. + cv2.circle(map_copy, (x, y), 3, color, -1) + + for conn in node.get_connections(): + conn_new_x = conn.x + self.last_base_x - 10_000 + conn_new_y = conn.y + self.last_base_y - 10_000 + + cv2.line(map_copy, (x, y), (conn_new_x, conn_new_y), (0x00, 0xff, 0x00)) # green + + # Add our interactables and draw lines and such to them, only draw the text label one time! + for interactable in node.get_interactables(): + interactable_new_x = interactable.x + self.last_base_x - 10_000 + interactable_new_y = interactable.y + self.last_base_y - 10_000 + + cv2.line(map_copy, (x, y), (interactable_new_x, interactable_new_y), (0, 165, 255)) # orange + cv2.circle(map_copy, (interactable_new_x, interactable_new_y), 3, (0, 165, 255), -1) + + if (interactable.x, interactable.y) not in seen_interactables: + cv2.putText(map_copy, f"{interactable.name}", (x - 35, y - 35), FONT_HERSHEY_COMPLEX_SMALL, .66, (0, 165, 255), 1) + seen_interactables[(interactable.x, interactable.y)] = True + + cv2.imwrite(self._get_area_level_debug_png_path(), map_copy) + + return map_copy + + def view_map(self): + map = self.draw_map_with_nodes() + cv2.imshow("map with nodes", map) + cv2.waitKey(0) + cv2.destroyAllWindows() + + +if __name__ == "__main__": + + # TODO: Make the first capture not automatic? Either auto-first node or you can select a node to extend + + import tkinter as tk + from tkinter import messagebox, simpledialog, Entry, END + + + # Have to do this to start Tk + tk_root = tk.Tk() + tk_root.overrideredirect(1) + tk_root.withdraw() + + kwargs = { + # "load_existing": messagebox.askokcancel("Overwrite", "Load existing area? (if no, you may overwrite something!)"), + # "area_name": simpledialog.askstring(title="Area name?", prompt="What's the AreaLevel you're working on?", initialvalue="Harrogath") + # "load_existing": False, + + # "load_existing": True, + # "area_name": "Harrogath", + # "prev_node": (10_000, 10_000), + # # "prev_node": (9966, 10034), + + "area_name": "Rogue Encampment", + } + + # if kwargs["load_existing"]: + # prev_node_coords = simpledialog.askstring( + # title="Area name?", + # prompt="Enter 'x, y' for the node would you like to add to. Ie '10_000, 10_000' to start from the original " + # "start (underscores deleted before processing)", + # initialvalue="10_000,10_000" + # ) + # prev_x, prev_y = prev_node_coords.replace("_", "").replace(" ", "").split(",") # tuple (x, y) + # kwargs["prev_node"] = (int(prev_x), int(prev_y)) + + recorder = AutoRecorder(**kwargs) + + keyboard.add_hotkey("f11", recorder.record_new_node) + keyboard.add_hotkey("f12", recorder.view_map) + keyboard.add_hotkey("f13", recorder.finish) + + # TODO: Select some particular node to start connecting from? + + # wait forever + while True: + sleep(.1) diff --git a/d2vs/utils.py b/d2vs/utils.py new file mode 100644 index 0000000..580a4b3 --- /dev/null +++ b/d2vs/utils.py @@ -0,0 +1,24 @@ +import json +import os + +import numpy as np + + +class NpEncoder(json.JSONEncoder): + """For serializing numpy types into json properly""" + def default(self, obj): + if isinstance(obj, np.integer): + return int(obj) + if isinstance(obj, np.floating): + return float(obj) + if isinstance(obj, np.ndarray): + return obj.tolist() + return super(NpEncoder, self).default(obj) + + +class ImageMergeException(Exception): + pass + + +def windows_say(text): + os.system(f'PowerShell -Command "Add-Type –AssemblyName System.Speech; (New-Object System.Speech.Synthesis.SpeechSynthesizer).Speak(\'{text}\');"') diff --git a/requirements.txt b/requirements.txt index 1b04576..6b3837c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,7 @@ -easyocr==1.4.1 -numpy==1.21.4 +easyocr +numpy +mss==6.1.0 +keyboard==0.13.5 +opencv-python +imutils==0.5.4 +tk diff --git a/test/base.py b/test/base.py index a9d679a..58955e0 100644 --- a/test/base.py +++ b/test/base.py @@ -16,9 +16,14 @@ def _img_to_np(self, path): return np.asarray(img, dtype='uint8') def _check_scan(self, path, expected_text, expected_item_type=None): + """Helper that loads an image, converst it to np array, attempts to OCR it and + returns the result""" readings = self.ocr.read(self._img_to_np(path)) + assert len(readings) == 1 - _, text, item_type = readings[0] + + bounds, text, item_type = readings[0] assert text == expected_text + if item_type: assert item_type == expected_item_type