Source code for rafcontpp.planner.fast_forward_integration

# Copyright (C) 2019 DLR
#
# All rights reserved. This program and the accompanying materials are made
# available under the terms of the 3-Clause BSD License which accompanies this
# distribution, and is available at
# https://opensource.org/licenses/BSD-3-Clause
#
# Contributors:
# Christoph Suerig <christoph.suerig@dlr.de>

# Don't connect with the Copyright comment above!
# Version 31.05.2019
import os
import subprocess
import time

from rafcontpp.model.plan_step import PlanStep
from rafcontpp.model.planner_interface import PlannerInterface
from rafcontpp.model.planning_report import PlanningReport


[docs]class FfIntegration(PlannerInterface): """ This is the integration script for the Fast-Forward Planning System version 2.3 by Hoffmann (https://fai.cs.uni-saarland.de/hoffmann/ff.html) """
[docs] def plan_scenario(self, domain_path, facts_path, planner_argv, storage_path): # get own directory for creating files in and so on... old_cwd = os.getcwd() new_cwd = os.path.join(old_cwd, 'ff_planning_tmp {0:.8f}'.format(time.time())) os.mkdir(new_cwd) os.chdir(new_cwd) plan_path = os.path.abspath(os.path.join(storage_path, "ff_plan")) command = 'ff -o ' + domain_path + ' -f ' + facts_path + ' ' for arg in planner_argv: command += arg + ' ' command = command.rstrip() plan = [] # run Fast Forward ff_process = subprocess.Popen([command], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) (stdout, stderr) = ff_process.communicate() ff_exit = ff_process.returncode # read plan, if possible files_to_delete = [] if ff_exit == 0: parsed_console = self.__parse_console_output(stdout) plan = parsed_console[1] plan_file = open(plan_path, "w") for line in parsed_console[0]: plan_file.write(line + '\r\n') plan_file.flush() plan_file.close() files_to_delete = ['ff_plan'] else: stderr = stdout # reset to old cwd os.chdir(old_cwd) os.rmdir(new_cwd) return PlanningReport(ff_exit == 0, plan, files_to_delete, str(ff_exit) + ': ' + str(stderr))
[docs] def is_available(self): """ :return: Boolean: True, if the planner is available in the system, false otherwhise. """ devnull = open(os.devnull, "wb") process = subprocess.Popen('ff', stdout=devnull, stderr=devnull, shell=True) process.wait() status = process.returncode devnull.close() return status != 127 # 127 is the code for command not found.
def __parse_console_output(self, to_parse): """ Receives the console output, and parses the plan out of it. :param to_parse: The console output. :return: [PlanStep]: A parsed plan. """ console_output = to_parse.splitlines() raw_plan = [] parsed_plan = [] if 'ff: found legal plan as follows' in console_output: start_index = console_output.index('ff: found legal plan as follows') + 2 while start_index < len(console_output): c_line = console_output[start_index] if ':' not in c_line: break action_index = c_line.index(':') + 2 action = c_line[action_index:] raw_plan.append(action) parts = action.split(" ") parsed_plan.append(PlanStep(parts[0], parts[1:])) start_index += 1 return (raw_plan, parsed_plan)