Source code for rafcontpp.logic.pddl_action_loader

# Copyright (C) 2018-2019 DLR
#
# All rights reserved. This program and the accompanying materials are made
# available under the terms of the 3-Clause BSD License which accompanies this
# distribution, and is available at
# https://opensource.org/licenses/BSD-3-Clause
#
# Contributors:
# Christoph Suerig <christoph.suerig@dlr.de>

# Don't connect with the Copyright comment above!
# Version 05.07.2019
import os
import unicodedata

from rafcon.core.singleton import library_manager
from rafcon.utils import log

from rafcontpp.logic.pddl_action_parser import PddlActionParser
from rafcontpp.model.datastore import SEMANTIC_DATA_DICT_NAME, PDDL_ACTION_SUB_DICT_NAME
from rafcontpp.model.pddl_action_representation import PddlActionRepresentation
from rafcontpp.model.pddl_action_representation import action_to_upper

logger = log.get_logger(__name__)


[docs]class PddlActionLoader: """ This class reads and parses PDDL actions from RAFCON states, into the PDDLActionRepresentation. """ def __init__(self, datastore): """ :param datastore: A datastore containing state pools, the state action map, and available actions. """ self.__datastore = datastore
[docs] def load_pddl_actions(self): """ load_pddl_actions reads the actions from the states and parses them into PddlActionRepresentations. Then it sets the pddl action map in datastore. :return: void """ state_libs = self.__datastore.get_state_pools() lib_names = [] for pool in state_libs: lib_names.append(os.path.basename(pool)) pddl_actions = {} for lib_name in lib_names: state_pool = library_manager.libraries[lib_name] for state in state_pool: lib_state = library_manager.get_library_instance(lib_name, state) sem_data = lib_state.state_copy.semantic_data if SEMANTIC_DATA_DICT_NAME in sem_data \ and str(state) in self.__datastore.get_state_action_map().keys(): action_dict = sem_data[SEMANTIC_DATA_DICT_NAME][PDDL_ACTION_SUB_DICT_NAME] # parse from unicode to string r means raw r_pred_str = unicodedata.normalize('NFKD', action_dict["pddl_predicates"]).encode('utf-8', 'ignore') r_action = unicodedata.normalize('NFKD', action_dict["pddl_action"]).encode('utf-8', 'ignore') r_types = '' r_reqs = '' if isinstance(action_dict["pddl_types"], unicode): r_types = unicodedata.normalize('NFKD', action_dict["pddl_types"]).encode('utf-8', 'ignore') if isinstance(action_dict["requirements"], unicode): r_reqs = unicodedata.normalize('NFKD', action_dict["requirements"]).encode('utf-8', 'ignore') action_parser = PddlActionParser(r_action) action_name = action_parser.parse_action_name().upper() c_action = action_to_upper(PddlActionRepresentation( action_name, r_action, self.parse_predicate_string(r_pred_str), self.parse_type_string(r_types), self.parse_requirement_string(r_reqs), action_parser.parse_parameters())) if c_action.name in pddl_actions.keys(): logger.warning('Multiple definition of Action: ' + c_action.name + '. Action definition of State ' + str(state) + ' not used.') else: pddl_actions[c_action.name] = c_action # just check, if all needed actions could be parsed. for action_name in self.__datastore.get_available_actions(): if action_name not in pddl_actions.keys(): logger.error("No action found for action called: \"" + action_name + "\"") self.__datastore.set_pddl_action_map(pddl_actions)
[docs] def parse_type_string(self, type_string): """ parse_type_string reveives a type string of fromat type1,type2 or type1 type2 and parses it into an array containing the types. :param type_string: A type string of format type1,type2 or type1 type2. :return: [String]: An array containing the types contained in the string, or an empty array if string is None or empty. """ ts = [] if type_string: ts = type_string.replace(',', ' ') ts = ts.split(' ') ts = list(filter(None, ts)) return ts
[docs] def parse_requirement_string(self, requ_string): """ Reveives an requirements string and parses it into a requirement array. :param requ_string: A string containing requirements. :return: [String]: An array containing requirements. An empty array if requ_string is None or empty. """ req_list = [] if requ_string: rs = requ_string.strip('[').strip(']') rs = rs.replace('\'', '') rs = rs.replace(' ', '') req_list = rs.split(',') return req_list
[docs] def parse_predicate_string(self, pred_string): """ Construct predicate array from predicate string. :param pred_string: A string containing all predicates of the action. :return: [String]: An array containing all predicates of the action. """ predicates = [] predicate_string = pred_string if pred_string is not None else '' start_index = predicate_string.find('(') end_index = predicate_string.find(')') + 1 while start_index > -1 and start_index < end_index: predicates.append(predicate_string[start_index:end_index]) predicate_string = predicate_string[end_index:] start_index = predicate_string.find('(') end_index = predicate_string.find(')') + 1 logger.debug("predicates: " + str(predicates)) return predicates