Skip to content
Snippets Groups Projects
Commit dbf2695e authored by aastorg2's avatar aastorg2
Browse files

Merge branch 'main' of gitlab.engr.illinois.edu:chsieh16/cs598mp-fall2021-proj into main

parents e2c4d536 9b7f23ff
No related branches found
No related tags found
No related merge requests found
......@@ -4,7 +4,7 @@ import itertools
import json
import logging
import os
from typing import Any, Dict, List, MutableSet, Tuple
from typing import Any, Callable, Dict, List, Literal, MutableSet, Tuple
import numpy as np
import z3
......@@ -22,6 +22,7 @@ class DTreeLearner(LearnerBase):
self._perc_dim: int = perc_dim
self.count_neg_dup = 0
self._s2f_func = lambda x: x
self._cons_s2f_method = None
# Given a base or derived feature name,
# returns a mapping from base feature names to coefficients
......@@ -53,15 +54,24 @@ class DTreeLearner(LearnerBase):
def perc_dim(self) -> int:
return self._perc_dim
def set_grammar(self, grammar) -> None:
def set_grammar(self, grammar, s2f_method: Literal["concat", "diff"] = "diff") -> None:
base_features: List[str] = []
derived_feature_map: Dict[str, Tuple[Dict, str]] = OrderedDict()
self._cons_s2f_method = s2f_method
if s2f_method == "concat":
construct_s2f = construct_sample_to_feature_func_by_concatenate
elif s2f_method == "diff":
construct_s2f = construct_sample_to_feature_func_by_diff
else:
raise ValueError(f"Unknown method '{s2f_method}'"
"to construct sample to feature func")
s2f_func_list = []
for i, trans in enumerate(grammar):
s2f_func_list.append(
construct_sample_to_feature_func(*trans))
ith_vars = [f"fvar{j}_A{i}" for j in range(self.perc_dim)]
feature_dim, s2f_func = construct_s2f(*trans)
s2f_func_list.append(s2f_func)
ith_vars = [f"fvar{j}_A{i}" for j in range(feature_dim)]
self._basevar_trans_map.update([(var, (trans, j)) for j, var in enumerate(ith_vars)])
......@@ -169,10 +179,31 @@ class DTreeLearner(LearnerBase):
ite_expr = self.get_pre_from_json(self.tree_out)
os.remove(self.tree_out) # Remove the generated json to avoid reusing old trees
ite_expr = self._subs_basevar_w_states(ite_expr)
# FIXME need to ensure the substitution matches the construction used in set_grammar
if self._cons_s2f_method == "concat":
ite_expr = self._subs_basevar_w_states_by_concatenate(ite_expr)
elif self._cons_s2f_method == "diff":
ite_expr = self._subs_basevar_w_states_by_diff(ite_expr)
else:
raise RuntimeError(f"Unknown method '{self._cons_s2f_method}' to reconstruct the candidate from the decision tree")
return ite_expr
def _subs_basevar_w_states(self, ite_expr) -> z3.BoolRef:
def _subs_basevar_w_states_by_concatenate(self, ite_expr) -> z3.BoolRef:
state_vars = z3.Reals([f"x_{i}" for i in range(self.state_dim)])
perc_vars = z3.Reals([f"z_{i}" for i in range(self.perc_dim)])
subs_basevar = []
for basevar, (trans, j) in self._basevar_trans_map.items():
if j < self.perc_dim:
a_mat, b_vec = trans
expanded_basevar = ((a_mat @ state_vars)[j] + b_vec[j])
else:
assert j < 2*self.perc_dim
expanded_basevar = perc_vars[j - self.perc_dim]
expanded_basevar = z3.simplify(expanded_basevar)
subs_basevar.append((z3.Real(basevar), expanded_basevar))
return z3.substitute(ite_expr, *subs_basevar)
def _subs_basevar_w_states_by_diff(self, ite_expr) -> z3.BoolRef:
state_vars = z3.Reals([f"x_{i}" for i in range(self.state_dim)])
perc_vars = z3.Reals([f"z_{i}" for i in range(self.perc_dim)])
subs_basevar = []
......@@ -222,7 +253,21 @@ class DTreeLearner(LearnerBase):
raise ValueError("error parsing the json object as a binary decision tree)")
def construct_sample_to_feature_func(a_mat: np.ndarray, b_vec: np.ndarray):
def construct_sample_to_feature_func_by_concatenate(a_mat: np.ndarray, b_vec: np.ndarray) \
-> Tuple[int, Callable[[np.ndarray], np.ndarray]]:
perc_dim, state_dim = a_mat.shape
def sample_to_feature_vec(sample):
assert len(sample) == state_dim + perc_dim
state = np.array(sample[0: state_dim])
perc = np.array(sample[state_dim: state_dim+perc_dim])
return np.concatenate(((a_mat @ state + b_vec), perc), axis=0)
# NOTE: Ensure the dimension of output matches.
# In this case, the output dimension is two times of perception dimension
return 2*perc_dim, sample_to_feature_vec
def construct_sample_to_feature_func_by_diff(a_mat: np.ndarray, b_vec: np.ndarray) \
-> Tuple[int, Callable[[np.ndarray], np.ndarray]]:
perc_dim, state_dim = a_mat.shape
def sample_to_feature_vec(sample):
......@@ -230,7 +275,9 @@ def construct_sample_to_feature_func(a_mat: np.ndarray, b_vec: np.ndarray):
state = np.array(sample[0: state_dim])
perc = np.array(sample[state_dim: state_dim+perc_dim])
return perc - (a_mat @ state + b_vec)
return sample_to_feature_vec
# NOTE: Ensure the dimension of output matches
# In this case, the output dimension is exactly the perception dimension
return perc_dim, sample_to_feature_vec
def test_dtree_learner():
......@@ -272,12 +319,14 @@ def test_sample_to_feature():
# construct_sample_to_feature_func: returns a function
# map: lin_trans(a_mat and b_vec pair) -> func
sample_to_feature_func = construct_sample_to_feature_func(a_mat, b_vec)
feature_dim, sample_to_feature_func = \
construct_sample_to_feature_func_by_diff(a_mat, b_vec)
# map = {name1:sample_to_feature_func}
sample = np.array([1., 2., 3., -2., -3.])
# sample_to_feature_func will compute dBar and psiBar
feature_vec = sample_to_feature_func(sample)
assert len(feature_vec ) == feature_dim, "The dimension of feature vector should match."
print("sample: " + str(feature_vec))
assert np.array_equal(feature_vec, np.array([0., 0.]))
......@@ -287,15 +336,43 @@ def test_sample_to_feature():
assert np.array_equal(feature_vec, np.array([1., 1.]))
def test_parse_json():
def test_parse_json_with_s2f_by_concat():
json_obj = json.loads("""
{"attribute":"((1*fvar0_A0) + (1*fvar1_A0))","cut":-0.125,"classification":0,
"children":[
{"attribute":"fvar2_A0","cut":0.375,"classification":0,
"children":[{"attribute":"","cut":0,"classification":true,"children":null},
{"attribute":"","cut":0,"classification":false,"children":null}]
},
{"attribute":"fvar3_A1","cut":-0.5,"classification":0,
"children":[{"attribute":"","cut":0,"classification":true,"children":null},
{"attribute":"","cut":0,"classification":false,"children":null}]
}
]
}""")
a_mat_0 = np.array([[0., -1., 0.],
[0., 0., -1.]])
b_vec_0 = np.zeros(2)
a_mat_1 = np.array([[0., -0.75, 0.],
[0., 0., -1.25]])
b_vec_1 = np.zeros(2)
learner = DTreeLearner(state_dim=3, perc_dim=2)
learner.set_grammar([(a_mat_0, b_vec_0), (a_mat_1, b_vec_1)], s2f_method="concat")
tree = learner.parse_tree(json_obj)
print(learner._subs_basevar_w_states_by_concatenate(tree))
def test_parse_json_with_s2f_by_diff():
json_obj = json.loads("""
{"attribute":"((1*fvar0_A0) + (1*fvar1_A0))","cut":-0.01,"classification":0,
{"attribute":"((1*fvar0_A0) + (1*fvar1_A0))","cut":-0.125,"classification":0,
"children":[
{"attribute":"fvar1_A0","cut":0.625,"classification":0,
"children":[{"attribute":"","cut":0,"classification":true,"children":null},
{"attribute":"","cut":0,"classification":false,"children":null}]
},
{"attribute":"fvar1_A1","cut":-0.15,"classification":0,
{"attribute":"fvar1_A1","cut":-1.5,"classification":0,
"children":[{"attribute":"","cut":0,"classification":true,"children":null},
{"attribute":"","cut":0,"classification":false,"children":null}]
}
......@@ -310,12 +387,13 @@ def test_parse_json():
b_vec_1 = np.zeros(2)
learner = DTreeLearner(state_dim=3, perc_dim=2)
learner.set_grammar([(a_mat_0, b_vec_0), (a_mat_1, b_vec_1)])
learner.set_grammar([(a_mat_0, b_vec_0), (a_mat_1, b_vec_1)], s2f_method="diff")
tree = learner.parse_tree(json_obj)
print(learner._subs_basevar_w_states(tree))
print(learner._subs_basevar_w_states_by_diff(tree))
if __name__ == "__main__":
# test_sample_to_feature()
test_sample_to_feature()
test_dtree_learner()
test_parse_json()
test_parse_json_with_s2f_by_concat()
test_parse_json_with_s2f_by_diff()
%% Cell type:code id: tags:
``` python
import json
import numpy as np
import pickle
import z3
STATE_DIM = 3
PERC_DIM = 2
filename = "dtree_synth.4x10.out.json"
with open(filename, "r") as f:
data = json.load(f)
found_dtree = [dict(part=entry["part"], **entry["result"]) for entry in data if entry["status"] == "found"]
print(len(found_dtree))
found, not_found, spur = 0, 0, 0
repeated_neg = 0
other = 0
for entry in data:
if entry["status"] == "found":
found += 1
elif entry["status"] == "not found":
not_found +=1
elif entry["status"] == "exception":
if "spurious cexs" in entry["result"]:
spur += 1
elif "repeated" in entry["result"]:
repeated_neg += 1
else:
other += 1
print(found, not_found, spur, repeated_neg, other)
```
%% Cell type:code id: tags:
``` python
def z3_float64_const_to_real(v: float) -> z3.RatNumRef:
return z3.simplify(
z3.fpToReal(z3.FPVal(v, z3.Float64()))
)
def in_part(state_arr, part_arr):
assert part_arr.shape == (len(state_arr), 2)
lb_arr, ub_arr = part_arr.T
return np.all(lb_arr <= state_arr) and np.all(state_arr <= ub_arr)
def calc_precision(part, z3_expr) -> float:
def in_z3_expr(sample, z3_expr) -> bool:
assert len(sample) == STATE_DIM + PERC_DIM
state_subs_map = [(z3.Real(f"x_{i}"), z3_float64_const_to_real(sample[i])) for i in range(STATE_DIM)]
perc_subs_map = [(z3.Real(f"z_{i}"), z3_float64_const_to_real(sample[i+STATE_DIM])) for i in range(PERC_DIM)]
sub_map = state_subs_map + perc_subs_map
val = z3.simplify(z3.substitute(z3_expr, *sub_map))
assert z3.is_bool(val)
if z3.is_false(val):
return False
elif z3.is_true(val):
return True
else:
raise RuntimeError(f"Cannot validate negative example {sample} by substitution")
pkl_name = "../data/800_truths-uniform_partition_4x20-1.2m-pi_12-one_straight_road-2021-10-27-08-49-17.bag.pickle"
with open(pkl_name, "rb") as f:
pkl_data = pickle.load(f)
truth_samples_seq = pkl_data["truth_samples"]
part_arr = np.asfarray(part)
num_pos, num_neg, num_nan = 0, 0, 0
for _, ss in truth_samples_seq:
for s in ss:
state_arr = np.asfarray(s[0:3])
if not in_part(state_arr, part_arr):
continue
# else:
if np.any(np.isnan(s)):
num_nan += 1
elif in_z3_expr(s, z3_expr):
num_pos += 1
else:
num_neg += 1
return num_pos, num_neg, num_nan
```
%% Cell type:code id: tags:
``` python
def visitor(e, seen):
if e in seen:
return
seen[e] = True
yield e
if z3.is_app(e):
for ch in e.children():
for e in visitor(ch, seen):
yield e
return
if z3.is_quantifier(e):
for e in visitor(e.body(), seen):
yield e
return
```
%% Cell type:code id: tags:
``` python
for result in found_dtree:
print(result['part'])
decls = {vname: z3.Real(vname) for vname in ["x_0", "x_1", "x_2", "z_0", "z_1"]}
smt2_str = f"(assert {result['formula']})"
z3_assertions = z3.parse_smt2_string(smt2_str, decls=decls)
z3_expr:z3.ExprRef = z3_assertions[0]
# print("#Atomic Predicates:", sum(z3.is_le(e) or z3.is_ge(e) for e in visitor(z3_expr, {})))
# print(z3_expr)
# Calculate the number of paths on a binary tree by adding one more path
# when there is an ite or a disjunction (due to simplification on ite).
# FIXME does not work if an ite expression is a common sub-expression of two paths.
num_paths = 1
for e in visitor(z3_expr, {}):
if z3.is_or(e) or z3.is_app_of(e, z3.Z3_OP_ITE):
num_paths += 1
print("#Paths:", num_paths)
num_pos, num_neg, num_nan = calc_precision(result['part'], z3_expr)
print(f"pos: {num_pos}; neg: {num_neg}; nan: {num_nan}")
print("precision (pos/(pos+neg)):", num_pos / (num_pos + num_neg) )
```
%% Cell type:code id: tags:
``` python
import ast
import csv
import itertools
import matplotlib.pyplot as plt
import numpy as np
import pathlib
import re
X_LIM = np.inf
X_ARR = np.array([-X_LIM, X_LIM])
Y_LIM = 1.2
NUM_Y_PARTS = 4
Y_ARR = np.linspace(-Y_LIM, Y_LIM, NUM_Y_PARTS + 1)
YAW_LIM = np.pi / 12
NUM_YAW_PARTS = 10
YAW_ARR = np.linspace(-YAW_LIM, YAW_LIM, NUM_YAW_PARTS + 1)
PARTITION = (X_ARR, Y_ARR, YAW_ARR)
BOUND_LIST = list(list(zip(x_arr[:-1], x_arr[1:])) for x_arr in PARTITION)
PART_LIST = list(itertools.product(*BOUND_LIST))
```
%% Cell type:code id: tags:
``` python
from numpy import arange
from sympy import continued_fraction
csv_path_list = list(pathlib.Path(".").glob("part-???-pre.data"))
csv_path_list.sort()
for csv_path in csv_path_list:
m = re.match("^part-(?P<partid>\d+)-pre.data$", csv_path.name)
partid = int(m["partid"])
part_bnd = np.asfarray(PART_LIST[partid])
# Reset the plot
plt.gca().clear()
plt.gca().set_title(
f"x bound: {part_bnd[0]} (m)\n"
f"y bound: {part_bnd[1]} (m)\n"
f"θ bound: {np.rad2deg(part_bnd[2])} (deg)"
)
pos_fvs, neg_fvs = [], []
with open(csv_path, 'r') as csvfile:
csvreader = csv.reader(csvfile)
for entry in csvreader:
if entry[2].lower() == "true":
pos_fvs.append(tuple(ast.literal_eval(e) for e in entry[0:2]))
else:
assert entry[2].lower() == "false"
neg_fvs.append(tuple(ast.literal_eval(e) for e in entry[0:2]))
print(f"File: {csv_path.name}; #pos: {len(pos_fvs)}; #neg: {len(neg_fvs)}")
pos_fv_arr = np.asfarray(pos_fvs)
plt.scatter(pos_fv_arr[:, 0], pos_fv_arr[:, 1], c="g", marker="o", s=1)
neg_fv_arr = np.asfarray(neg_fvs)
plt.scatter(neg_fv_arr[:, 0], neg_fv_arr[:, 1], c="r", marker="x", s=1)
# dx = np.linspace(-1, 0.5,500)
# dy = np.linspace(-0.3, 0.5, 500)
# x,y = np.meshgrid(dx,dy)
# if partid == 16:
# region = (-3486699801877165/72057594037927936 < 1*y) & \
# ((2774949835903621/36028797018963968 >= (1*x + -1*y))
# &(-649018808849943/1125899906842624 < (1*x + -1*y))
# &( (4806281874582455/288230376151711744 >= (1*x + 1*y))
# | (2006731648131879/288230376151711744 >= 1*y)
# )
# )
# if partid == 0:
# region = ((y>-0.008467263) & (x+y>-0.7293049)).astype(int)
# plt.gca().set_xlim(-1, 1)
# plt.gca().set_xlim(-2.2, 0.2)
# elif partid == 9:
# region = (((x+y<=-1.141609) & (y>0.09126037)) | ((x+y>-1.141609) & (y>-0.1772965))).astype(int)
# plt.gca().set_xlim(-2.5, 2.5)
# plt.gca().set_ylim(-0.42, 0.25)
# elif partid == 17:
# region = ((y>-0.0121161) & (y<=0.02008105) & (x+y>-0.6796197) & (x+y<=0.05748017)).astype(int)
# plt.gca().set_xlim(-1, 1)
# plt.gca().set_ylim(-0.25, 0.25)
# else:
# pass
# plt.imshow( region,
# extent=(x.min(),x.max(),y.min(),y.max()),origin="lower", cmap="Greys", alpha = 0.3)
plt.savefig(csv_path.name + ".png")
```
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment