Skip to content
Snippets Groups Projects
Commit cb1dc02d authored by chsieh16's avatar chsieh16
Browse files

Add DTree learner, teacher, and integration

parent eb62c8a4
No related branches found
No related tags found
No related merge requests found
......@@ -2,6 +2,7 @@ from collections import OrderedDict
import csv
import itertools
import json
import logging
import os
from typing import Any, Dict, List, Optional, Tuple
......@@ -20,9 +21,12 @@ class DTreeLearner(LearnerBase):
self._s2f_func = lambda x: x
# Given a (derived) feature name,
# this map returns a mapping from base feature names to coefficients
# Given a base or derived feature name,
# returns a mapping from base feature names to coefficients
self._var_coeff_map: Dict[str, Dict[str, int]] = {}
# Given a base feature name,
# this map returns the affine transformation provided in the grammar
self._basevar_trans_map: Dict[str, Tuple[Any, int]] = {}
# check directory name exists, if not create it.
self.dir_name = "out"
......@@ -52,21 +56,24 @@ class DTreeLearner(LearnerBase):
derived_feature_map: Dict[str, Tuple[Dict, str]] = OrderedDict()
s2f_func_list = []
for i, (a_mat, b_vec) in enumerate(grammar):
for i, trans in enumerate(grammar):
s2f_func_list.append(
construct_sample_to_feature_func(a_mat, b_vec))
construct_sample_to_feature_func(*trans))
ith_vars = [f"fvar{j}_A{i}" for j in range(self.perc_dim)]
self._basevar_trans_map.update([(var, (trans, j)) for j, var in enumerate(ith_vars)])
base_features.extend(ith_vars)
derived_feature_map.update(
self._generate_derived_features(ith_vars))
# Store mapping from all feature names to coefficients of base features
self._var_coeff_map.update({
var: {var: 1} for var in base_features
})
self._var_coeff_map.update({
var: coeff_map for var, (coeff_map, _) in derived_feature_map.items()
})
self._var_coeff_map.update([
(var, {var: 1}) for var in base_features
])
self._var_coeff_map.update([
(var, coeff_map) for var, (coeff_map, _) in derived_feature_map.items()
])
# One sample to feature vector function for many linear transformations
self._s2f_func = self._compose_s2f_functions(s2f_func_list)
......@@ -131,21 +138,58 @@ class DTreeLearner(LearnerBase):
data_out = csv.writer(d_file)
for f in feature_vec_list:
print(f)
data_out.writerow(f+[label])
data_out.writerow(itertools.chain(f, [label]))
def learn(self) -> Tuple:
def learn(self) -> List[Tuple]:
res = os.popen(self.exec).read()
print(res)
assert os.path.exists(self.tree_out), "if learned successfully" \
f"there should be a json file in {self.dir_name}"
result = self.get_pre_from_json(self.tree_out)
for conjunct in result:
for var, op, cut in conjunct:
pass
print(result)
return ()
dnf = self.get_pre_from_json(self.tree_out)
ret_dnf: List[Tuple] = []
for conjunct in dnf:
ret_trans = ()
ret_coeffs_list = []
ret_cut_list = []
if len(conjunct) == 0:
# Conjunction of zero clauses is defined as True
# return 0*(z-(0*x-0)) <= inf which is equivalent to True
a_mat = np.zeros(shape=(self.perc_dim, self.state_dim))
b_vec = np.zeros(self.perc_dim)
coeffs_mat = np.zeros(shape=(1, self.perc_dim))
cut_vec = np.array([np.inf])
ret_dnf.append((a_mat, b_vec, coeffs_mat, cut_vec))
continue
# else:
for pred in conjunct:
var, op, cut = pred
coeff_arr = np.zeros(self.perc_dim)
# Convert from dictionary to coefficients
# XXX May use sparse matrix from scipy
for basevar, coeff in self._var_coeff_map[var].items():
trans, j = self._basevar_trans_map[basevar]
coeff_arr[j] = coeff
if not ret_trans:
ret_trans = trans
elif ret_trans != trans:
raise NotImplementedError(
"Not supporting mixing affine transformations in one conjunct.")
if op == "<=":
pass
elif op == ">": # TODO deal with strict unequality
coeff_arr = -coeff_arr
cut = -cut
else:
raise ValueError(f"Unknown operator '{op}'")
ret_coeffs_list.append(coeff_arr)
ret_cut_list.append(cut)
ret_coeffs_mat = np.stack(ret_coeffs_list)
ret_cut_vec = np.array(ret_cut_list)
assert ret_trans
ret_dnf.append(ret_trans + (ret_coeffs_mat, ret_cut_vec))
return ret_dnf
def get_pre_from_json(self, path):
try:
......@@ -196,15 +240,19 @@ def construct_sample_to_feature_func(a_mat: np.ndarray, b_vec: np.ndarray):
def test_dtree_learner():
a_mat = np.array([[0., -1., 0.],
[0., 0., -1]])
b_vec = np.zeros(2)
a_mat_0 = np.array([[0., -1., 0.],
[0., 0., -1.]])
b_vec_0 = np.zeros(2)
a_mat_1 = np.array([[0., -0.75, 0.],
[0., 0., -1.25]])
b_vec_1 = np.zeros(2)
learner = DTreeLearner(state_dim=3, perc_dim=2)
learner.set_grammar([(a_mat, b_vec), (a_mat, b_vec)])
learner.set_grammar([(a_mat_0, b_vec_0), (a_mat_1, b_vec_1)])
print(*learner._var_coeff_map.items(), sep='\n')
return
logging.debug(*learner._basevar_trans_map.items(), sep='\n')
logging.debug(*learner._var_coeff_map.items(), sep='\n')
pos_examples = [
(1., 2., 3., -2., -3.),
......@@ -219,7 +267,7 @@ def test_dtree_learner():
]
learner.add_negative_examples(*neg_examples)
learner.learn()
print(learner.learn())
def test_sample_to_feature():
......
import pickle
from typing import List, Tuple
import numpy as np
import z3
from dtree_learner import DTreeLearner as Learner
from dtree_teacher_gem_stanley import DTreeTeacherGEMStanley as Teacher
def load_positive_examples(file_name: str) -> List[Tuple[float, ...]]:
with open(file_name, "rb") as pickle_file_io:
pkl_data = pickle.load(pickle_file_io)
truth_samples_seq = pkl_data["truth_samples"]
i_th = 0 # select only the i-th partition
truth_samples_seq = truth_samples_seq[i_th:i_th+1]
print("Representative point in partition:", truth_samples_seq[0][0])
truth_samples_seq = [(t, [s for s in raw_samples if not any(np.isnan(s))])
for t, raw_samples in truth_samples_seq]
# Convert from sampled states and percepts to positive examples for learning
return [
s for _, samples in truth_samples_seq for s in samples
]
def test_synth_dtree():
positive_examples = load_positive_examples(
"data/collect_images_2021-11-22-17-59-46.cs598.filtered.pickle")
positive_examples = positive_examples[:20:] # Select only first few examples
ex_dim = len(positive_examples[0])
print("#examples: %d" % len(positive_examples))
print("Dimension of each example: %d" % ex_dim)
assert all(len(ex) == ex_dim and not any(np.isnan(ex))
for ex in positive_examples)
teacher = Teacher()
assert teacher.state_dim + teacher.perc_dim == ex_dim
# 0.0 <= x <= 30.0 and -1.0 <= y <= 0.9 and 0.2 <= theta <= 0.22
teacher.set_old_state_bound(lb=[0.0, -1.0, 0.2], ub=[30.0, -0.9, 0.22])
synth_dtree(positive_examples, teacher, num_max_iterations=5)
def synth_dtree(positive_examples, teacher, num_max_iterations: int = 10):
learner = Learner(state_dim=teacher.state_dim,
perc_dim=teacher.perc_dim, timeout=20000)
a_mat_0 = np.array([[0., -1., 0.],
[0., 0., -1.]])
b_vec_0 = np.zeros(2)
learner.set_grammar([(a_mat_0, b_vec_0)])
learner.add_positive_examples(*positive_examples)
past_candidate_list = []
for k in range(num_max_iterations):
print(f"Iteration {k}:", sep='')
print("learning ....")
candidate_dnf = learner.learn()
print("done learning")
if not candidate_dnf: # learning FAILED
print("Learning Failed.")
return
print(f"candidate DNF: {candidate_dnf}")
past_candidate_list.append(candidate_dnf)
# QUERYING TEACHER IF THERE ARE NEGATIVE EXAMPLES
negative_examples = []
for candidate in candidate_dnf:
result = teacher.check(candidate)
print(result)
if result == z3.sat:
m = teacher.model()
assert len(m) > 0
negative_examples.extend(m)
# TODO check if negative example state is spurious or true courterexample
elif result == z3.unsat:
continue
else:
print("Reason Unknown", teacher.reason_unknown())
return past_candidate_list
print(f"negative examples: {negative_examples}")
if len(negative_examples) > 0:
learner.add_negative_examples(*negative_examples)
else:
print("we are done!")
return past_candidate_list
print("Reached max iteration %d." % num_max_iterations)
return []
if __name__ == "__main__":
test_synth_dtree()
import numpy as np
from gem_stanley_teacher import GEMStanleyTeacher
class DTreeTeacherGEMStanley(GEMStanleyTeacher):
def __init__(self, name="dtree_gem_stanley") -> None:
super().__init__(name=name)
def _set_candidate(self, candidate) -> None:
a_mat, b_vec, coeff_mat, cut_vec = candidate # TODO parse candidate
# Variable Aliases
m = self._gp_model
x = self._old_state
z = self._percept
z_diff = self._percept_diff
# Remove contraints from previous candidate first
m.remove(self._prev_candidate_constr)
self._prev_candidate_constr.clear()
m.update()
# Constraints on values of z
cons = m.addConstr(z_diff == z - (a_mat @ x + b_vec))
self._prev_candidate_constr.append(cons)
# Constraints on values of z
cons = m.addMConstr(coeff_mat, z_diff, '<', cut_vec)
self._prev_candidate_constr.append(cons)
def test_dtree_gem_stanley_teacher():
a_mat = np.array([[0., -1., 0.],
[0., 0., -1.]])
b_vec = np.zeros(2)
coeff_mat = np.array([
[-1, -1],
[1, -1],
[-1, 0],
[0, 1],
])
cut_vec = np.array([1, 2, 3, 4])
candidate = (a_mat, b_vec, coeff_mat, cut_vec)
teacher = DTreeTeacherGEMStanley()
teacher._set_candidate(candidate)
teacher.dump_model()
if __name__ == "__main__":
test_dtree_gem_stanley_teacher()
import itertools
from typing import List, Tuple
def generateInputLanguageFile(nameOfFile:str, intVariables: List[str], boolVariables: List[str]):
fileContents= generateExpressionsBools(intVariables, boolVariables)
try:
with open(nameOfFile, 'wt') as outfile:
outfile.write(fileContents)
except Exception as e:
raise Exception(str(e) + ": " + "error writing files---line 12 grammar_generation_utils.py")
def generateExpressionsBools(intVariables: List[str], boolVariables: List[str])-> str:
coeff_combination = make_coeff_linear_combination(len(intVariables), -1, 1)
names_file = 'precondition.'
for var in intVariables:
names_file += '\n' + var + ': continuous.'
# adding boolean observer method features
for var in boolVariables:
names_file += '\n' + var + ': true, false.'
# check equality of integer variables
if len(intVariables) >= 2:
all_combination = itertools.combinations(intVariables, 2)
for (var1, var2) in all_combination:
expr = "(" + var1 + " = " + var2 + ")"
name_expr = "( = " + var1 + " " + var2 + " )"
names_file += '\n' + name_expr + ' := ' + expr + ' .' #needs change
for coeff in coeff_combination:
# old way of generating all possible combinations
expr = ' + '.join(map(lambda x,y: "(" + str(x) + "*" + y + ")", coeff, intVariables))
name_expr = ' ( + ' + ' '.join(map(lambda x,y: "( * " + str(x) + " " + y + " )", coeff, intVariables)) + ' )'
names_file += '\n' + name_expr + ' := ' + expr + ' .'
names_file += '\nprecondition: true,false.'
return names_file
def make_coeff_linear_combination( number_of_variables, low, high):
init_list = [[i] for i in range(low, high + 1)]
a = init_list
for i in range(1, number_of_variables):
a = [x + y for x in a for y in init_list] # x + y -- concatenation of lists
return a
if __name__ == '__main__':
intVariables = ["dbar", "psibar"]
boolVariables = []
fullPathLocation = "/home/aastorg2/intelligibleAbs/cs598mp-fall2021-proj/tempLocation/pre.names"
generateInputLanguageFile(fullPathLocation, intVariables)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment