my_air_cargo_problems.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. from aimacode.logic import PropKB
  2. from aimacode.planning import Action
  3. from aimacode.search import (
  4. Node, Problem,
  5. )
  6. from aimacode.utils import expr
  7. from lp_utils import (
  8. FluentState, encode_state, decode_state,
  9. )
  10. from my_planning_graph import PlanningGraph
  11. from functools import lru_cache
  12. class AirCargoProblem(Problem):
  13. def __init__(self, cargos, planes, airports, initial: FluentState, \
  14. goal: list):
  15. """
  16. :param cargos: list of str
  17. cargos in the problem
  18. :param planes: list of str
  19. planes in the problem
  20. :param airports: list of str
  21. airports in the problem
  22. :param initial: FluentState object
  23. positive and negative literal fluents (as expr)
  24. describing initial state
  25. :param goal: list of expr
  26. literal fluents required for goal test
  27. """
  28. self.state_map = initial.pos + initial.neg
  29. self.initial_state_TF = encode_state(initial, self.state_map)
  30. Problem.__init__(self, self.initial_state_TF, goal=goal)
  31. self.cargos = cargos
  32. self.planes = planes
  33. self.airports = airports
  34. self.actions_list = self.get_actions()
  35. def get_actions(self):
  36. """
  37. This method creates concrete actions (no variables) for all actions
  38. in the problem domain action schema and turns them into complete Action
  39. objects as defined in the aimacode.planning module. It is
  40. computationally expensive to call this method directly;
  41. however, it is called in the constructor and the results cached
  42. in the `actions_list` property.
  43. RETURNS:
  44. list<Action>
  45. list of Action objects
  46. """
  47. def load_actions():
  48. """Create all concrete Load actions and return a list.
  49. :return: list of Action objects
  50. """
  51. loads = []
  52. for cargo in self.cargos:
  53. for plane in self.planes:
  54. for airport in self.airports:
  55. precond_pos = [
  56. expr("At({}, {})".format(cargo, airport)),
  57. expr("At({}, {})".format(plane, airport))
  58. ]
  59. precond_neg = []
  60. effect_add = [expr("In({}, {})".format(cargo, plane))]
  61. effect_rem = [expr("At({}, {})".format(cargo, airport))]
  62. load = Action (
  63. expr("Load({}, {}, {})"\
  64. .format(cargo, plane, airport)),
  65. [precond_pos, precond_neg],
  66. [effect_add, effect_rem]
  67. )
  68. loads.append(load)
  69. return loads
  70. def unload_actions():
  71. """Create all concrete Unload actions and return a list.
  72. :return: list of Action objects
  73. """
  74. unloads = []
  75. for cargo in self.cargos:
  76. for plane in self.planes:
  77. for airport in self.airports:
  78. precond_pos = [
  79. expr("In({}, {})".format(cargo, plane)),
  80. expr("At({}, {})".format(plane, airport))
  81. ]
  82. precond_neg = []
  83. effect_add = [expr("At({}, {})".format(cargo, airport))]
  84. effect_rem = [expr("In({}, {})".format(cargo, plane))]
  85. unload = Action (
  86. expr("Unload({}, {}, {})"\
  87. .format(cargo, plane, airport)),
  88. [precond_pos, precond_neg],
  89. [effect_add, effect_rem]
  90. )
  91. unloads.append(unload)
  92. return unloads
  93. def fly_actions():
  94. """Create all concrete Fly actions and return a list.
  95. :return: list of Action objects
  96. """
  97. flys = []
  98. for fr in self.airports:
  99. for to in self.airports:
  100. if fr != to:
  101. for p in self.planes:
  102. precond_pos = [expr("At({}, {})".format(p, fr)),
  103. ]
  104. precond_neg = []
  105. effect_add = [expr("At({}, {})".format(p, to))]
  106. effect_rem = [expr("At({}, {})".format(p, fr))]
  107. fly = Action(expr("Fly({}, {}, {})"\
  108. .format(p, fr, to)),
  109. [precond_pos, precond_neg],
  110. [effect_add, effect_rem])
  111. flys.append(fly)
  112. return flys
  113. return load_actions() + unload_actions() + fly_actions()
  114. def actions(self, state: str) -> list:
  115. """Return the actions that can be executed in the given state.
  116. :param state: str
  117. state represented as T/F string of mapped fluents (state variables)
  118. e.g. 'FTTTFF'
  119. :return: list of Action objects
  120. """
  121. kb = PropKB()
  122. kb.tell(decode_state(state, self.state_map).pos_sentence())
  123. possible_actions = []
  124. for action in self.actions_list:
  125. # Assume action is possible
  126. is_action_possible = True
  127. for c in action.precond_neg:
  128. if c in kb.clauses:
  129. is_action_possible = False
  130. break # No need to continue search
  131. # Only check if action is still possible
  132. if is_action_possible:
  133. for c in action.precond_pos:
  134. if c not in kb.clauses:
  135. is_action_possible = False
  136. break # No need to continue search
  137. if is_action_possible: possible_actions.append(action)
  138. return possible_actions
  139. def result(self, state: str, action: Action):
  140. """ Return the state that results from executing the given
  141. action in the given state. The action must be one of
  142. self.actions(state).
  143. :param state: state entering node
  144. :param action: Action applied
  145. :return: resulting state after action
  146. """
  147. prev_state = decode_state(state, self.state_map)
  148. next_state = FluentState([], [])
  149. for fluent in prev_state.pos:
  150. if fluent not in action.effect_rem:
  151. next_state.pos.append(fluent)
  152. for fluent in prev_state.neg:
  153. if fluent not in action.effect_add:
  154. next_state.neg.append(fluent)
  155. for fluent in action.effect_add:
  156. if fluent not in next_state.pos:
  157. next_state.pos.append(fluent)
  158. for fluent in action.effect_rem:
  159. if fluent not in next_state.neg:
  160. next_state.neg.append(fluent)
  161. return encode_state(next_state, self.state_map)
  162. def goal_test(self, state: str) -> bool:
  163. """ Test the state to see if goal is reached
  164. :param state: str representing state
  165. :return: bool
  166. """
  167. kb = PropKB()
  168. kb.tell(decode_state(state, self.state_map).pos_sentence())
  169. for clause in self.goal:
  170. if clause not in kb.clauses:
  171. return False
  172. return True
  173. def h_1(self, node: Node):
  174. # Note that this is not a true heuristic
  175. h_const = 1
  176. return h_const
  177. # @lru_cache(maxsize=8192)
  178. def h_pg_levelsum(self, node: Node):
  179. """This heuristic uses a planning graph representation of the problem
  180. state space to estimate the sum of all actions that must be carried
  181. out from the current state in order to satisfy each individual goal
  182. condition.
  183. """
  184. # Requires implemented PlanningGraph class
  185. pg = PlanningGraph(self, node.state)
  186. pg_levelsum = pg.h_levelsum()
  187. return pg_levelsum
  188. # @lru_cache(maxsize=8192)
  189. def h_ignore_preconditions(self, node: Node):
  190. """This heuristic estimates the minimum number of actions that must be
  191. carried out from the current state in order to satisfy all of the goal
  192. conditions by ignoring the preconditions required for an action to be
  193. executed.
  194. """
  195. kb = PropKB()
  196. kb.tell(decode_state(node.state, self.state_map).pos_sentence())
  197. kb_clauses = kb.clauses
  198. actions_count = 0
  199. for clause in self.goal:
  200. if clause not in kb_clauses:
  201. actions_count += 1
  202. return actions_count
  203. def air_cargo_p1() -> AirCargoProblem:
  204. cargos = ['C1', 'C2']
  205. planes = ['P1', 'P2']
  206. airports = ['JFK', 'SFO']
  207. pos = [
  208. expr('At(C1, SFO)'),
  209. expr('At(C2, JFK)'),
  210. expr('At(P1, SFO)'),
  211. expr('At(P2, JFK)'),
  212. ]
  213. neg = [
  214. expr('At(C2, SFO)'),
  215. expr('In(C2, P1)'),
  216. expr('In(C2, P2)'),
  217. expr('At(C1, JFK)'),
  218. expr('In(C1, P1)'),
  219. expr('In(C1, P2)'),
  220. expr('At(P1, JFK)'),
  221. expr('At(P2, SFO)'),
  222. ]
  223. init = FluentState(pos, neg)
  224. goal = [
  225. expr('At(C1, JFK)'),
  226. expr('At(C2, SFO)'),
  227. ]
  228. return AirCargoProblem(cargos, planes, airports, init, goal)
  229. def air_cargo_p2() -> AirCargoProblem:
  230. cargos = ['C1', 'C2', 'C3']
  231. planes = ['P1', 'P2', 'P3']
  232. airports = ['JFK', 'SFO', 'ATL']
  233. pos = [
  234. expr('At(C1, SFO)'),
  235. expr('At(C2, JFK)'),
  236. expr('At(C3, ATL)'),
  237. expr('At(P1, SFO)'),
  238. expr('At(P2, JFK)'),
  239. expr('At(P3, ATL)')
  240. ]
  241. neg = [
  242. expr('At(C1, JFK)'),
  243. expr('At(C1, ATL)'),
  244. expr('In(C1, P1)'),
  245. expr('In(C1, P2)'),
  246. expr('In(C1, P3)'),
  247. expr('At(C2, SFO)'),
  248. expr('At(C2, ATL)'),
  249. expr('In(C2, P1)'),
  250. expr('In(C2, P2)'),
  251. expr('In(C2, P3)'),
  252. expr('At(C3, SFO)'),
  253. expr('At(C3, JFK)'),
  254. expr('In(C3, P1)'),
  255. expr('In(C3, P2)'),
  256. expr('In(C3, P3)'),
  257. expr('At(P1, JFK)'),
  258. expr('At(P1, ATL)'),
  259. expr('At(P2, SFO)'),
  260. expr('At(P2, ATL)'),
  261. expr('At(P3, JFK)'),
  262. expr('At(P3, SFO)')
  263. ]
  264. init = FluentState(pos, neg)
  265. goal = [
  266. expr('At(C1, JFK)'),
  267. expr('At(C2, SFO)'),
  268. expr('At(C3, SFO)')
  269. ]
  270. return AirCargoProblem(cargos, planes, airports, init, goal)
  271. def air_cargo_p3() -> AirCargoProblem:
  272. cargos = ['C1', 'C2', 'C3', 'C4']
  273. planes = ['P1', 'P2']
  274. airports = ['JFK', 'SFO', 'ATL', 'ORD']
  275. pos = [
  276. expr('At(C1, SFO)'),
  277. expr('At(C2, JFK)'),
  278. expr('At(C3, ATL)'),
  279. expr('At(C4, ORD)'),
  280. expr('At(P1, SFO)'),
  281. expr('At(P2, JFK)')
  282. ]
  283. neg = [
  284. expr('At(C1, JFK)'),
  285. expr('At(C1, ATL)'),
  286. expr('At(C1, ORD)'),
  287. expr('In(C1, P1)'),
  288. expr('In(C1, P2)'),
  289. expr('At(C2, SFO)'),
  290. expr('At(C2, ATL)'),
  291. expr('At(C2, ORD)'),
  292. expr('In(C2, P1)'),
  293. expr('In(C2, P2)'),
  294. expr('At(C3, SFO)'),
  295. expr('At(C3, JFK)'),
  296. expr('At(C3, ORD)'),
  297. expr('In(C3, P1)'),
  298. expr('In(C3, P2)'),
  299. expr('At(C4, SFO)'),
  300. expr('At(C4, JFK)'),
  301. expr('At(C4, ATL)'),
  302. expr('In(C4, P1)'),
  303. expr('In(C4, P2)'),
  304. expr('At(P1, JFK)'),
  305. expr('At(P1, ATL)'),
  306. expr('At(P1, ORD)'),
  307. expr('At(P2, SFO)'),
  308. expr('At(P2, ATL)'),
  309. expr('At(P2, ORD)')
  310. ]
  311. init = FluentState(pos, neg)
  312. goal = [
  313. expr('At(C1, JFK)'),
  314. expr('At(C3, JFK)'),
  315. expr('At(C2, SFO)'),
  316. expr('At(C4, SFO)')
  317. ]
  318. return AirCargoProblem(cargos, planes, airports, init, goal)