Skip to content
Snippets Groups Projects
Commit 0f06ebb9 authored by chsieh16's avatar chsieh16
Browse files

Refactor to parse dtree as one ITE expression

parent 04dd969b
No related branches found
No related tags found
No related merge requests found
......@@ -165,9 +165,9 @@ class DTreeLearner(LearnerBase):
return ite_expr
def _subs_basevar_w_states(self, ite_expr) -> sympy.logic.boolalg.Boolean:
state_vars = sympy.symbols([f"x[{i}]" for i in range(self.state_dim)])
state_vars = sympy.symbols([f"x_{i}" for i in range(self.state_dim)])
state_vec = sympy.Matrix(state_vars)
perc_vars = sympy.symbols([f"z[{i}]" for i in range(self.perc_dim)])
perc_vars = sympy.symbols([f"z_{i}" for i in range(self.perc_dim)])
perc_vec = sympy.Matrix(perc_vars)
subs_basevar = []
for basevar, (trans, j) in self._basevar_trans_map.items():
......
import itertools
import pickle
from typing import List, Tuple
import numpy as np
import z3
import csv
import sympy
from dtree_learner import DTreeLearner as Learner
from dtree_teacher_gem_stanley import DTreeTeacherGEMStanley as Teacher
from dtree_teacher_gem_stanley import DTreeGEMStanleyGurobiTeacher as Teacher
def load_positive_examples(file_name: str) -> List[Tuple[float, ...]]:
......@@ -22,7 +21,7 @@ def load_positive_examples(file_name: str) -> List[Tuple[float, ...]]:
truth_samples_seq = [(t, [s for s in raw_samples if not any(np.isnan(s))])
for t, raw_samples in truth_samples_seq]
# Convert from sampled states and percepts to positive examples for learning
return [
s for _, samples in truth_samples_seq for s in samples
]
......@@ -57,13 +56,6 @@ def synth_dtree(positive_examples, teacher, num_max_iterations: int = 10):
[0., 0., -1.]])
b_vec_0 = np.zeros(2)
learner.set_grammar([(a_mat_0, b_vec_0)])
# writing positive examples in csv file
f = open("positive_sample.csv", 'w')
data_out = csv.writer(f)
for ps in positive_examples:
data_out.writerow(itertools.chain(ps, ["true"]))
f.close()
learner.add_positive_examples(*positive_examples)
past_candidate_list = []
......@@ -71,66 +63,53 @@ def synth_dtree(positive_examples, teacher, num_max_iterations: int = 10):
print(f"Iteration {k}:", sep='')
print("learning ....")
candidate_dnf = learner.learn()
candidate = learner.learn()
print("done learning")
print(f"candidate: {candidate}")
if not candidate_dnf: # learning FAILED
print("Learning Failed.")
return
print(f"candidate DNF: {candidate_dnf}")
past_candidate_list.append(candidate)
past_candidate_list.append(candidate_dnf)
# QUERYING TEACHER IF THERE ARE NEGATIVE EXAMPLES
print(f"number of paths: {len(candidate_dnf)}")
negative_examples = []
for candidate in candidate_dnf:
result = teacher.check(candidate)
print(result)
if result == z3.sat:
m = teacher.model()
assert len(m) > 0
assert validate_negative_examples(candidate, m)
negative_examples.extend(m)
# TODO check if negative example state is spurious or true courterexample
elif result == z3.unsat:
continue
else:
print("Reason Unknown", teacher.reason_unknown())
return past_candidate_list
fneg = open("negative_sample.csv", 'w')
data_out = csv.writer(fneg)
for pn in negative_examples:
data_out.writerow(itertools.chain(pn, ["true"]))
fneg.close()
print(f"negative examples: {negative_examples}")
if len(negative_examples) > 0:
result = teacher.check(candidate)
print(result)
if result == z3.sat:
negative_examples = teacher.model()
assert len(negative_examples) > 0
print(f"negative examples: {negative_examples}")
assert validate_cexs(teacher.state_dim, teacher.perc_dim, candidate, negative_examples)
learner.add_negative_examples(*negative_examples)
else:
file = open('winnerDnf', 'wb')
pickle.dump(candidate_dnf, file)
continue
elif result == z3.unsat:
print("we are done!")
return past_candidate_list
else:
print("Reason Unknown", teacher.reason_unknown())
return past_candidate_list
#if k == 2:
# exit(0)
print("Reached max iteration %d." % num_max_iterations)
return []
return []
def validate_negative_examples(candidate, neg_exs) -> bool:
a_mat, b_vec, coeff_mat, cut_vec = candidate
for ex in neg_exs:
z_diff = ex[3:5] - (a_mat @ ex[0:3] + b_vec)
if not np.all(coeff_mat @ z_diff <= cut_vec):
print(ex)
return False
return True
def validate_cexs(state_dim: int, perc_dim: int,
candidate: sympy.logic.boolalg.Boolean,
cexs: List[Tuple[float]]) -> bool:
spurious_cexs = []
for cex in cexs:
state_subs_map = [(f"x_{i}", cex[i]) for i in range(state_dim)]
perc_subs_map = [(f"z_{i}", cex[i+state_dim]) for i in range(perc_dim)]
sub_map = state_subs_map + perc_subs_map
val = candidate.subs(sub_map)
assert isinstance(val, sympy.logic.boolalg.BooleanAtom)
if val == sympy.false:
spurious_cexs.append(cex)
if not spurious_cexs:
return True
else:
print("Spurious CEXs:", *spurious_cexs, sep='\n')
return False
if __name__ == "__main__":
......
import pickle
import re
import gurobipy as gp
import numpy as np
import sympy
import z3
from gem_stanley_teacher import GEMStanleyGurobiTeacher
class DTreeTeacherGEMStanley(GEMStanleyGurobiTeacher):
def __init__(self, name="dtree_gem_stanley") -> None:
super().__init__(name=name)
def _set_candidate(self, candidate) -> None:
a_mat, b_vec, coeff_mat, cut_vec = candidate # TODO parse candidate
class DTreeGEMStanleyGurobiTeacher(GEMStanleyGurobiTeacher):
PRECISION = 10**-3
SYMPY_VAR_RE = re.compile(r"(?P<var>\w+)_(?P<idx>\d+)")
def _build_affine_expr(self, sympy_expr: sympy.Expr):
if isinstance(sympy_expr, sympy.Symbol):
result = self.SYMPY_VAR_RE.search(sympy_expr.name)
assert result is not None
var_name, idx = result.group("var"), result.group("idx")
gp_var = self._gp_model.getVarByName(f"{var_name}[{idx}]")
assert gp_var is not None
return gp_var
elif isinstance(sympy_expr, sympy.Number):
return float(sympy_expr)
elif isinstance(sympy_expr, sympy.Add):
return sum(self._build_affine_expr(arg) for arg in sympy_expr.args)
elif isinstance(sympy_expr, sympy.Mul):
if len(sympy_expr.args) > 2:
raise NotImplementedError("TODO: multiplication of three or more operands")
lhs = self._build_affine_expr(sympy_expr.args[0])
rhs = self._build_affine_expr(sympy_expr.args[1])
return lhs * rhs
else:
raise RuntimeError("Only support affine expressions.")
def _set_candidate(self, conjunct: sympy.logic.boolalg.Boolean) -> None:
print(conjunct)
# Variable Aliases
m = self._gp_model
x = self._old_state
z = self._percept
z_diff = self._percept_diff
# Remove contraints from previous candidate first
m.remove(self._prev_candidate_constr)
self._prev_candidate_constr.clear()
m.update()
# Constraints on values of z
cons = m.addConstr(z_diff == z - (a_mat @ x + b_vec))
self._prev_candidate_constr.append(cons)
# Constraints on values of z
# cons = m.addMConstr(coeff_mat, z_diff, '<', cut_vec - 10**-3)
cons = m.addMConstr(coeff_mat, z_diff, '<', cut_vec)
self._prev_candidate_constr.append(cons)
# L2-norm objective
m.setObjective(z_diff @ z_diff, gp.GRB.MINIMIZE)
def test_dtree_gem_stanley_teacher():
if conjunct is sympy.true:
return
elif isinstance(conjunct, sympy.And):
pred_list = conjunct.args
elif isinstance(conjunct, sympy.core.relational.Relational):
pred_list = [conjunct]
else:
raise RuntimeError(f"{conjunct} should be a conjunction.")
for pred in pred_list:
assert isinstance(pred, sympy.core.relational.Relational)
lhs = self._build_affine_expr(pred.lhs)
rhs = self._build_affine_expr(pred.rhs)
if isinstance(pred, sympy.Equality):
cons = (lhs == rhs)
elif isinstance(pred, sympy.GreaterThan):
cons = (lhs >= rhs)
elif isinstance(pred, sympy.StrictGreaterThan):
cons = (lhs >= rhs + self.PRECISION)
elif isinstance(pred, sympy.LessThan):
cons = (lhs <= rhs)
elif isinstance(pred, sympy.StrictLessThan):
cons = (lhs <= rhs - self.PRECISION)
else:
raise RuntimeError(f"Unsupprted relational expression {pred}")
gp_cons = self._gp_model.addConstr(cons)
self._prev_candidate_constr.append(gp_cons)
def check(self, candidate: sympy.logic.boolalg.Boolean) -> z3.CheckSatResult:
dnf = sympy.logic.boolalg.to_dnf(candidate, simplify=True)
if dnf is sympy.false:
return z3.unsat
# else:
if isinstance(dnf, sympy.Or):
conjunct_list = dnf.args
elif dnf is sympy.true or isinstance(dnf, sympy.Not) or isinstance(dnf, sympy.And) \
or isinstance(dnf, sympy.core.relational.Relational):
conjunct_list = [dnf]
else:
raise RuntimeError(f"Candiate formula {dnf} should have been converted to DNF.")
self._cexs.clear()
for conjunct in conjunct_list:
self._set_candidate(conjunct)
self._gp_model.optimize()
if self._gp_model.status == gp.GRB.INF_OR_UNBD:
self._gp_model.setParam("DualReductions", 0)
self._gp_model.optimize()
if self._gp_model.status in [gp.GRB.OPTIMAL, gp.GRB.SUBOPTIMAL]:
print(f"ObjValue: {self._gp_model.getObjective().getValue()}")
cex = tuple(self._old_state.x) + tuple(self._percept.x)
self._cexs.append(cex)
elif self._gp_model.status == gp.GRB.INFEASIBLE:
continue
else:
return z3.unknown
if len(self._cexs) > 0:
return z3.sat
else:
return z3.unsat
def test_dtree_gem_stanley_gurobi_teacher():
a_mat = np.array([[0., -1., 0.],
[0., 0., -1.]])
b_vec = np.zeros(2)
......@@ -48,24 +120,12 @@ def test_dtree_gem_stanley_teacher():
])
cut_vec = np.array([1, 2, 3, 4])
candidate = (a_mat, b_vec, coeff_mat, cut_vec)
candidate = sympy.sympify("ITE(x_0 >= 0.2, x_1 <= 1, 0.5*x_0 + x_1 > 3)")
teacher = DTreeTeacherGEMStanley()
teacher._set_candidate(candidate)
teacher.dump_model()
def test_verified_candidate():
with open("winnerDnf.pickle", "rb") as dnf_pkl:
winner_dnf = pickle.load(dnf_pkl)
print(winner_dnf)
teacher = DTreeTeacherGEMStanley()
teacher.set_old_state_bound(lb=[0.0, -1.0, 0.2], ub=[30.0, -0.9, 0.22])
for cand in winner_dnf:
print(teacher.check(cand))
print(teacher.reason_unknown())
teacher = DTreeGEMStanleyGurobiTeacher(norm_ord=2)
print(teacher.check(candidate))
print(teacher.model())
if __name__ == "__main__":
# test_dtree_gem_stanley_teacher()
test_verified_candidate()
test_dtree_gem_stanley_gurobi_teacher()
......@@ -29,6 +29,10 @@ NEW_K_CTE_V_LIM = K_CTE_V_LIM + FORWARD_VEL * CYCLE_TIME * 1.0
NEW_ATAN_K_CTE_V_LIM = np.arctan(NEW_K_CTE_V_LIM)
NEW_RAW_ANG_ERR_LIM = ANG_LIM + FORWARD_VEL * CYCLE_TIME
# Ideal perception as a linear transform from state to ground truth percept
PERC_GT = np.array([[0., -1., 0.],
[0., 0., -1.]], float)
class GEMStanleyDRealTeacher(DRealTeacherBase):
......@@ -79,9 +83,9 @@ class GEMStanleyDRealTeacher(DRealTeacherBase):
class GEMStanleyGurobiTeacher(GurobiTeacherBase):
def __init__(self, name="gem_stanley") -> None:
def __init__(self, name="gem_stanley", norm_ord=2) -> None:
super().__init__(name=name,
state_dim=3, perc_dim=2, ctrl_dim=1)
state_dim=3, perc_dim=2, ctrl_dim=1, norm_ord=norm_ord)
def _add_system(self) -> None:
# Bounds on all domains
......@@ -106,7 +110,7 @@ class GEMStanleyGurobiTeacher(GurobiTeacherBase):
lb=-ATAN_K_CTE_V_LIM, ub=ATAN_K_CTE_V_LIM)
m.addGenConstrTan(xvar=atan_K_cte_V, yvar=K_cte_V)
# Clip steering angle
error = m.addVar(name="(φ+atan(K*d/Vf))/T",
error = m.addVar(name="(φ+atan(K*d/Vf))",
lb=-RAW_ANG_ERR_LIM, ub=RAW_ANG_ERR_LIM)
m.addConstr(error == phi + atan_K_cte_V)
......@@ -134,41 +138,38 @@ class GEMStanleyGurobiTeacher(GurobiTeacherBase):
old_yaw + sin_steer*FORWARD_VEL*CYCLE_TIME/WHEEL_BASE)
def _add_unsafe(self) -> None:
assert PERC_GT.shape == (self.perc_dim, self.state_dim)
# Variable Aliases
m = self._gp_model
old_x, old_y, old_yaw = self._old_state.tolist()
new_x, new_y, new_yaw = self._new_state.tolist()
K_old_y_V = m.addVar(name="K*-y/Vf", vtype=GRB.CONTINUOUS,
lb=-K_CTE_V_LIM, ub=K_CTE_V_LIM)
m.addConstr(K_old_y_V == K_P * -old_y / FORWARD_VEL)
atan_K_old_y_V = m.addVar(name="atan(K*-y/Vf)", vtype=GRB.CONTINUOUS,
lb=-ATAN_K_CTE_V_LIM, ub=ATAN_K_CTE_V_LIM)
m.addGenConstrTan(xvar=atan_K_old_y_V, yvar=K_old_y_V)
K_new_y_V = m.addVar(name="K*-y'/Vf", vtype=GRB.CONTINUOUS,
lb=-NEW_K_CTE_V_LIM, ub=NEW_K_CTE_V_LIM)
m.addConstr(K_new_y_V == K_P * -new_y / FORWARD_VEL)
atan_K_new_y_V = m.addVar(name="atan(K*-y'/Vf)", vtype=GRB.CONTINUOUS,
lb=-NEW_ATAN_K_CTE_V_LIM,
ub=NEW_ATAN_K_CTE_V_LIM)
m.addGenConstrTan(xvar=atan_K_new_y_V, yvar=K_new_y_V)
old_err = m.addVar(name="-θ+atan(K*-y/Vf)", vtype=GRB.CONTINUOUS,
lb=-RAW_ANG_ERR_LIM, ub=RAW_ANG_ERR_LIM)
m.addConstr(old_err == -old_yaw + atan_K_old_y_V)
old_V = m.addVar(name="V(x,y,θ)", vtype=GRB.CONTINUOUS,
lb=0, ub=RAW_ANG_ERR_LIM)
m.addConstr(old_V == gp.abs_(old_err))
new_err = m.addVar(name="'+atan(K*-y'/Vf)", vtype=GRB.CONTINUOUS,
lb=-NEW_RAW_ANG_ERR_LIM, ub=NEW_RAW_ANG_ERR_LIM)
m.addConstr(new_err == -new_yaw + atan_K_new_y_V)
new_V = m.addVar(name="V(x',y'')", vtype=GRB.CONTINUOUS,
lb=0, ub=NEW_RAW_ANG_ERR_LIM)
m.addConstr(new_V == gp.abs_(new_err))
m.addConstr(new_V >= old_V) # Tracking error is increasing (UNSAFE)
# Add nonincreasing constraints
old_truth = m.addMVar(shape=(self.perc_dim,), name="m(x)", **self.FREEVAR)
m.addConstr(old_truth == PERC_GT @ self._old_state)
old_lya_val = m.addVar(name="|m(x)|", vtype=gp.GRB.CONTINUOUS,
lb=0.0, ub=RAW_ANG_ERR_LIM)
m.addConstr(old_lya_val == gp.norm(old_truth, float(self._norm_ord)))
new_truth = m.addMVar(shape=(self.perc_dim,), name="m(x')", **self.FREEVAR)
m.addConstr(new_truth == PERC_GT @ self._new_state)
new_lya_val = m.addVar(name="|m(x')|", vtype=gp.GRB.CONTINUOUS,
lb=0.0, ub=NEW_RAW_ANG_ERR_LIM)
m.addConstr(new_lya_val == gp.norm(new_truth, float(self._norm_ord)))
m.addConstr(new_lya_val >= old_lya_val, name="Non-decreasing Error") # Tracking error is non-decreasing
def _add_objective(self) -> None:
# Variable Aliases
m = self._gp_model
x = self._old_state
z = self._percept
z_diff = self._percept_diff
# Add objective
norm_var = m.addVar(name="|z-m(x)|", **self.NNEGVAR)
m.addConstr(z_diff == z - PERC_GT @ x)
m.addConstr(norm_var == gp.norm(z_diff, float(self._norm_ord)))
m.setObjective(norm_var, gp.GRB.MINIMIZE)
class GEMStanleySymPyTeacher(SymPyTeacherBase):
......@@ -271,9 +272,8 @@ def test_gem_stanley_gurobi_teacher():
ub=(np.inf, 1.0, 0.25)
)
teacher.dump_system_encoding()
print(teacher.check(None))
print(teacher.model())
if __name__ == "__main__":
test_gem_stanley_sympy_teacher()
test_gem_stanley_gurobi_teacher()
# test_gem_stanley_sympy_teacher()
......@@ -149,12 +149,18 @@ class GurobiTeacherBase(TeacherBase):
TRIGVAR = {"vtype": gp.GRB.CONTINUOUS, "lb": -1.0, "ub": 1.0}
def __init__(self, name: str,
state_dim: int, perc_dim: int, ctrl_dim: int) -> None:
state_dim: int, perc_dim: int, ctrl_dim: int, norm_ord: Literal[1, 2, "inf"] = 2) -> None:
super().__init__()
self._gp_model = gp.Model(name)
m = self._gp_model
if norm_ord not in [1, 2, 'inf']:
raise ValueError("Norm order %s is not supported." % str(norm_ord))
if norm_ord == 2:
self._gp_model.setParam("NonConvex", 2)
self._norm_ord = norm_ord
# Old state variables
self._old_state = m.addMVar(shape=state_dim, name='x', **self.FREEVAR)
# New state variables
......@@ -169,14 +175,12 @@ class GurobiTeacherBase(TeacherBase):
self._add_unsafe()
# Add intermediate variables with constraints for candidates
z_diff = m.addMVar(name="z-(Ax+b)", shape=perc_dim, **self.FREEVAR)
z_dist = m.addMVar(name="|z-(Ax+b)|", shape=perc_dim, **self.NNEGVAR)
for zi, abs_zi in zip(z_diff.tolist(), z_dist.tolist()):
m.addConstr(abs_zi == gp.abs_(zi))
self._percept_diff = z_diff
self._percept_dist = z_dist
self._percept_diff = m.addMVar(name="z-m(x)", shape=perc_dim, **self.FREEVAR)
self._add_objective()
self._prev_candidate_constr: List = []
self._cexs: List = []
@property
def state_dim(self) -> int:
......@@ -198,66 +202,16 @@ class GurobiTeacherBase(TeacherBase):
def _add_unsafe(self) -> None:
raise NotImplementedError
def _set_candidate(self, candidate: sympy.logic.boolalg.Boolean) -> None:
raise NotImplementedError("TODO convert from sympy")
shape_sel, coeff, intercept, radius = candidate # TODO parse candidate
# Variable Aliases
m = self._gp_model
x = self._old_state
z = self._percept
z_diff = self._percept_diff
z_dist = self._percept_dist
# TODO Modify coefficients instead of remove and add back contraints
# Remove contraints from previous candidate first
m.remove(self._prev_candidate_constr)
self._prev_candidate_constr.clear()
m.update()
# Constraints on values of z
cons = m.addConstr(z_diff == z - (coeff @ x + intercept))
self._prev_candidate_constr.append(cons)
if shape_sel == 0:
cons_r = m.addConstr(z_dist.sum() <= radius, "||z-(Ax+b)||_1 <= r")
self._prev_candidate_constr.append(cons_r)
# L1-norm objective
m.setObjective(z_dist.sum(), gp.GRB.MINIMIZE)
elif shape_sel == 1:
cons_r = m.addConstr(z_diff @ z_diff <= radius*radius,
name="||z-(Ax+b)||^2 <= r^2")
self._prev_candidate_constr.append(cons_r)
# L2-norm objective
m.setObjective(z_diff @ z_diff, gp.GRB.MINIMIZE)
elif shape_sel == 2:
dist_oo = m.addVar(name="||z-(Ax+b)||_oo", **self.NNEGVAR)
cons_max = m.addConstr(dist_oo == gp.max_(*z_dist.tolist()))
cons_r = m.addConstr(dist_oo <= radius, "||z-(Ax+b)||_oo <= r")
self._prev_candidate_constr.extend([dist_oo, cons_max, cons_r])
# Loo-norm objective
m.setObjective(dist_oo, gp.GRB.MINIMIZE)
else:
raise ValueError("Unknown shape. shape_sel=%d" % shape_sel)
@abc.abstractmethod
def _add_objective(self) -> None:
raise NotImplementedError
def set_old_state_bound(self, lb, ub) -> None:
self._old_state.lb = lb
self._old_state.ub = ub
def check(self, candidate: sympy.logic.boolalg.Boolean) -> z3.CheckSatResult:
raise NotImplementedError("TODO convert from sympy")
self._set_candidate(candidate)
self._gp_model.optimize()
if self._gp_model.status == gp.GRB.INF_OR_UNBD:
self._gp_model.setParam("DualReductions", 0)
self._gp_model.optimize()
if self._gp_model.status in [gp.GRB.OPTIMAL, gp.GRB.SUBOPTIMAL]:
return z3.sat
elif self._gp_model.status == gp.GRB.INFEASIBLE:
return z3.unsat
else:
return z3.unknown
raise NotImplementedError
def dump_system_encoding(self, basename: str = "") -> None:
""" Dump optimization problem in LP format """
......@@ -266,12 +220,7 @@ class GurobiTeacherBase(TeacherBase):
self._gp_model.write(basename + ".lp")
def model(self) -> Sequence[Tuple]:
if self._gp_model.status in [gp.GRB.OPTIMAL, gp.GRB.SUBOPTIMAL]:
x = self._old_state
z = self._percept
return [tuple(x.x) + tuple(z.x)]
else:
return []
return self._cexs
def reason_unknown(self) -> str:
if self._gp_model.status == gp.GRB.INF_OR_UNBD:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment