diff --git a/Part01/Animation_Output/AnimationVideo.mp4 b/Part01/Animation_Output/AnimationVideo.mp4 new file mode 100644 index 0000000..a99181a Binary files /dev/null and b/Part01/Animation_Output/AnimationVideo.mp4 differ diff --git a/Part01/Animation_Output/TerminalOutput.png b/Part01/Animation_Output/TerminalOutput.png new file mode 100644 index 0000000..9f29d21 Binary files /dev/null and b/Part01/Animation_Output/TerminalOutput.png differ diff --git a/Part01/__pycache__/a_star.cpython-38.pyc b/Part01/__pycache__/a_star.cpython-38.pyc new file mode 100644 index 0000000..ab2ed9b Binary files /dev/null and b/Part01/__pycache__/a_star.cpython-38.pyc differ diff --git a/Part01/__pycache__/a_star.cpython-39.pyc b/Part01/__pycache__/a_star.cpython-39.pyc new file mode 100644 index 0000000..faee38e Binary files /dev/null and b/Part01/__pycache__/a_star.cpython-39.pyc differ diff --git a/Part01/__pycache__/action_handler.cpython-38.pyc b/Part01/__pycache__/action_handler.cpython-38.pyc new file mode 100644 index 0000000..1afcbf0 Binary files /dev/null and b/Part01/__pycache__/action_handler.cpython-38.pyc differ diff --git a/Part01/__pycache__/action_handler.cpython-39.pyc b/Part01/__pycache__/action_handler.cpython-39.pyc new file mode 100644 index 0000000..1747bee Binary files /dev/null and b/Part01/__pycache__/action_handler.cpython-39.pyc differ diff --git a/Part01/__pycache__/environment.cpython-38.pyc b/Part01/__pycache__/environment.cpython-38.pyc new file mode 100644 index 0000000..0087259 Binary files /dev/null and b/Part01/__pycache__/environment.cpython-38.pyc differ diff --git a/Part01/__pycache__/environment.cpython-39.pyc b/Part01/__pycache__/environment.cpython-39.pyc new file mode 100644 index 0000000..ec68601 Binary files /dev/null and b/Part01/__pycache__/environment.cpython-39.pyc differ diff --git a/Part01/__pycache__/node.cpython-38.pyc b/Part01/__pycache__/node.cpython-38.pyc new file mode 100644 index 0000000..2dbbb2a Binary files /dev/null and b/Part01/__pycache__/node.cpython-38.pyc differ diff --git a/Part01/__pycache__/node.cpython-39.pyc b/Part01/__pycache__/node.cpython-39.pyc new file mode 100644 index 0000000..e9972c4 Binary files /dev/null and b/Part01/__pycache__/node.cpython-39.pyc differ diff --git a/Part01/__pycache__/node_manager.cpython-38.pyc b/Part01/__pycache__/node_manager.cpython-38.pyc new file mode 100644 index 0000000..26d74be Binary files /dev/null and b/Part01/__pycache__/node_manager.cpython-38.pyc differ diff --git a/Part01/__pycache__/node_manager.cpython-39.pyc b/Part01/__pycache__/node_manager.cpython-39.pyc new file mode 100644 index 0000000..c01d96f Binary files /dev/null and b/Part01/__pycache__/node_manager.cpython-39.pyc differ diff --git a/Part01/__pycache__/obstacle_model.cpython-38.pyc b/Part01/__pycache__/obstacle_model.cpython-38.pyc new file mode 100644 index 0000000..b59d514 Binary files /dev/null and b/Part01/__pycache__/obstacle_model.cpython-38.pyc differ diff --git a/Part01/__pycache__/obstacle_model.cpython-39.pyc b/Part01/__pycache__/obstacle_model.cpython-39.pyc new file mode 100644 index 0000000..3d4e496 Binary files /dev/null and b/Part01/__pycache__/obstacle_model.cpython-39.pyc differ diff --git a/a_star_arshad_dhinesh_tej.py b/Part01/a_star.py similarity index 66% rename from a_star_arshad_dhinesh_tej.py rename to Part01/a_star.py index 60c8252..c1fca63 100644 --- a/a_star_arshad_dhinesh_tej.py +++ b/Part01/a_star.py @@ -44,21 +44,21 @@ def visited_node_check(self, node, list_x, list_y, list_theta, robot_radius): theta_threshold_min = node[2] - 10 #theta threshold min = node's theta -30 theta_threshold_max = node[2] + 10 #theta threshold max = node's theta +30 - if _x.size > 0: - thetas = list_theta[_x.ravel()] + # if _x.size > 0: + # thetas = list_theta[_x.ravel()] - range_thetas = np.where(np.logical_and(thetas>=theta_threshold_min, thetas<=theta_threshold_max)) + # range_thetas = np.where(np.logical_and(thetas>=theta_threshold_min, thetas<=theta_threshold_max)) - _len = len(range_thetas[0]) - if _len > 0: - return True, _x[range_thetas[0][0]] + # _len = len(range_thetas[0]) + # if _len > 0: + # return True, _x[range_thetas[0][0]] if len(_x) > 0: #if there are elements in dist_matrix that are less than 0.5 - for i in _x: #loop through all elements in dist_matrix that are less than 0.5 - if list_theta[i] > theta_threshold_min and list_theta[i] < theta_threshold_max: #if theta of node in list is within theta threshold - return True, i #Then node is in the given list - # return True, _x[0][0] - return False, None #Else node is not in the given list + # for i in _x: #loop through all elements in dist_matrix that are less than 0.5 + # if list_theta[i] > theta_threshold_min and list_theta[i] < theta_threshold_max: #if theta of node in list is within theta threshold + # return True, i #Then node is in the given list + return True, _x[0] + # return False, None #Else node is not in the given list else: return False, None #Else node is not in the given list @@ -95,7 +95,6 @@ def explore_actions(self, parent_node) -> bool: if Status: simulated_position , Total_Cost_To_Come, Total_Cost_To_Go = Sim_Pose - #else of the state has already been explored then ignore the action status, idx = self.visited_node_check(simulated_position, node_manager.node_x[self.visited_node_list], \ node_manager.node_y[self.visited_node_list], node_manager.node_theta[self.visited_node_list], self.robot_radius) @@ -110,6 +109,7 @@ def explore_actions(self, parent_node) -> bool: NewNode = node_manager.make_node(simulated_position, Total_Cost_To_Come, Total_Cost_To_Go) #Update the parent for the node NewNode.Parent_Node_hash = parent_node.Node_hash + NewNode.Action_to_Node = action heapq.heappush(self.pending_state_que, NewNode) self.node_arrow_list[parent_node.Node_hash].append(NewNode.Node_hash) @@ -128,6 +128,7 @@ def explore_actions(self, parent_node) -> bool: node_manager.global_node_directory[idx].Node_State = simulated_position NewNode = node_manager.global_node_directory[idx] + NewNode.Action_to_Node = action heapq.heapify(self.pending_state_que) #Push new node to the pending que for future expoloration @@ -136,7 +137,7 @@ def explore_actions(self, parent_node) -> bool: # heapq.heappush(self.pending_state_que, NewNode) - def find_goal_node(self, start_state, goal_state, robot_radius, goal_dist_threshold) -> bool: + def find_goal_node(self, start_state, goal_state, robot_radius, goal_dist_threshold, display_env) -> bool: """Takes start stat and goal state for the environement and computes the tree using _Breadth first search algorithm till the goal state is reached @@ -154,6 +155,7 @@ def find_goal_node(self, start_state, goal_state, robot_radius, goal_dist_thresh self.initial_state = start_state self.goal_dist_threshold = goal_dist_threshold self.Final_Node = None + self.rendered_nodes = 0 #Initialize search que. This stores the nodes that needs to be explored start_node = node_manager.make_node(self.initial_state) @@ -165,7 +167,15 @@ def find_goal_node(self, start_state, goal_state, robot_radius, goal_dist_thresh #Perform search till a goal state is reached or maximum number of iterations reached + render_freq = 100 + render_freq_counter = 0 while True: + render_freq_counter += 1 + if render_freq_counter % render_freq == 0: + self.visualize_exploration(start_state, goal_state, display_env, self.robot_radius, self.rendered_nodes) + self.rendered_nodes = len(self.visited_node_list) + + if len(self.pending_state_que) > 0: #fetch next node to be explored next_item = heapq.heappop(self.pending_state_que) @@ -186,6 +196,40 @@ def find_goal_node(self, start_state, goal_state, robot_radius, goal_dist_thresh else: print("Unable to find the goal state!!!") return False + + def back_track_actions(self): + """Funciton to find the final trajectory given the final goal node + + Returns: + list of tuples: list of trajectory points + """ + if self.Final_Node != None: + ul = [] + ur = [] + theta = [] + + last_node = self.Final_Node + ul.append(last_node.Action_to_Node[0]) + ur.append(last_node.Action_to_Node[1]) + theta.append(last_node.Node_State[2]) + + #back track untill the start node has been reached + while True: + last_node = node_manager.global_node_directory[last_node.Parent_Node_hash] + + if last_node.Parent_Node_hash == None: + break + + if last_node.Action_to_Node != None: + ul.append(last_node.Action_to_Node[0]) + ur.append(last_node.Action_to_Node[1]) + theta.append(node_manager.global_node_directory[last_node.Parent_Node_hash].Node_State[2]) + + + ul.reverse(); ur.reverse(); theta.reverse() + return ul, ur, theta + + return None def back_track(self): """Funciton to find the final trajectory given the final goal node @@ -200,9 +244,9 @@ def back_track(self): #back track untill the start node has been reached while True: last_node = node_manager.global_node_directory[last_node.Parent_Node_hash] - if last_node.Parent_Node_hash != None: - trajectory.append(last_node.Node_State) - else: + trajectory.append(last_node.Node_State) + + if last_node.Parent_Node_hash == None: break trajectory.reverse() @@ -210,7 +254,7 @@ def back_track(self): return None - def visualize_exploration(self, display_environment, robot_radius): + def visualize_exploration(self, start_state, goal_state, display_environment, robot_radius, render_node_start_index = 0): """Visualize exploration of the nodes """ #indicate start and goal nodes with circles5 @@ -222,10 +266,13 @@ def visualize_exploration(self, display_environment, robot_radius): #variable to control gui update rate update_count = 0 # Loop through all visited nodes and show on the cv window - for start in self.node_arrow_list: - for end in self.node_arrow_list[start]: - display_environment.update_action(node_manager.global_node_directory[start].Node_State, \ - node_manager.global_node_directory[end].Node_State) + for idx in self.visited_node_list[render_node_start_index:]: + if node_manager.global_node_directory[idx].Parent_Node_hash != None: + parent_idx = node_manager.global_node_directory[idx].Parent_Node_hash + pose = (node_manager.node_x[parent_idx], node_manager.node_y[parent_idx], \ + node_manager.node_theta[parent_idx]) + display_environment.update_action(pose, \ + node_manager.global_node_directory[idx].Action_to_Node, self.env.is_valid_position) update_count +=1 #Update gui every 300 iteration @@ -244,6 +291,24 @@ def visualize_trajectory(self, trajectory, display_environment, robot_radius): Args: trajectory (lsit of tuples): trajectory """ + update_count = 0 + # Loop through all visited nodes and show on the cv window + for node in self.pending_state_que: + + if node.Parent_Node_hash != None: + parent_idx = node.Parent_Node_hash + pose = (node_manager.node_x[parent_idx], node_manager.node_y[parent_idx], \ + node_manager.node_theta[parent_idx]) + display_environment.update_action(pose, \ + node.Action_to_Node, self.env.is_valid_position) + + update_count +=1 + #Update gui every 300 iteration + if update_count == 500: + update_count = 0 + cv.waitKey(1) + + #Loop through all points in the trajectory for point in trajectory: display_environment.highlight_point(point) @@ -273,85 +338,5 @@ def animate_trajectory(self, trajectory, display_environment, robot_radius): cv.waitKey(1) -if __name__ == "__main__": - - - print("Enter obstacle inflation size : ") - obstacle_inflation = int(input()) - print("Enter robot radius : ") - robot_radius = int(input()) - goal_dist_threshold = 1.5 * robot_radius - - print("Note: Enter step greater than 0.5 * robot radius") - print(f"Enter robot's action step size: [{ceil(0.5 * robot_radius)} - 10]") - step_size = int(input()) - - while step_size < 1 or step_size > 10: #check if the start node is outside the map - print("\nAction step size out of range [1-10]. Re-Enter the action Step size: ") - step_size = int(input()) - - #Create environment - print("Please wait while creating the environment.") - #Crate environment for showing the final visualization-------------------------- - #This does not show the additional inflation for robot radius. Insted, it will inflate the robot size - display_environment = environment(250, 600, obstacle_inflation) - display_environment.create_map() - #------------------------------------------------------------------------------- - - _environment = environment(250, 600, robot_radius+obstacle_inflation) - _environment.create_map() - print("Environment created successfully.") - - # #Getting start node from the user: - print("\nEnter Start Node as integers: x,y,theta") - start_y, start_x, start_theta = input().replace(" ", "").split(',') - start_state = (int(start_x.strip()), int(start_y.strip()), int(start_theta.strip())) - - - while(not _environment.is_valid_position(start_state)) or start_state[2] % 30 != 0: #check if the start node is outside the map - print("\nStart Node is out of the Map or theta is not a multiple of 30. Re-Enter the Start node: ") - start_y, start_x, start_theta = input().replace(" ", "").split(',') - start_state = (int(start_x.strip()), int(start_y.strip()), int(start_theta.strip())) - - - # Getting goal node from the user: - print("Enter Goal Node as integers: x,y,theta") - goal_y, goal_x, goal_theta = input().replace(" ", "").split(',') - goal_state = (int(goal_x.strip()), int(goal_y.strip()), int(goal_theta.strip())) - - while(not _environment.is_valid_position(goal_state)) or goal_state[2] % 30 != 0: #check if the goal node is outside the map - print("\nGoal Node is out of the Map or theta is not a multiple of 30 Re-Enter the Goal node: ") - goal_y, goal_x,goal_theta = input().replace(" ", "").split(',') - goal_state = (int(goal_x.strip()), int(goal_y.strip()), int(goal_theta.strip())) - - - #Create action handler for the environment - _actionHandler = action_handler(_environment, step_size, start_state, goal_state) - - #create planner - _astar_planner = astar_planner(_environment, _actionHandler) - - #Request planner to find a plan - print("Planning started.") - print(f"Start Position : {[start_state[1], start_state[0]]}, Goal Postion : {[goal_state[1], goal_state[0]]}") - print("Please wait while searching for the goal state...") - start_time = time.time() - _status = _astar_planner.find_goal_node(start_state, goal_state, robot_radius, goal_dist_threshold) - elspsed_time = time.time() - start_time - - print(f"Total time taken to run the algorithm : {elspsed_time} seconds\n") - - #If Planner is successfull, visualize the plan - if _status: - trajectory = _astar_planner.back_track() - if trajectory != None: - # display_environment.begin_video_writer() - _astar_planner.visualize_exploration(display_environment, robot_radius) - _astar_planner.visualize_trajectory(trajectory, display_environment, robot_radius) - _astar_planner.animate_trajectory(trajectory, display_environment, robot_radius) - # display_environment.close_video_writer() - - display_environment.save_image("Solution.png") - cv.waitKey(0) diff --git a/Part01/action_handler.py b/Part01/action_handler.py new file mode 100644 index 0000000..0ad02d0 --- /dev/null +++ b/Part01/action_handler.py @@ -0,0 +1,92 @@ +import math +import time +import numpy as np + +class action_handler: +#Class to handle the action supported by the environment + + + def __init__(self, env, step_size, start_pose, goal_pose, omega1, omega2, wheel_radius, wheel_base_width) -> None: + """Initilaize action handler parameters + Args: + env (environment): referance ot environment + """ + self.env = env + self.start_pose = start_pose + self.goal_pose = goal_pose + self.step_size = step_size + self.robot_wheel_radius = wheel_radius + self.wheel_base_width = wheel_base_width + + #define al the actions possible in the environment + self.Actions = [(0, omega1), (omega1,0), (omega1, omega1), (0, omega2), (omega2, 0), (omega2,omega2), (omega1, omega2), (omega2, omega1)] + print(f"ACTION SET : {self.Actions}") + + + def cost_norm(self, pose1, pose2): + x1, y1, _ = pose1 + x2, y2, _ = pose2 + return np.linalg.norm([x1 - x2, y1 - y2]) + + def ActionValues(self, pose, action): + Xi,Yi,Thetai = pose + UL, UR = action + + t = 0 + dt = 0.1 + + Xn=Xi + Yn=Yi + Thetan = 3.14 * Thetai / 180 + + + cost_to_go=0 + while t None: - """Initialize environment parameters - - Args: - height (int): height of the map - width (int): width of the map - """ - self.height = height - self.width = width - self.inflation_radius = inflation_radius - #create a map fo given dimentions. 3 channels for opencv BGR - self.map = np.ones((height, width, 3)) - - - - #create obstacle models for all the objects in the environment - - #create original boundary obstacle model - self.boundary_model = obstacle_model([ - [(600,0), (600,250), (0,250), (0,0)] - ]) - - #create inflated boundary obstacle model - self.inflated_boundary_model = obstacle_model([ - self.inflate_polygon([(600,0), (600,250), (0,250), (0,0)], -1*self.inflation_radius), - ]) - - #create original polygon objects obstacle model - self.original_obstacle_model = obstacle_model([ - [(150,0), (150,100), (100,100), (100,0)], # Polygon corresponding to botom rectangular pillar - [(150,150), (150,250), (100,250), (100,150)], # Polygon corresponding to Top rectangular pillar - [(360,87), (360,163), (300,200), (240,163), (240,87), (300,50)], # Polygon corresponding to hexgon - [(510,125), (460,225), (460,25) ], # Polygon corresponding to Triangle - ]) - - - #create inflated polygon objects obstacle model - self.inflated_obstacle_model = obstacle_model([ - self.inflate_polygon([(150,0), (150,100), (100,100), (100,0)], self.inflation_radius), - self.inflate_polygon([(150,150), (150,250), (100,250), (100,150)], self.inflation_radius), - self.inflate_polygon([(360,87), (360,163), (300,200), (240,163), (240,87), (300,50)], self.inflation_radius), - self.inflate_polygon([(510,125), (460,225), (460,25) ], self.inflation_radius), - ]) - - - - def create_map(self): - """Idnetify obstacles and free space in the map uisng the obstacle models - """ - #Iterate through all the states in the enviornement - for i in range(self.map.shape[0]): - for j in range(self.map.shape[1]): - #Checks if state present inside the non-inflated obstacle - if self.original_obstacle_model.is_inside_obstacle((j,i)): - self.map[i,j] = [0x80, 0x80, 0xff] - - #Checks if state present inside an nflated obstacle - elif self.inflated_obstacle_model.is_inside_obstacle((j,i)): - self.map[i,j] = [0x7a, 0x70, 0x52] - - #Checks if state present outside the inflated boundary - elif not self.inflated_boundary_model.is_inside_obstacle((j,i)): - self.map[i,j] = [0x7a, 0x70, 0x52] - - #Identify as stae belongs to the free space - else: - self.map[i,j] = [0xc2, 0xba, 0xa3] - - def is_valid_position(self, position): - """Checks if a given position belongs to free space or obstacle space - - Args: - position (tuple): state of the agent to be verified - - Returns: - bool: True if pixel belongs to free space else False - """ - if position[0] >=0 and position[0] < 250 and position[1] >=0 and position[1] < 600: - #Check if a state belongs to free psace by comparing the color of the respective pixel in the environment - if self.map[position[0], position[1]][0] == 0xc2 and \ - self.map[position[0], position[1]][1] == 0xba and \ - self.map[position[0], position[1]][2] == 0xa3: - return True - - #return false of state is in obstacle space - return False - - - def refresh_map(self): - """Refreshes map with updated image map - """ - #Flip the map to satisfy the environment direction - image = cv.flip(self.map.astype('uint8'), 0) - image = cv.resize(image, (0,0), fx = 2, fy = 2) - cv.imshow("map", image) - - def update_map(self, position): - """Highlights a point in the environment at the given locaiton - - Args: - position (tuple): pixel location - """ - i, j, _ = position - self.map[i, j] = [255, 255, 255] - self.refresh_map() - - def update_action(self, start, end): - """Update map with explored nodes colour - - Args: - explored_node (tuple): state that has been visited - """ - x1, y1, theta1 = start - x2, y2, theta2 = end - self.map = cv.arrowedLine(self.map, (y1, x1), (y2, x2),[0, 255, 255], 1, cv.LINE_AA) - self.refresh_map() - - - def save_image(self, file_path): - """saves current state of the environment in the file location as image - - Args: - file_path (string): absolute path of the file where the image needs to be saved - """ - #Flip the map to satisfy the environment direction - image = cv.flip(self.map.astype('uint8'), 0) - image = cv.resize(image, (0,0), fx = 2, fy = 2) - cv.imwrite(file_path, image) - - def highlight_state(self, position, size, color): - """Draws a circle at the given location in the environment - - Args: - position (tuple): pixel location - """ - self.map = cv.circle(self.map, (position[1],position[0]), size, color, -1) - self.refresh_map() - - def highlight_point(self, position): - """Highlights a point in the environment at the given locaiton - - Args: - position (tuple): pixel location - """ - i, j, _ = position - self.map[i, j] = [255, 0, 0] - - def show_robot(self, position, size): - """Highlights a point in the environment at the given locaiton - - Args: - position (tuple): pixel location - """ - _map = self.map.copy() - image = cv.circle(_map, (position[1],position[0]), size, (255, 0, 0), -1) - image = cv.flip(image.astype('uint8'), 0) - resized = cv.resize(image, (0,0), fx = 2, fy = 2) - cv.imshow("map", resized) - return image - - #primitives to save video of the jplanning environment----------------------- - def begin_video_writer(self): - self.writer= cv.VideoWriter('Animation Video.mp4', cv.VideoWriter_fourcc(*'DIVX'), 10, (self.width, self.height)) - - - def insert_video_frame(self, image): - self.writer.write(image) - - def write_video_frame(self): - image = cv.flip(self.map.astype('uint8'), 0) - # image = cv.resize(image, (0,0), fx = 2, fy = 2) - self.writer.write(image) - - def close_video_writer(self): - self.writer.release() - #-------------------------------------------------------------------------------- - - -if __name__ == "__main__": - _map_viz = environment(250, 600, 10) - _map_viz.create_map() - _map_viz.refresh_map() - cv.waitKey(0) +import math +import numpy as np +import cv2 as cv + +import pyclipper +from obstacle_model import * + +class environment: + """Class contains all the funcitonality to create and manipulate environemnt and its visualization + """ + + def inflate_polygon(self, vertices, radius): + if(len(vertices) > 2): + pco = pyclipper.PyclipperOffset() + pco.AddPath(vertices, pyclipper.PT_CLIP, pyclipper.ET_CLOSEDPOLYGON) + vertices_inflated = pco.Execute(radius) + vertices_inflated = [tuple(x) for x in vertices_inflated[0]] + else: + vertices_inflated = vertices.copy() + vertices_inflated[1] += radius + + return vertices_inflated + + def __init__(self, height, width, inflation_radius, step_size, robot_wheel_radius, wheel_base_width) -> None: + """Initialize environment parameters + + Args: + height (int): height of the map + width (int): width of the map + """ + self.height = height + self.width = width + self.inflation_radius = inflation_radius + + self.step_size = step_size + self.robot_wheel_radius = robot_wheel_radius + self.wheel_base_width = wheel_base_width + + #create a map fo given dimentions. 3 channels for opencv BGR + self.map = np.ones((height, width, 3)) + + #create obstacle models for all the objects in the environment + + #create original boundary obstacle model + self.boundary_model = obstacle_model([ + [(600,0), (600,200), (0,200), (0,0)] + ]) + + #create inflated boundary obstacle model + self.inflated_boundary_model = obstacle_model([ + self.inflate_polygon([(600,0), (600,200), (0,200), (0,0)], -1*self.inflation_radius), + ]) + + #create original polygon objects obstacle model + self.original_obstacle_model = obstacle_model([ + [(150,200), (150,75), (165,75), (165,200), (150,200)], # Polygon corresponding to botom rectangular pillar + [(250,125), (250,0), (265,0), (265,125), (250,125)], + [(400,110), 50] # Polygon corresponding to Top rectangular pillar + ]) + + + #create inflated polygon objects obstacle model + self.inflated_obstacle_model = obstacle_model([ + self.inflate_polygon([(150,200), (150,75), (165,75), (165,200), (150,200)], self.inflation_radius), + self.inflate_polygon([(250,125), (250,0), (265,0), (265,125), (250,125)], self.inflation_radius), + self.inflate_polygon([(250,125), (250,0), (265,0), (265,125), (250,125)], self.inflation_radius), + self.inflate_polygon([(400,110), 50], self.inflation_radius), + ]) + + + def create_map(self): + """Idnetify obstacles and free space in the map uisng the obstacle models + """ + #Iterate through all the states in the enviornement + for i in range(self.map.shape[0]): + for j in range(self.map.shape[1]): + #Checks if state present inside the non-inflated obstacle + if self.original_obstacle_model.is_inside_obstacle((j,i)): + self.map[i,j] = [0x80, 0x80, 0xff] + + #Checks if state present inside an inflated obstacle + elif self.inflated_obstacle_model.is_inside_obstacle((j,i)): + self.map[i,j] = [0x7a, 0x70, 0x52] + + #Checks if state present outside the inflated boundary + elif not self.inflated_boundary_model.is_inside_obstacle((j,i)): + self.map[i,j] = [0x7a, 0x70, 0x52] + + #Identify as state belongs to the free space + else: + self.map[i,j] = [0xc2, 0xba, 0xa3] + + + def is_valid_position(self, position): + """Checks if a given position belongs to free space or obstacle space + Args: + position (tuple): state of the agent to be verified + Returns: + bool: True if pixel belongs to free space else False + """ + if position[0] >=0 and position[0] < 200 and position[1] >=0 and position[1] < 600: + #Check if a state belongs to free psace by comparing the color of the respective pixel in the environment + if self.map[position[0], position[1]][0] == 0xc2 and \ + self.map[position[0], position[1]][1] == 0xba and \ + self.map[position[0], position[1]][2] == 0xa3: + return True + + #return false of state is in obstacle space + return False + + + def refresh_map(self): + """Refreshes map with updated image map + """ + #Flip the map to satisfy the environment direction + image = cv.flip(self.map.astype('uint8'), 0) + image = cv.resize(image, (0,0), fx = 2, fy = 2) + cv.imshow("map", image) + + def update_map(self, position): + """Highlights a point in the environment at the given locaiton + Args: + position (tuple): pixel location + """ + i, j, _ = position + self.map[i, j] = [255, 255, 255] + self.refresh_map() + + def update_action(self, pose, action, validity_checker): + Xi,Yi,Thetai = pose + UL, UR = action + t = 0 + # r = 0.038 + # L = 0.354 + r = self.robot_wheel_radius + L = self.wheel_base_width + dt = 0.1 + Xn=Xi + Yn=Yi + Thetan = 3.14 * Thetai / 180 + # Xi, Yi,Thetai: Input point's coordinates + # Xs, Ys: Start point coordinates for plot function + # Xn, Yn, Thetan: End point coordintes + cost_to_go=0 + while t None: #Contains total-cost to the goal self.Total_Cost = self.Cost_to_Come + self.Cost_to_Go + + #Contains the action taken to reach this node + self.Action_to_Node = None #Funciton to help heapq for comparing two node objects def __lt__(self, other): diff --git a/node_manager.py b/Part01/node_manager.py similarity index 100% rename from node_manager.py rename to Part01/node_manager.py diff --git a/Part01/obstacle_model.py b/Part01/obstacle_model.py new file mode 100644 index 0000000..6e819bf --- /dev/null +++ b/Part01/obstacle_model.py @@ -0,0 +1,89 @@ +import unittest +import numpy as np + + +class obstacle_model: + """Class to detect obstacles from a given set of polygon vertices + """ + + def __init__(self, _obstacles) -> None: + """Constructor takes a lists of vertices belonging to multiple polygons representing the obstacles + + Args: + _obstacles (list of lists of tuples): [[poly1_vertices], [poly2_vertices]] + """ + self.obstacles = _obstacles + + + def is_inside_obstacle(self, point): + """Given a point, the method computes if the point lies inside any of the obstacles + Loaded in the obstacle model + + Args: + point (tuple): point + + Returns: + bool: True of point lies inside any obstacle, False if point is outside + all the loaded obstacles + """ + + # lopp through each polygon + for vertices in self.obstacles: + is_inside_obstacle = True + #loop through pairs of vertices and find if point lies on which side of the polygon + if len(vertices) > 2: # if not a circle + for i in range(len(vertices)): + if is_inside_obstacle: + + #If final vertex then pair it with the initial vertex + if i == len(vertices) - 1: + x2, y2 = vertices[i][0], vertices[i][1] + x1, y1 = vertices[0][0], vertices[0][1] + else: + #Chose vertices order so the the point on the left side of the line gives a negative value + x2, y2 = vertices[i][0], vertices[i][1] + x1, y1 = vertices[i+1][0], vertices[i+1][1] + + #Compute line coefficients for the vetex pair (ax+by+c = 0) + a = (y1 - y2) + b = (x1 - x2) * -1 + c = x1*y2 - x2*y1 + + #substitute the point and compute the direction + l = a*point[0] + b*point[1] + c + + #if vertex lies outside a polygon line then break the check + if l > 0: + is_inside_obstacle = False + break + + else: # if circle + l = (point[0] - vertices[0][0]) ** 2 + (point[1] - vertices[0][1]) ** 2 - vertices[1] ** 2 + + if l > 0: + is_inside_obstacle = False + + #if the point is found inside any polygon return true + if is_inside_obstacle: + return True + + #if the point lies outside all the poygons return false + return False + + +#test case to check the obstacle_model class +class obstacle_model_test(unittest.TestCase): + def setUp(self): + self.obstacleModels = obstacle_model([ [(60, 0), (60, 50), (50,50), (50,0) ] ]) + + def test_free_space(self): + self.assertEqual(self.obstacleModels.check_obstacle((70,70)), False, "free_space error!!") + + def test_obstacle(self): + self.assertEqual(self.obstacleModels.check_obstacle((55,40)), True, "obstacle test failed!!") + + def tearDown(self): + pass + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/Part01/planner.py b/Part01/planner.py new file mode 100644 index 0000000..bf36eb8 --- /dev/null +++ b/Part01/planner.py @@ -0,0 +1,120 @@ +from a_star import * + + +class planner: + + wheel_radius = 3.3 + robot_radius = 10.5 + wheel_base_width = 16.0 + + goal_dist_threshold = 0 + obstacle_inflation = 5 + step_size = 1 + + + def __init__(self) -> None: + + + print("Enter RPM1 : ") + self.RPM1 = int(input()) + + print("Enter RPM2 : ") + self.RPM2 = int(input()) + + + self.omega1 = (self.RPM1*2*math.pi)/60 + self.omega2 = (self.RPM2 *2*math.pi)/60 + + print("Enter obstacle inflation size : ") + self.obstacle_inflation = int(input()) + + #Create environment + print("Please wait while creating the environment.") + #Create environment for showing the final visualization-------------------------- + #This does not show the additional inflation for robot radius. Insted, it will inflate the robot size + self.display_environment = environment(200, 600, self.obstacle_inflation, self.step_size, self.wheel_radius, self.wheel_base_width) + self.display_environment.create_map() + #------------------------------------------------------------------------------- + + self._environment = environment(200, 600, self.robot_radius+self.obstacle_inflation, self.step_size, self.wheel_radius, self.wheel_base_width) + self._environment.create_map() + print("Environment created successfully.") + + # #Getting start node from the user: + print("\nEnter Start Node as integers: x,y,theta") + start_y, start_x, start_theta = input().replace(" ", "").split(',') + self.start_state = (int(start_x.strip()), int(start_y.strip()), int(start_theta.strip())) + + + while(not self._environment.is_valid_position(self.start_state)) or self.start_state[2] % 30 != 0: #check if the start node is outside the map + print("\nStart Node is out of the Map or theta is not a multiple of 30. Re-Enter the Start node: ") + start_y, start_x, start_theta = input().replace(" ", "").split(',') + self.start_state = (int(start_x.strip()), int(start_y.strip()), int(start_theta.strip())) + + + # Getting goal node from the user: + print("Enter Goal Node as integers: x,y,theta") + goal_y, goal_x, goal_theta = input().replace(" ", "").split(',') + self.goal_state = (int(goal_x.strip()), int(goal_y.strip()), int(goal_theta.strip())) + + while(not self._environment.is_valid_position(self.goal_state)) or self.goal_state[2] % 30 != 0: #check if the goal node is outside the map + print("\nGoal Node is out of the Map or theta is not a multiple of 30 Re-Enter the Goal node: ") + goal_y, goal_x,goal_theta = input().replace(" ", "").split(',') + self.goal_state = (int(goal_x.strip()), int(goal_y.strip()), int(goal_theta.strip())) + + def plan(self): + + + self.robot_radius = round(self.robot_radius) + if self.robot_radius < 1: + self.robot_radius = 1 + + self.goal_dist_threshold = self.robot_radius + + + + #Create action handler for the environment + _actionHandler = action_handler(self._environment, self.step_size, self.start_state, self.goal_state, self.omega1, self.omega2, self.wheel_radius, self.wheel_base_width) + + #create planner + self._astar_planner = astar_planner(self._environment, _actionHandler) + + #Request planner to find a plan + print("Planning started.") + print(f"Start Position : {[self.start_state[1], self.start_state[0]]}, Goal Postion : {[self.goal_state[1], self.goal_state[0]]}") + print("Please wait while searching for the goal state...") + start_time = time.time() + _status = self._astar_planner.find_goal_node(self.start_state, self.goal_state, self.robot_radius, self.goal_dist_threshold, self.display_environment) + elspsed_time = time.time() - start_time + + print(f"Total time taken to run the algorithm : {elspsed_time} seconds\n") + + #If Planner is successfull, visualize the plan + return _status + + def get_trajectory(self): + return self._astar_planner.back_track() + + def get_action_set(self): + return self._astar_planner.back_track_actions() + + def view_plan(self): + trajectory = self.get_trajectory() + if trajectory != None: + # display_environment.begin_video_writer() + self._astar_planner.visualize_exploration(self.start_state, self.goal_state, self.display_environment, self.robot_radius) + self._astar_planner.visualize_trajectory(trajectory, self.display_environment, self.robot_radius) + self._astar_planner.animate_trajectory(trajectory, self.display_environment, self.robot_radius) + # display_environment.close_video_writer() + + cv.waitKey(0) + + + + +if __name__ == "__main__": + + + _planner = planner() + if _planner.plan(): + _planner.view_plan() \ No newline at end of file diff --git a/Part01/requirements.txt b/Part01/requirements.txt new file mode 100644 index 0000000..874436e --- /dev/null +++ b/Part01/requirements.txt @@ -0,0 +1,3 @@ +numpy==1.24.2 +opencv-python==4.7.0.72 +pyclipper==1.3.0.post4 diff --git a/Part02/Animation_Output/Gazebo_Turtlebot_AStar.mp4 b/Part02/Animation_Output/Gazebo_Turtlebot_AStar.mp4 new file mode 100644 index 0000000..d0ade05 Binary files /dev/null and b/Part02/Animation_Output/Gazebo_Turtlebot_AStar.mp4 differ diff --git a/Part02/src/.vscode/c_cpp_properties.json b/Part02/src/.vscode/c_cpp_properties.json new file mode 100644 index 0000000..5823637 --- /dev/null +++ b/Part02/src/.vscode/c_cpp_properties.json @@ -0,0 +1,25 @@ +{ + "configurations": [ + { + "browse": { + "databaseFilename": "${default}", + "limitSymbolsToIncludedHeaders": false + }, + "includePath": [ + "/home/dhinesh/ros2_ws_turtlebot/install/turtlebot3_fake_node/include/**", + "/home/dhinesh/ros2_ws_turtlebot/install/turtlebot3_msgs/include/**", + "/home/dhinesh/ros2_ws_turtlebot/install/turtlebot3_gazebo/include/**", + "/home/dhinesh/ros2_ws_turtlebot/install/dynamixel_sdk/include/**", + "/opt/ros/noetic/include/**", + "/home/dhinesh/ros2_ws_turtlebot/install/dynamixel_sdk_custom_interfaces/include/**", + "/usr/include/**" + ], + "name": "ROS", + "intelliSenseMode": "gcc-x64", + "compilerPath": "/usr/bin/gcc", + "cStandard": "gnu11", + "cppStandard": "c++14" + } + ], + "version": 4 +} \ No newline at end of file diff --git a/Part02/src/.vscode/settings.json b/Part02/src/.vscode/settings.json new file mode 100644 index 0000000..fc68b75 --- /dev/null +++ b/Part02/src/.vscode/settings.json @@ -0,0 +1,18 @@ +{ + "python.autoComplete.extraPaths": [ + "/home/dhinesh/ros2_ws_turtlebot/install/turtlebot3_teleop/lib/python3.8/site-packages", + "/home/dhinesh/ros2_ws_turtlebot/install/turtlebot3_example/lib/python3.8/site-packages", + "/home/dhinesh/ros2_ws_turtlebot/install/turtlebot3_msgs/lib/python3.8/site-packages", + "/home/dhinesh/ros2_ws_turtlebot/install/dynamixel_sdk_custom_interfaces/lib/python3.8/site-packages", + "/opt/ros/galactic/lib/python3.8/site-packages", + "/opt/ros/noetic/lib/python3/dist-packages" + ], + "python.analysis.extraPaths": [ + "/home/dhinesh/ros2_ws_turtlebot/install/turtlebot3_teleop/lib/python3.8/site-packages", + "/home/dhinesh/ros2_ws_turtlebot/install/turtlebot3_example/lib/python3.8/site-packages", + "/home/dhinesh/ros2_ws_turtlebot/install/turtlebot3_msgs/lib/python3.8/site-packages", + "/home/dhinesh/ros2_ws_turtlebot/install/dynamixel_sdk_custom_interfaces/lib/python3.8/site-packages", + "/opt/ros/galactic/lib/python3.8/site-packages", + "/opt/ros/noetic/lib/python3/dist-packages" + ] +} \ No newline at end of file diff --git a/Part02/src/turtlebot3.repos b/Part02/src/turtlebot3.repos new file mode 100644 index 0000000..169400c --- /dev/null +++ b/Part02/src/turtlebot3.repos @@ -0,0 +1,21 @@ +repositories: + turtlebot3/turtlebot3: + type: git + url: https://github.com/ROBOTIS-GIT/turtlebot3.git + version: galactic-devel + turtlebot3/turtlebot3_msgs: + type: git + url: https://github.com/ROBOTIS-GIT/turtlebot3_msgs.git + version: galactic-devel + turtlebot3/turtlebot3_simulations: + type: git + url: https://github.com/ROBOTIS-GIT/turtlebot3_simulations.git + version: galactic-devel + utils/DynamixelSDK: + type: git + url: https://github.com/ROBOTIS-GIT/DynamixelSDK.git + version: galactic-devel + utils/hls_lfcd_lds_driver: + type: git + url: https://github.com/ROBOTIS-GIT/hls_lfcd_lds_driver.git + version: galactic-devel diff --git a/Part02/src/turtlebot3/turtlebot3 b/Part02/src/turtlebot3/turtlebot3 new file mode 160000 index 0000000..b1eeeb5 --- /dev/null +++ b/Part02/src/turtlebot3/turtlebot3 @@ -0,0 +1 @@ +Subproject commit b1eeeb587f616bb355ab4ac20db337d352b0e299 diff --git a/Part02/src/turtlebot3/turtlebot3_msgs b/Part02/src/turtlebot3/turtlebot3_msgs new file mode 160000 index 0000000..8d676fd --- /dev/null +++ b/Part02/src/turtlebot3/turtlebot3_msgs @@ -0,0 +1 @@ +Subproject commit 8d676fdd80f49d8a7b777ba2cedead89b605f930 diff --git a/Part02/src/turtlebot3/turtlebot3_simulations b/Part02/src/turtlebot3/turtlebot3_simulations new file mode 160000 index 0000000..04a2bbc --- /dev/null +++ b/Part02/src/turtlebot3/turtlebot3_simulations @@ -0,0 +1 @@ +Subproject commit 04a2bbc2b51d075fe5e2170716edc9a348d61a8b diff --git a/Part02/src/turtlebot_path_planning/.vscode/c_cpp_properties.json b/Part02/src/turtlebot_path_planning/.vscode/c_cpp_properties.json new file mode 100644 index 0000000..9ae4379 --- /dev/null +++ b/Part02/src/turtlebot_path_planning/.vscode/c_cpp_properties.json @@ -0,0 +1,25 @@ +{ + "configurations": [ + { + "browse": { + "databaseFilename": "${default}", + "limitSymbolsToIncludedHeaders": false + }, + "includePath": [ + "/home/dhinesh/ros2_ws_turtlebot/install/turtlebot3_fake_node/include/**", + "/home/dhinesh/ros2_ws_turtlebot/install/turtlebot3_msgs/include/**", + "/home/dhinesh/ros2_ws_turtlebot/install/turtlebot3_gazebo/include/**", + "/home/dhinesh/ros2_ws_turtlebot/install/dynamixel_sdk_custom_interfaces/include/**", + "/home/dhinesh/ros2_ws_turtlebot/install/dynamixel_sdk/include/**", + "/opt/ros/noetic/include/**", + "/usr/include/**" + ], + "name": "ROS", + "intelliSenseMode": "gcc-x64", + "compilerPath": "/usr/bin/gcc", + "cStandard": "gnu11", + "cppStandard": "c++14" + } + ], + "version": 4 +} \ No newline at end of file diff --git a/Part02/src/turtlebot_path_planning/.vscode/settings.json b/Part02/src/turtlebot_path_planning/.vscode/settings.json new file mode 100644 index 0000000..efccc2b --- /dev/null +++ b/Part02/src/turtlebot_path_planning/.vscode/settings.json @@ -0,0 +1,20 @@ +{ + "python.autoComplete.extraPaths": [ + "/home/dhinesh/ros2_ws_turtlebot/install/velocity_publisher/lib/python3.8/site-packages", + "/home/dhinesh/ros2_ws_turtlebot/install/turtlebot3_teleop/lib/python3.8/site-packages", + "/home/dhinesh/ros2_ws_turtlebot/install/turtlebot3_example/lib/python3.8/site-packages", + "/home/dhinesh/ros2_ws_turtlebot/install/turtlebot3_msgs/lib/python3.8/site-packages", + "/home/dhinesh/ros2_ws_turtlebot/install/dynamixel_sdk_custom_interfaces/lib/python3.8/site-packages", + "/opt/ros/galactic/lib/python3.8/site-packages", + "/opt/ros/noetic/lib/python3/dist-packages" + ], + "python.analysis.extraPaths": [ + "/home/dhinesh/ros2_ws_turtlebot/install/velocity_publisher/lib/python3.8/site-packages", + "/home/dhinesh/ros2_ws_turtlebot/install/turtlebot3_teleop/lib/python3.8/site-packages", + "/home/dhinesh/ros2_ws_turtlebot/install/turtlebot3_example/lib/python3.8/site-packages", + "/home/dhinesh/ros2_ws_turtlebot/install/turtlebot3_msgs/lib/python3.8/site-packages", + "/home/dhinesh/ros2_ws_turtlebot/install/dynamixel_sdk_custom_interfaces/lib/python3.8/site-packages", + "/opt/ros/galactic/lib/python3.8/site-packages", + "/opt/ros/noetic/lib/python3/dist-packages" + ] +} \ No newline at end of file diff --git a/Part02/src/turtlebot_path_planning/CMakeLists.txt b/Part02/src/turtlebot_path_planning/CMakeLists.txt new file mode 100644 index 0000000..b8020dd --- /dev/null +++ b/Part02/src/turtlebot_path_planning/CMakeLists.txt @@ -0,0 +1,33 @@ +cmake_minimum_required(VERSION 3.8) +project(turtlebot_path_planning) + +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wall -Wextra -Wpedantic) +endif() + +# find dependencies +find_package(ament_cmake REQUIRED) +# uncomment the following section in order to fill in +# further dependencies manually. +# find_package( REQUIRED) + + +install(DIRECTORY + launch + DESTINATION share/${PROJECT_NAME} +) + + +if(BUILD_TESTING) + find_package(ament_lint_auto REQUIRED) + # the following line skips the linter which checks for copyrights + # uncomment the line when a copyright and license is not present in all source files + #set(ament_cmake_copyright_FOUND TRUE) + # the following line skips cpplint (only works in a git repo) + # uncomment the line when this package is not in a git repo + #set(ament_cmake_cpplint_FOUND TRUE) + ament_lint_auto_find_test_dependencies() +endif() + + +ament_package() diff --git a/Part02/src/turtlebot_path_planning/launch/turtlebot_astar_simulation.launch.py b/Part02/src/turtlebot_path_planning/launch/turtlebot_astar_simulation.launch.py new file mode 100644 index 0000000..20cd79f --- /dev/null +++ b/Part02/src/turtlebot_path_planning/launch/turtlebot_astar_simulation.launch.py @@ -0,0 +1,141 @@ +from launch import LaunchDescription, LaunchContext +from launch_ros.actions import Node +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration +from launch.actions import OpaqueFunction +from launch.actions import IncludeLaunchDescription +from launch.launch_description_sources import PythonLaunchDescriptionSource +from ament_index_python.packages import get_package_share_directory +from math import pi +import os + + +#This function decrustrates the start_node launch argument and passes it to the spawn_turtlebot3.launch.py file: +def turtlebot_spawn(context: LaunchContext, start_node): + arr = context.perform_substitution(start_node) + x = arr.replace('[', "").replace(']',"").split(',')[0] #Remove the brackets and split the string + y = arr.replace('[', "").replace(']',"").split(',')[1] + angle = arr.replace('[', "").replace(']',"").split(',')[2] + rad = (90.0-float(angle)) * pi / 180 #Subtract 90 degrees from the anlge for proper spawning & Convert angle to radians + + launch_file_dir = os.path.join(get_package_share_directory('turtlebot3_gazebo'), 'launch') + + spawn_turtlebot_cmd = IncludeLaunchDescription( + PythonLaunchDescriptionSource( + os.path.join(launch_file_dir, 'spawn_turtlebot3.launch.py') + ), + launch_arguments={ + 'x_pose': x, + 'y_pose': y, + 'theta': str(rad) + }.items() + ) + + return [spawn_turtlebot_cmd] #Return the spawn_turtlebot_cmd to be added to the launch description + + + +#This function generates the launch description - basic structure of the launch file +def generate_launch_description(): + + #Declare the launch arguments: + start_node = DeclareLaunchArgument( + 'start_node', + default_value='[0.0, 0.0, 0.0]', + description='start node for turtlebot' + ) + + goal_node = DeclareLaunchArgument( + 'goal_node', + default_value='[0.0, 0.0, 0.0]', + description='goal node for turtlebot' + ) + + RPM1 = DeclareLaunchArgument( + 'RPM1', + default_value='0.0', + description='RPM1 for turtlebot' + ) + + RPM2 = DeclareLaunchArgument( + 'RPM2', + default_value='0.0', + description='RPM2 for turtlebot' + ) + + clearance = DeclareLaunchArgument( + 'clearance', + default_value='0.0', + description='clearance for turtlebot' + ) + + + #Declare nodes - vel_pub node which performs the path planning & publishes the velocity to the turtlebot + #It accepts the start_node, goal_node, RPM1, RPM2, and clearance as launch arguments + velocity_publisher = Node( + package='velocity_publisher', + executable='vel_pub', + parameters=[ + {'start_node': LaunchConfiguration('start_node')}, + {'goal_node': LaunchConfiguration('goal_node')}, + {'RPM1': LaunchConfiguration('RPM1')}, + {'RPM2': LaunchConfiguration('RPM2')}, + {'clearance': LaunchConfiguration('clearance')} + ], + output='screen' + ) + + + #Create the launch description + ld = LaunchDescription() + + #Declare launch files to be launch - within this launch file + #Gazebo Turtlebot3 World is launched with the map.world file + launch_file_dir = os.path.join(get_package_share_directory('turtlebot3_gazebo'), 'launch') + pkg_gazebo_ros = get_package_share_directory('gazebo_ros') + + use_sim_time = LaunchConfiguration('use_sim_time', default='true') + + world = os.path.join( + get_package_share_directory('turtlebot3_gazebo'), + 'worlds', + 'map.world' + ) + + gzserver_cmd = IncludeLaunchDescription( + PythonLaunchDescriptionSource( + os.path.join(pkg_gazebo_ros, 'launch', 'gzserver.launch.py') + ), + launch_arguments={'world': world}.items() + ) + + gzclient_cmd = IncludeLaunchDescription( + PythonLaunchDescriptionSource( + os.path.join(pkg_gazebo_ros, 'launch', 'gzclient.launch.py') + ) + ) + + robot_state_publisher_cmd = IncludeLaunchDescription( + PythonLaunchDescriptionSource( + os.path.join(launch_file_dir, 'robot_state_publisher.launch.py') + ), + launch_arguments={'use_sim_time': use_sim_time}.items() + ) + + + #Add the launch actions to the launch description - do it for every launch action + ld.add_action(gzserver_cmd) + ld.add_action(gzclient_cmd) + ld.add_action(robot_state_publisher_cmd) + + ld.add_action(OpaqueFunction(function=turtlebot_spawn, args=[LaunchConfiguration('start_node')])) #Add the spawn_turtlebot_cmd to the launch description + + ld.add_action(start_node) + ld.add_action(goal_node) + ld.add_action(RPM1) + ld.add_action(RPM2) + ld.add_action(clearance) + + ld.add_action(velocity_publisher) #Add the velocity_publisher package + + return ld #Return the launch description diff --git a/Part02/src/turtlebot_path_planning/package.xml b/Part02/src/turtlebot_path_planning/package.xml new file mode 100644 index 0000000..80fd1c6 --- /dev/null +++ b/Part02/src/turtlebot_path_planning/package.xml @@ -0,0 +1,21 @@ + + + + turtlebot_path_planning + 0.0.0 + TODO: Package description + dhinesh + TODO: License declaration + + ament_cmake + + ament_lint_auto + ament_lint_common + + velocity_publisher + + + ament_cmake + + + diff --git a/Part02/src/utils/DynamixelSDK b/Part02/src/utils/DynamixelSDK new file mode 160000 index 0000000..4c37f8f --- /dev/null +++ b/Part02/src/utils/DynamixelSDK @@ -0,0 +1 @@ +Subproject commit 4c37f8fb65569b233d2a95159ad50147d9ef1321 diff --git a/Part02/src/utils/hls_lfcd_lds_driver b/Part02/src/utils/hls_lfcd_lds_driver new file mode 160000 index 0000000..a55a46b --- /dev/null +++ b/Part02/src/utils/hls_lfcd_lds_driver @@ -0,0 +1 @@ +Subproject commit a55a46bf24acc30fb1f1f0a43d00a48f404193a1 diff --git a/Part02/src/velocity_publisher/package.xml b/Part02/src/velocity_publisher/package.xml new file mode 100644 index 0000000..66d5193 --- /dev/null +++ b/Part02/src/velocity_publisher/package.xml @@ -0,0 +1,18 @@ + + + + velocity_publisher + 0.0.0 + Astar Implementation using Turtlebot3 + dhinesh + Apache License 2.0 + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/Part02/src/velocity_publisher/resource/velocity_publisher b/Part02/src/velocity_publisher/resource/velocity_publisher new file mode 100644 index 0000000..e69de29 diff --git a/Part02/src/velocity_publisher/setup.cfg b/Part02/src/velocity_publisher/setup.cfg new file mode 100644 index 0000000..c351215 --- /dev/null +++ b/Part02/src/velocity_publisher/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/velocity_publisher +[install] +install_scripts=$base/lib/velocity_publisher diff --git a/Part02/src/velocity_publisher/setup.py b/Part02/src/velocity_publisher/setup.py new file mode 100644 index 0000000..563b7db --- /dev/null +++ b/Part02/src/velocity_publisher/setup.py @@ -0,0 +1,26 @@ +from setuptools import setup + +package_name = 'velocity_publisher' + +setup( + name=package_name, + version='0.0.0', + packages=[package_name], + data_files=[ + ('share/ament_index/resource_index/packages', + ['resource/' + package_name]), + ('share/' + package_name, ['package.xml']), + ], + install_requires=['setuptools'], + zip_safe=True, + maintainer='dhinesh', + maintainer_email='dhineshrajasekaran@gmail.com', + description='Astar Implementation using Turtlebot3', + license='Apache License 2.0', + tests_require=['pytest'], + entry_points={ + 'console_scripts': [ + 'vel_pub = velocity_publisher.vel_pub:main' + ], + }, +) diff --git a/Part02/src/velocity_publisher/test/test_copyright.py b/Part02/src/velocity_publisher/test/test_copyright.py new file mode 100644 index 0000000..cc8ff03 --- /dev/null +++ b/Part02/src/velocity_publisher/test/test_copyright.py @@ -0,0 +1,23 @@ +# Copyright 2015 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ament_copyright.main import main +import pytest + + +@pytest.mark.copyright +@pytest.mark.linter +def test_copyright(): + rc = main(argv=['.', 'test']) + assert rc == 0, 'Found errors' diff --git a/Part02/src/velocity_publisher/test/test_flake8.py b/Part02/src/velocity_publisher/test/test_flake8.py new file mode 100644 index 0000000..27ee107 --- /dev/null +++ b/Part02/src/velocity_publisher/test/test_flake8.py @@ -0,0 +1,25 @@ +# Copyright 2017 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ament_flake8.main import main_with_errors +import pytest + + +@pytest.mark.flake8 +@pytest.mark.linter +def test_flake8(): + rc, errors = main_with_errors(argv=[]) + assert rc == 0, \ + 'Found %d code style errors / warnings:\n' % len(errors) + \ + '\n'.join(errors) diff --git a/Part02/src/velocity_publisher/test/test_pep257.py b/Part02/src/velocity_publisher/test/test_pep257.py new file mode 100644 index 0000000..b234a38 --- /dev/null +++ b/Part02/src/velocity_publisher/test/test_pep257.py @@ -0,0 +1,23 @@ +# Copyright 2015 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ament_pep257.main import main +import pytest + + +@pytest.mark.linter +@pytest.mark.pep257 +def test_pep257(): + rc = main(argv=['.', 'test']) + assert rc == 0, 'Found code style errors / warnings' diff --git a/Part02/src/velocity_publisher/velocity_publisher/__init__.py b/Part02/src/velocity_publisher/velocity_publisher/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/Part02/src/velocity_publisher/velocity_publisher/a_star.py b/Part02/src/velocity_publisher/velocity_publisher/a_star.py new file mode 100644 index 0000000..4fef48c --- /dev/null +++ b/Part02/src/velocity_publisher/velocity_publisher/a_star.py @@ -0,0 +1,342 @@ +import sys +import time + +import heapq + +from .node import * +from .node_manager import * +from .environment import * +from .action_handler import * +from math import ceil + + + +#Git repo link https://github.com/itej89/AstarPathPlanner + + +class astar_planner: + + def __init__(self, env, action_handler) -> None: + self.env = env + self.action_handler = action_handler + + def goal_check(self, node, list_x, list_y, goal_dist_threshold): + #Eucledian distance threshold = 1.5 * radius + #Theta threshold = 30 + dist_matrix = np.sqrt((list_y - node[1])**2 + (list_x - node[0])**2) #dist formula = ((y2 - y1)^2 + (x2 - x1)^2)^1/2 + dist_matrix = dist_matrix.reshape(len(list_y), 1) #reshape to (n,1) + + _x = np.argwhere(dist_matrix < goal_dist_threshold) #get index of all elements in dist_matrix that are less than 0.5 + + + if len(_x) > 0: + return True + + def visited_node_check(self, node, list_x, list_y, list_theta, robot_radius): + #Eucledian distance threshold = 0.5 + #Theta threshold = 30 + + dist_matrix = np.sqrt((list_y - node[1])**2 + (list_x - node[0])**2) #dist formula = ((y2 - y1)^2 + (x2 - x1)^2)^1/2 + dist_matrix = dist_matrix.reshape(len(list_y), 1) #reshape to (n,1) + + _x = np.argwhere(dist_matrix < 0.5*robot_radius).ravel() #get index of all elements in dist_matrix that are less than 0.5 + + theta_threshold_min = node[2] - 10 #theta threshold min = node's theta -30 + theta_threshold_max = node[2] + 10 #theta threshold max = node's theta +30 + + # if _x.size > 0: + # thetas = list_theta[_x.ravel()] + + # range_thetas = np.where(np.logical_and(thetas>=theta_threshold_min, thetas<=theta_threshold_max)) + + # _len = len(range_thetas[0]) + # if _len > 0: + # return True, _x[range_thetas[0][0]] + + if len(_x) > 0: #if there are elements in dist_matrix that are less than 0.5 + # for i in _x: #loop through all elements in dist_matrix that are less than 0.5 + # if list_theta[i] > theta_threshold_min and list_theta[i] < theta_threshold_max: #if theta of node in list is within theta threshold + # return True, i #Then node is in the given list + return True, _x[0] + # return False, None #Else node is not in the given list + else: + return False, None #Else node is not in the given list + + def explore_actions(self, parent_node) -> bool: + """Function takes an environmet state and runs + possible actions for that state and extract + possible unique future states and stores them in the que + + Args: + parent_node (node): node representing a state of the + environment + + """ + + #get the position of the agent location + # this represents the (x,y) position on the grid + agents_postion = parent_node.Node_State + + #move the input node into explored list + self.visited_node_list.append(parent_node.Node_hash) + self.node_arrow_list[parent_node.Node_hash] = [] + + Sim_Pose = None + Status = False + + #iterate through all possible actions + for action in self.action_handler.Actions: + #make a copy of the current agents position + + #Compute agents possible furture position + Status ,Sim_Pose = self.action_handler.PerformAction(parent_node, action) + + #If the future position is in the bounds of the environemnt + if Status: + + simulated_position , Total_Cost_To_Come, Total_Cost_To_Go = Sim_Pose + #else of the state has already been explored then ignore the action + status, idx = self.visited_node_check(simulated_position, node_manager.node_x[self.visited_node_list], \ + node_manager.node_y[self.visited_node_list], node_manager.node_theta[self.visited_node_list], self.robot_radius) + if status: + continue + + NewNode = None + #if state has been visited for the first time then create anew node for the state + status, idx = self.visited_node_check(simulated_position, node_manager.node_x, node_manager.node_y, node_manager.node_theta, self.robot_radius) + if not status: + #Create a new node + NewNode = node_manager.make_node(simulated_position, Total_Cost_To_Come, Total_Cost_To_Go) + #Update the parent for the node + NewNode.Parent_Node_hash = parent_node.Node_hash + NewNode.Action_to_Node = action + heapq.heappush(self.pending_state_que, NewNode) + self.node_arrow_list[parent_node.Node_hash].append(NewNode.Node_hash) + + + #else if the state is in pending que then verify its current cost + # and update its cost and parent if necessary + else: + #If node can be reached in less cost + if node_manager.global_node_directory[idx].Total_Cost > Total_Cost_To_Come + Total_Cost_To_Go: + #update cost + node_manager.global_node_directory[idx].cost_to_come = Total_Cost_To_Come + node_manager.global_node_directory[idx].cost_to_go = Total_Cost_To_Go + node_manager.global_node_directory[idx].Total_Cost = Total_Cost_To_Come + Total_Cost_To_Go + #update new parent node + node_manager.global_node_directory[idx].Parent_Node_hash = parent_node.Node_hash + node_manager.global_node_directory[idx].Node_State = simulated_position + + NewNode = node_manager.global_node_directory[idx] + NewNode.Action_to_Node = action + heapq.heapify(self.pending_state_que) + + #Push new node to the pending que for future expoloration + # if NewNode != None: + # #Found an unique state that needs to be pushed in to the que + # heapq.heappush(self.pending_state_que, NewNode) + + + def find_goal_node(self, start_state, goal_state, robot_radius, goal_dist_threshold, display_env) -> bool: + """Takes start stat and goal state for the environement + and computes the tree using _Breadth first search algorithm till the goal state is reached + + Args: + start_state (2D numpy array): Start state for the environment + goal_state (2D numpy array): Goal state for the environment + """ + + #Call this fuction for auto id generation of nodes for a new tree + node_manager.initialize() + + #initailize states + self.robot_radius = robot_radius + self.goal_state = goal_state + self.initial_state = start_state + self.goal_dist_threshold = goal_dist_threshold + self.Final_Node = None + self.rendered_nodes = 0 + + #Initialize search que. This stores the nodes that needs to be explored + start_node = node_manager.make_node(self.initial_state) + self.pending_state_que = [start_node] + heapq.heapify(self.pending_state_que) + + self.visited_node_list = [] + self.node_arrow_list = {} + + #Perform search till a goal state is reached or maximum number of iterations reached + + render_freq = 100 + render_freq_counter = 0 + while True: + render_freq_counter += 1 + if render_freq_counter % render_freq == 0: + # self.visualize_exploration(start_state, goal_state, display_env, self.robot_radius, self.rendered_nodes) + self.rendered_nodes = len(self.visited_node_list) + + + if len(self.pending_state_que) > 0: + #fetch next node to be explored + next_item = heapq.heappop(self.pending_state_que) + else: + next_item = None + + + if next_item!= None: + next_node = next_item + # print(next_node.Node_State) + #Check if the next node is the goal node, then return success + if self.goal_check(next_node.Node_State, np.array([goal_state[0]]), np.array([goal_state[1]]), self.goal_dist_threshold): + self.Final_Node = next_node + print("Found the goal state!!!") + return True + + self.explore_actions(next_node) + else: + print("Unable to find the goal state!!!") + return False + + def back_track_actions(self): + """Funciton to find the final trajectory given the final goal node + + Returns: + list of tuples: list of trajectory points + """ + if self.Final_Node != None: + ul = [] + ur = [] + theta = [] + + last_node = self.Final_Node + ul.append(last_node.Action_to_Node[0]) + ur.append(last_node.Action_to_Node[1]) + theta.append(last_node.Node_State[2]) + + #back track untill the start node has been reached + while True: + last_node = node_manager.global_node_directory[last_node.Parent_Node_hash] + + if last_node.Parent_Node_hash == None: + break + + if last_node.Action_to_Node != None: + ul.append(last_node.Action_to_Node[0]) + ur.append(last_node.Action_to_Node[1]) + theta.append(node_manager.global_node_directory[last_node.Parent_Node_hash].Node_State[2]) + + + ul.reverse(); ur.reverse(); theta.reverse() + return ul, ur, theta + + return None + + def back_track(self): + """Funciton to find the final trajectory given the final goal node + + Returns: + list of tuples: list of trajectory points + """ + if self.Final_Node != None: + last_node = self.Final_Node + trajectory = [last_node.Node_State] + + #back track untill the start node has been reached + while True: + last_node = node_manager.global_node_directory[last_node.Parent_Node_hash] + trajectory.append(last_node.Node_State) + + if last_node.Parent_Node_hash == None: + break + + trajectory.reverse() + return trajectory + + return None + + def visualize_exploration(self, start_state, goal_state, display_environment, robot_radius, render_node_start_index = 0): + """Visualize exploration of the nodes + """ + #indicate start and goal nodes with circles5 + # display_environment.highlight_state(start_state, robot_radius, (255, 0, 0)) + display_environment.highlight_state(goal_state, robot_radius, (0, 255, 0)) + display_environment.refresh_map() + cv.waitKey(1) + + #variable to control gui update rate + update_count = 0 + # Loop through all visited nodes and show on the cv window + for idx in self.visited_node_list[render_node_start_index:]: + if node_manager.global_node_directory[idx].Parent_Node_hash != None: + parent_idx = node_manager.global_node_directory[idx].Parent_Node_hash + pose = (node_manager.node_x[parent_idx], node_manager.node_y[parent_idx], \ + node_manager.node_theta[parent_idx]) + display_environment.update_action(pose, \ + node_manager.global_node_directory[idx].Action_to_Node, self.env.is_valid_position) + + update_count +=1 + #Update gui every 300 iteration + if update_count == 500: + update_count = 0 + # display_environment.write_video_frame() + cv.waitKey(1) + + + + + + def visualize_trajectory(self, trajectory, display_environment, robot_radius): + """Funciton to visualize trajectory + + Args: + trajectory (lsit of tuples): trajectory + """ + update_count = 0 + # Loop through all visited nodes and show on the cv window + for node in self.pending_state_que: + + if node.Parent_Node_hash != None: + parent_idx = node.Parent_Node_hash + pose = (node_manager.node_x[parent_idx], node_manager.node_y[parent_idx], \ + node_manager.node_theta[parent_idx]) + display_environment.update_action(pose, \ + node.Action_to_Node, self.env.is_valid_position) + + update_count +=1 + #Update gui every 300 iteration + if update_count == 500: + update_count = 0 + cv.waitKey(1) + + + #Loop through all points in the trajectory + for point in trajectory: + display_environment.highlight_point(point) + #upodate map with the trajectory + # display_environment.refresh_map() + # for i in range(20): + # display_environment.write_video_frame() + # cv.waitKey(1) + + + + + + def animate_trajectory(self, trajectory, display_environment, robot_radius): + """Funciton to visualize trajectory + Args: + trajectory (lsit of tuples): trajectory + """ + #Loop through all points in the trajectory + frame = None + for point in trajectory: + frame = display_environment.show_robot(point, robot_radius) + # for i in range(2): + # display_environment.insert_video_frame(frame) + cv.waitKey(100) + + + cv.waitKey(1) + + + diff --git a/Part02/src/velocity_publisher/velocity_publisher/action_handler.py b/Part02/src/velocity_publisher/velocity_publisher/action_handler.py new file mode 100644 index 0000000..0ad02d0 --- /dev/null +++ b/Part02/src/velocity_publisher/velocity_publisher/action_handler.py @@ -0,0 +1,92 @@ +import math +import time +import numpy as np + +class action_handler: +#Class to handle the action supported by the environment + + + def __init__(self, env, step_size, start_pose, goal_pose, omega1, omega2, wheel_radius, wheel_base_width) -> None: + """Initilaize action handler parameters + Args: + env (environment): referance ot environment + """ + self.env = env + self.start_pose = start_pose + self.goal_pose = goal_pose + self.step_size = step_size + self.robot_wheel_radius = wheel_radius + self.wheel_base_width = wheel_base_width + + #define al the actions possible in the environment + self.Actions = [(0, omega1), (omega1,0), (omega1, omega1), (0, omega2), (omega2, 0), (omega2,omega2), (omega1, omega2), (omega2, omega1)] + print(f"ACTION SET : {self.Actions}") + + + def cost_norm(self, pose1, pose2): + x1, y1, _ = pose1 + x2, y2, _ = pose2 + return np.linalg.norm([x1 - x2, y1 - y2]) + + def ActionValues(self, pose, action): + Xi,Yi,Thetai = pose + UL, UR = action + + t = 0 + dt = 0.1 + + Xn=Xi + Yn=Yi + Thetan = 3.14 * Thetai / 180 + + + cost_to_go=0 + while t 2): + pco = pyclipper.PyclipperOffset() + pco.AddPath(vertices, pyclipper.PT_CLIP, pyclipper.ET_CLOSEDPOLYGON) + vertices_inflated = pco.Execute(radius) + vertices_inflated = [tuple(x) for x in vertices_inflated[0]] + else: + vertices_inflated = vertices.copy() + vertices_inflated[1] += radius + + return vertices_inflated + + def __init__(self, height, width, inflation_radius, step_size, robot_wheel_radius, wheel_base_width) -> None: + """Initialize environment parameters + + Args: + height (int): height of the map + width (int): width of the map + """ + self.height = height + self.width = width + self.inflation_radius = inflation_radius + + self.step_size = step_size + self.robot_wheel_radius = robot_wheel_radius + self.wheel_base_width = wheel_base_width + + #create a map fo given dimentions. 3 channels for opencv BGR + self.map = np.ones((height, width, 3)) + + #create obstacle models for all the objects in the environment + + #create original boundary obstacle model + self.boundary_model = obstacle_model([ + [(600,0), (600,200), (0,200), (0,0)] + ]) + + #create inflated boundary obstacle model + self.inflated_boundary_model = obstacle_model([ + self.inflate_polygon([(600,0), (600,200), (0,200), (0,0)], -1*self.inflation_radius), + ]) + + #create original polygon objects obstacle model + self.original_obstacle_model = obstacle_model([ + [(150,200), (150,75), (165,75), (165,200), (150,200)], # Polygon corresponding to botom rectangular pillar + [(250,125), (250,0), (265,0), (265,125), (250,125)], + [(400,110), 50] # Polygon corresponding to Top rectangular pillar + ]) + + + #create inflated polygon objects obstacle model + self.inflated_obstacle_model = obstacle_model([ + self.inflate_polygon([(150,200), (150,75), (165,75), (165,200), (150,200)], self.inflation_radius), + self.inflate_polygon([(250,125), (250,0), (265,0), (265,125), (250,125)], self.inflation_radius), + self.inflate_polygon([(250,125), (250,0), (265,0), (265,125), (250,125)], self.inflation_radius), + self.inflate_polygon([(400,110), 50], self.inflation_radius), + ]) + + + def create_map(self): + """Idnetify obstacles and free space in the map uisng the obstacle models + """ + #Iterate through all the states in the enviornement + for i in range(self.map.shape[0]): + for j in range(self.map.shape[1]): + #Checks if state present inside the non-inflated obstacle + if self.original_obstacle_model.is_inside_obstacle((j,i)): + self.map[i,j] = [0x80, 0x80, 0xff] + + #Checks if state present inside an inflated obstacle + elif self.inflated_obstacle_model.is_inside_obstacle((j,i)): + self.map[i,j] = [0x7a, 0x70, 0x52] + + #Checks if state present outside the inflated boundary + elif not self.inflated_boundary_model.is_inside_obstacle((j,i)): + self.map[i,j] = [0x7a, 0x70, 0x52] + + #Identify as state belongs to the free space + else: + self.map[i,j] = [0xc2, 0xba, 0xa3] + + + def is_valid_position(self, position): + """Checks if a given position belongs to free space or obstacle space + Args: + position (tuple): state of the agent to be verified + Returns: + bool: True if pixel belongs to free space else False + """ + if position[0] >=0 and position[0] < 200 and position[1] >=0 and position[1] < 600: + #Check if a state belongs to free psace by comparing the color of the respective pixel in the environment + if self.map[position[0], position[1]][0] == 0xc2 and \ + self.map[position[0], position[1]][1] == 0xba and \ + self.map[position[0], position[1]][2] == 0xa3: + return True + + #return false of state is in obstacle space + return False + + + def refresh_map(self): + """Refreshes map with updated image map + """ + #Flip the map to satisfy the environment direction + image = cv.flip(self.map.astype('uint8'), 0) + image = cv.resize(image, (0,0), fx = 2, fy = 2) + cv.imshow("map", image) + + def update_map(self, position): + """Highlights a point in the environment at the given locaiton + Args: + position (tuple): pixel location + """ + i, j, _ = position + self.map[i, j] = [255, 255, 255] + self.refresh_map() + + def update_action(self, pose, action, validity_checker): + Xi,Yi,Thetai = pose + UL, UR = action + t = 0 + # r = 0.038 + # L = 0.354 + r = self.robot_wheel_radius + L = self.wheel_base_width + dt = 0.1 + Xn=Xi + Yn=Yi + Thetan = 3.14 * Thetai / 180 + # Xi, Yi,Thetai: Input point's coordinates + # Xs, Ys: Start point coordinates for plot function + # Xn, Yn, Thetan: End point coordintes + cost_to_go=0 + while t None: + """Constructor for node object + Args: + state (numpy array): state of the environemnt + """ + + # an unique id is generated for the given state by taking the + # numpy array ordered elements in sequence + self.Node_hash = hash + + #node's parent id + # contains "None" if the node is top of the tree + self.Parent_Node_hash = None + + #Contains state that this node represents + self.Node_State = state + + #Contains cost-to-come to the node + self.Cost_to_Come = cost_to_come + + #Contains cost-to-Go to the goal + self.Cost_to_Go = cost_to_go + + #Contains total-cost to the goal + self.Total_Cost = self.Cost_to_Come + self.Cost_to_Go + + #Contains the action taken to reach this node + self.Action_to_Node = None + + #Funciton to help heapq for comparing two node objects + def __lt__(self, other): + return self.Total_Cost < other.Total_Cost \ No newline at end of file diff --git a/Part02/src/velocity_publisher/velocity_publisher/node_manager.py b/Part02/src/velocity_publisher/velocity_publisher/node_manager.py new file mode 100644 index 0000000..8bad096 --- /dev/null +++ b/Part02/src/velocity_publisher/velocity_publisher/node_manager.py @@ -0,0 +1,55 @@ + +from .node import * +import numpy as np + +class node_manager: +#Class to mange all the node creation and maintainance + + #A global node directory where all the nodes that has been creates will be stored + global_node_directory = {} + + node_x = None + node_y = None + node_theta = None + + + #call this funciton before starting the search to reset id + def initialize(): + node_manager.global_node_directory.clear() + + + #Funciton to create a hash map for a given state of the node + def make_hash(state): + #For the current use case we can just use the lcoation of the pixel as hash + if node_manager.node_x is None: + node_manager.node_x = np.array([state[0]]) + node_manager.node_y = np.array([state[1]]) + node_manager.node_theta = np.array([state[2]]) + else: + node_manager.node_x = np.append(node_manager.node_x, state[0]) + node_manager.node_y = np.append(node_manager.node_y, state[1]) + node_manager.node_theta = np.append(node_manager.node_theta, state[2]) + + return len(node_manager.node_x) - 1 + + + def make_node(state, cost_to_come = 0, cost_to_go = 0): + """Funciton creates a Node using the given parameterts + + Args: + state (tuple): location of the node + cost_to_come (int, optional): cost to come to the node. Defaults to 0. + cost_to_go (int, optional): cost to go from the node. Defaults to 0. + + Returns: + node: referance to the new node stored the global directory + """ + #Make hash for the node + hash = node_manager.make_hash(state) + + #Create a node and store it in global node directory + node_manager.global_node_directory[hash] = node(state, hash, cost_to_come, cost_to_go) + + #Return referance to the new node + return node_manager.global_node_directory[hash] + diff --git a/Part02/src/velocity_publisher/velocity_publisher/obstacle_model.py b/Part02/src/velocity_publisher/velocity_publisher/obstacle_model.py new file mode 100644 index 0000000..6e819bf --- /dev/null +++ b/Part02/src/velocity_publisher/velocity_publisher/obstacle_model.py @@ -0,0 +1,89 @@ +import unittest +import numpy as np + + +class obstacle_model: + """Class to detect obstacles from a given set of polygon vertices + """ + + def __init__(self, _obstacles) -> None: + """Constructor takes a lists of vertices belonging to multiple polygons representing the obstacles + + Args: + _obstacles (list of lists of tuples): [[poly1_vertices], [poly2_vertices]] + """ + self.obstacles = _obstacles + + + def is_inside_obstacle(self, point): + """Given a point, the method computes if the point lies inside any of the obstacles + Loaded in the obstacle model + + Args: + point (tuple): point + + Returns: + bool: True of point lies inside any obstacle, False if point is outside + all the loaded obstacles + """ + + # lopp through each polygon + for vertices in self.obstacles: + is_inside_obstacle = True + #loop through pairs of vertices and find if point lies on which side of the polygon + if len(vertices) > 2: # if not a circle + for i in range(len(vertices)): + if is_inside_obstacle: + + #If final vertex then pair it with the initial vertex + if i == len(vertices) - 1: + x2, y2 = vertices[i][0], vertices[i][1] + x1, y1 = vertices[0][0], vertices[0][1] + else: + #Chose vertices order so the the point on the left side of the line gives a negative value + x2, y2 = vertices[i][0], vertices[i][1] + x1, y1 = vertices[i+1][0], vertices[i+1][1] + + #Compute line coefficients for the vetex pair (ax+by+c = 0) + a = (y1 - y2) + b = (x1 - x2) * -1 + c = x1*y2 - x2*y1 + + #substitute the point and compute the direction + l = a*point[0] + b*point[1] + c + + #if vertex lies outside a polygon line then break the check + if l > 0: + is_inside_obstacle = False + break + + else: # if circle + l = (point[0] - vertices[0][0]) ** 2 + (point[1] - vertices[0][1]) ** 2 - vertices[1] ** 2 + + if l > 0: + is_inside_obstacle = False + + #if the point is found inside any polygon return true + if is_inside_obstacle: + return True + + #if the point lies outside all the poygons return false + return False + + +#test case to check the obstacle_model class +class obstacle_model_test(unittest.TestCase): + def setUp(self): + self.obstacleModels = obstacle_model([ [(60, 0), (60, 50), (50,50), (50,0) ] ]) + + def test_free_space(self): + self.assertEqual(self.obstacleModels.check_obstacle((70,70)), False, "free_space error!!") + + def test_obstacle(self): + self.assertEqual(self.obstacleModels.check_obstacle((55,40)), True, "obstacle test failed!!") + + def tearDown(self): + pass + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/Part02/src/velocity_publisher/velocity_publisher/planner.py b/Part02/src/velocity_publisher/velocity_publisher/planner.py new file mode 100644 index 0000000..a47e1c3 --- /dev/null +++ b/Part02/src/velocity_publisher/velocity_publisher/planner.py @@ -0,0 +1,82 @@ +from .a_star import * + + +class planner: + + wheel_radius = 3.3 + robot_radius = 10.5 + wheel_base_width = 16.0 + + goal_dist_threshold = 0 + obstacle_inflation = 5 + step_size = 1 + def plan(self, omega1, omega2, start_state, goal_state, clearance): + + self.obstacle_inflation = clearance + + self.robot_radius = round(self.robot_radius) + if self.robot_radius < 1: + self.robot_radius = 1 + + self.goal_dist_threshold = self.robot_radius + #Create environment + print("Please wait while creating the environment.") + #Create environment for showing the final visualization-------------------------- + #This does not show the additional inflation for robot radius. Insted, it will inflate the robot size + self.display_environment = environment(200, 600, self.obstacle_inflation, self.step_size, self.wheel_radius, self.wheel_base_width) + self.display_environment.create_map() + #------------------------------------------------------------------------------- + + self._environment = environment(200, 600, self.robot_radius+self.obstacle_inflation, self.step_size, self.wheel_radius, self.wheel_base_width) + self._environment.create_map() + print("Environment created successfully.") + + + #Create action handler for the environment + _actionHandler = action_handler(self._environment, self.step_size, start_state, goal_state, omega1, omega2, self.wheel_radius, self.wheel_base_width) + + #create planner + self._astar_planner = astar_planner(self._environment, _actionHandler) + + #Request planner to find a plan + print("Planning started.") + print(f"Start Position : {[start_state[1], start_state[0]]}, Goal Postion : {[goal_state[1], goal_state[0]]}") + print("Please wait while searching for the goal state...") + start_time = time.time() + _status = self._astar_planner.find_goal_node(start_state, goal_state, self.robot_radius, self.goal_dist_threshold, self.display_environment) + elspsed_time = time.time() - start_time + + print(f"Total time taken to run the algorithm : {elspsed_time} seconds\n") + + #If Planner is successfull, visualize the plan + return _status + + def get_trajectory(self): + return self._astar_planner.back_track() + + def get_action_set(self): + return self._astar_planner.back_track_actions() + + def view_plan(self, start_state, goal_state, trajectory): + if trajectory != None: + # display_environment.begin_video_writer() + self._astar_planner.visualize_exploration(start_state, goal_state, self.display_environment, self.robot_radius) + self._astar_planner.visualize_trajectory(trajectory, self.display_environment, self.robot_radius) + self._astar_planner.animate_trajectory(trajectory, self.display_environment, self.robot_radius) + # display_environment.close_video_writer() + + + + + +if __name__ == "__main__": + rpm1 = (50*2*math.pi)/60 + rpm2 = (100*2*map.pi)/60 + + start_state = (30, 30, 30) + goal_state = (100, 500, 30) + + _planner = planner() + if _planner.plan(rpm1, rpm2, start_state, goal_state): + _planner.view_plan(start_state, goal_state, _planner.get_trajectory()) + print(_planner.get_action_set()) \ No newline at end of file diff --git a/Part02/src/velocity_publisher/velocity_publisher/vel_pub.py b/Part02/src/velocity_publisher/velocity_publisher/vel_pub.py new file mode 100644 index 0000000..d1a473b --- /dev/null +++ b/Part02/src/velocity_publisher/velocity_publisher/vel_pub.py @@ -0,0 +1,293 @@ +import rclpy +from geometry_msgs.msg import Twist +from rclpy.node import Node +from rclpy.qos import QoSProfile #qos profile for publisher +from math import cos, sin, sqrt, pi +from example_interfaces.msg import String +import time + +from .planner import planner + + +class velocity_publsiher(): + def __init__(self): + self.BURGER_MAX_LIN_VEL = 0.22 # in m/s + self.BURGER_MAX_ANG_VEL = 2.84 # in rad/s + self.r = 0.033 # in meter + self.L = 0.16 # in meter + self.map_width = 6 # in meter + self.map_height = 2 # in meter + + self.node = Node("velocity_publisher") #Node name + qos = QoSProfile(depth=10) #qos profile for publisher is 10 + self.pub = self.node.create_publisher(Twist, '/cmd_vel', qos) #Topic name is /cmd_vel, queue size is qos, message type is Twist + + self.start_node_pub = self.node.create_publisher(String, '/start_node', 10) #create a publisher to publish the start node to /start_node topic + + self.start_node = self.node.declare_parameter('start_node') #declare a parameter to get the start node from launch file + self.goal_node = self.node.declare_parameter('goal_node') + self.RPM1 = self.node.declare_parameter('RPM1') + self.RPM2 = self.node.declare_parameter('RPM2') + self.clearance = self.node.declare_parameter('clearance') + + + #Function to constrain robott velocity between low_bound and high_bound + #Turtlebot has a max linear velocity of 0.22 m/s and max angular velocity of 2.84 rad/s + #Any vel above 0.22 m/s or 2.84 rad/s will be set to 0.22 m/s or 2.84 rad/s + #Input: velocity, low_bound, high_bound + #Output: constrained velocity + def vel_constrain(self, input_vel, low_bound, high_bound): + if input_vel < low_bound: + input_vel = low_bound + elif input_vel > high_bound: + input_vel = high_bound + else: + input_vel = input_vel + + return input_vel + + #Function to check if linear velocity is within the limit and call vel_constrain function + #Input: linear velocity list + #Output: checked linear velocity list + def check_linear_limit_velocity(self, lin_velocity_list): + for i in range(len(lin_velocity_list)): + lin_velocity_list[i] = self.vel_constrain(lin_velocity_list[i], -self.BURGER_MAX_LIN_VEL, self.BURGER_MAX_LIN_VEL) + + return lin_velocity_list + + #Function to check if angular velocity is within the limit and call vel_constrain function + #Input: angular velocity list + #Output: checked angular velocity list + def check_angular_limit_velocity(self, ang_velocity_list): + for i in range(len(ang_velocity_list)): + ang_velocity_list[i] = self.vel_constrain(ang_velocity_list[i], -self.BURGER_MAX_ANG_VEL, self.BURGER_MAX_ANG_VEL) + + return ang_velocity_list + + + + #Function to publish velocity to "/cmd_vel" topic + #Input: linear velocity list and angular velocity list + #Output: publish to /cmd_vel topic + def ros_pub(self, lin_vel, ang_vel): + move = Twist() #create a Twist message + for i in range(len(lin_vel)): + # for i in range(3): + move.linear.x = lin_vel[i] #set x linear velocity + move.angular.z = -1*ang_vel[i] #set z angular velocity + self.pub.publish(move) #publish the message + time.sleep(1.09) + + move.linear.x = 0.0 #set x linear velocity + move.angular.z = 0.0 #set z angular velocity + self.pub.publish(move) #publish the message + time.sleep(3) + + + #Function to convert wheel velocity to robot velocity + #Based on the formula: x = r/2 * (ul+ur) * cos(theta), y = r/2 * (ul+ur) * sin(theta), theta = r/L * (ur - ul) + #Input: ul, ur, theta + #Output: linear velocity list and angular velocity list + def convert_wheel_vel_to_robot_vel(self, ul, ur, theta): + lin__vel = [] + ang_vel = [] + + for i in range(len(ul)): + x_dot = (self.r/2) * (ul[i]+ur[i]) * cos(theta[i]) #mobile robot kinematics equation x_dot = (r/2) * (ul+ur) * cos(theta) + y_dot = (self.r/2) * (ul[i]+ur[i]) * sin(theta[i]) #mobile robot kinematics equation y_dot = (r/2) * (ul+ur) * sin(theta) + theta_dot = (self.r/self.L) * (ur[i] - ul[i]) #mobile robot kinematics equation theta_dot = (r/L) * (ur - ul) + + lin__vel.append(sqrt(x_dot ** 2 + y_dot ** 2)) #linear velocity = sqrt(x_dot^2 + y_dot^2) + ang_vel.append(theta_dot) + + return lin__vel, ang_vel + + #Function to convert meter to centimeter for a node + #Input: node + #Output: converted node + def convert_m_to_cm(self, node): + node[0] = node[0] * 100 + node[1] = node[1] * 100 + return node + + #Function to convert gazebo coordinate to opencv coordinate + #Input: node/cooridnate + #Output: converted node/coordinate + def convert_xy_gazebo_to_opencv(self, node): + node[0] = node[0]+1 + node[1] = node[1]+0.5 + return node + + #Function to convert centimeter to meter for linear velocity list + #Input: linear velocity list + #Output: converted linear velocity list + def convert_cms_to_ms(self, lin_velocity_list): + for i in range(len(lin_velocity_list)): + lin_velocity_list[i] = lin_velocity_list[i] / 100 + + return lin_velocity_list + + #Function to convert degree to radian for angular velocity list + #Input: angular velocity list + #Output: converted angular velocity list + def convert_deg_to_rad(self, ang_velocity_list): + for i in range(len(ang_velocity_list)): + ang_velocity_list[i] = (ang_velocity_list[i]) * pi/180 + + return ang_velocity_list + + + #Function to process node from user input to opencv requirement + #Using above functions + #Input: node + #Output: processed node + def node_process_user_to_opencv(self, node): + node = self.convert_xy_gazebo_to_opencv(node) + node = self.convert_m_to_cm(node) + return node + + #Function to process node from opencv output to ROS requirement + #Using above functions + #Input: ul, ur, theta lists + #Output: processed ul, ur, theta lists + def vel_process_opencv_to_ROS(self, ul, ur, theta): + theta = self.convert_deg_to_rad(theta) + return ul, ur, theta + + +##Requirements: + ####A* & opencv: + ##Input: + #start node (x,y,theta_s) - x,y in cm, theta_s in degree + #goal node (x,y,theta_s) - x,y in cm, theta_s in degree + #RPM1 & RPM2 is revoluation per minute + #clearance is in cm + + ##Output: + #ul,ur in cm/s + #theta in degree + + + ####Gazebo & ROS: + ##What I need: + #start node (x,y,theta_s) - x,y in m, theta_s in rad + #ul,ur in m/s + #theta in rad + + ####User Input: + #start node (x,y,theta_s) - x,y in m, theta_s in degree (theta_s is in degree for user convenience) + #goal node (x,y,theta_s) - x,y in m, theta_s in degree + #RPM1 & RPM2 - revoluation per minute + #clearance - in mm (for user convenience) + + +def main(): + rclpy.init() #initiate ros2 + p1 = velocity_publsiher() #create a object of class velocity_publisher + + p1.start_node = p1.start_node.get_parameter_value().double_array_value + print("User start_node: ", p1.start_node) #Print the obtained data to terminal + p1.node.get_logger().info("User start_node: %s" % p1.start_node) #print and log the user input to ROS2 + + p1.goal_node = p1.goal_node.get_parameter_value().double_array_value + print("User goal_node: ", p1.goal_node) + p1.node.get_logger().info("User goal_node: %s" % p1.goal_node) + + p1.clearance = p1.clearance.get_parameter_value().double_value + print("User clearance: ", p1.clearance) + p1.node.get_logger().info("User clearance: %s" % p1.clearance) + + p1.RPM1 = p1.RPM1.get_parameter_value().double_value + print("User RPM1: ", p1.RPM1) + p1.node.get_logger().info("User RPM1: %s" % p1.RPM1) + + p1.RPM2 = p1.RPM2.get_parameter_value().double_value + print("User RPM2: ", p1.RPM2) + p1.node.get_logger().info("User RPM2: %s" % p1.RPM2) + + # print("Enter start node (x,y,theta_s) (x,y) in m, theta_s in degree: ") + # start_node = [float(x) for x in input().split()] + + # print("Enter goal node (x,y,theta_s) (x,y) in m, theta_s in degree: ") + # goal_node = [float(x) for x in input().split()] + + # print("Enter RPM1 (revoluation per minute): ") + # RPM1 = int(input()) + + # print("Enter RPM2 (revoluation per minute): ") + # RPM2 = int(input()) + + # print("Enter clearance (in mm): ") + # clearance = int(input()) + + start_node = [p1.start_node[1],p1.start_node[0], p1.start_node[2]] #convert x,y to y,x for opencv + goal_node = [p1.goal_node[1],p1.goal_node[0], p1.goal_node[2]] + + #Process User Input: + print("Processing User Input...") + start_node = p1.node_process_user_to_opencv(start_node) #Convert to cm and opencv coordinate system + goal_node = p1.node_process_user_to_opencv(goal_node) + clearance = int(p1.clearance) / 10 #convert to cm + RPM1 = (p1.RPM1*2*pi)/60 #convert to rev/s and rev/s * 2pi = angular vel of wheel (any) + RPM2 = (p1.RPM2*2*pi)/60 #convert to rev/s and rev/s * 2pi = angular vel of wheel (any) + + # #Test Case 3: + # start_node = [30,30,30] + # goal_node = [180,400,30] + # clearance = 5 + # RPM1 = (50*2*pi)/60 + # RPM2 = (100*2*pi)/60 + + start_node = [int(start_node[0]), int(start_node[1]), int(start_node[2])] #convert to int + goal_node = [int(goal_node[0]), int(goal_node[1]), int(goal_node[2])] + + #Print User Input fed to A* Algorithm: + print("Processed User Input for A* Algorithm:") + print("Start Node: ", start_node) + print("Goal Node: ", goal_node) + print("RPM1: ", RPM1) + print("RPM2: ", RPM2) + print("Clearance: ", clearance) + print("Running A* Algorithm...") + + + _planner = planner() + if _planner.plan(RPM1, RPM2, start_node, goal_node, int(clearance)): #if A* Algorithm is successf + ul,ur,theta = _planner.get_action_set() #get path found + _planner.view_plan(start_node, goal_node,_planner.get_trajectory()) #view the trajectory 2D + + # #Test Case 1: + # ul = [0.5,0.4,0.3,0.2,0.1,0.0,-0.1,-0.2,-0.3,-0.4,-0.5] #in m/s + # ur = [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0] #in m/s + # theta = [0.5,0.4,0.3,0.2,0.1,0.0,-0.1,-0.2,-0.3,-0.4,-0.5] #in rad + + #Process A* Output: + print("Processing A* Output...") + ul, ur, theta = p1.vel_process_opencv_to_ROS(ul, ur, theta) #convert to ROS coordinate system and m/s + + #Convert wheel velocity to robot velocity: + print("Converting wheel velocity to robot velocity...") + lin_vel, ang_vel = p1.convert_wheel_vel_to_robot_vel(ul, ur, theta) + + # # Check velocity limit: + # print("Checking velocity limit for Turtlebot...") + # lin_vel = p1.check_linear_limit_velocity(lin_vel) + # ang_vel = p1.check_angular_limit_velocity(ang_vel) + + # #Test Case 2: + # lin_vel = [0.5,0.4,0.3,0.2,0.1,0.0,-0.1,-0.2,-0.3,-0.4,-0.5] #in m/s + # ang_vel = [0.5,0.4,0.3,0.2,0.1,0.0,-0.1,-0.2,-0.3,-0.4,-0.5] #in rad/s + + #Publish velocity: + print("Publishing velocity...") + p1.ros_pub(lin_vel, ang_vel) #publish velocity to Turtlebot + print("Velocity published") + + + rclpy.spin(p1.node) #spin the node + p1.node.destroy_node() #destroy the node explicitly + rclpy.shutdown() #shutdown ros2 + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/Part02/src/velocity_publisher/velocity_publisher/visualization_ph2.py b/Part02/src/velocity_publisher/velocity_publisher/visualization_ph2.py new file mode 100644 index 0000000..b080519 --- /dev/null +++ b/Part02/src/velocity_publisher/velocity_publisher/visualization_ph2.py @@ -0,0 +1,172 @@ +import numpy as np +import cv2 +from statistics import mean +import math + +#Map Creation using Half Plane Method & OpenCV +class visualization(): + def __init__(self): + + # Normal Obstacle Space: (points taken in clock-wise order) + self.rect1_n = [(150,200), (150,75), (165,75), (165,200), (150,200)] #Rect 1 in the given map + self.rect2_n = [(250,125), (250,0), (265,0), (265,125), (250,125)] #Rect 2 in the given map + self.cir_n = [(400,110), 50] + self.wall_n = [(0, 0), (600,0), (600,200), (0,200), (0,0)] #Boundary of the map + + #Empty Lists to store the equations of the lines + self.rect1_n_eq = [] + self.rect2_n_eq = [] + self.cir_n_eq = [] + self.wall_n_eq = [] #Wall in given map + + self.rect1_b_eq = [] + self.rect2_b_eq = [] + self.cir_b_eq = [] + self.wall_b_eq = [] + + #Function to calculate the equation of a line given two points + # ax+by+c=0 + def line_eq(self, pt1, pt2): + a = pt2[1] - pt1[1] #a = y2 - y1 + b = pt1[0] - pt2[0] #b = x1 - x2 + c = a*(pt1[0]) + b*(pt1[1]) #-c = ax1 + by1 + return [a, b, -c] #return the equation of the line + + def check_poly(self, poly, x, y): #Function to check if a node is inside any polygon obstacle space + count = 0 + if len(poly) > 2: + for i in range(len(poly)): + if poly[i][0] * x + poly[i][1] * y + poly[i][2] <= 0: #Check if the node is on the left side of the line + count+=1 #If yes, increment the count + + if count == len(poly): #If the count is equal to the number of lines, the node is inside the polygon + return True + else: + return False + + else: + for i in range(len(poly)-1): + if ((x - poly[i][0]) ** 2 + (y - poly[i][1]) ** 2 - poly[i+1] ** 2) <= 0: + return True + else: + return False + + # Find the centroid a shape + def centroid(self, shape): + x_coor = [x for x, y in shape] + y_coor = [y for x, y in shape] + centroid_x = int(mean(x_coor)) + centroid_y = int(mean(y_coor)) + + return (centroid_x, centroid_y) + + # Find the bloated line equations of a given shape + def bloat_eqns(self, shape, clearance): + shape_b_eq = [] + if len(shape) > 2: + centroid_x, centroid_y = self.centroid(shape) + for i in range(len(shape)-1): #Find the equation of all the lines of obstacle + # bloating line equations + op = [] + op1 = [] + + op = self.line_eq(shape[i], shape[i+1]) + #print("For points ","(",rect1_n[i], rect1_n[i+1], ")","NOrmal obstacle eqn: ", op[0],"x"," +", op[1],"y"," +",op[2]) + + op1 = op.copy() + new_op = [] + + # find out if the line is towards the center or away from the center + loc = op1[0]*centroid_x + op1[1]*centroid_y + op1[2] + + new_op.append(op1[0]) # append a + new_op.append(op1[1]) # append b + + if loc < 0: # line is away from center + op1[2] = op1[2] - (clearance * math.sqrt((op1[0]**2) + (op1[1]**2))) + else: # line is towards center + op1[2] = op1[2] + (clearance * math.sqrt((op1[0]**2) + (op1[1]**2))) + new_op.append(op1[2]) # append c + + shape_b_eq.append(new_op) #Bloated obstacle eqn + else: + shape_b_eq = shape.copy() + shape_b_eq[1] += clearance + + return shape_b_eq + + def normal_eqns(self, shape): + shape_n_eq = [] + if len(shape) > 2: + for i in range(len(shape)-1): #Find the equation of all the lines of shape obstacles + shape_n_eq.append(self.line_eq(shape[i], shape[i+1])) #Normal shape obstacle eqn + else: + shape_n_eq = shape.copy() + + return shape_n_eq + + #Function to check if a point lies inside a shape - Half Plane Method + #Each shape's points as per map is taken in clockwise direction and given as a list + def map_half_plane(self, clearance, robot_radius): + + clearance = clearance + robot_radius + self.image = np.zeros((600, 200, 3), np.uint8) #create a new image with a black background + + self.rect1_n_eq = self.normal_eqns(self.rect1_n) + self.rect1_b_eq = self.bloat_eqns(self.rect1_n, clearance) + + self.rect2_n_eq = self.normal_eqns(self.rect2_n) + self.rect2_b_eq = self.bloat_eqns(self.rect2_n, clearance) + + self.cir_n_eq = self.normal_eqns(self.cir_n) + self.cir_b_eq = self.bloat_eqns(self.cir_n, clearance) + + self.wall_n_eq = self.normal_eqns(self.wall_n) + self.wall_b_eq = self.bloat_eqns(self.wall_n, -clearance) + + for i in range(0, 600): #Loop through all the nodes (pixels) in the map + for j in range(0, 200): + if (self.check_poly(self.rect1_b_eq, i, j) == True): #Check if the node is inside the bloated rect 1 obstacle space + if (self.check_poly(self.rect1_n_eq, i, j) == True): #Check if the node is inside the normal rect 1 obstacle space + self.image[i][j] = [255, 255, 255] + else: + self.image[i][j] = [255, 191, 0] + + elif (self.check_poly(self.rect2_b_eq, i, j) == True): #Check if the node is inside the bloated rect 1 obstacle space + if (self.check_poly(self.rect2_n_eq, i, j) == True): #Check if the node is inside the normal rect 1 obstacle space + self.image[i][j] = [255, 255, 255] + else: + self.image[i][j] = [255, 191, 0] + + elif (self.check_poly(self.cir_b_eq, i, j) == True): #Check if the node is inside the bloated rect 1 obstacle space + if (self.check_poly(self.cir_n_eq, i, j) == True): #Check if the node is inside the normal rect 1 obstacle space + self.image[i][j] = [255, 255, 255] + else: + self.image[i][j] = [255, 191, 0] + + if (self.check_poly(self.wall_n_eq, i, j) == True): #Check if the node is inside the normal wall obstacle space + if (self.check_poly(self.wall_b_eq, i, j) == False): #Check if the node is not inside the bloated wall obstacle space + self.image[i][j] = [255, 191, 0] + + return self.image #Return the image with the map + + def highLight_node(self, node, color): #Function to highlight a node in the map + self.image[node[0],node[1]] = color #Change the color of the node to the specified color + + def highLight_position(self, node, color): #Function to highlight the point robot in the map + plot = self.image.copy() + cv2.circle(plot, (node[1],node[0]), 5, color, -1) #Draw a circle at the specified node + return plot + +def main(): + map = visualization() #Create a map object + image = map.map_half_plane(5, 5) #Create the map + + image = cv2.rotate(image, cv2.ROTATE_90_COUNTERCLOCKWISE, image) #Rotate the map by 90 degrees + image = cv2.resize(image, (0,0), fx=2, fy=2) #scale the map to bigger size + cv2.imshow("Map", image) #Display the map + cv2.waitKey(0) #Wait for a key press + cv2.destroyAllWindows() #Destroy all windows + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/README.md b/README.md index 35139a3..d76ba8c 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Implementation-of-A*-Path-Planning-Algorithm +# Implementation-of-A*-Path-Planning-Algorithm using TurtleBot Gazebo ROS2 ## Team Members: - Tej (119197066 - itej89) @@ -7,19 +7,20 @@ ## Google Drive Link: - Below link has the Animation Video output: - - https://drive.google.com/file/d/1Xx3JgBlFZ-htePhxba8Yi7xv2LvUGVkv/view + - Part01: https://drive.google.com/file/d/1BFqM_ZlW0QAr9SYCJtw362kum45erf2Z/view?usp=share_link + - Part02: https://drive.google.com/file/d/13a0W3pZDtqHQqK6NQ6CZKaSkKELC-JiH/view?usp=share_link ## Github Link: -- https://github.com/stark-2000/AstarPathPlanner +- https://github.com/stark-2000/AstarPathPlanner/tree/AStar_Turtlebot3 + ## Instructions: - Clone the repository to your local machine using the following command: ``` - git clone https://github.com/stark-2000/AstarPathPlanner.git + git clone -b AStar_Turtlebot3 https://github.com/stark-2000/AstarPathPlanner.git ``` - cd into the cloned repository ``` - cd AstarPathPlanner + cd /AstarPathPlanner ``` - Alternatively, you can download the zip file of the repository and extract it to your local machine: @@ -27,48 +28,125 @@ cd AstarPathPlanner-master ``` -- Open Terminal and Run the following commands & test with the following test cases: (make sure visulization.py file is in the same folder) - ``` - python3 a_star_arshad_dhinesh_tej.py - ``` -- Now for Test Case 1, Enter the following values when prompted or any random values of your choice: - - Enter Radius of the robot as "5" - - Enter the clearance as "5" - - Enter step size for robot movement as "5" - - Enter the start node as "30,30,30" - - Enter the goal node as "418,120,30" - -- Repeat the above step for Test Case 2 with the following values or any random values of your choice: - - Enter Radius of the robot as "5" - - Enter the clearance as "5" - - Enter step size for robot movement as "10" - - Enter the start node as "30,30,30" - - Enter the goal node as "120,120,30" +- Part1: Open Terminal and Run the following commands + - Note: Ensure your inside the repository folder + ``` + cd Part01 + python3 planner.py + ``` + - Note: Follow the same syntax & units for input arguments for random test cases + - RPM1 & RPM2 are in Revolutions per minute + - Clearance is in cm + - Start Node & Goal Node is in the format: x,y,theta, where x & y are in cm and theta is in degrees + + - When promted for input values, enter any random test cases or the following test cases: + - Test Case 1: + - start_node = 30,30,30 + - goal_node = 400,180,30 + - RPM1 = 50 + - RPM2 = 100 + - clearance = 5 + + - Test Case 2: + - start_node = 30,30,30 + - goal_node = 350,160,30 + - RPM1 = 50 + - RPM2 = 100 + - clearance = 50 + + +- Part2: Open Terminal & Run the following commands + - Note: Ensure your inside the repository folder + ``` + cd Part02 + ``` + - Source the ROS2 environment + ``` + source /opt/ros/galactic/setup.bash + ``` + or + ``` + source /ros/galactic/setup.bash + ``` + - Build the packages + ``` + colcon build + ``` + - Source the workspace + ``` + source . install/setup.bash + ``` + - Run the launch file with the following arguments: + - General Syntax: + ``` + ros2 launch turtlebot_path_planning turtlebot_astar_simulation.launch.py 'start_node:=[]' 'goal_node:=[]' 'RPM1:=' 'RPM2:=' 'clearance:=' + ``` + + - Note: Follow the same syntax & units for input arguments for random test cases + - RPM1 & RPM2 are in Revolutions per minute + - Clearance is in mm + - Start Node & Goal Node are in the format: [x,y,theta], where x & y are in meters and theta is in degrees + + - Test Case 1: + - Start Node: [-0.2,-0.7,30.0] + - Goal Node: [3.0,0.6,30.0] + - RPM1: 50.0 + - RPM2: 100.0 + - Clearance: 50.0 + + ``` + ros2 launch turtlebot_path_planning turtlebot_astar_simulation.launch.py 'start_node:=[-0.2,-0.7,30.0]' 'goal_node:=[3.0,0.6,30.0]' 'RPM1:=50.0' 'RPM2:=100.0' 'clearance:=50.0' + ``` + - Test Case 2: + - Start Node: [0.0,0.0,30.0] + - Goal Node: [1.7,0.5,30.0] + - RPM1: 50.0 + - RPM2: 100.0 + - Clearance: 50.0 + + ``` + ros2 launch turtlebot_path_planning turtlebot_astar_simulation.launch.py 'start_node:=[0.0,0.0,30.0]' 'goal_node:=[1.7,0.5,30.0]' 'RPM1:=50.0' 'RPM2:=100.0' 'clearance:=50.0' + ``` - Note: - - To find the optimal path for goal node placed on the right side of hexagon, it takes approx 6 to 7mins depending on your cpu speed. So, please be patient. - - The threshold used for node closeness comparison is 2.5 units (0.5 * radius of robot), so please provide the step size above 2.5 units for finding the path. It changes dynamicallyy based on the radius of the robot provided by user. Make sure the step size is above 0.5 * radius of the robot. + - The simulation performs well for both the above test cases. In general it works well for all the test cases except a few. Like the turtlebot is not able to follow the trajectory accurately due to lack of a controller hence, it wobbles and deviates a lot while traversing the waypoints. + - As a result sometimes it hits obstacles or the boundary of the map and the simulation stops. + - Hence, to verify whether path is generated by A* algorithm, the 2D map is also enabled in the simulation. So that you can cross compare how the robot traverses way points indicated in blue in the 2D map. + - The simuation works for a set of goal points above the circle but only works for very few cases when the goal is below or righ side of the circle. ## Obstacle Space - Map: -- Adobe Photoshop generated: - -![Canvas](https://user-images.githubusercontent.com/78305300/226239170-55cbf178-080e-4550-a2e5-8734c773b690.png) +- OpenCV 2D Visualization: -- Desmos Visualization: - -![Canvas_Desmos](https://user-images.githubusercontent.com/78305300/226239180-21341f99-308c-4c84-a3f6-ec63ddbf447d.png) +![Obstacle_Space_Map](https://user-images.githubusercontent.com/112987383/230250741-1ed85db1-178c-4c36-87fe-f6015a5a2674.png) -## Demo Video: - - Video shows exploration of nodes and finding the goal node for a circular robot given start node, goal node, robot radius and clerance from obstacles. Once the shortest path is found, it is backtracked and visualized using openCV. - -https://user-images.githubusercontent.com/78305300/226238687-4ec5a996-c4e9-45fa-b35a-5d853e70b8a6.mp4 +- Gazebo 3D Visualization: + +![Screenshot from 2023-04-08 10-38-59](https://user-images.githubusercontent.com/78305300/230730573-b3d242b9-1a3f-4eff-bd41-194b1bf921b9.png) +## Demo Video: + - Video shows exploration of nodes and finding the goal node for a turtlebot3 burger given start node, goal node, 2 set of RPM, and clerance from obstacles. Robot radius 'r' and Length between 2 wheels 'L' is obtained from Turtlebot3 datasheet. Once the shortest path is found, it is backtracked and visualized using openCV 2D visualization. + - ROS logger info and print commands are utilized in regular intervals to show the processing of the obtained launch arguements and progress of the A* algorithm. + - Once the path is visualized in 2D, the linear & angular velocities are calculated and published to the turtlebot3 burger and the robot is made to follow the path which is visualized in 3D using Gazebo. + - First video shows only the 2D visualization and the second video shows the 2D and 3D visualization of the robot following the path. + - Video 1: + +
+