planning.py 9.68 KiB
# UPDATE THIS WHEN JOSH SENDS FEN AND BEST MOVE
# from chess_engine import fen_notation, best_move
import numpy as np
import heapq
import serial
from chess_ai import chess_AI, cheat_check
import chess
import time
'''
When I get input:
input[0] = from
input[1] = to
input[2] = fen_string
input[3] = ?
input[4] = ?
input[5] = ?
input[6] = ?
'''
class Path_planning():
def __init__(self, board_width):
self.board_width = board_width
self.box_width = board_width / 8
self.column_letters = np.array(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])
self.row_numbers = np.array(['8', '7', '6', '5', '4', '3', '2', '1'])
self.walls_array = np.zeros((17, 17))
def filling_board(self, fen_string):
fen_rows = fen_string.split("/")
colNum = 1
rowNum = 1
# this turns the FEN string into the board state by saying where the piece can and cannot go
for row in fen_rows:
for character in row:
# print(character)
if(character == ' '):
break
try:
offset = int(character)
colNum += 2*offset
except ValueError:
self.walls_array[rowNum, colNum] = 1
colNum += 2 # same reasoning as above
# print("letter: ", character)
colNum = 1
rowNum += 2
# print(self.walls_array)
def A_star_traversal(self, move_to_execute, capture):
col_from = 0
col_to = 0
row_to = 0
row_from = 0
# move_to_execute = example_best_move
for move in enumerate(move_to_execute):
print(move)
letter_num = move[1].split(" ")
if(move[0] == 0):
col_from = (np.where(self.column_letters == letter_num[0])[0][0] * 2) + 1 # All multiplied by 2 to get the actual box index in wall array
row_from = (np.where(self.row_numbers == letter_num[1])[0][0] * 2) + 1
else:
col_to = (np.where(self.column_letters == letter_num[0])[0][0] * 2) + 1
row_to = (np.where(self.row_numbers == letter_num[1])[0][0] * 2) + 1
if(capture):
row_to -= 1 # We want the top left corner of board
col_to -= 1
print(row_from, col_from, row_to, col_to )
# print(self.walls_array)
#start of traversal
frontier = []
heapq.heapify(frontier)
path = []
explored = []
visited = []
heapq.heapify(explored)
cellParent = dict()
start = (row_from, col_from)
goal = (row_to, col_to)
end = ()
g = 0
h=0
f=0
neighbors = []
heapq.heappush(frontier, (0, 0, start, None)) # f val, g val, coords, parent coords
while(len(frontier) != 0):
curCell = frontier[0]
if(curCell[2] in visited):
heapq.heappop(frontier)
else:
heapq.heappush(explored, curCell)
visited.append(curCell[2])
heapq.heappop(frontier)
# print("Current Cell: ", curCell)
if(curCell[2] == goal):
end = curCell
break
# neighbors = maze.neighbors(curCell[2][0], curCell[2][1])
for rowOffset in range(-1, 2):
for colOffset in range(-1, 2):
if((rowOffset == -1 or rowOffset == 1) and colOffset != 0):
continue
if(self.walls_array[(curCell[2][0] + rowOffset)][(curCell[2][1] + colOffset)] == 0 or (curCell[2][0]+rowOffset == goal[0] and curCell[2][1]+colOffset == goal[1])): #If there is no wall from the cell we're at and were we could go in a square around the cell, add it as a neighbor
neighbors.append((curCell[2][0]+rowOffset, curCell[2][1]+colOffset))
# print(neighbors)
for cell in neighbors:
if(cell not in visited):
g = curCell[1] + 1
numWalls = 0
for rowOffset in range(-1, 2):
for colOffset in range(-1, 2):
if(((rowOffset == -1 or rowOffset == 1) and colOffset != 0) or (colOffset == 0 and rowOffset == 0) or (curCell[2] == (cell[0]+rowOffset, cell[1]+colOffset))):
continue
if(self.walls_array[(cell[0] + rowOffset)][(cell[1] + colOffset)] == 1): #If there is no wall from the cell we're at and were we could go in a square around the cell, add it as a neighbor
numWalls += 1
h = abs(goal[0] - cell[0]) + abs(goal[1] - cell[1]) + numWalls
f = g + h
# print(cell, f)
heapq.heappush(frontier, (f, g, cell, curCell[2]))
cellParent[cell] = curCell
# print(frontier)
neighbors = []
path.append(end[2])
parentCell = end[3]
# print(explored)
while(parentCell != None):
path.append(parentCell)
# print(path)
for x in explored:
# print(x, parentCell)
if(x[2] == parentCell):
parentCell = x[3]
break
path.reverse()
print(path)
return path
def serialSend(self, message, cheating, promotion, castling):
ser = serial.Serial("/dev/serial/by-id/usb-Silicon_Labs_CP2102N_USB_to_UART_Bridge_Controller_3ca0a1e14c55ed11b2b78f131d62bc44-if00-port0", 9600) # open first serial port CHANGE ON RASPBERRY PI
ser1 = serial.Serial("/dev/serial/by-id/usb-FTDI_FT232R_USB_UART_A5XK3RJT-if00-port0", 19200)
# print(ser.portstr) # check which port was really used
done = False
if promotion:
message += '^'
message += ';'
if not cheating:
message += '>'
print(message)
# temp = "hello\n"
ser.write(message.encode()) # write a string
ser1.write(message.encode())
if(not cheating):
while(not done):
retVal = ser.readline(4)
print(retVal)
if(retVal == b'Done' or retVal == 4):
done = True
# line = ser.readline()
# print(line)
ser.close()
ser1.close()
return
def pathToSerial(self, path, capLength):
serialMessage = ""
counter = 0
for y,x in path:
serialMessage += str(x)
serialMessage += ';'
serialMessage += str(y)
serialMessage += ';'
counter += 1
if(counter == capLength):
serialMessage += '<'
serialMessage += ';'
print(serialMessage)
return serialMessage
def sendCheating(self, is_cheating):
if is_cheating:
self.serialSend('?', 1, 0, 0)
else:
self.serialSend('@', 1, 0, 0)
def chessExecuteMove(self, AI_output):
print(AI_output)
captureLocation = "a 8"
example_fen = AI_output[3]
example_best_move = AI_output[0]
if len(AI_output[1]) == 2:
example_capture = AI_output[1]
elif len(AI_output[1]) == 1:
example_capture = AI_output[1]
example_capture.append(captureLocation)
print(example_capture)
else:
example_capture = ''
cheating = AI_output[2][5]
castling = AI_output[2][2]
capture = AI_output[2][3]
enPasant = AI_output[2][1]
promotion = AI_output[2][4]
capLength = -1
# if cheating:
# self.serialSend('?', cheating, promotion, castling)
# else:
self.filling_board(example_fen)
if(capture or castling or enPasant):
capPath = self.A_star_traversal(example_capture, capture)
capLength = len(capPath)
if(capture):
capture = False
if(castling):
captureString = example_capture[0] + " " + example_capture[1]
print(captureString)
captureString = captureString.replace(" ", "")
print(captureString)
temp_board = chess.Board("r3k2K/1b1p1p2/p3p1q1/6p1/2P5/P3P3/1P2B3/R1R1Q3 b q - 7 31")
temp_board.push(chess.Move.from_uci(captureString))
new_FEN_string = temp_board.fen()
self.filling_board(new_FEN_string)
piecePath = self.A_star_traversal(example_best_move, capture)
capPath.extend(piecePath)
self.serialSend(self.pathToSerial(capPath, capLength), cheating, promotion, castling)
# # try to delay motors
# # time.sleep(5)
else:
piecePath = self.A_star_traversal(example_best_move, capture)
self.serialSend(self.pathToSerial(piecePath, capLength), cheating, promotion, castling)
def main():
# AI_output = chess_AI("r1bq1r2/pp2n3/4N2k/3pPppP/1b1n2Q1/2N5/PP3PP1/R1B1K2R w KQ g6 0 15", 0, 0)[0] #enpasaent possible
AI_output = chess_AI("r3k2K/1b1p1p2/p3p1q1/6p1/2P5/P3P3/1P2B3/R1R1Q3 b q - 0 1", "", 0)[0] # castle possible
# cheatingFlag = cheat_check("r3k2K/1b1p1p2/p3p1q1/6p1/2P5/P3P3/1P2B3/R1R1Q3 b q - 7 31","r3k2K/1b1p1p2/p3p1q1/6p1/2P5/P3P3/1P2B3/R1R1Q3 b q - 7 31")
mychess = Path_planning(23)
mychess.chessExecuteMove(AI_output)
if __name__ == '__main__':
main()