123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547 |
- from aimacode.planning import Action
- from aimacode.search import Problem
- from aimacode.utils import expr
- from lp_utils import decode_state
- class PgNode():
- """Base class for planning graph nodes.
- includes instance sets common to both types of nodes used in
- a planning graph
- parents: the set of nodes in the previous level
- children: the set of nodes in the subsequent level
- mutex: the set of sibling nodes that are mutually exclusive with this node
- """
- def __init__(self):
- self.parents = set()
- self.children = set()
- self.mutex = set()
- def is_mutex(self, other) -> bool:
- """Boolean test for mutual exclusion
- :param other: PgNode
- the other node to compare with
- :return: bool
- True if this node and the other are marked
- mutually exclusive (mutex)
- """
- if other in self.mutex:
- return True
- return False
- def show(self):
- """helper print for debugging shows counts of
- parents, children, siblings
- :return:
- print only
- """
- print("{} parents".format(len(self.parents)))
- print("{} children".format(len(self.children)))
- print("{} mutex".format(len(self.mutex)))
- class PgNode_s(PgNode):
- """A planning graph node representing a state (literal fluent) from a
- planning problem.
- Args:
- symbol : str
- A string representing a literal expression from a planning problem
- domain.
- is_pos : bool
- Boolean flag indicating whether the literal expression is positive or
- negative.
- """
- def __init__(self, symbol: str, is_pos: bool):
- """S-level Planning Graph node constructor.
- :param symbol: expr
- :param is_pos: bool
- Instance variables calculated:
- literal: expr
- fluent in its literal form including negative operator
- if applicable
- Instance variables inherited from PgNode:
- parents: set of nodes connected to this node in previous A level;
- children: set of nodes connected to this node in next A level;
- mutex: set of sibling S-nodes that this node has
- mutual exclusion with;
- """
- PgNode.__init__(self)
- self.symbol = symbol
- self.is_pos = is_pos
- self.literal = expr(self.symbol)
- if not self.is_pos:
- self.literal = expr('~{}'.format(self.symbol))
- def show(self):
- """Helper print for debugging shows literal plus counts of parents,
- children, siblings.
- :return:
- print only
- """
- print("\n*** {}".format(self.literal))
- PgNode.show(self)
- def __eq__(self, other):
- '''equality test for nodes - compares only the literal for equality
- :param other: PgNode_s
- :return: bool
- '''
- if isinstance(other, self.__class__):
- return (self.symbol == other.symbol) \
- and (self.is_pos == other.is_pos)
- def __hash__(self):
- return hash(self.symbol) ^ hash(self.is_pos)
- class PgNode_a(PgNode):
- """A-type (action) Planning Graph node - inherited from PgNode """
- def __init__(self, action: Action):
- """A-level Planning Graph node constructor
- :param action: Action
- a ground action, i.e. this action cannot contain any variables
- Instance variables calculated:
- An A-level will always have an S-level as its parent and an S-level
- as its child.
- The preconditions and effects will become the parents and children
- of the A-level node. However, when this node is created, it is not
- yet connected to the graph.
- prenodes: set of *possible* parent S-nodes
- effnodes: set of *possible* child S-nodes
- is_persistent: bool True if this is a persistence action,
- i.e. a no-op action
- Instance variables inherited from PgNode:
- parents: set of nodes connected to this node in previous A level;
- children: set of nodes connected to this node in next A level;
- mutex: set of sibling S-nodes that this node has
- mutual exclusion with;
- """
- PgNode.__init__(self)
- self.action = action
- self.prenodes = self.precond_s_nodes()
- self.effnodes = self.effect_s_nodes()
- self.is_persistent = False
- if self.prenodes == self.effnodes:
- self.is_persistent = True
- def show(self):
- """helper print for debugging shows action plus counts of
- parents, children, siblings.
- :return:
- print only
- """
- print("\n*** {}{}".format(self.action.name, self.action.args))
- PgNode.show(self)
- def precond_s_nodes(self):
- """precondition literals as S-nodes
- (represents possible parents for this node).
- It is computationally expensive to call this function;
- it is only called by the class constructor to populate the
- `prenodes` attribute.
- :return: set of PgNode_s
- """
- nodes = set()
- for p in self.action.precond_pos:
- n = PgNode_s(p, True)
- nodes.add(n)
- for p in self.action.precond_neg:
- n = PgNode_s(p, False)
- nodes.add(n)
- return nodes
- def effect_s_nodes(self):
- """effect literals as S-nodes
- (represents possible children for this node).
- It is computationally expensive to call this function;
- it is only called by the class constructor to populate the
- `effnodes` attribute.
- :return: set of PgNode_s
- """
- nodes = set()
- for e in self.action.effect_add:
- n = PgNode_s(e, True)
- nodes.add(n)
- for e in self.action.effect_rem:
- n = PgNode_s(e, False)
- nodes.add(n)
- return nodes
- def __eq__(self, other):
- """equality test for nodes - compares only the action name for equality
- :param other: PgNode_a
- :return: bool
- """
- if isinstance(other, self.__class__):
- return (self.action.name == other.action.name) \
- and (self.action.args == other.action.args)
- def __hash__(self):
- return hash(self.action.name) ^ hash(self.action.args)
- def mutexify(node1: PgNode, node2: PgNode):
- """Adds sibling nodes to each other's mutual exclusion (mutex) set.
- These should be sibling nodes!
- :param node1: PgNode (or inherited PgNode_a, PgNode_s types)
- :param node2: PgNode (or inherited PgNode_a, PgNode_s types)
- :return:
- node mutex sets modified
- """
- if type(node1) != type(node2):
- raise TypeError('Attempted to mutex two nodes of different types')
- node1.mutex.add(node2)
- node2.mutex.add(node1)
- class PlanningGraph():
- """
- A planning graph as described in chapter 10 of the AIMA text. The planning
- graph can be used to reason about
- """
- def __init__(self, problem: Problem, state: str, serial_planning=True):
- """
- :param problem: PlanningProblem
- (or subclass such as AirCargoProblem or HaveCakeProblem)
- :param state: str (will be in form TFTTFF... representing fluent states)
- :param serial_planning: bool
- (whether or not to assume that only one action can occur at a time)
- Instance variable calculated:
- fs: FluentState
- the state represented as positive and
- negative fluent literal lists
- all_actions: list of the PlanningProblem valid ground actions
- combined with calculated no-op actions
- s_levels: list of sets of PgNode_s,
- where each set in the list represents an S-level
- in the planning graph.
- a_levels: list of sets of PgNode_a,
- where each set in the list represents an A-level
- in the planning graph.
- """
- self.problem = problem
- self.fs = decode_state(state, problem.state_map)
- self.serial = serial_planning
- self.all_actions = self.problem.actions_list + \
- self.noop_actions(self.problem.state_map)
- self.s_levels = []
- self.a_levels = []
- self.create_graph()
- def noop_actions(self, literal_list):
- """create persistent action for each possible fluent
- "No-Op" actions are virtual actions (i.e., actions that only exist in
- the planning graph, not in the planning problem domain) that operate
- on each fluent (literal expression) from the problem domain. No op
- actions "pass through" the literal expressions from one level of the
- planning graph to the next.
- The no-op action list requires both a positive and a negative action
- for each literal expression. Positive no-op actions require the literal
- as a positive precondition and add the literal expression as an effect
- in the output, and negative no-op actions require the literal as a
- negative precondition and remove the literal expression as an effect in
- the output.
- This function should only be called by the class constructor.
- :param literal_list:
- :return: list of Action
- """
- action_list = []
- for fluent in literal_list:
- act1 = Action(expr("Noop_pos({})".format(fluent)),\
- ([fluent], []), ([fluent], []))
- action_list.append(act1)
- act2 = Action(expr("Noop_neg({})".format(fluent)),\
- ([], [fluent]), ([], [fluent]))
- action_list.append(act2)
- return action_list
- def create_graph(self):
- """Build a Planning Graph as described in Russell-Norvig 3rd Ed 10.3
- or 2nd Ed 11.4
- The S0 initial level has been implemented for you.
- It has no parents and includes all of the literal fluents that are part
- of the initial state passed to the constructor. At the start
- of a problem planning search, this will be the same as the initial
- state of the problem. However, the planning graph can be built from
- any state in the Planning Problem.
- This function should only be called by the class constructor.
- :return:
- builds the graph by filling s_levels[] and a_levels[] lists with
- node sets for each level.
- """
- # the graph should only be built during class construction
- if (len(self.s_levels) != 0) or (len(self.a_levels) != 0):
- raise Exception(
- 'Planning Graph already created;\
- construct a new planning graph for each new state in the\
- planning sequence')
- # initialize S0 to literals in initial state provided.
- leveled = False
- level = 0
- self.s_levels.append(set()) # S0 set of s_nodes - empty to start
- # for each fluent in the initial state, add the correct literal PgNode_s
- for literal in self.fs.pos:
- self.s_levels[level].add(PgNode_s(literal, True))
- for literal in self.fs.neg:
- self.s_levels[level].add(PgNode_s(literal, False))
- # no mutexes at the first level
- # continue to build the graph alternating A,
- # S levels until last two S levels contain the same literals,
- # i.e. until it is "leveled"
- while not leveled:
- self.add_action_level(level)
- self.update_a_mutex(self.a_levels[level])
- level += 1
- self.add_literal_level(level)
- self.update_s_mutex(self.s_levels[level])
- if self.s_levels[level] == self.s_levels[level - 1]:
- leveled = True
- def add_action_level(self, level):
- """ add an A (action) level to the Planning Graph
- :param level: int
- the level number alternates S0, A0, S1, A1, S2, ...etc.
- the level number is also used as the
- index for the node set lists self.a_levels[] and self.s_levels[]
- :return:
- adds A nodes to the current level in self.a_levels[level]
- """
- previous_s_level = self.s_levels[level]
- # Previous state level (previous to this level of actions)
- actions = self.all_actions # List of all possible actions
- self.a_levels.append(set()) # Initialize this level of actions
- for action in actions:
- a_node = PgNode_a(action)
- if a_node.prenodes.issubset(previous_s_level):
- for s_node in previous_s_level:
- # Connect action level to state and vice-versa
- s_node.children.add(a_node)
- a_node.parents.add(s_node)
- # Add newly created action to this new level of actions
- self.a_levels[level].add(a_node)
- def add_literal_level(self, level):
- """ add an S (literal) level to the Planning Graph
- :param level: int
- the level number alternates S0, A0, S1, A1, S2, ....etc.
- the level number is also used as the
- index for the node set lists self.a_levels[] and self.s_levels[]
- :return:
- adds S nodes to the current level in self.s_levels[level]
- """
- previous_a_level = self.a_levels[level - 1]
- #Previous action level (previous to this level of states)
- self.s_levels.append(set()) # Initialize this level of states
- for a_node in previous_a_level:
- for effect_node in a_node.effnodes:
- # Connect state level to action and vice-versa
- a_node.children.add(effect_node)
- effect_node.parents.add(a_node)
- # Add newly state to this new level of states
- self.s_levels[level].add(effect_node)
- def update_a_mutex(self, nodeset):
- """ Determine and update sibling mutual exclusion for A-level nodes
- Mutex action tests section from 3rd Ed. 10.3 or 2nd Ed. 11.4
- A mutex relation holds between two actions a given level
- if the planning graph is a serial planning graph and the pair are
- nonpersistence actions or if any of the three conditions hold between
- the pair:
- Inconsistent Effects
- Interference
- Competing needs
- :param nodeset: set of PgNode_a (siblings in the same level)
- :return:
- mutex set in each PgNode_a in the set is appropriately updated
- """
- nodelist = list(nodeset)
- for i, n1 in enumerate(nodelist[:-1]):
- for n2 in nodelist[i + 1:]:
- if (self.serialize_actions(n1, n2) or
- self.inconsistent_effects_mutex(n1, n2) or
- self.interference_mutex(n1, n2) or
- self.competing_needs_mutex(n1, n2)):
- mutexify(n1, n2)
- def serialize_actions(self, node_a1: PgNode_a, node_a2: PgNode_a) -> bool:
- '''
- Test a pair of actions for mutual exclusion, returning True if the
- planning graph is serial, and if either action is persistent; otherwise
- return False. Two serial actions are mutually exclusive if they are
- both non-persistent.
- :param node_a1: PgNode_a
- :param node_a2: PgNode_a
- :return: bool
- '''
- #
- if not self.serial:
- return False
- if node_a1.is_persistent or node_a2.is_persistent:
- return False
- return True
- def inconsistent_effects_mutex(self, node_a1: PgNode_a,\
- node_a2: PgNode_a) -> bool:
- '''
- Test a pair of actions for inconsistent effects, returning True if
- one action negates an effect of the other, and False otherwise.
- :param node_a1: PgNode_a
- :param node_a2: PgNode_a
- :return: bool
- '''
- return bool(
- # Are actions from node 1 negated by actions in node 2?
- set(node_a1.action.effect_add) & set(node_a2.action.effect_rem) |
- # Are actions from node 2 negated by actions in node 1?
- set(node_a2.action.effect_add) & set(node_a1.action.effect_rem)
- )
- def interference_mutex(self, node_a1: PgNode_a, node_a2: PgNode_a) -> bool:
- '''
- Test a pair of actions for mutual exclusion, returning True if the
- effect of one action is the negation of a precondition of the other.
- :param node_a1: PgNode_a
- :param node_a2: PgNode_a
- :return: bool
- '''
- return bool(
- # Are actions in node 1 the negation of a precondition of node 2?
- set(node_a1.action.effect_add) & set(node_a2.action.precond_neg) |
- set(node_a1.action.effect_rem) & set(node_a2.action.precond_pos) |
- # Are actions in node 2 the negation of a precondition of node 1?
- set(node_a2.action.effect_add) & set(node_a1.action.precond_neg) |
- set(node_a2.action.effect_rem) & set(node_a1.action.precond_pos)
- )
- def competing_needs_mutex(self, node_a1: PgNode_a,\
- node_a2: PgNode_a) -> bool:
- '''
- Test a pair of actions for mutual exclusion, returning True if one of
- the precondition of one action is mutex with a precondition of the
- other action.
- :param node_a1: PgNode_a
- :param node_a2: PgNode_a
- :return: bool
- '''
- for a1_parent in node_a1.parents:
- for a2_parent in node_a2.parents:
- if a1_parent.is_mutex(a2_parent):
- return True
- return False
- def update_s_mutex(self, nodeset: set):
- ''' Determine and update sibling mutual exclusion for S-level nodes
- Mutex action tests section from 3rd Ed. 10.3 or 2nd Ed. 11.4
- A mutex relation holds between literals at a given level
- if either of the two conditions hold between the pair:
- Negation
- Inconsistent support
- :param nodeset: set of PgNode_a (siblings in the same level)
- :return:
- mutex set in each PgNode_a in the set is appropriately updated
- '''
- nodelist = list(nodeset)
- for i, n1 in enumerate(nodelist[:-1]):
- for n2 in nodelist[i + 1:]:
- if self.negation_mutex(n1, n2) or\
- self.inconsistent_support_mutex(n1, n2):
- mutexify(n1, n2)
- def negation_mutex(self, node_s1: PgNode_s, node_s2: PgNode_s) -> bool:
- '''
- Test a pair of state literals for mutual exclusion, returning True if
- one node is the negation of the other, and False otherwise.
- :param node_s1: PgNode_s
- :param node_s2: PgNode_s
- :return: bool
- '''
- # Verify both state represent the same symbol
- is_same_symbol = node_s1.symbol == node_s2.symbol
- # Verify 1 state is the negation of the other
- is_negation = node_s1.is_pos != node_s2.is_pos
- return is_same_symbol and is_negation
- def inconsistent_support_mutex(self, node_s1: PgNode_s, node_s2: PgNode_s):
- '''
- Test a pair of state literals for mutual exclusion, returning True if
- there are no actions that could achieve the two literals at the same
- time, and False otherwise. In other words, the two literal nodes are
- mutex if all of the actions that could achieve the first literal node
- are pairwise mutually exclusive with all of the actions that could
- achieve the second literal node.
- :param node_s1: PgNode_s
- :param node_s2: PgNode_s
- :return: bool
- '''
- for s1_parent in node_s1.parents:
- for s2_parent in node_s2.parents:
- if not s1_parent.is_mutex(s2_parent):
- return False
- return True
- def h_levelsum(self) -> int:
- '''The sum of the level costs of the individual goals
- (admissible if goals independent)
- :return: int
- '''
- level_sum = 0
- goals = self.problem.goal
- s_levels = self.s_levels
- for goal in goals:
- node = PgNode_s(goal, True)
- s_levels_list = enumerate(s_levels)
- for level, s_nodes in s_levels_list:
- if node in s_nodes:
- level_sum += level
- break
- return level_sum
|