Source code for rafcontpp.logic.domain_generator

# Copyright (C) 2018-2019 DLR
#
# All rights reserved. This program and the accompanying materials are made
# available under the terms of the 3-Clause BSD License which accompanies this
# distribution, and is available at
# https://opensource.org/licenses/BSD-3-Clause
#
# Contributors:
# Christoph Suerig <christoph.suerig@dlr.de>

# Don't connect with the Copyright comment above!
# Version 09.09.2019

import os
from datetime import datetime

from rafcon.utils import log

from rafcontpp.logic.predicate_merger import PredicateMerger
from rafcontpp.logic.type_merger import TypeMerger
from rafcontpp.model.datastore import PLUGIN_VERSION
logger = log.get_logger(__name__)


[docs]class DomainGenerator: """ The DomainGenerator uses the data provided by the datastore to generate a domain.pddl file for the planner. """ def __init__(self, datastore): """ :param datastore: A Datastore containing: pddl facts representation, pddl action map, file save directory. """ if datastore is None: logger.error("Datastore in DomainGenerator can not be None!") raise ValueError("Datastore in DomainGenerator can not be None!") self.__datastore = datastore
[docs] def generate_domain(self): """ generateDomain generates a domain and returns its path. :return: String: The path of the generated domain file. """ facts = self.__datastore.get_pddl_facts_representation() domain_name = facts.domain_name problem_name = facts.problem_name pddl_actions = self.__datastore.get_pddl_action_map().values() domain_path = os.path.abspath(os.path.join(self.__datastore.get_file_save_dir(), domain_name + "_domain.pddl")) type_merger = TypeMerger(self.__datastore) type_tree = type_merger.merge_types() self.__datastore.set_available_types(type_tree) preds = self.__merge_predicates(pddl_actions) self.__datastore.set_available_predicates(preds[1]) merged_preds = preds[0] merged_requirs = self.__merge_requirements(pddl_actions) logger.debug('writing domain file to: ' + str(domain_path)) domain_file = open(domain_path, "w") domain_file.write("{}\r\n".format(self.__get_comment_section())) domain_file.write(self.__get_head(domain_name) + "\r\n") domain_file.write(self.__get_requirements(merged_requirs) + "\r\n") domain_file.write(self.__get_types(type_tree) + "\r\n") domain_file.write(self.__get_predicates(merged_preds) + "\r\n") domain_file.write(self.__get_actions(pddl_actions) + "\r\n") domain_file.write(")") domain_file.flush() domain_file.close() self.__datastore.set_domain_path(domain_path) self.__datastore.add_generated_file(domain_name + "_domain.pddl") return domain_path
def __get_head(self, domain_name): """ Takes a domain name, and returns the head of a pddl domain, e.g. "(define (domain test_domain)". :param domain_name: The name of the domain. :return: String: The head of a pddl domain. """ return "(DEFINE (DOMAIN {})".format(domain_name) def __merge_requirements(self, pddl_actions): """ mergeRequirements takes the requirements of the used pddl-actions, and removes dublicates. :param pddl_actions: A list with PddlActionRepresentations :return: [String]: All pddl-requirements without dublicates. """ requirements = [] for action in pddl_actions: for requirement in action.requirements: if requirement not in requirements: requirements.append(requirement) return requirements def __get_requirements(self, requirements): """ Takes the requirements list and returns it as string in a pddl-format. :param requirements: A list with all requirments :return: String: Requirments in a pddl-conform format. """ requirements_section = "(:REQUIREMENTS " for requirement in requirements: requirements_section = requirements_section + requirement + " " return requirements_section + ")" def __get_types(self, merged_types): """ Takes a type tree and returns a string representation usable in a pddl-domain. :param merged_types: A type tree containing all (relevant) types. :return: String: A type section usable in pddl. """ types_in_pddl = "" if merged_types: types = "(:TYPES \r\n" types_in_pddl = types + merged_types.get_as_pddl_string() + ")" return types_in_pddl def __merge_predicates(self, pddl_actions): """ mergePredicates takes all predicates mentioned in the PddlActionRepresentations, and removes dublicates. :param pddl_actions: A list of PddlActionRepresentations. :return: ([String],[(String,[(String,int)])]): A tuple with a list of predicates and a list of predicates as tuple without dublicates. """ # pre merge predicates predicates = [] for action in pddl_actions: for predicate in action.predicates: if predicate not in predicates: predicates.append(predicate) merger = PredicateMerger(self.__datastore) predicates = merger.merge_predicates(predicates) return predicates def __get_predicates(self, predicates): """ Takes a list of predicates as string, and returns a pddl confrom predicates section. :param predicate_section: A list of predicates as strings. :return: String: A pddl conform predicates section. """ predicate_section = "(:PREDICATES\r\n" for predicate in predicates: predicate_section = predicate_section + predicate + "\r\n" return predicate_section + ")" def __get_actions(self, pddl_actions): """ Takes a list of PddlActionRepresentations, and returns a pddl conform action section. :param pddl_actions: A list of PddlActionRepresentations. :return: String: A pddl conform action section. """ actions = "" for action in pddl_actions: actions += action.action + "\r\n\r\n" return actions def __get_comment_section(self): """ This method generated a header comment for the domain, containing the version, the time, date and the 'author'. :return: String: A domain header comment, already containing the pddl comment indicator character ';'. """ comment_section = ";; This Domain was automatically generated by RAFCON Task Planner Plugin (RTPP)\r\n" comment_section = "{};; Version: RTPP {}\r\n".format(comment_section, PLUGIN_VERSION) comment_section = "{};; Date: {}\r\n".format(comment_section, datetime.now().strftime("%d.%m.%Y")) comment_section = "{};; Time: {}h\r\n".format(comment_section, datetime.now().strftime("%H:%M:%S")) # build borders. comment_section = ";;;;\r\n{};;;;\r\n".format(comment_section) return comment_section