agent_test.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549
  1. """
  2. This file contains test cases to verify the correct implementation of the
  3. functions required for this project including minimax, alphabeta, and iterative
  4. deepening. The heuristic function is tested for conformance to the expected
  5. interface, but cannot be automatically assessed for correctness.
  6. STUDENTS SHOULD NOT NEED TO MODIFY THIS CODE. IT WOULD BE BEST TO TREAT THIS
  7. FILE AS A BLACK BOX FOR TESTING.
  8. """
  9. import unittest
  10. import timeit
  11. import sys
  12. import isolation
  13. import game_agent
  14. from collections import Counter
  15. from copy import copy
  16. from functools import wraps
  17. from queue import Queue
  18. from threading import Thread
  19. from multiprocessing import TimeoutError
  20. from queue import Empty as QueueEmptyError
  21. from importlib import reload
  22. TIMEOUT = 60
  23. WRONG_MOVE = """
  24. The {} function failed because it returned a non-optimal move at search
  25. depth {}.
  26. Valid choices: {}
  27. Your selection: {}
  28. """
  29. WRONG_NUM_EXPLORED = """
  30. Your {} search visited the wrong nodes at search depth {}. If the number
  31. of visits is too large, make sure that iterative deepening is only
  32. running when the `iterative` flag is set in the agent constructor.
  33. Max explored size: {}
  34. Number you explored: {}
  35. """
  36. UNEXPECTED_VISIT = """
  37. Your {} search did not visit the number of expected unique nodes at search
  38. depth {}.
  39. Max explored size: {}
  40. Number you explored: {}
  41. """
  42. ID_FAIL = """
  43. Your agent explored the wrong number of nodes using Iterative Deepening and
  44. minimax. Remember that ID + MM should check every node in each layer of the
  45. game tree before moving on to the next layer.
  46. """
  47. INVALID_MOVE = """
  48. Your agent returned an invalid move. Make sure that your function returns
  49. a selection when the search times out during iterative deepening.
  50. Valid choices: {!s}
  51. Your choice: {}
  52. """
  53. WRONG_HEURISTIC = """
  54. Your agent did not use the heuristic provided to the agent constructor
  55. (accessed with self.score()). Make sure that you're not directly calling
  56. your custom_score() heuristic function in alpha-beta or minimax search.
  57. """
  58. TIMER_MARGIN = 15 # time (in ms) to leave on the timer to avoid timeout
  59. def curr_time_millis():
  60. """Simple timer to return the current clock time in milliseconds."""
  61. return 1000 * timeit.default_timer()
  62. def handler(obj, testcase, queue):
  63. """Handler to pass information between threads; used in the timeout
  64. function to abort long-running (i.e., probably hung) test cases.
  65. """
  66. try:
  67. queue.put((None, testcase(obj)))
  68. except:
  69. queue.put((sys.exc_info(), None))
  70. def timeout(time_limit):
  71. """Function decorator for unittest test cases to specify test case timeout.
  72. The timer mechanism works by spawning a new thread for the test to run in
  73. and using the timeout handler for the thread-safe queue class to abort and
  74. kill the child thread if it doesn't return within the timeout.
  75. It is not safe to access system resources (e.g., files) within test cases
  76. wrapped by this timer.
  77. """
  78. def wrapUnitTest(testcase):
  79. @wraps(testcase)
  80. def testWrapper(self):
  81. queue = Queue()
  82. try:
  83. p = Thread(target=handler, args=(self, testcase, queue))
  84. p.daemon = True
  85. p.start()
  86. err, res = queue.get(timeout=time_limit)
  87. p.join()
  88. if err:
  89. raise err[0](err[1]).with_traceback(err[2])
  90. return res
  91. except QueueEmptyError:
  92. raise TimeoutError(
  93. ("Test aborted due to timeout. Test was " +
  94. "expected to finish in less than {} second(s).").format(
  95. time_limit))
  96. return testWrapper
  97. return wrapUnitTest
  98. def makeEvalTable(table):
  99. """Use a closure to create a heuristic function that returns values from
  100. a table that maps board locations to constant values. This supports testing
  101. the minimax and alphabeta search functions.
  102. THIS HEURISTIC IS ONLY USEFUL FOR TESTING THE SEARCH FUNCTIONALITY -
  103. IT IS NOT MEANT AS AN EXAMPLE OF A USEFUL HEURISTIC FOR GAME PLAYING.
  104. """
  105. def score(game, player):
  106. row, col = game.get_player_location(player)
  107. return table[row][col]
  108. return score
  109. def makeEvalStop(limit, timer):
  110. """Use a closure to create a heuristic function that forces the search
  111. timer to expire when a fixed number of node expansions have been perfomred
  112. during the search. This ensures that the search algorithm should always be
  113. in a predictable state regardless of node expansion order.
  114. THIS HEURISTIC IS ONLY USEFUL FOR TESTING THE SEARCH FUNCTIONALITY -
  115. IT IS NOT MEANT AS AN EXAMPLE OF A USEFUL HEURISTIC FOR GAME PLAYING.
  116. """
  117. def score(game, player):
  118. timer.invoked = True
  119. if timer.time_left() < 0:
  120. raise TimeoutError("Timer expired during search. You must " +
  121. "return an answer before the timer reaches 0.")
  122. if limit == game.counts[0]:
  123. timer.time_limit = 0
  124. return 0
  125. return score
  126. def makeBranchEval(first_branch):
  127. """Use a closure to create a heuristic function that evaluates to a nonzero
  128. score when the root of the search is the first branch explored, and
  129. otherwise returns 0. This heuristic is used to force alpha-beta to prune
  130. some parts of a game tree for testing.
  131. THIS HEURISTIC IS ONLY USEFUL FOR TESTING THE SEARCH FUNCTIONALITY -
  132. IT IS NOT MEANT AS AN EXAMPLE OF A USEFUL HEURISTIC FOR GAME PLAYING.
  133. """
  134. def score(game, player):
  135. if not first_branch:
  136. first_branch.append(game.root)
  137. if game.root in first_branch:
  138. return 1.
  139. return 0.
  140. return score
  141. class CounterBoard(isolation.Board):
  142. """Subclass of the isolation board that maintains counters for the number
  143. of unique nodes and total nodes visited during depth first search.
  144. Some functions from the base class must be overridden to maintain the
  145. counters during search.
  146. """
  147. def __init__(self, *args, **kwargs):
  148. super(CounterBoard, self).__init__(*args, **kwargs)
  149. self.counter = Counter()
  150. self.visited = set()
  151. self.root = None
  152. def copy(self):
  153. new_board = CounterBoard(self._player_1, self._player_2,
  154. width=self.width, height=self.height)
  155. new_board.move_count = self.move_count
  156. new_board._active_player = self._active_player
  157. new_board._inactive_player = self._inactive_player
  158. new_board._board_state = copy(self._board_state)
  159. new_board.counter = self.counter
  160. new_board.visited = self.visited
  161. new_board.root = self.root
  162. return new_board
  163. def forecast_move(self, move):
  164. self.counter[move] += 1
  165. self.visited.add(move)
  166. new_board = self.copy()
  167. new_board.apply_move(move)
  168. if new_board.root is None:
  169. new_board.root = move
  170. return new_board
  171. @property
  172. def counts(self):
  173. """ Return counts of (total, unique) nodes visited """
  174. return sum(self.counter.values()), len(self.visited)
  175. class Project1Test(unittest.TestCase):
  176. def initAUT(self, depth, eval_fn, iterative=False,
  177. method="minimax", loc1=(3, 3), loc2=(0, 0), w=7, h=7):
  178. """Generate and initialize player and board objects to be used for
  179. testing.
  180. """
  181. reload(game_agent)
  182. agentUT = game_agent.CustomPlayer(depth, eval_fn, iterative, method)
  183. board = CounterBoard(agentUT, 'null_agent', w, h)
  184. board.apply_move(loc1)
  185. board.apply_move(loc2)
  186. return agentUT, board
  187. @timeout(TIMEOUT)
  188. # @unittest.skip("Skip eval function test.") # Uncomment this line to skip test
  189. def test_heuristic(self):
  190. """Test output interface of heuristic score function interface."""
  191. player1 = "Player1"
  192. player2 = "Player2"
  193. p1_location = (0, 0)
  194. p2_location = (1, 1) # top left corner
  195. game = isolation.Board(player1, player2)
  196. game.apply_move(p1_location)
  197. game.apply_move(p2_location)
  198. self.assertIsInstance(game_agent.custom_score(game, player1), float,
  199. "The heuristic function should return a floating point")
  200. @timeout(TIMEOUT)
  201. # @unittest.skip("Skip simple minimax test.") # Uncomment this line to skip test
  202. def test_minimax_interface(self):
  203. """Test CustomPlayer.minimax interface with simple input """
  204. h, w = 7, 7 # board size
  205. test_depth = 1
  206. starting_location = (5, 3)
  207. adversary_location = (0, 0) # top left corner
  208. iterative_search = False
  209. search_method = "minimax"
  210. heuristic = lambda g, p: 0. # return 0 everywhere
  211. # create a player agent & a game board
  212. agentUT = game_agent.CustomPlayer(
  213. test_depth, heuristic, iterative_search, search_method)
  214. agentUT.time_left = lambda: 99 # ignore timeout for fixed-depth search
  215. board = isolation.Board(agentUT, 'null_agent', w, h)
  216. # place two "players" on the board at arbitrary (but fixed) locations
  217. board.apply_move(starting_location)
  218. board.apply_move(adversary_location)
  219. for move in board.get_legal_moves():
  220. next_state = board.forecast_move(move)
  221. v, _ = agentUT.minimax(next_state, test_depth)
  222. self.assertTrue(type(v) == float,
  223. ("Minimax function should return a floating " +
  224. "point value approximating the score for the " +
  225. "branch being searched."))
  226. @timeout(TIMEOUT)
  227. # @unittest.skip("Skip alphabeta test.") # Uncomment this line to skip test
  228. def test_alphabeta_interface(self):
  229. """Test CustomPlayer.alphabeta interface with simple input """
  230. h, w = 9, 9 # board size
  231. test_depth = 1
  232. starting_location = (2, 7)
  233. adversary_location = (0, 0) # top left corner
  234. iterative_search = False
  235. search_method = "alphabeta"
  236. heuristic = lambda g, p: 0. # return 0 everywhere
  237. # create a player agent & a game board
  238. agentUT = game_agent.CustomPlayer(
  239. test_depth, heuristic, iterative_search, search_method)
  240. agentUT.time_left = lambda: 99 # ignore timeout for fixed-depth search
  241. board = isolation.Board(agentUT, 'null_agent', w, h)
  242. # place two "players" on the board at arbitrary (but fixed) locations
  243. board.apply_move(starting_location)
  244. board.apply_move(adversary_location)
  245. for move in board.get_legal_moves():
  246. next_state = board.forecast_move(move)
  247. v, _ = agentUT.alphabeta(next_state, test_depth)
  248. self.assertTrue(type(v) == float,
  249. ("Alpha Beta function should return a floating " +
  250. "point value approximating the score for the " +
  251. "branch being searched."))
  252. @timeout(TIMEOUT)
  253. # @unittest.skip("Skip get_move test.") # Uncomment this line to skip test
  254. def test_get_move_interface(self):
  255. """Test CustomPlayer.get_move interface with simple input """
  256. h, w = 9, 9 # board size
  257. test_depth = 1
  258. starting_location = (2, 7)
  259. adversary_location = (0, 0) # top left corner
  260. iterative_search = False
  261. search_method = "minimax"
  262. heuristic = lambda g, p: 0. # return 0 everywhere
  263. # create a player agent & a game board
  264. agentUT = game_agent.CustomPlayer(
  265. test_depth, heuristic, iterative_search, search_method)
  266. # Test that get_move returns a legal choice on an empty game board
  267. board = isolation.Board(agentUT, 'null_agent', w, h)
  268. legal_moves = board.get_legal_moves()
  269. move = agentUT.get_move(board, legal_moves, lambda: 99)
  270. self.assertIn(move, legal_moves,
  271. ("The get_move() function failed as player 1 on an " +
  272. "empty board. It should return coordinates on the " +
  273. "game board for the location of the agent's next " +
  274. "move. The move must be one of the legal moves on " +
  275. "the current game board."))
  276. # Test that get_move returns a legal choice for first move as player 2
  277. board = isolation.Board('null_agent', agentUT, w, h)
  278. board.apply_move(starting_location)
  279. legal_moves = board.get_legal_moves()
  280. move = agentUT.get_move(board, legal_moves, lambda: 99)
  281. self.assertIn(move, legal_moves,
  282. ("The get_move() function failed making the first " +
  283. "move as player 2 on a new board. It should return " +
  284. "coordinates on the game board for the location " +
  285. "of the agent's next move. The move must be one " +
  286. "of the legal moves on the current game board."))
  287. # Test that get_move returns a legal choice after first move
  288. board = isolation.Board(agentUT, 'null_agent', w, h)
  289. board.apply_move(starting_location)
  290. board.apply_move(adversary_location)
  291. legal_moves = board.get_legal_moves()
  292. move = agentUT.get_move(board, legal_moves, lambda: 99)
  293. self.assertIn(move, legal_moves,
  294. ("The get_move() function failed as player 1 on a " +
  295. "game in progress. It should return coordinates on" +
  296. "the game board for the location of the agent's " +
  297. "next move. The move must be one of the legal moves " +
  298. "on the current game board."))
  299. @timeout(TIMEOUT)
  300. # @unittest.skip("Skip minimax test.") # Uncomment this line to skip test
  301. def test_minimax(self):
  302. """Test CustomPlayer.minimax
  303. This test uses a scoring function that returns a constant value based
  304. on the location of the search agent on the board to force minimax to
  305. choose a branch that visits those cells at a specific fixed-depth.
  306. If minimax is working properly, it will visit a constant number of
  307. nodes during the search and return one of the acceptable legal moves.
  308. """
  309. h, w = 7, 7 # board size
  310. starting_location = (2, 3)
  311. adversary_location = (0, 0) # top left corner
  312. iterative_search = False
  313. method = "minimax"
  314. # The agent under test starts at position (2, 3) on the board, which
  315. # gives eight (8) possible legal moves [(0, 2), (0, 4), (1, 1), (1, 5),
  316. # (3, 1), (3, 5), (4, 2), (4, 4)]. The search function will pick one of
  317. # those moves based on the estimated score for each branch. The value
  318. # only changes on odd depths because even depths end on when the
  319. # adversary has initiative.
  320. value_table = [[0] * w for _ in range(h)]
  321. value_table[1][5] = 1 # depth 1 & 2
  322. value_table[4][3] = 2 # depth 3 & 4
  323. value_table[6][6] = 3 # depth 5
  324. heuristic = makeEvalTable(value_table)
  325. # These moves are the branches that will lead to the cells in the value
  326. # table for the search depths.
  327. expected_moves = [set([(1, 5)]),
  328. set([(3, 1), (3, 5)]),
  329. set([(3, 5), (4, 2)])]
  330. # Expected number of node expansions during search
  331. counts = [(8, 8), (24, 10), (92, 27), (418, 32), (1650, 43)]
  332. # Test fixed-depth search; note that odd depths mean that the searching
  333. # player (student agent) has the last move, while even depths mean that
  334. # the adversary has the last move before calling the heuristic
  335. # evaluation function.
  336. for idx in range(5):
  337. test_depth = idx + 1
  338. agentUT, board = self.initAUT(test_depth, heuristic,
  339. iterative_search, method,
  340. loc1=starting_location,
  341. loc2=adversary_location)
  342. # disable search timeout by returning a constant value
  343. agentUT.time_left = lambda: 1e3
  344. _, move = agentUT.minimax(board, test_depth)
  345. num_explored_valid = board.counts[0] == counts[idx][0]
  346. num_unique_valid = board.counts[1] == counts[idx][1]
  347. self.assertTrue(num_explored_valid, WRONG_NUM_EXPLORED.format(
  348. method, test_depth, counts[idx][0], board.counts[0]))
  349. self.assertTrue(num_unique_valid, UNEXPECTED_VISIT.format(
  350. method, test_depth, counts[idx][1], board.counts[1]))
  351. self.assertIn(move, expected_moves[idx // 2], WRONG_MOVE.format(
  352. method, test_depth, expected_moves[idx // 2], move))
  353. @timeout(TIMEOUT)
  354. # @unittest.skip("Skip alpha-beta test.") # Uncomment this line to skip test
  355. def test_alphabeta(self):
  356. """Test CustomPlayer.alphabeta
  357. This test uses a scoring function that returns a constant value based
  358. on the branch being searched by alphabeta in the user agent, and forces
  359. the search to prune on every other branch it visits. By using a huge
  360. board where the players are too far apart to interact and every branch
  361. has the same growth factor, the expansion and pruning must result in
  362. an exact number of expanded nodes.
  363. """
  364. h, w = 101, 101 # board size
  365. starting_location = (50, 50)
  366. adversary_location = (0, 0) # top left corner
  367. iterative_search = False
  368. method = "alphabeta"
  369. # The agent under test starts in the middle of a huge board so that
  370. # every branch has the same number of possible moves, so pruning any
  371. # branch has the same effect during testing
  372. # These are the expected number of node expansions for alphabeta search
  373. # to explore the game tree to fixed depth. The custom eval function
  374. # used for this test ensures that some branches must be pruned, while
  375. # the search should still return an optimal move.
  376. counts = [(8, 8), (17, 10), (74, 42), (139, 51), (540, 119)]
  377. for idx in range(len(counts)):
  378. test_depth = idx + 1 # pruning guarantee requires min depth of 3
  379. first_branch = []
  380. heuristic = makeBranchEval(first_branch)
  381. agentUT, board = self.initAUT(test_depth, heuristic,
  382. iterative_search, method,
  383. loc1=starting_location,
  384. loc2=adversary_location,
  385. w=w, h=h)
  386. # disable search timeout by returning a constant value
  387. agentUT.time_left = lambda: 1e3
  388. _, move = agentUT.alphabeta(board, test_depth)
  389. num_explored_valid = board.counts[0] == counts[idx][0]
  390. num_unique_valid = board.counts[1] == counts[idx][1]
  391. self.assertTrue(num_explored_valid, WRONG_NUM_EXPLORED.format(
  392. method, test_depth, counts[idx][0], board.counts[0]))
  393. self.assertTrue(num_unique_valid, UNEXPECTED_VISIT.format(
  394. method, test_depth, counts[idx][1], board.counts[1]))
  395. self.assertIn(move, first_branch, WRONG_MOVE.format(
  396. method, test_depth, first_branch, move))
  397. @timeout(TIMEOUT)
  398. # @unittest.skip("Skip iterative deepening test.") # Uncomment this line to skip test
  399. def test_get_move(self):
  400. """Test iterative deepening in CustomPlayer.get_move
  401. Placing an agent on the game board and performing ID minimax search,
  402. which should visit a specific number of unique nodes while expanding.
  403. By forcing the search to timeout when a predetermined number of nodes
  404. have been expanded, we can then verify that the expected number of
  405. unique nodes have been visited.
  406. """
  407. class DynamicTimer():
  408. """Dynamic Timer allows the time limit to be changed after the
  409. timer is initialized so that the search timeout can be triggered
  410. before the timer actually expires. This allows the timer to expire
  411. when an event occurs, regardless of the clock time required until
  412. the event happens.
  413. """
  414. def __init__(self, time_limit):
  415. self.time_limit = time_limit
  416. self.invoked = False
  417. self.start_time = curr_time_millis()
  418. def time_left(self):
  419. return self.time_limit - (curr_time_millis() - self.start_time)
  420. w, h = 11, 11 # board size
  421. adversary_location = (0, 0)
  422. method = "minimax"
  423. # The agent under test starts at the positions indicated below, and
  424. # performs an iterative deepening minimax search (minimax is easier to
  425. # test because it always visits all nodes in the game tree at every
  426. # level).
  427. origins = [(2, 3), (6, 6), (7, 4), (4, 2), (0, 5), (10, 10)]
  428. exact_counts = [(8, 8), (32, 10), (160, 39), (603, 35), (1861, 54), (3912, 62)]
  429. for idx in range(len(origins)):
  430. # set the initial timer high enough that the search will not
  431. # timeout before triggering the dynamic timer to halt by visiting
  432. # the expected number of nodes
  433. time_limit = 1e4
  434. timer = DynamicTimer(time_limit)
  435. eval_fn = makeEvalStop(exact_counts[idx][0], timer)
  436. agentUT, board = self.initAUT(-1, eval_fn, True, method,
  437. origins[idx], adversary_location,
  438. w, h)
  439. legal_moves = board.get_legal_moves()
  440. chosen_move = agentUT.get_move(board, legal_moves, timer.time_left)
  441. diff_total = abs(board.counts[0] - exact_counts[idx][0])
  442. diff_unique = abs(board.counts[1] - exact_counts[idx][1])
  443. self.assertTrue(timer.invoked, WRONG_HEURISTIC)
  444. self.assertTrue(diff_total <= 1 and diff_unique == 0, ID_FAIL)
  445. self.assertTrue(chosen_move in legal_moves, INVALID_MOVE.format(
  446. legal_moves, chosen_move))
  447. if __name__ == '__main__':
  448. unittest.main()