# Copyright (C) 2018-2019 DLR
#
# All rights reserved. This program and the accompanying materials are made
# available under the terms of the 3-Clause BSD License which accompanies this
# distribution, and is available at
# https://opensource.org/licenses/BSD-3-Clause
#
# Contributors:
# Christoph Suerig <christoph.suerig@dlr.de>
# Don't connect with the Copyright comment above!
# Version 09.09.2019
import copy
import getpass
import json
import os
import threading
import time
from rafcon.utils import log
from rafcontpp.model.pddl_action_representation import PddlActionRepresentation
from rafcontpp.model.plan_step import PlanStep
from rafcontpp.model.type_tree import TypeTree
logger = log.get_logger(__name__)
# a map containing all built in planners e.g. planners with integration script.
built_in_planners = {
'Fast Downward Planning System': ('rafcontpp.planner.fast_downward_integration', 'FdIntegration'),
'Fast-Forward Planning System v2.3': ('rafcontpp.planner.fast_forward_integration', 'FfIntegration')
}
# The version string of the Plug-in.
PLUGIN_VERSION = "v1.5"
# The storage path of the config file.
DATASTORE_STORAGE_PATH = os.path.join(os.path.expanduser('~'), os.path.normpath('.config/rafcon/rafcontpp_conf.json'))
#the temp file save dir, to save files in if files should not be kept.
TMP_FILE_SAVE_DIR = os.path.join('/tmp','rtpp-{}'.format(getpass.getuser()))
# The name of the semantic data dict in rafcon state
SEMANTIC_DATA_DICT_NAME = 'RAFCONTPP'
# The name of the sub dictionary, where the pddl action is stored in.
PDDL_ACTION_SUB_DICT_NAME = 'PDDL_ACTION'
# The key to allow the Override of the State Content
ALLOW_OVERRIDE_NAME = 'Allow_Override'
# A lock to synchronize planning thread map accesses.
planning_threads_lock = threading.Lock()
# tuples of all registered (currently running) planning threads format: (thread,problem name)
planning_threads = {}
[docs]def get_planning_threads():
"""
:return: {long: InterruptableThread}: A copy of the planning_threads dict.
"""
with planning_threads_lock:
threads_copy = copy.copy(planning_threads)
return threads_copy
[docs]def datastore_from_file(file_path):
""" datastore_from_file
datastore_from_file tries to create a partial datastore (just input values, no clculated ones)
from a .json file e.g. config file. if there is no config file present, it returns a datastore with default values
:param file_path: the path to the config file
:return: Datastore: A partial initialized datastore, or a datastore with default values.
"""
ds = None
if not os.path.isfile(file_path):
logger.warning("Can't restore configuration from: {}".format(file_path))
logger.info("Creating default configuration...")
default_dir = str(os.path.expanduser('~'))
ds = Datastore([default_dir], '',
default_dir, built_in_planners.keys()[0], default_dir, [], default_dir, default_dir, False)
else:
data = json.load(open(file_path, "r"))
logger.debug('Loading Configuration form: ' + file_path)
sm_name = data['sm_name'] if 'sm_name' in data else '' # To provide backward compatibility.
ds = Datastore(data['state_pools'],
sm_name,
data['sm_save_dir'],
data['planner'],
data['planner_script_path'],
data['planner_argv'],
data['facts_path'],
data['type_db_path'],
data['keep_related_files'],
data['file_save_dir'])
runtime_data_path = data[
'runtime_data_path'] if 'runtime_data_path' in data else '' # To provide backward compatibility.
runtime_as_ref = data[
'runtime_as_ref'] if 'runtime_as_ref' in data else False # To provide backward compatibility.
plan_into_state = data[
'plan_into_state'] if 'plan_into_state' in data else False # To provide backward compatibility.
ds.set_runtime_data_path(runtime_data_path)
ds.set_use_runtime_path_as_ref(runtime_as_ref)
ds.set_generate_into_state(plan_into_state)
logger.info("Read configuration successfully!")
return ds
[docs]class Datastore:
""" Datastore
Datastore is a datastore, which holds all data of the plugin. Every module can get, and store its data here.
"""
def __init__(self, state_pools, sm_name, sm_save_dir, planner, planner_script_path, planner_argv,
facts_path, type_db_path, keep_related_files, file_save_dir='/tmp'):
"""
Constructor of Datastore
:param state_pools: a list of file paths.
:param sm_name: the name of the state machine which will be generated.
:param sm_save_dir: the directory, where to save the generated state machine.
:param planner: the name / script path of the used planner.
:param planner_argv: a String array, with arguments for the planner.
:param facts_path: path of the facts file.
:param type_db_path: path of the type_db.
:param keep_related_files: true, if generated files e.g. the domain file or the plan should be saved.
:param file_save_dir: a path, where to save all related files.
"""
# a list of directories, containing states with pddl notation.
self.__state_pools = state_pools
# the name of the state machine, which will be generated.
self.__sm_name = sm_name
# the location, to save the generated state machine in.
self.__sm_save_dir = sm_save_dir
# the complete path of the facts file (e.g. /home/facts.pddl).
self.__facts_path = facts_path
# the complete path of the type db file
self.__type_db_path = type_db_path
# True if files should be keeped, false otherwhise
self.__keep_related_files = keep_related_files
# the directory, where to save all produced files in
self.__file_save_dir = file_save_dir
# the shortcut, or the name of the planner script
self.__planner = planner
# additional arguments for the planner as string array.
self.__planner_argv = planner_argv
# the path of a custom planner script. this variable is not really useful for the plugin, and its not used, BUT:
# its useful for usability, to be able to save the script path persistent.
self.__planner_script_path = planner_script_path
# the complete path of the domain file (e.g. /home/domain.pddl).
self.__domain_path = None
# a map containing pddl action names as keys, and rafcon states as values.
self.__action_state_map = None
# a map containing rafcon states as keys and pddl action names as values.
self.__state_action_map = None
# a map containing action names as keys and pddl action representations as values
self.__pddl_action_map = None
# a list, contining the names of all available actions.
self.__available_actions = None
# a PddlFactsRepresentation Object, containing the parsed facts file.
self.__pddl_facts_representation = None
# a typeTree containing all available types
self.__available_types = None
# a list of (String,[(String,integer)]) prediactes
self.__available_predicates = None
# the plan, as a list of plan steps.
self.__plan = None
# a list with the name of all files, generated during the pipeline execution.
self.__generated_files = []
# the complete path of the runtime data dict, which holds data required during the run of the generated sm.
self.__runtime_data_path = None
# if true the runtime_data is red during runtime, otherwhise its red when generating the sm.
self.__use_runtime_data_path_as_reference = False
# target state, the state to plan into.
self.__target_state = None
# true, if the state machine should be generated into the target state. false if independent sm should be used.
self.__generate_into_state = False
[docs] def validate_ds(self): # TODO validate everything!
"""
validate_ds runs some checks on the state_pools, the sm-save_dir, the facts_path, the type_db_path,
and the file_save_dir. if a check fails, it raises a ValueError.
:return: void
"""
# validate state_pools
for dir in self.__state_pools:
if not os.path.isdir(dir):
logger.error("state pool directory not found: " + str(dir))
raise ValueError('Is not a directory: ' + str(dir))
# validate sm_save_dir
if not os.path.isdir(self.__sm_save_dir):
logger.error("state machine save dir: directory not found! " + str(self.__sm_save_dir))
raise ValueError('Is not a directory: ' + str(self.__sm_save_dir))
# validate facts_file
if not os.path.isfile(self.__facts_path):
logger.error("No facts file : " + str(self.__facts_path))
raise ValueError('Is not a file: ' + str(self.__facts_path))
# validate type_db_path
if not os.path.isfile(self.__type_db_path):
logger.error("No type database : " + str(self.__type_db_path))
raise ValueError('Is not a file: ' + str(self.__type_db_path))
# validate file_save_dir
if self.__keep_related_files and (not os.path.isdir(self.__file_save_dir)):
logger.error("file save dir is not a directory: " + str(self.__file_save_dir))
raise ValueError('Is not a directory: ' + str(self.__file_save_dir))
[docs] def register_thread(self, interruptable_thread):
"""
gets a thread, addes it synchronized to a global map, and returns the map key (which is the register time.).
:param interruptable_thread: A InterruptableThread, the datastore should store.
:return: long: The key used to register the thread. (That's the register time as unixtimestamp.)
"""
with planning_threads_lock:
register_time = time.time() # unix timestamp
# set task name to sm name, or problem name, if no sm name is available, and to state name if the sm is planned
# into a state.
task_name = self.get_pddl_facts_representation().problem_name if len(
self.get_sm_name()) == 0 else self.get_sm_name()
if self.__target_state:
task_name = self.__target_state.name
planning_threads[register_time] = (interruptable_thread, task_name, self.get_planner())
return register_time
[docs] def remove_thread(self, key):
"""
Receives a timestamp as key, and removes the thread synchronized from the global map.
:param key: The time, the thread was registered
:return: Boolean: True, if removing was successful, false otherwise
"""
successful = False
with planning_threads_lock:
if key in planning_threads.keys():
del planning_threads[key]
successful = not (key in planning_threads.keys())
return successful
[docs] def get_state_pools(self):
return self.__state_pools
[docs] def add_state_pools(self, state_pools, set_pool):
"""
:param state_pools: the state pools to add
:param set_pool: if true, state pools are not added, but set, and old list gets lost.
"""
if not state_pools:
logger.error("state_pools can't be None")
raise ValueError("state_pools can't be None")
if set_pool:
self.__state_pools = []
if state_pools and isinstance(state_pools, str):
if state_pools not in self.__state_pools:
self.__state_pools.append(state_pools)
elif state_pools:
for state_pool in state_pools:
self.add_state_pools(state_pool, False)
[docs] def get_file_save_dir(self):
return self.__file_save_dir
[docs] def set_file_save_dir(self, file_save_dir):
if self.__keep_related_files and not os.path.isdir(file_save_dir):
logger.error('file_save_dir must be a directory')
raise ValueError('Is not a directory: ' + str(file_save_dir))
self.__file_save_dir = file_save_dir
[docs] def get_sm_name(self):
return self.__sm_name
[docs] def set_sm_name(self, name):
self.__sm_name = name
[docs] def get_sm_save_dir(self):
return self.__sm_save_dir
[docs] def set_sm_save_dir(self, sm_save_dir):
if not os.path.isdir(sm_save_dir):
logger.error('state machine save directory must be a directory!')
raise ValueError('Is not a direcotry: ' + str(sm_save_dir))
self.__sm_save_dir = sm_save_dir
[docs] def get_domain_path(self):
return self.__domain_path
[docs] def set_domain_path(self, domain_path):
if not os.path.isfile(domain_path):
logger.error("No domain file: " + str(domain_path))
raise ValueError('Is not a file: ' + str(domain_path))
self.__domain_path = domain_path
[docs] def get_facts_path(self):
return self.__facts_path
[docs] def set_facts_path(self, facts_path):
if not os.path.isfile(facts_path):
logger.error('No facts file: ' + str(facts_path))
raise ValueError('Is not a File: ' + str(facts_path))
self.__facts_path = facts_path
[docs] def get_type_db_path(self):
return self.__type_db_path
[docs] def set_type_db_path(self, type_db):
if not os.path.isfile(type_db):
logger.error('No type db found at: ' + str(type_db))
raise ValueError('Is not a File: ' + str(type_db))
self.__type_db_path = type_db
[docs] def get_planner_argv(self):
return self.__planner_argv
[docs] def set_planner_argv(self, planner_argv):
if planner_argv is None:
logger.error("can't set None value as planner_argv")
raise ValueError("can't set None value as planner_argv")
self.__planner_argv = planner_argv
[docs] def get_planner(self):
return self.__planner
[docs] def set_planner(self, planner):
self.__planner = planner
[docs] def get_action_state_map(self):
return self.__action_state_map
[docs] def set_action_state_map(self, action_state_map):
if action_state_map is None:
logger.error("can't set None value as action_state_map")
raise ValueError("can't set None value as action_state_map")
self.__action_state_map = action_state_map
[docs] def get_state_action_map(self):
return self.__state_action_map
[docs] def set_state_action_map(self, state_action_map):
if state_action_map is None:
logger.error("can't set None value as state_action_map.")
raise ValueError("can't set None value as state_action_map.")
self.__state_action_map = state_action_map
[docs] def get_available_actions(self):
return self.__available_actions
[docs] def set_available_actions(self, available_actions):
if available_actions is None:
logger.error("Can't set None value as available_actions.")
raise ValueError("Can't set None value as available_actions.")
self.__available_actions = available_actions
[docs] def set_pddl_facts_representation(self, facts_representation):
if facts_representation is None:
logger.error("Can't set None value as pddl_facts_representation.")
raise ValueError("Can't set None value as pddl_facts_representation.")
self.__pddl_facts_representation = facts_representation
[docs] def get_runtime_data_path(self):
return self.__runtime_data_path
[docs] def set_runtime_data_path(self, runtime_data_path):
if runtime_data_path and len(runtime_data_path) > 0 and not os.path.isfile(runtime_data_path):
logger.warning('Runtime Data Path is not a Path: {}'.format(runtime_data_path))
self.__runtime_data_path = runtime_data_path
[docs] def use_runtime_path_as_ref(self):
return self.__use_runtime_data_path_as_reference
[docs] def set_use_runtime_path_as_ref(self, use):
self.__use_runtime_data_path_as_reference = use
[docs] def get_pddl_facts_representation(self):
return self.__pddl_facts_representation
[docs] def get_plan(self):
return self.__plan
[docs] def set_plan(self, plan):
if plan is None:
logger.error("can't set None value as plan.")
raise ValueError("can't set None value as plan.")
if len(plan) > 0 and (not isinstance(plan[0], PlanStep)):
logger.error("plan hast to be of type [PlanStep].")
raise TypeError("plan hast to be of type [PlanStep].")
self.__plan = plan
[docs] def get_built_in_planners(self):
return built_in_planners
[docs] def add_generated_file(self, file_name):
if file_name and isinstance(file_name, str):
self.__generated_files.append(file_name)
elif file_name:
self.__generated_files.extend(file_name)
[docs] def get_generated_files(self):
return self.__generated_files
[docs] def get_planner_script_path(self):
return self.__planner_script_path
[docs] def set_planner_script_path(self, psp):
self.__planner_script_path = psp
[docs] def get_pddl_action_map(self):
return self.__pddl_action_map
[docs] def set_pddl_action_map(self, action_map):
if action_map and isinstance(action_map.values()[0], PddlActionRepresentation):
self.__pddl_action_map = action_map
else:
logger.error("pddl action map has to be of type Dict{str:PddlActionRepresentation}")
raise TypeError("pddl action map has to be of type Dict{str:PddlActionRepresentation}")
[docs] def get_available_types(self):
return self.__available_types
[docs] def set_available_types(self, availabe_types):
assert isinstance(availabe_types, TypeTree)
self.__available_types = availabe_types
[docs] def get_available_predicates(self):
return self.__available_predicates
[docs] def set_available_predicates(self, available_predicates):
self.__available_predicates = available_predicates
[docs] def get_target_state(self):
return self.__target_state
[docs] def set_target_state(self, target_state):
self.__target_state = target_state
[docs] def generate_into_state(self):
return self.__generate_into_state
[docs] def set_generate_into_state(self, plan_into_state):
self.__generate_into_state = plan_into_state
[docs] def save_datastore_parts_in_file(self, file_path):
""" save_datastore_parts_in_file
save_datastore_parts_in_file saves all plugin inputs, which are present in the datastore in a file.
:param file_path: The path of the configuration file
:return: void
"""
data_to_save = {
'state_pools': self.__state_pools,
'type_db_path': self.__type_db_path,
'planner': self.__planner,
'planner_script_path': self.__planner_script_path,
'planner_argv': self.__planner_argv,
'facts_path': self.__facts_path,
'plan_into_state': self.__generate_into_state,
'sm_name': self.__sm_name,
'sm_save_dir': self.__sm_save_dir,
'keep_related_files': self.__keep_related_files,
'file_save_dir': self.__file_save_dir,
'runtime_data_path': self.__runtime_data_path,
'runtime_as_ref': self.__use_runtime_data_path_as_reference
}
logger.debug('Writing Configuration to path: ' + file_path)
conf_file = open(file_path, "w")
conf_file.write(json.dumps(data_to_save, indent=2, sort_keys=True, separators=(',', ': ')))
conf_file.flush()
conf_file.close()
logger.info('Saved Configuration successfully!')