Skip to content
Snippets Groups Projects
Commit e1643d10 authored by chsieh16's avatar chsieh16
Browse files

Move DReal teachers to standalone files

parent 1472d360
No related branches found
No related tags found
No related merge requests found
import abc
import itertools
from typing import Dict, List, Sequence, Tuple
import dreal
import numpy as np
import z3
from teacher_base import TeacherBase
class DRealTeacherBase(TeacherBase):
@staticmethod
def affine_trans_exprs(state, coeff: np.ndarray, intercept=None):
if intercept is None:
intercept = np.zeros(coeff.shape[0])
assert (len(intercept), len(state)) == coeff.shape
return [
b_i + sum([col_j*var_j for col_j, var_j in zip(row_i, state)])
for row_i, b_i in zip(coeff, intercept)
]
def __init__(self, name: str,
state_dim: int, perc_dim: int, ctrl_dim: int, norm_ord,
delta: float = 0.001) -> None:
self._norm_ord = norm_ord
self._delta = delta
# Old state variables
self._old_state = [dreal.Variable(f"x[{i}]", dreal.Variable.Real) for i in range(state_dim)]
# New state variables
self._new_state = [dreal.Variable(f"x'[{i}]", dreal.Variable.Real) for i in range(state_dim)]
# Perception variables
self._percept = [dreal.Variable(f"z[{i}]", dreal.Variable.Real) for i in range(perc_dim)]
# Control variables
self._control = [dreal.Variable(f"u[{i}]", dreal.Variable.Real) for i in range(ctrl_dim)]
self._var_bounds = {
var: (-np.inf, np.inf)
for var in itertools.chain(self._old_state, self._percept, self._control)
} # type: Dict[dreal.Variable, Tuple[float, float]]
self._not_inv_cons = [] # type: List
self._models = [] # type: List
self._add_system()
self._add_unsafe()
@abc.abstractmethod
def _add_system(self) -> None:
raise NotImplementedError
@abc.abstractmethod
def _add_unsafe(self) -> None:
raise NotImplementedError
@property
def state_dim(self) -> int:
return len(self._old_state)
@property
def perc_dim(self) -> int:
return len(self._percept)
@property
def ctrl_dim(self) -> int:
return len(self._control)
def _gen_cand_pred(self, candidate):
raise NotImplementedError(f"TODO convert {candidate} to dReal formula")
def _set_var_bound(self, smt_vars: List[dreal.Variable], lb: Sequence[float], ub: Sequence[float]) -> None:
assert len(lb) == len(smt_vars)
assert len(ub) == len(smt_vars)
for var_i, lb_i, ub_i in zip(smt_vars, lb, ub):
self._var_bounds[var_i] = (lb_i, ub_i)
def set_old_state_bound(self, lb: Sequence[float], ub: Sequence[float]) -> None:
self._set_var_bound(self._old_state, lb, ub)
def check(self, candidate) -> z3.CheckSatResult:
bound_preds = [
dreal.And(lb_i <= var_i, var_i <= ub_i)
for var_i, (lb_i, ub_i) in self._var_bounds.items()
]
cand_pred = self._gen_cand_pred(candidate)
query = dreal.And(*bound_preds, cand_pred, *self._not_inv_cons)
res = dreal.CheckSatisfiability(query, self._delta)
self._models.clear()
if res:
self._models.append(
tuple(res[x_i] for x_i in self._old_state) + tuple(res[z_i] for z_i in self._percept)
)
return z3.sat
else:
return z3.unsat
def dump_system_encoding(self, basename: str = "") -> None:
print([
dreal.And(lb_i <= var_i, var_i <= ub_i)
for var_i, (lb_i, ub_i) in self._var_bounds.items()
])
print(self._not_inv_cons)
def model(self) -> Sequence[Tuple]:
return self._models
def reason_unknown(self) -> str:
raise NotImplementedError
import dreal
import numpy as np
from dreal_teacher_base import DRealTeacherBase
from gem_stanley_teacher import CTE_LIM, ANG_LIM, STEERING_LIM, K_P, FORWARD_VEL, CYCLE_TIME, WHEEL_BASE
class GEMStanleyDRealTeacher(DRealTeacherBase):
def __init__(self, name="gem_stanley", norm_ord=2, delta=0.001) -> None:
super().__init__(name=name,
state_dim=3, perc_dim=2, ctrl_dim=1, norm_ord=norm_ord, delta=delta)
def _add_system(self) -> None:
self._set_var_bound(self._old_state, lb=(-np.inf, -CTE_LIM, -ANG_LIM), ub=(np.inf, CTE_LIM, ANG_LIM))
self._set_var_bound(self._new_state, lb=(-np.inf, -CTE_LIM, -ANG_LIM), ub=(np.inf, CTE_LIM, ANG_LIM))
self._set_var_bound(self._percept, lb=(-CTE_LIM, -ANG_LIM), ub=(CTE_LIM, ANG_LIM))
self._set_var_bound(self._control, lb=(-STEERING_LIM,), ub=(STEERING_LIM,))
# Variable Aliases
old_x, old_y, old_yaw = self._old_state
new_x, new_y, new_yaw = self._new_state
cte, phi = self._percept
steering, = self._control
self._not_inv_cons.extend([
# Control
steering == dreal.Min(dreal.Max(phi + dreal.atan(cte*(K_P / FORWARD_VEL)), -STEERING_LIM), STEERING_LIM),
# Dynamics
new_x == old_x + FORWARD_VEL*CYCLE_TIME*dreal.cos(old_yaw + steering),
new_y == old_y + FORWARD_VEL*CYCLE_TIME*dreal.sin(old_yaw + steering),
new_yaw == old_yaw + dreal.sin(steering)*FORWARD_VEL*CYCLE_TIME/WHEEL_BASE,
])
def _add_unsafe(self) -> None:
# Variable Aliases
old_x, old_y, old_yaw = self._old_state
new_x, new_y, new_yaw = self._new_state
if self._norm_ord == 1:
old_err = abs(old_y) + abs(old_yaw)
new_err = abs(new_y) + abs(new_yaw)
elif self._norm_ord == 2:
old_err = dreal.sqrt(old_y**2 + old_yaw**2)
new_err = dreal.sqrt(new_y**2 + new_yaw**2)
else:
assert self._norm_ord == "inf"
old_err = dreal.Max(abs(old_y), abs(old_yaw))
new_err = dreal.Max(abs(new_y), abs(new_yaw))
self._not_inv_cons.extend([
old_err <= new_err
])
from typing import Optional, Tuple
import dreal
from typing import Tuple
import gurobipy as gp
import numpy as np
import sympy
import z3
from teacher_base import DRealTeacherBase, SymPyTeacherBase
from gem_stanley_dreal_teacher import GEMStanleyDRealTeacher
from teacher_base import SymPyTeacherBase
from dtree_teacher_base import DTreeGurobiTeacherBase
WHEEL_BASE = 1.75 # m
......@@ -32,54 +31,6 @@ NEW_ATAN_K_CTE_V_LIM = np.arctan(NEW_K_CTE_V_LIM)
NEW_RAW_ANG_ERR_LIM = ANG_LIM + FORWARD_VEL * CYCLE_TIME
class GEMStanleyDRealTeacher(DRealTeacherBase):
def __init__(self, name="gem_stanley", norm_ord=2, delta=0.001) -> None:
super().__init__(name=name,
state_dim=3, perc_dim=2, ctrl_dim=1, norm_ord=norm_ord, delta=delta)
def _add_system(self) -> None:
self._set_var_bound(self._old_state, lb=(-np.inf, -CTE_LIM, -ANG_LIM), ub=(np.inf, CTE_LIM, ANG_LIM))
self._set_var_bound(self._new_state, lb=(-np.inf, -CTE_LIM, -ANG_LIM), ub=(np.inf, CTE_LIM, ANG_LIM))
self._set_var_bound(self._percept, lb=(-CTE_LIM, -ANG_LIM), ub=(CTE_LIM, ANG_LIM))
self._set_var_bound(self._control, lb=(-STEERING_LIM,), ub=(STEERING_LIM,))
# Variable Aliases
old_x, old_y, old_yaw = self._old_state
new_x, new_y, new_yaw = self._new_state
cte, phi = self._percept
steering, = self._control
self._not_inv_cons.extend([
# Control
steering == dreal.Min(dreal.Max(phi + dreal.atan(cte*(K_P / FORWARD_VEL)), -STEERING_LIM), STEERING_LIM),
# Dynamics
new_x == old_x + FORWARD_VEL*CYCLE_TIME*dreal.cos(old_yaw + steering),
new_y == old_y + FORWARD_VEL*CYCLE_TIME*dreal.sin(old_yaw + steering),
new_yaw == old_yaw + dreal.sin(steering)*FORWARD_VEL*CYCLE_TIME/WHEEL_BASE,
])
def _add_unsafe(self) -> None:
# Variable Aliases
old_x, old_y, old_yaw = self._old_state
new_x, new_y, new_yaw = self._new_state
if self._norm_ord == 1:
old_err = abs(old_y) + abs(old_yaw)
new_err = abs(new_y) + abs(new_yaw)
elif self._norm_ord == 2:
old_err = dreal.sqrt(old_y**2 + old_yaw**2)
new_err = dreal.sqrt(new_y**2 + new_yaw**2)
else:
assert self._norm_ord == "inf"
old_err = dreal.Max(abs(old_y), abs(old_yaw))
new_err = dreal.Max(abs(new_y), abs(new_yaw))
self._not_inv_cons.extend([
old_err <= new_err
])
class DTreeGEMStanleyGurobiTeacher(DTreeGurobiTeacherBase):
# Ideal perception as a linear transform from state to ground truth percept
PERC_GT = np.array([[0., -1., 0.],
......
import abc
import itertools
#from typing import Dict, Hashable, List, Literal, Optional, Sequence, Tuple
from typing import Dict, Hashable, List, Optional, Sequence, Tuple
from typing import Dict, List, Optional, Sequence, Tuple
import gurobipy as gp
import numpy as np
import sympy
import z3
import dreal
def z3_float64_const_to_real(v: float) -> z3.RatNumRef:
......@@ -56,106 +54,6 @@ class TeacherBase(abc.ABC):
raise NotImplementedError
class DRealTeacherBase(TeacherBase):
@staticmethod
def affine_trans_exprs(state, coeff: np.ndarray, intercept=None):
if intercept is None:
intercept = np.zeros(coeff.shape[0])
assert (len(intercept), len(state)) == coeff.shape
return [
b_i + sum([col_j*var_j for col_j, var_j in zip(row_i, state)])
for row_i, b_i in zip(coeff, intercept)
]
def __init__(self, name: str,
state_dim: int, perc_dim: int, ctrl_dim: int, norm_ord,
delta: float = 0.001) -> None:
self._norm_ord = norm_ord
self._delta = delta
# Old state variables
self._old_state = [dreal.Variable(f"x[{i}]", dreal.Variable.Real) for i in range(state_dim)]
# New state variables
self._new_state = [dreal.Variable(f"x'[{i}]", dreal.Variable.Real) for i in range(state_dim)]
# Perception variables
self._percept = [dreal.Variable(f"z[{i}]", dreal.Variable.Real) for i in range(perc_dim)]
# Control variables
self._control = [dreal.Variable(f"u[{i}]", dreal.Variable.Real) for i in range(ctrl_dim)]
self._var_bounds = {
var: (-np.inf, np.inf)
for var in itertools.chain(self._old_state, self._percept, self._control)
} # type: Dict[dreal.Variable, Tuple[float, float]]
self._not_inv_cons = [] # type: List
self._models = [] # type: List
self._add_system()
self._add_unsafe()
@abc.abstractmethod
def _add_system(self) -> None:
raise NotImplementedError
@abc.abstractmethod
def _add_unsafe(self) -> None:
raise NotImplementedError
@property
def state_dim(self) -> int:
return len(self._old_state)
@property
def perc_dim(self) -> int:
return len(self._percept)
@property
def ctrl_dim(self) -> int:
return len(self._control)
def _gen_cand_pred(self, candidate):
raise NotImplementedError(f"TODO convert {candidate} to dReal formula")
def _set_var_bound(self, smt_vars: List[dreal.Variable], lb: Sequence[float], ub: Sequence[float]) -> None:
assert len(lb) == len(smt_vars)
assert len(ub) == len(smt_vars)
for var_i, lb_i, ub_i in zip(smt_vars, lb, ub):
self._var_bounds[var_i] = (lb_i, ub_i)
def set_old_state_bound(self, lb: Sequence[float], ub: Sequence[float]) -> None:
self._set_var_bound(self._old_state, lb, ub)
def check(self, candidate) -> z3.CheckSatResult:
bound_preds = [
dreal.And(lb_i <= var_i, var_i <= ub_i)
for var_i, (lb_i, ub_i) in self._var_bounds.items()
]
cand_pred = self._gen_cand_pred(candidate)
query = dreal.And(*bound_preds, cand_pred, *self._not_inv_cons)
res = dreal.CheckSatisfiability(query, self._delta)
self._models.clear()
if res:
self._models.append(
tuple(res[x_i] for x_i in self._old_state) + tuple(res[z_i] for z_i in self._percept)
)
return z3.sat
else:
return z3.unsat
def dump_system_encoding(self, basename: str = "") -> None:
print([
dreal.And(lb_i <= var_i, var_i <= ub_i)
for var_i, (lb_i, ub_i) in self._var_bounds.items()
])
print(self._not_inv_cons)
def model(self) -> Sequence[Tuple]:
return self._models
def reason_unknown(self) -> str:
raise NotImplementedError
class GurobiTeacherBase(TeacherBase):
FREEVAR = {"vtype": gp.GRB.CONTINUOUS, "lb": -np.inf, "ub": np.inf}
NNEGVAR = {"vtype": gp.GRB.CONTINUOUS, "lb": 0.0, "ub": np.inf}
......@@ -262,7 +160,7 @@ class SymPyTeacherBase(TeacherBase):
self._var_bounds = {
var: (-sympy.oo, sympy.oo)
for var in itertools.chain(self._old_state, self._new_state, self._percept, self._control)
} # type: Dict[dreal.Variable, Tuple[float, float]]
} # type: Dict[sympy.Symbol, Tuple[float, float]]
self._not_inv_cons = [] # type: List
......@@ -289,7 +187,7 @@ class SymPyTeacherBase(TeacherBase):
def ctrl_dim(self) -> int:
return len(self._control)
def _set_var_bound(self, sym_vars: List[dreal.Variable], lb: Sequence[float], ub: Sequence[float]) -> None:
def _set_var_bound(self, sym_vars: List[sympy.Symbol], lb: Sequence[float], ub: Sequence[float]) -> None:
assert len(lb) == len(sym_vars)
assert len(ub) == len(sym_vars)
for var_i, lb_i, ub_i in zip(sym_vars, lb, ub):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment