Source code for rafcontpp.planner.fast_downward_integration

# Copyright (C) 2018-2019 DLR
#
# All rights reserved. This program and the accompanying materials are made
# available under the terms of the 3-Clause BSD License which accompanies this
# distribution, and is available at
# https://opensource.org/licenses/BSD-3-Clause
#
# Contributors:
# Christoph Suerig <christoph.suerig@dlr.de>

# Don't connect with the Copyright comment above!
# Version 31.05.2019
import os
import shutil
import subprocess
import time

from rafcontpp.model.plan_step import PlanStep
from rafcontpp.model.planner_interface import PlannerInterface
from rafcontpp.model.planning_report import PlanningReport


[docs]class FdIntegration(PlannerInterface): """ This is the integration Script for the Fast downward Planning System by Malte Helmert Et al. (fast-downward.org) """
[docs] def plan_scenario(self, domain_path, facts_path, planner_argv, storage_path): # get own directory for creating files in and so on... old_cwd = os.getcwd() new_cwd = os.path.join(old_cwd, 'fd_planning_tmp {0:.8f}'.format(time.time())) os.mkdir(new_cwd) os.chdir(new_cwd) command = 'fast-downward ' + domain_path + ' ' + facts_path + ' ' plan_path = os.path.abspath(os.path.join(os.curdir, "sas_plan")) outsas = os.path.abspath(os.path.join(os.curdir, "output.sas")) plan = [] if len(planner_argv) == 0: command += '--search \"astar(blind())\"' else: for arg in planner_argv: command += arg + ' ' command = command.rstrip() # run Fast-downward fd_process = subprocess.Popen([command], stdout=subprocess.PIPE, shell=True) (out, err) = fd_process.communicate() fd_exit = fd_process.returncode # read plan, if possible if fd_exit == 0: plan = self.__parse_raw_plan(plan_path) copied_files = self.__copy_and_clean(plan_path, outsas, storage_path) # reset to old cwd os.chdir(old_cwd) os.rmdir(new_cwd) return PlanningReport(fd_exit < 4, plan, copied_files, str(fd_exit) + ': ' + self.__translate_fd_exit_code(fd_exit) + " used command was: " + command)
[docs] def is_available(self): """ :return: Boolean: True, if the planner is available in the system, false otherwise. """ devnull = open(os.devnull, "wb") process = subprocess.Popen('fast-downward', stdout=devnull, stderr=devnull, shell=True) process.wait() status = process.returncode devnull.close() return status != 127 # 127 is the code for command not found.
def __parse_raw_plan(self, plan_path): """ :param plan_path: The path of the plan file. :return: [PlanStep]: A parsed plan. """ parsed_plan = [] if os.path.isfile(plan_path): plan_file = open(plan_path, "r") raw_plan = plan_file.readlines() raw_plan.pop() # parsing the action out of the raw file. for action in raw_plan: # remove useless chars action = action[:-2] action = action[1:] parts = action.split(" ") parsed_plan.append(PlanStep(parts[0], parts[1:])) return parsed_plan def __copy_and_clean(self, plan_path, outsas_path, storage_path): """ Copy and clean moves all generated files to the storage_path, and writes their names into a list. :param plan_path: The path of the plan. :param outsas_path: The path of the outsas file. :param storage_path: The storage path. :return: [String]: A list with the names of the moved files. """ copied_files = [] if os.path.isfile(plan_path): shutil.move(plan_path, os.path.join(storage_path, 'sas_plan')) copied_files.append('sas_plan') if os.path.isfile(outsas_path): shutil.move(outsas_path, os.path.join(storage_path, 'output.sas')) copied_files.append('output.sas') return copied_files def __translate_fd_exit_code(self, fd_exit): """ receives an error code and returns the corresponding error message. :param fd_exit: An exit code :return: String: The corresponding error message. """ translated_exit_code = str(fd_exit) codes = { 0: "No Error!", 1: "at least one plan was found and another component ran out of memory.", 2: "at least one plan was found and another component ran out of time.", 3: "at least one plan was found, another component ran out of memory, and yet another one ran out of time. ", 10: "Translator proved task to be unsolvable.", 11: "Task is provably unsolvable with current bound.", 12: "Search ended without finding a solution. ", 20: "Translate, Memory exhausted.", 21: "Translate, Time exhausted. Not supported on Windows because we use SIGXCPU to kill the planner.", 22: "Search, Memory exhausted.", 23: "Search, Timeout occurred. Not supported on Windows because we use SIGXCPU to kill the planner. ", 24: "Search, one component ran out of memory and another one out of time. ", 30: "Critical error: something went wrong (e.g. translator bug, but also malformed PDDL input).", 31: "Usage error: wrong command line options", 32: "Something went wrong that should not have gone wrong (e.g. planner bug).", 33: "Wrong command line options or SAS+ file.", 34: "Requested unsupported feature.", 35: "Something went wrong in the driver (e.g. failed setting resource limits, ill-defined portfolio, complete plan generated after an incomplete one).", 36: "Usage error: wrong or missing command line options, including (implicitly) specifying non-existing paths (e.g. for input files or build directory).", 37: "Requested unsupported feature (e.g. limiting memory on macOS).", 127: "Couldn't start FastDownward Planner: command \"fast-downward.py\" not found!" } if fd_exit in codes: translated_exit_code = str(fd_exit) + " => " + codes.get(fd_exit, "Unknown") return translated_exit_code