In part 1, we ran through how to apply the A* path finding algorithm on a grid of data. We discussed that this approach was possibly a little naive in the real world which include more interesting scenarios such as one-way streets.
Note: It's definitely worth going through part 1 before carrying on here, as I won't go into any details on A* in this particular post.
I'd like to step away from relying on grid positions per se, in the real world your data will not fit specifically to a grid, it will no doubt have some funky shape. Of course, even with this funky shape you could turn it into a grid using the max/min of the x/y axes, creating a rectangle and tagging each square outside as a 'no-travel' zone but I feel this could be quite inefficient as it increases the number of 'potential' neighbours that the algorithm needs to check.
I'd rather just include the positions that exist, and then regarding neighbours, only check ones that I know exist and that we can travel between.
Let's take a look at the image below:
Firstly, in this new approach, I don't want to even consider the positions that would be (4,3) and (4,1) as they don't exist in the data. We could never travel to or from them so why should we waste our precious processing time even glancing sideways at them!
Secondly, we have an interesting situation at position (2,2) - it's a one way street! The only scenario where you can travel into position (2,2) is from position (3,2). Position (2,2) can only go on to position (1,2). None of the other positions can be involved in this, and unsurprisingly, as it's a one way street, we can't travel the other way (i.e. from left to right)
In part 1, there wasn't really a way to deal with positions that have 'custom' neighbours...so I want to re-jig the logic here so that we can.
First, let's import the python packages we need:
import numpy as np import heapq import pandas as pd from collections import OrderedDict
Now, let's create the data to represent our example grid above.
We want coordinates that represent all the possible positions at which we can reside, and then the related pairs for each position, of which you can travel to.
x1 = [3,3,3,3,3,3,3,3,2,2,2,2,2,2,2,2,1,1,1,1,1,1,1,1,3,2,4,4,4,3] y1 = [3,3,2,2,2,2,1,1,3,3,3,3,1,1,1,1,3,3,2,2,2,2,1,1,2,2,2,2,4,2] x2 = [3,2,3,3,2,2,3,2,3,3,1,1,3,3,1,1,2,1,3,2,2,1,2,1,2,1,3,3,3,4] y2 = [2,3,3,1,3,1,1,1,3,2,3,2,2,1,2,1,3,2,1,3,1,1,1,2,2,2,2,3,1,2]
coord_pairs = pd.DataFrame( OrderedDict((('x1', pd.Series(x1)), ('y1', pd.Series(y1)), ('x2', pd.Series(x2)), ('y2', pd.Series(y2))))) coord_pairs = coord_pairs.sort_values(['x1', 'y1'], ascending=[True,True]) print(coord_pairs)
x1 and y1 are the positions we can reside. For each of these rows, we have the corresponding positions (x2 and y2) that you are able to travel to from that initial position.
You'll see that position (3,2) is the only one that has a pair of (2,2) as it's the only position that can go on to travel to this point. Also, (2,2) has a pairing with (1,2) but not vice versa.
Note: We use OrderedDict to ensure the columns are in the right order. This only really matters as we want to be able to easily see what is what.
I'll now run through the changes in the code from what we used in part 1. They're fairly minor changes in terms of the code.
First, instead of having a fixed set of neighbours in our astar function, we instead create another function which will lookup the available neighbours when we're at any specific position.
def available_neighbours(current_x,current_y): return list(zip(coord_pairs.loc[(coord_pairs.x1 == current_x) & (coord_pairs.y1 == current_y)][["x2"]].x2, coord_pairs.loc[(coord_pairs.x1 == current_x) & (coord_pairs.y1 == current_y)][["y2"]].y2))
Based on our current x and y coordinate, this function looks up those values in our coord_pairs dataframe and returns a list of available neighbours. This means that it will be dynamic for each location.
The heuristic function we had last time stays exactly as it was:
def heuristic(a, b): return np.sqrt((b[0] - a[0]) ** 2 + (b[1] - a[1]) ** 2)
The code for the updated astar function is as follows:
def astar(start, goal):
close_set = set() came_from = {} gscore = {start:0} fscore = {start:heuristic(start, goal)} oheap = []
heapq.heappush(oheap, (fscore[start], start)) while oheap:
current = heapq.heappop(oheap)[1] neighbours = available_neighbours(current[0],current[1])
if current == goal: data = [] while current in came_from: data.append(current) current = came_from[current] return data
close_set.add(current) for i, j in neighbours: neighbour = i, j tentative_g_score = gscore[current] + heuristic(current, neighbour) if neighbour in close_set and tentative_g_score >= gscore.get(neighbour, 0): continue if tentative_g_score < gscore.get(neighbour, 0) or neighbour not in [i[1]for i in oheap]: came_from[neighbour] = current gscore[neighbour] = tentative_g_score fscore[neighbour] = tentative_g_score + heuristic(neighbour, goal) heapq.heappush(oheap, (fscore[neighbour], neighbour)) return False
It's important to note that the astar function now only takes two elements - the start and goal coordinates. We no longer take in the grid as we're basing our decisions purely on coordinate pairings
Next, we've removed the fixed list of neighbours and replaced it with the call to the available_neighbours function we described earlier. This now lives inside the while oheap loop as it needs to be dynamic.
Lastly, as we're no longer using a grid, we don't need any of the code that ensures our neighbours aren't outside the bounds of that grid. The code that has been removed are these lines:
if 0 <= neighbor[0] < array.shape[0]: if 0 <= neighbor[1] < array.shape[1]: if array[neighbor[0]][neighbor[1]] == 1: continue else: # array bound y walls continue else: # array bound x walls continue
That's it - we're ready to test the new code!
Let's now do a very simple experiment to see this all in action. Remembering that we have a one way street at (2,2) - if we want to go from (4,2) to (1,2) what path will we get? What about the reverse, from (1,2) to (4,2)?
Path 1: (4,2) to (1,2)
Specify our start and goal
start = (4,2) goal = (1,2)
Run our functions and then get the route just like in part 1:
route = astar(start, goal) route = route + [start] route = route[::-1] print(route)
[(4, 2), (3, 2), (2, 2), (1, 2)]
It seems to have worked, it was able to travel straight across from right to left, through the one way road to our destination!
Path 2: (1,2) to (4,2)
Specify our start and goal
start = (1,2) goal = (4,2)
Run our functions and get the route just like before:
[(1, 2), (2, 1), (3, 2), (4, 2)]
Cool, it worked again! It wasn't possible for it to travel the wrong way down the one way road so it had to go around...
Hopefully this has been useful and/or interesting. This was an extremely simple scenario, but I think in general this is a better solution for real world data...
Next up in part 3 - we'll see if we can get this working on 3 dimensions!
FULL CODE:
##############################################################################
# import packages
##############################################################################
import numpy as np
import heapq
import pandas as pd
from collections import OrderedDict
##############################################################################
# coordinate pairs
##############################################################################
x1 = [3,3,3,3,3,3,3,3,2,2,2,2,2,2,2,2,1,1,1,1,1,1,1,1,3,2,4,4,4,3]
y1 = [3,3,2,2,2,2,1,1,3,3,3,3,1,1,1,1,3,3,2,2,2,2,1,1,2,2,2,2,4,2]
x2 = [3,2,3,3,2,2,3,2,3,3,1,1,3,3,1,1,2,1,3,2,2,1,2,1,2,1,3,3,3,4]
y2 = [2,3,3,1,3,1,1,1,3,2,3,2,2,1,2,1,3,2,1,3,1,1,1,2,2,2,2,3,1,2]
coord_pairs = pd.DataFrame( OrderedDict((('x1', pd.Series(x1)), ('y1', pd.Series(y1)), ('x2', pd.Series(x2)), ('y2', pd.Series(y2)))))
coord_pairs = coord_pairs.sort_values(['x1', 'y1'], ascending=[True,True])
print(coord_pairs)
##############################################################################
# specify start and goal positions
##############################################################################
start = (4,2)
goal = (1,2)
#start = (1,2)
#goal = (4,2)
##############################################################################
# a* path finding functions
##############################################################################
def available_neighbours(current_x,current_y):
return list(zip(coord_pairs.loc[(coord_pairs.x1 == current_x) & (coord_pairs.y1 == current_y)][["x2"]].x2, coord_pairs.loc[(coord_pairs.x1 == current_x) & (coord_pairs.y1 == current_y)][["y2"]].y2))
def heuristic(a, b):
return np.sqrt((b[0] - a[0]) ** 2 + (b[1] - a[1]) ** 2)
def astar(start, goal):
close_set = set()
came_from = {}
gscore = {start:0}
fscore = {start:heuristic(start, goal)}
oheap = []
heapq.heappush(oheap, (fscore[start], start))
while oheap:
current = heapq.heappop(oheap)[1]
neighbours = available_neighbours(current[0],current[1])
if current == goal:
data = []
while current in came_from:
data.append(current)
current = came_from[current]
return data
close_set.add(current)
for i, j in neighbours:
neighbour = i, j
tentative_g_score = gscore[current] + heuristic(current, neighbour)
if neighbour in close_set and tentative_g_score >= gscore.get(neighbour, 0):
continue
if tentative_g_score < gscore.get(neighbour, 0) or neighbour not in [i[1]for i in oheap]:
came_from[neighbour] = current
gscore[neighbour] = tentative_g_score
fscore[neighbour] = tentative_g_score + heuristic(neighbour, goal)
heapq.heappush(oheap, (fscore[neighbour], neighbour))
return False
##############################################################################
# calculate route
##############################################################################
route = astar(start, goal)
route = route + [start]
route = route[::-1]
print(route)