From e1643d103256237e518703a286b45fa9e5ca5280 Mon Sep 17 00:00:00 2001 From: "Hsieh, Chiao" <chsieh16@illinois.edu> Date: Wed, 26 Oct 2022 19:22:37 -0500 Subject: [PATCH] Move DReal teachers to standalone files --- dreal_teacher_base.py | 110 +++++++++++++++++++++++++++++++++++ gem_stanley_dreal_teacher.py | 53 +++++++++++++++++ gem_stanley_teacher.py | 55 +----------------- teacher_base.py | 108 +--------------------------------- 4 files changed, 169 insertions(+), 157 deletions(-) create mode 100644 dreal_teacher_base.py create mode 100644 gem_stanley_dreal_teacher.py diff --git a/dreal_teacher_base.py b/dreal_teacher_base.py new file mode 100644 index 0000000..f5d9ca0 --- /dev/null +++ b/dreal_teacher_base.py @@ -0,0 +1,110 @@ +import abc +import itertools +from typing import Dict, List, Sequence, Tuple + +import dreal +import numpy as np +import z3 + +from teacher_base import TeacherBase + + +class DRealTeacherBase(TeacherBase): + @staticmethod + def affine_trans_exprs(state, coeff: np.ndarray, intercept=None): + if intercept is None: + intercept = np.zeros(coeff.shape[0]) + assert (len(intercept), len(state)) == coeff.shape + return [ + b_i + sum([col_j*var_j for col_j, var_j in zip(row_i, state)]) + for row_i, b_i in zip(coeff, intercept) + ] + + def __init__(self, name: str, + state_dim: int, perc_dim: int, ctrl_dim: int, norm_ord, + delta: float = 0.001) -> None: + self._norm_ord = norm_ord + self._delta = delta + + # Old state variables + self._old_state = [dreal.Variable(f"x[{i}]", dreal.Variable.Real) for i in range(state_dim)] + # New state variables + self._new_state = [dreal.Variable(f"x'[{i}]", dreal.Variable.Real) for i in range(state_dim)] + # Perception variables + self._percept = [dreal.Variable(f"z[{i}]", dreal.Variable.Real) for i in range(perc_dim)] + # Control variables + self._control = [dreal.Variable(f"u[{i}]", dreal.Variable.Real) for i in range(ctrl_dim)] + + self._var_bounds = { + var: (-np.inf, np.inf) + for var in itertools.chain(self._old_state, self._percept, self._control) + } # type: Dict[dreal.Variable, Tuple[float, float]] + self._not_inv_cons = [] # type: List + self._models = [] # type: List + + self._add_system() + self._add_unsafe() + + @abc.abstractmethod + def _add_system(self) -> None: + raise NotImplementedError + + @abc.abstractmethod + def _add_unsafe(self) -> None: + raise NotImplementedError + + @property + def state_dim(self) -> int: + return len(self._old_state) + + @property + def perc_dim(self) -> int: + return len(self._percept) + + @property + def ctrl_dim(self) -> int: + return len(self._control) + + def _gen_cand_pred(self, candidate): + raise NotImplementedError(f"TODO convert {candidate} to dReal formula") + + def _set_var_bound(self, smt_vars: List[dreal.Variable], lb: Sequence[float], ub: Sequence[float]) -> None: + assert len(lb) == len(smt_vars) + assert len(ub) == len(smt_vars) + for var_i, lb_i, ub_i in zip(smt_vars, lb, ub): + self._var_bounds[var_i] = (lb_i, ub_i) + + def set_old_state_bound(self, lb: Sequence[float], ub: Sequence[float]) -> None: + self._set_var_bound(self._old_state, lb, ub) + + def check(self, candidate) -> z3.CheckSatResult: + bound_preds = [ + dreal.And(lb_i <= var_i, var_i <= ub_i) + for var_i, (lb_i, ub_i) in self._var_bounds.items() + ] + cand_pred = self._gen_cand_pred(candidate) + query = dreal.And(*bound_preds, cand_pred, *self._not_inv_cons) + res = dreal.CheckSatisfiability(query, self._delta) + + self._models.clear() + if res: + self._models.append( + tuple(res[x_i] for x_i in self._old_state) + tuple(res[z_i] for z_i in self._percept) + ) + return z3.sat + else: + return z3.unsat + + def dump_system_encoding(self, basename: str = "") -> None: + print([ + dreal.And(lb_i <= var_i, var_i <= ub_i) + for var_i, (lb_i, ub_i) in self._var_bounds.items() + ]) + print(self._not_inv_cons) + + def model(self) -> Sequence[Tuple]: + return self._models + + def reason_unknown(self) -> str: + raise NotImplementedError + diff --git a/gem_stanley_dreal_teacher.py b/gem_stanley_dreal_teacher.py new file mode 100644 index 0000000..42fa3c3 --- /dev/null +++ b/gem_stanley_dreal_teacher.py @@ -0,0 +1,53 @@ +import dreal +import numpy as np + +from dreal_teacher_base import DRealTeacherBase +from gem_stanley_teacher import CTE_LIM, ANG_LIM, STEERING_LIM, K_P, FORWARD_VEL, CYCLE_TIME, WHEEL_BASE + + +class GEMStanleyDRealTeacher(DRealTeacherBase): + + def __init__(self, name="gem_stanley", norm_ord=2, delta=0.001) -> None: + super().__init__(name=name, + state_dim=3, perc_dim=2, ctrl_dim=1, norm_ord=norm_ord, delta=delta) + + def _add_system(self) -> None: + self._set_var_bound(self._old_state, lb=(-np.inf, -CTE_LIM, -ANG_LIM), ub=(np.inf, CTE_LIM, ANG_LIM)) + self._set_var_bound(self._new_state, lb=(-np.inf, -CTE_LIM, -ANG_LIM), ub=(np.inf, CTE_LIM, ANG_LIM)) + self._set_var_bound(self._percept, lb=(-CTE_LIM, -ANG_LIM), ub=(CTE_LIM, ANG_LIM)) + self._set_var_bound(self._control, lb=(-STEERING_LIM,), ub=(STEERING_LIM,)) + + # Variable Aliases + old_x, old_y, old_yaw = self._old_state + new_x, new_y, new_yaw = self._new_state + cte, phi = self._percept + steering, = self._control + + self._not_inv_cons.extend([ + # Control + steering == dreal.Min(dreal.Max(phi + dreal.atan(cte*(K_P / FORWARD_VEL)), -STEERING_LIM), STEERING_LIM), + # Dynamics + new_x == old_x + FORWARD_VEL*CYCLE_TIME*dreal.cos(old_yaw + steering), + new_y == old_y + FORWARD_VEL*CYCLE_TIME*dreal.sin(old_yaw + steering), + new_yaw == old_yaw + dreal.sin(steering)*FORWARD_VEL*CYCLE_TIME/WHEEL_BASE, + ]) + + def _add_unsafe(self) -> None: + # Variable Aliases + old_x, old_y, old_yaw = self._old_state + new_x, new_y, new_yaw = self._new_state + + if self._norm_ord == 1: + old_err = abs(old_y) + abs(old_yaw) + new_err = abs(new_y) + abs(new_yaw) + elif self._norm_ord == 2: + old_err = dreal.sqrt(old_y**2 + old_yaw**2) + new_err = dreal.sqrt(new_y**2 + new_yaw**2) + else: + assert self._norm_ord == "inf" + old_err = dreal.Max(abs(old_y), abs(old_yaw)) + new_err = dreal.Max(abs(new_y), abs(new_yaw)) + + self._not_inv_cons.extend([ + old_err <= new_err + ]) diff --git a/gem_stanley_teacher.py b/gem_stanley_teacher.py index 41de8b2..8127b74 100644 --- a/gem_stanley_teacher.py +++ b/gem_stanley_teacher.py @@ -1,12 +1,11 @@ - -from typing import Optional, Tuple -import dreal +from typing import Tuple import gurobipy as gp import numpy as np import sympy import z3 -from teacher_base import DRealTeacherBase, SymPyTeacherBase +from gem_stanley_dreal_teacher import GEMStanleyDRealTeacher +from teacher_base import SymPyTeacherBase from dtree_teacher_base import DTreeGurobiTeacherBase WHEEL_BASE = 1.75 # m @@ -32,54 +31,6 @@ NEW_ATAN_K_CTE_V_LIM = np.arctan(NEW_K_CTE_V_LIM) NEW_RAW_ANG_ERR_LIM = ANG_LIM + FORWARD_VEL * CYCLE_TIME -class GEMStanleyDRealTeacher(DRealTeacherBase): - - def __init__(self, name="gem_stanley", norm_ord=2, delta=0.001) -> None: - super().__init__(name=name, - state_dim=3, perc_dim=2, ctrl_dim=1, norm_ord=norm_ord, delta=delta) - - def _add_system(self) -> None: - self._set_var_bound(self._old_state, lb=(-np.inf, -CTE_LIM, -ANG_LIM), ub=(np.inf, CTE_LIM, ANG_LIM)) - self._set_var_bound(self._new_state, lb=(-np.inf, -CTE_LIM, -ANG_LIM), ub=(np.inf, CTE_LIM, ANG_LIM)) - self._set_var_bound(self._percept, lb=(-CTE_LIM, -ANG_LIM), ub=(CTE_LIM, ANG_LIM)) - self._set_var_bound(self._control, lb=(-STEERING_LIM,), ub=(STEERING_LIM,)) - - # Variable Aliases - old_x, old_y, old_yaw = self._old_state - new_x, new_y, new_yaw = self._new_state - cte, phi = self._percept - steering, = self._control - - self._not_inv_cons.extend([ - # Control - steering == dreal.Min(dreal.Max(phi + dreal.atan(cte*(K_P / FORWARD_VEL)), -STEERING_LIM), STEERING_LIM), - # Dynamics - new_x == old_x + FORWARD_VEL*CYCLE_TIME*dreal.cos(old_yaw + steering), - new_y == old_y + FORWARD_VEL*CYCLE_TIME*dreal.sin(old_yaw + steering), - new_yaw == old_yaw + dreal.sin(steering)*FORWARD_VEL*CYCLE_TIME/WHEEL_BASE, - ]) - - def _add_unsafe(self) -> None: - # Variable Aliases - old_x, old_y, old_yaw = self._old_state - new_x, new_y, new_yaw = self._new_state - - if self._norm_ord == 1: - old_err = abs(old_y) + abs(old_yaw) - new_err = abs(new_y) + abs(new_yaw) - elif self._norm_ord == 2: - old_err = dreal.sqrt(old_y**2 + old_yaw**2) - new_err = dreal.sqrt(new_y**2 + new_yaw**2) - else: - assert self._norm_ord == "inf" - old_err = dreal.Max(abs(old_y), abs(old_yaw)) - new_err = dreal.Max(abs(new_y), abs(new_yaw)) - - self._not_inv_cons.extend([ - old_err <= new_err - ]) - - class DTreeGEMStanleyGurobiTeacher(DTreeGurobiTeacherBase): # Ideal perception as a linear transform from state to ground truth percept PERC_GT = np.array([[0., -1., 0.], diff --git a/teacher_base.py b/teacher_base.py index 19cd943..e917fff 100644 --- a/teacher_base.py +++ b/teacher_base.py @@ -1,13 +1,11 @@ import abc import itertools -#from typing import Dict, Hashable, List, Literal, Optional, Sequence, Tuple -from typing import Dict, Hashable, List, Optional, Sequence, Tuple +from typing import Dict, List, Optional, Sequence, Tuple import gurobipy as gp import numpy as np import sympy import z3 -import dreal def z3_float64_const_to_real(v: float) -> z3.RatNumRef: @@ -56,106 +54,6 @@ class TeacherBase(abc.ABC): raise NotImplementedError -class DRealTeacherBase(TeacherBase): - @staticmethod - def affine_trans_exprs(state, coeff: np.ndarray, intercept=None): - if intercept is None: - intercept = np.zeros(coeff.shape[0]) - assert (len(intercept), len(state)) == coeff.shape - return [ - b_i + sum([col_j*var_j for col_j, var_j in zip(row_i, state)]) - for row_i, b_i in zip(coeff, intercept) - ] - - def __init__(self, name: str, - state_dim: int, perc_dim: int, ctrl_dim: int, norm_ord, - delta: float = 0.001) -> None: - self._norm_ord = norm_ord - self._delta = delta - - # Old state variables - self._old_state = [dreal.Variable(f"x[{i}]", dreal.Variable.Real) for i in range(state_dim)] - # New state variables - self._new_state = [dreal.Variable(f"x'[{i}]", dreal.Variable.Real) for i in range(state_dim)] - # Perception variables - self._percept = [dreal.Variable(f"z[{i}]", dreal.Variable.Real) for i in range(perc_dim)] - # Control variables - self._control = [dreal.Variable(f"u[{i}]", dreal.Variable.Real) for i in range(ctrl_dim)] - - self._var_bounds = { - var: (-np.inf, np.inf) - for var in itertools.chain(self._old_state, self._percept, self._control) - } # type: Dict[dreal.Variable, Tuple[float, float]] - self._not_inv_cons = [] # type: List - self._models = [] # type: List - - self._add_system() - self._add_unsafe() - - @abc.abstractmethod - def _add_system(self) -> None: - raise NotImplementedError - - @abc.abstractmethod - def _add_unsafe(self) -> None: - raise NotImplementedError - - @property - def state_dim(self) -> int: - return len(self._old_state) - - @property - def perc_dim(self) -> int: - return len(self._percept) - - @property - def ctrl_dim(self) -> int: - return len(self._control) - - def _gen_cand_pred(self, candidate): - raise NotImplementedError(f"TODO convert {candidate} to dReal formula") - - def _set_var_bound(self, smt_vars: List[dreal.Variable], lb: Sequence[float], ub: Sequence[float]) -> None: - assert len(lb) == len(smt_vars) - assert len(ub) == len(smt_vars) - for var_i, lb_i, ub_i in zip(smt_vars, lb, ub): - self._var_bounds[var_i] = (lb_i, ub_i) - - def set_old_state_bound(self, lb: Sequence[float], ub: Sequence[float]) -> None: - self._set_var_bound(self._old_state, lb, ub) - - def check(self, candidate) -> z3.CheckSatResult: - bound_preds = [ - dreal.And(lb_i <= var_i, var_i <= ub_i) - for var_i, (lb_i, ub_i) in self._var_bounds.items() - ] - cand_pred = self._gen_cand_pred(candidate) - query = dreal.And(*bound_preds, cand_pred, *self._not_inv_cons) - res = dreal.CheckSatisfiability(query, self._delta) - - self._models.clear() - if res: - self._models.append( - tuple(res[x_i] for x_i in self._old_state) + tuple(res[z_i] for z_i in self._percept) - ) - return z3.sat - else: - return z3.unsat - - def dump_system_encoding(self, basename: str = "") -> None: - print([ - dreal.And(lb_i <= var_i, var_i <= ub_i) - for var_i, (lb_i, ub_i) in self._var_bounds.items() - ]) - print(self._not_inv_cons) - - def model(self) -> Sequence[Tuple]: - return self._models - - def reason_unknown(self) -> str: - raise NotImplementedError - - class GurobiTeacherBase(TeacherBase): FREEVAR = {"vtype": gp.GRB.CONTINUOUS, "lb": -np.inf, "ub": np.inf} NNEGVAR = {"vtype": gp.GRB.CONTINUOUS, "lb": 0.0, "ub": np.inf} @@ -262,7 +160,7 @@ class SymPyTeacherBase(TeacherBase): self._var_bounds = { var: (-sympy.oo, sympy.oo) for var in itertools.chain(self._old_state, self._new_state, self._percept, self._control) - } # type: Dict[dreal.Variable, Tuple[float, float]] + } # type: Dict[sympy.Symbol, Tuple[float, float]] self._not_inv_cons = [] # type: List @@ -289,7 +187,7 @@ class SymPyTeacherBase(TeacherBase): def ctrl_dim(self) -> int: return len(self._control) - def _set_var_bound(self, sym_vars: List[dreal.Variable], lb: Sequence[float], ub: Sequence[float]) -> None: + def _set_var_bound(self, sym_vars: List[sympy.Symbol], lb: Sequence[float], ub: Sequence[float]) -> None: assert len(lb) == len(sym_vars) assert len(ub) == len(sym_vars) for var_i, lb_i, ub_i in zip(sym_vars, lb, ub): -- GitLab