Newer
Older
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
from learner_base import LearnerBase
class DTreeLearner(LearnerBase):
def __init__(self, state_dim: int, perc_dim: int,
timeout: int = 10000) -> None:
super().__init__()
self.debug_neg_conc= set()
self.debug_neg_perc= set()
self._state_dim: int = state_dim
self._perc_dim: int = perc_dim
# Given a base or derived feature name,
# returns a mapping from base feature names to coefficients
self._var_coeff_map: Dict[str, Dict[str, int]] = {}
# Given a base feature name,
# this map returns the affine transformation provided in the grammar
self._basevar_trans_map: Dict[str, Tuple[Any, int]] = {}
# check directory name exists, if not create it.
self.dir_name = "out"
if not os.path.isdir(self.dir_name):
path_prefix = self.dir_name+"/pre"
self.data_file = path_prefix + ".data"
self.names_file = path_prefix + ".names"
self.tree_out = path_prefix + ".json"
# create empty data files or truncate existing data in files
self.exec = f'c50exact/c5.0dbg -I 1 -m 1 -f {path_prefix}'
@property
def state_dim(self) -> int:
return self._state_dim
@property
def perc_dim(self) -> int:
return self._perc_dim
def set_grammar(self, grammar) -> None:
derived_feature_map: Dict[str, Tuple[Dict, str]] = OrderedDict()
for i, trans in enumerate(grammar):
construct_sample_to_feature_func(*trans))
ith_vars = [f"fvar{j}_A{i}" for j in range(self.perc_dim)]
self._basevar_trans_map.update([(var, (trans, j)) for j, var in enumerate(ith_vars)])
base_features.extend(ith_vars)
derived_feature_map.update(
self._generate_derived_features(ith_vars))
# Store mapping from all feature names to coefficients of base features
self._var_coeff_map.update([
(var, {var: 1}) for var in base_features
])
self._var_coeff_map.update([
(var, coeff_map) for var, (coeff_map, _) in derived_feature_map.items()
])
# One sample to feature vector function for many linear transformations
self._s2f_func = self._compose_s2f_functions(s2f_func_list)
# Write names file
file_lines = ["precondition."] + \
[f"{var}: continuous." for var in base_features] + \
[f"{var} := {expr}." for var, (_, expr) in derived_feature_map.items()] + \
["precondition: true, false."]
with open(self.names_file, "w") as f:
f.write('\n'.join(file_lines))
@staticmethod
def _compose_s2f_functions(s2f_func_list):
def composed_func(sample):
return sum((list(f(sample)) for f in s2f_func_list),[])
return composed_func
@staticmethod
def _generate_derived_features(
base_vars: List[str], k: int = 2) -> List[Tuple[str, Tuple[Any, str]]]:
res = []
for var in base_vars:
var_coeff_map = {var: -1}
expr = f"(-1*{var})"
name = expr
res.append((name, (var_coeff_map, expr)))
coeff_combinations = list(itertools.product([1, -1], repeat=k))
var_id_iter = range(len(base_vars))
for selected_var_ids in itertools.combinations(var_id_iter, k):
for coeff in coeff_combinations:
var_coeff_map = {base_vars[i]: c
for c, i in zip(coeff, selected_var_ids)}
expr = " + ".join(f"({c}*{base_vars[i]})"
for c, i in zip(coeff, selected_var_ids))
name = f"({expr})"
res.append((name, (var_coeff_map, expr)))
return res
def add_implication_examples(self, *args) -> None:
return super().add_implication_examples(*args)
def add_positive_examples(self, *args) -> None:
feature_vec_list = [self._s2f_func(sample) for sample in args]
print("Positive feature vectors:", feature_vec_list)
self._append_to_data_file(feature_vec_list, "true")
def add_negative_examples(self, *args) -> None:
for samp in args:
if samp in self.debug_neg_conc:
self.count_neg_dup+=1
print("repeated negative example: "+ str(samp))
perc_samp = tuple(self._s2f_func(samp))
print(tuple(perc_samp))
if perc_samp in self.debug_neg_perc:
print("repeated negative example: "+ str(perc_samp))
raise ValueError()
self.debug_neg_perc.add(perc_samp)
print(f"number of negative duplicate {self.count_neg_dup}")
feature_vec_list = [self._s2f_func(sample) for sample in args]
print("Negative feature vectors:", feature_vec_list)
self._append_to_data_file(feature_vec_list, "false")
def _append_to_data_file(self, feature_vec_list, label: str):
with open(self.data_file, 'a') as d_file:
data_out = csv.writer(d_file)
for f in feature_vec_list:
print(f)
data_out.writerow(itertools.chain(f, [label]))
res = os.popen(self.exec).read()
print(res)
assert os.path.exists(self.tree_out), "if learned successfully" \
f"there should be a json file in {self.dir_name}"
dnf = self.get_pre_from_json(self.tree_out)
os.remove(self.tree_out) # Remove the generated json to avoid reusing old trees
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
ret_dnf: List[Tuple] = []
for conjunct in dnf:
ret_trans = ()
ret_coeffs_list = []
ret_cut_list = []
if len(conjunct) == 0:
# Conjunction of zero clauses is defined as True
# return 0*(z-(0*x-0)) <= inf which is equivalent to True
a_mat = np.zeros(shape=(self.perc_dim, self.state_dim))
b_vec = np.zeros(self.perc_dim)
coeffs_mat = np.zeros(shape=(1, self.perc_dim))
cut_vec = np.array([np.inf])
ret_dnf.append((a_mat, b_vec, coeffs_mat, cut_vec))
continue
# else:
for pred in conjunct:
var, op, cut = pred
coeff_arr = np.zeros(self.perc_dim)
# Convert from dictionary to coefficients
# XXX May use sparse matrix from scipy
for basevar, coeff in self._var_coeff_map[var].items():
trans, j = self._basevar_trans_map[basevar]
coeff_arr[j] = coeff
if not ret_trans:
ret_trans = trans
elif ret_trans != trans:
raise NotImplementedError(
"Not supporting mixing affine transformations in one conjunct.")
if op == "<=":
pass
elif op == ">": # TODO deal with strict unequality
coeff_arr = -coeff_arr
cut = -cut
else:
raise ValueError(f"Unknown operator '{op}'")
ret_coeffs_list.append(coeff_arr)
ret_cut_list.append(cut)
ret_coeffs_mat = np.stack(ret_coeffs_list)
ret_cut_vec = np.array(ret_cut_list)
assert ret_trans
ret_dnf.append(ret_trans + (ret_coeffs_mat, ret_cut_vec))
return ret_dnf
def get_pre_from_json(self, path):
try:
with open(path) as json_file:
tree = json.load(json_file)
return self.parse_tree(tree)
except json.JSONDecodeError:
raise ValueError(f"cannot parse {path} as a json file")
def parse_tree(self, tree) -> Optional[List[List]]:
if tree['children'] is None:
# At a leaf node, return the clause
if tree['classification']:
return [[]] # Non-none value represtns a True leaf node
return None
elif len(tree['children']) == 2:
# Post-order traversal
left = self.parse_tree(tree['children'][0])
right = self.parse_tree(tree['children'][1])
if left is None and right is None:
return None
res_left = []
if left is not None:
res_left = [[(tree['attribute'], "<=", tree['cut'])] + conjunct
for conjunct in left]
res_right = []
if right is not None:
res_right = [[(tree['attribute'], ">", tree['cut'])] + conjunct
for conjunct in right]
assert res_left or res_right
return res_left + res_right
else:
raise ValueError("error parsing the json object as a binary decision tree)")
def construct_sample_to_feature_func(a_mat: np.ndarray, b_vec: np.ndarray):
perc_dim, state_dim = a_mat.shape
def sample_to_feature_vec(sample):
assert len(sample) == state_dim + perc_dim
state = sample[0: state_dim]
perc = sample[state_dim: state_dim+perc_dim]
perc_bar = perc - (np.dot(state, a_mat.T) + b_vec)
return perc_bar
return sample_to_feature_vec
def test_dtree_learner():
a_mat_0 = np.array([[0., -1., 0.],
[0., 0., -1.]])
b_vec_0 = np.zeros(2)
a_mat_1 = np.array([[0., -0.75, 0.],
[0., 0., -1.25]])
b_vec_1 = np.zeros(2)
learner = DTreeLearner(state_dim=3, perc_dim=2)
learner.set_grammar([(a_mat_0, b_vec_0), (a_mat_1, b_vec_1)])
logging.debug(*learner._basevar_trans_map.items(), sep='\n')
logging.debug(*learner._var_coeff_map.items(), sep='\n')
pos_examples = [
(1., 2., 3., -2., -3.),
(1., 2., 3., -1., -2.)
]
learner.add_positive_examples(*pos_examples)
neg_examples = [
(10., 1.0, 1.0, 0.5, 0.5),
(10., 1.0, 1.0, 1.5, 1.5),
(10., 9.0, 9.0, 5.0, 5.0),
]
learner.add_negative_examples(*neg_examples)
def test_sample_to_feature():
# tuple
a_mat = np.array([[0., -1., 0.],
[0., 0., -1]])
b_vec = np.zeros(2)
# construct_sample_to_feature_func: returns a function
# map: lin_trans(a_mat and b_vec pair) -> func
sample_to_feature_func = construct_sample_to_feature_func(a_mat, b_vec)
# map = {name1:sample_to_feature_func}
sample = np.array([1., 2., 3., -2., -3.])
# sample_to_feature_func will compute dBar and psiBar
feature_vec = sample_to_feature_func(sample)
assert np.array_equal(feature_vec, np.array([0., 0.]))
sample = np.array([1., 2., 3., -1., -2.])
feature_vec = sample_to_feature_func(sample)
assert np.array_equal(feature_vec, np.array([1., 1.]))
if __name__ == "__main__":