123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357 |
- from aimacode.logic import PropKB
- from aimacode.planning import Action
- from aimacode.search import (
- Node, Problem,
- )
- from aimacode.utils import expr
- from lp_utils import (
- FluentState, encode_state, decode_state,
- )
- from my_planning_graph import PlanningGraph
- from functools import lru_cache
- class AirCargoProblem(Problem):
- def __init__(self, cargos, planes, airports, initial: FluentState, \
- goal: list):
- """
- :param cargos: list of str
- cargos in the problem
- :param planes: list of str
- planes in the problem
- :param airports: list of str
- airports in the problem
- :param initial: FluentState object
- positive and negative literal fluents (as expr)
- describing initial state
- :param goal: list of expr
- literal fluents required for goal test
- """
- self.state_map = initial.pos + initial.neg
- self.initial_state_TF = encode_state(initial, self.state_map)
- Problem.__init__(self, self.initial_state_TF, goal=goal)
- self.cargos = cargos
- self.planes = planes
- self.airports = airports
- self.actions_list = self.get_actions()
- def get_actions(self):
- """
- This method creates concrete actions (no variables) for all actions
- in the problem domain action schema and turns them into complete Action
- objects as defined in the aimacode.planning module. It is
- computationally expensive to call this method directly;
- however, it is called in the constructor and the results cached
- in the `actions_list` property.
- RETURNS:
- list<Action>
- list of Action objects
- """
- def load_actions():
- """Create all concrete Load actions and return a list.
- :return: list of Action objects
- """
- loads = []
- for cargo in self.cargos:
- for plane in self.planes:
- for airport in self.airports:
- precond_pos = [
- expr("At({}, {})".format(cargo, airport)),
- expr("At({}, {})".format(plane, airport))
- ]
- precond_neg = []
- effect_add = [expr("In({}, {})".format(cargo, plane))]
- effect_rem = [expr("At({}, {})".format(cargo, airport))]
- load = Action (
- expr("Load({}, {}, {})"\
- .format(cargo, plane, airport)),
- [precond_pos, precond_neg],
- [effect_add, effect_rem]
- )
- loads.append(load)
- return loads
- def unload_actions():
- """Create all concrete Unload actions and return a list.
- :return: list of Action objects
- """
- unloads = []
- for cargo in self.cargos:
- for plane in self.planes:
- for airport in self.airports:
- precond_pos = [
- expr("In({}, {})".format(cargo, plane)),
- expr("At({}, {})".format(plane, airport))
- ]
- precond_neg = []
- effect_add = [expr("At({}, {})".format(cargo, airport))]
- effect_rem = [expr("In({}, {})".format(cargo, plane))]
- unload = Action (
- expr("Unload({}, {}, {})"\
- .format(cargo, plane, airport)),
- [precond_pos, precond_neg],
- [effect_add, effect_rem]
- )
- unloads.append(unload)
- return unloads
- def fly_actions():
- """Create all concrete Fly actions and return a list.
- :return: list of Action objects
- """
- flys = []
- for fr in self.airports:
- for to in self.airports:
- if fr != to:
- for p in self.planes:
- precond_pos = [expr("At({}, {})".format(p, fr)),
- ]
- precond_neg = []
- effect_add = [expr("At({}, {})".format(p, to))]
- effect_rem = [expr("At({}, {})".format(p, fr))]
- fly = Action(expr("Fly({}, {}, {})"\
- .format(p, fr, to)),
- [precond_pos, precond_neg],
- [effect_add, effect_rem])
- flys.append(fly)
- return flys
- return load_actions() + unload_actions() + fly_actions()
- def actions(self, state: str) -> list:
- """Return the actions that can be executed in the given state.
- :param state: str
- state represented as T/F string of mapped fluents (state variables)
- e.g. 'FTTTFF'
- :return: list of Action objects
- """
- kb = PropKB()
- kb.tell(decode_state(state, self.state_map).pos_sentence())
- possible_actions = []
- for action in self.actions_list:
- # Assume action is possible
- is_action_possible = True
- for c in action.precond_neg:
- if c in kb.clauses:
- is_action_possible = False
- break # No need to continue search
- # Only check if action is still possible
- if is_action_possible:
- for c in action.precond_pos:
- if c not in kb.clauses:
- is_action_possible = False
- break # No need to continue search
- if is_action_possible: possible_actions.append(action)
- return possible_actions
- def result(self, state: str, action: Action):
- """ Return the state that results from executing the given
- action in the given state. The action must be one of
- self.actions(state).
- :param state: state entering node
- :param action: Action applied
- :return: resulting state after action
- """
- prev_state = decode_state(state, self.state_map)
- next_state = FluentState([], [])
- for fluent in prev_state.pos:
- if fluent not in action.effect_rem:
- next_state.pos.append(fluent)
- for fluent in prev_state.neg:
- if fluent not in action.effect_add:
- next_state.neg.append(fluent)
- for fluent in action.effect_add:
- if fluent not in next_state.pos:
- next_state.pos.append(fluent)
- for fluent in action.effect_rem:
- if fluent not in next_state.neg:
- next_state.neg.append(fluent)
- return encode_state(next_state, self.state_map)
- def goal_test(self, state: str) -> bool:
- """ Test the state to see if goal is reached
- :param state: str representing state
- :return: bool
- """
- kb = PropKB()
- kb.tell(decode_state(state, self.state_map).pos_sentence())
- for clause in self.goal:
- if clause not in kb.clauses:
- return False
- return True
- def h_1(self, node: Node):
- # Note that this is not a true heuristic
- h_const = 1
- return h_const
- # @lru_cache(maxsize=8192)
- def h_pg_levelsum(self, node: Node):
- """This heuristic uses a planning graph representation of the problem
- state space to estimate the sum of all actions that must be carried
- out from the current state in order to satisfy each individual goal
- condition.
- """
- # Requires implemented PlanningGraph class
- pg = PlanningGraph(self, node.state)
- pg_levelsum = pg.h_levelsum()
- return pg_levelsum
- # @lru_cache(maxsize=8192)
- def h_ignore_preconditions(self, node: Node):
- """This heuristic estimates the minimum number of actions that must be
- carried out from the current state in order to satisfy all of the goal
- conditions by ignoring the preconditions required for an action to be
- executed.
- """
- kb = PropKB()
- kb.tell(decode_state(node.state, self.state_map).pos_sentence())
- kb_clauses = kb.clauses
- actions_count = 0
- for clause in self.goal:
- if clause not in kb_clauses:
- actions_count += 1
- return actions_count
- def air_cargo_p1() -> AirCargoProblem:
- cargos = ['C1', 'C2']
- planes = ['P1', 'P2']
- airports = ['JFK', 'SFO']
- pos = [
- expr('At(C1, SFO)'),
- expr('At(C2, JFK)'),
- expr('At(P1, SFO)'),
- expr('At(P2, JFK)'),
- ]
- neg = [
- expr('At(C2, SFO)'),
- expr('In(C2, P1)'),
- expr('In(C2, P2)'),
- expr('At(C1, JFK)'),
- expr('In(C1, P1)'),
- expr('In(C1, P2)'),
- expr('At(P1, JFK)'),
- expr('At(P2, SFO)'),
- ]
- init = FluentState(pos, neg)
- goal = [
- expr('At(C1, JFK)'),
- expr('At(C2, SFO)'),
- ]
- return AirCargoProblem(cargos, planes, airports, init, goal)
- def air_cargo_p2() -> AirCargoProblem:
- cargos = ['C1', 'C2', 'C3']
- planes = ['P1', 'P2', 'P3']
- airports = ['JFK', 'SFO', 'ATL']
- pos = [
- expr('At(C1, SFO)'),
- expr('At(C2, JFK)'),
- expr('At(C3, ATL)'),
- expr('At(P1, SFO)'),
- expr('At(P2, JFK)'),
- expr('At(P3, ATL)')
- ]
- neg = [
- expr('At(C1, JFK)'),
- expr('At(C1, ATL)'),
- expr('In(C1, P1)'),
- expr('In(C1, P2)'),
- expr('In(C1, P3)'),
- expr('At(C2, SFO)'),
- expr('At(C2, ATL)'),
- expr('In(C2, P1)'),
- expr('In(C2, P2)'),
- expr('In(C2, P3)'),
- expr('At(C3, SFO)'),
- expr('At(C3, JFK)'),
- expr('In(C3, P1)'),
- expr('In(C3, P2)'),
- expr('In(C3, P3)'),
- expr('At(P1, JFK)'),
- expr('At(P1, ATL)'),
- expr('At(P2, SFO)'),
- expr('At(P2, ATL)'),
- expr('At(P3, JFK)'),
- expr('At(P3, SFO)')
- ]
- init = FluentState(pos, neg)
- goal = [
- expr('At(C1, JFK)'),
- expr('At(C2, SFO)'),
- expr('At(C3, SFO)')
- ]
- return AirCargoProblem(cargos, planes, airports, init, goal)
- def air_cargo_p3() -> AirCargoProblem:
- cargos = ['C1', 'C2', 'C3', 'C4']
- planes = ['P1', 'P2']
- airports = ['JFK', 'SFO', 'ATL', 'ORD']
- pos = [
- expr('At(C1, SFO)'),
- expr('At(C2, JFK)'),
- expr('At(C3, ATL)'),
- expr('At(C4, ORD)'),
- expr('At(P1, SFO)'),
- expr('At(P2, JFK)')
- ]
- neg = [
- expr('At(C1, JFK)'),
- expr('At(C1, ATL)'),
- expr('At(C1, ORD)'),
- expr('In(C1, P1)'),
- expr('In(C1, P2)'),
- expr('At(C2, SFO)'),
- expr('At(C2, ATL)'),
- expr('At(C2, ORD)'),
- expr('In(C2, P1)'),
- expr('In(C2, P2)'),
- expr('At(C3, SFO)'),
- expr('At(C3, JFK)'),
- expr('At(C3, ORD)'),
- expr('In(C3, P1)'),
- expr('In(C3, P2)'),
- expr('At(C4, SFO)'),
- expr('At(C4, JFK)'),
- expr('At(C4, ATL)'),
- expr('In(C4, P1)'),
- expr('In(C4, P2)'),
- expr('At(P1, JFK)'),
- expr('At(P1, ATL)'),
- expr('At(P1, ORD)'),
- expr('At(P2, SFO)'),
- expr('At(P2, ATL)'),
- expr('At(P2, ORD)')
- ]
- init = FluentState(pos, neg)
- goal = [
- expr('At(C1, JFK)'),
- expr('At(C3, JFK)'),
- expr('At(C2, SFO)'),
- expr('At(C4, SFO)')
- ]
- return AirCargoProblem(cargos, planes, airports, init, goal)
|