Source code for rafcontpp.logic.mapper

# Copyright (C) 2018-2019 DLR
#
# All rights reserved. This program and the accompanying materials are made
# available under the terms of the 3-Clause BSD License which accompanies this
# distribution, and is available at
# https://opensource.org/licenses/BSD-3-Clause
#
# Contributors:
# Christoph Suerig <christoph.suerig@dlr.de>

# Don't connect with the Copyright comment above!
# Version 05.07.2019
import os

from rafcon.core.config import global_config
from rafcon.core.singleton import library_manager
from rafcon.utils import log

from rafcontpp.logic.pddl_action_parser import PddlActionParser
from rafcontpp.model.datastore import SEMANTIC_DATA_DICT_NAME, PDDL_ACTION_SUB_DICT_NAME

logger = log.get_logger(__name__)


[docs]class Mapper: """Mapper The Mapper maps pddl actions and rafcon states together. """ def __init__(self, datastore): """ :param datastore: A datastore containing state pools. """ if datastore is None: logger.error("Datastore in Mapper can not be None!") raise ValueError("Datastore in Mapper can not be None!") self.__datastore = datastore
[docs] def generate_action_state_map(self): """ generates a map, with pddl action names as key, and RAFCON States as Values. Writes the map into the datastore. :return: void """ state_libs = self.__datastore.get_state_pools() libraries = global_config.get_config_value("LIBRARY_PATHS") lib_names = [] for pool in state_libs: logger.debug("adding library path: " + str(pool)) lib_name = os.path.basename(pool) lib_names.append(lib_name) libraries[lib_name] = os.path.abspath(pool) global_config.set_config_value("LIBRARY_PATHS", libraries) # also refreshes libraries action_state_map = {} for lib_name in lib_names: state_pool = library_manager.libraries[lib_name] for state in state_pool: lib_state = library_manager.get_library_instance(lib_name, state) sem_data = lib_state.state_copy.semantic_data if isinstance(sem_data[SEMANTIC_DATA_DICT_NAME][PDDL_ACTION_SUB_DICT_NAME]['pddl_action'], unicode): action_string = str( sem_data[SEMANTIC_DATA_DICT_NAME][PDDL_ACTION_SUB_DICT_NAME]['pddl_action']).upper() action_name = PddlActionParser(action_string).parse_action_name() if action_name in action_state_map: logger.warning( "Multiple associations of action {}! Associated with states: {} and {}. Using association with {}." .format(action_name, action_state_map[action_name], state, action_state_map[action_name])) else: action_state_map[action_name] = state else: logger.warning("State {} is not associated with any PDDL Action!".format(state)) if not action_state_map: logger.error("No States with semantic PDDL_Action data found!") raise ValueError("No States with semantic PDDL_Action data found!") logger.debug('action_state_map has ' + str(len(action_state_map.keys())) + ' entires.') self.__datastore.set_action_state_map(action_state_map)
[docs] def generate_state_action_map(self): """ Generates a map with RAFCON States as Keys and PDDL Action names as values. If no action_state_map exists, it calls generate action_state_map. It's writing the map into the datastore. :return: void """ if self.__datastore.get_action_state_map() is None: self.generate_action_state_map() action_state_map = self.__datastore.get_action_state_map() state_action_map = {} for action in action_state_map.keys(): c_state = action_state_map[action] if c_state in state_action_map: logger.warning( "Multiple associations of state {}! Associated with actions {} and {}. Using association with {}" .format(c_state, state_action_map[c_state].name, action.name, state_action_map[c_state].name)) else: state_action_map[c_state] = action if not state_action_map: logger.warning("could not generate state_action_map, action_state_map was empty!") logger.debug('state_action_map has ' + str(len(state_action_map.keys())) + ' entries.') self.__datastore.set_state_action_map(state_action_map)
[docs] def generate_available_actions(self): """ Takes the action_state_map, and extracts the keys, in order to get a list of all available PDDL Actions. If no action_state_map exists, it calls generate_action_state_map() first. It writes the list with available actions into the datastore. :return: void """ if self.__datastore.get_action_state_map() is None: self.generate_action_state_map() action_state_map = self.__datastore.get_action_state_map() logger.debug('list of available actions has ' + str(len(action_state_map.keys())) + ' entires.') self.__datastore.set_available_actions(action_state_map.keys())