Newer
Older
from typing import Any, Dict, List, MutableSet, Tuple
from learner_base import LearnerBase
class DTreeLearner(LearnerBase):
def __init__(self, state_dim: int, perc_dim: int,
timeout: int = 10000) -> None:
super().__init__()
self.debug_neg_conc = set() # type: MutableSet[Tuple[float,...]]
self.debug_neg_perc = set() # type: MutableSet[Tuple[float,...]]
self._state_dim: int = state_dim
self._perc_dim: int = perc_dim
self.count_neg_dup = 0
# Given a base or derived feature name,
# returns a mapping from base feature names to coefficients
self._var_coeff_map: Dict[str, Dict[str, int]] = {}
# Given a base feature name,
# this map returns the affine transformation provided in the grammar
self._basevar_trans_map: Dict[str, Tuple[Any, int]] = {}
# check directory name exists, if not create it.
self.dir_name = "out"
if not os.path.isdir(self.dir_name):
path_prefix = self.dir_name+"/pre"
self.data_file = path_prefix + ".data"
self.names_file = path_prefix + ".names"
self.tree_out = path_prefix + ".json"
# create empty data files or truncate existing data in files
self.exec = f'c50exact/c5.0dbg -I 1 -m 1 -f {path_prefix}'
@property
def state_dim(self) -> int:
return self._state_dim
@property
def perc_dim(self) -> int:
return self._perc_dim
def set_grammar(self, grammar) -> None:
derived_feature_map: Dict[str, Tuple[Dict, str]] = OrderedDict()
for i, trans in enumerate(grammar):
construct_sample_to_feature_func(*trans))
ith_vars = [f"fvar{j}_A{i}" for j in range(self.perc_dim)]
self._basevar_trans_map.update([(var, (trans, j)) for j, var in enumerate(ith_vars)])
base_features.extend(ith_vars)
derived_feature_map.update(
self._generate_derived_features(ith_vars))
# Store mapping from all feature names to coefficients of base features
self._var_coeff_map.update([
(var, {var: 1}) for var in base_features
])
self._var_coeff_map.update([
(var, coeff_map) for var, (coeff_map, _) in derived_feature_map.items()
])
# One sample to feature vector function for many linear transformations
self._s2f_func = self._compose_s2f_functions(s2f_func_list)
# Write names file
file_lines = ["precondition."] + \
[f"{var}: continuous." for var in base_features] + \
[f"{var} := {expr}." for var, (_, expr) in derived_feature_map.items()] + \
["precondition: true, false."]
with open(self.names_file, "w") as f:
f.write('\n'.join(file_lines))
@staticmethod
def _compose_s2f_functions(s2f_func_list):
def composed_func(sample):
return sum((list(f(sample)) for f in s2f_func_list), [])
return composed_func
@staticmethod
def _generate_derived_features(
base_vars: List[str], k: int = 2) -> List[Tuple[str, Tuple[Any, str]]]:
res = []
for var in base_vars:
var_coeff_map = {var: -1}
expr = f"(-1*{var})"
name = expr
res.append((name, (var_coeff_map, expr)))
coeff_combinations = list(itertools.product([1, -1], repeat=k))
var_id_iter = range(len(base_vars))
for selected_var_ids in itertools.combinations(var_id_iter, k):
for coeff in coeff_combinations:
var_coeff_map = {base_vars[i]: c
for c, i in zip(coeff, selected_var_ids)}
expr = " + ".join(f"({c}*{base_vars[i]})"
for c, i in zip(coeff, selected_var_ids))
name = f"({expr})"
res.append((name, (var_coeff_map, expr)))
return res
def add_implication_examples(self, *args) -> None:
return super().add_implication_examples(*args)
def add_positive_examples(self, *args) -> None:
feature_vec_list = [self._s2f_func(sample) for sample in args]
print("Positive feature vectors:", feature_vec_list)
self._append_to_data_file(feature_vec_list, "true")
def add_negative_examples(self, *args) -> None:
for samp in args:
if samp in self.debug_neg_conc:
self.count_neg_dup += 1
raise ValueError("repeated negative example: " + str(samp))
perc_samp = tuple(self._s2f_func(samp))
print(tuple(perc_samp))
if perc_samp in self.debug_neg_perc:
raise ValueError("repeated negative example: " + str(perc_samp))
self.debug_neg_perc.add(perc_samp)
print(f"number of negative duplicate {self.count_neg_dup}")
feature_vec_list = [self._s2f_func(sample) for sample in args]
print("Negative feature vectors:", feature_vec_list)
self._append_to_data_file(feature_vec_list, "false")
def _append_to_data_file(self, feature_vec_list, label: str):
with open(self.data_file, 'a') as d_file:
data_out = csv.writer(d_file)
for f in feature_vec_list:
row = itertools.chain(f, [label]) # append label at the end of each row
data_out.writerow(row)
def learn(self) -> sympy.logic.boolalg.Boolean:
assert os.path.exists(self.tree_out), "if learned successfully" \
f"there should be a json file in {self.dir_name}"
ite_expr = self.get_pre_from_json(self.tree_out)
os.remove(self.tree_out) # Remove the generated json to avoid reusing old trees
ite_expr = self._subs_basevar_w_states(ite_expr)
return ite_expr
def _subs_basevar_w_states(self, ite_expr) -> sympy.logic.boolalg.Boolean:
state_vars = sympy.symbols([f"x_{i}" for i in range(self.state_dim)])
state_vec = sympy.Matrix(state_vars)
perc_vars = sympy.symbols([f"z_{i}" for i in range(self.perc_dim)])
perc_vec = sympy.Matrix(perc_vars)
subs_basevar = []
for basevar, (trans, j) in self._basevar_trans_map.items():
a_mat, b_vec = trans
a_mat, b_vec = sympy.Matrix(a_mat), sympy.Matrix(b_vec)
expanded_basevar = (perc_vec - (a_mat @ state_vec + b_vec))[j]
subs_basevar.append((basevar, expanded_basevar))
return ite_expr.subs(subs_basevar)
@staticmethod
def get_pre_from_json(path):
try:
with open(path) as json_file:
tree = json.load(json_file)
return DTreeLearner.parse_tree(tree)
except json.JSONDecodeError:
raise ValueError(f"cannot parse {path} as a json file")
@staticmethod
def parse_tree(tree) -> sympy.logic.boolalg.Boolean:
if tree['children'] is None:
# At a leaf node, return the clause
if tree['classification']:
return sympy.true # True leaf node
return sympy.false # False leaf node
elif len(tree['children']) == 2:
# Post-order traversal
left = DTreeLearner.parse_tree(tree['children'][0])
right = DTreeLearner.parse_tree(tree['children'][1])
# Create an ITE expression tree
cond = sympy.sympify(f"{tree['attribute']} <= {tree['cut']}")
return sympy.logic.boolalg.ITE(cond, left, right)
else:
raise ValueError("error parsing the json object as a binary decision tree)")
def construct_sample_to_feature_func(a_mat: np.ndarray, b_vec: np.ndarray):
perc_dim, state_dim = a_mat.shape
def sample_to_feature_vec(sample):
assert len(sample) == state_dim + perc_dim
state = np.array(sample[0: state_dim])
perc = np.array(sample[state_dim: state_dim+perc_dim])
return perc - (a_mat @ state + b_vec)
return sample_to_feature_vec
def test_dtree_learner():
a_mat_0 = np.array([[0., -1., 0.],
[0., 0., -1.]])
b_vec_0 = np.zeros(2)
a_mat_1 = np.array([[0., -0.75, 0.],
[0., 0., -1.25]])
b_vec_1 = np.zeros(2)
learner = DTreeLearner(state_dim=3, perc_dim=2)
learner.set_grammar([(a_mat_0, b_vec_0), (a_mat_1, b_vec_1)])
logging.debug(*learner._basevar_trans_map.items(), sep='\n')
logging.debug(*learner._var_coeff_map.items(), sep='\n')
pos_examples = [
(1., 2., 3., -2., -3.),
(1., 2., 3., -1., -2.)
]
learner.add_positive_examples(*pos_examples)
neg_examples = [
(10., 1.0, 1.0, 0.5, 0.5),
(10., 1.0, 1.0, 1.5, 1.5),
(10., 9.0, 9.0, 5.0, 5.0),
]
learner.add_negative_examples(*neg_examples)
def test_sample_to_feature():
# tuple
a_mat = np.array([[0., -1., 0.],
[0., 0., -1]])
b_vec = np.zeros(2)
# construct_sample_to_feature_func: returns a function
# map: lin_trans(a_mat and b_vec pair) -> func
sample_to_feature_func = construct_sample_to_feature_func(a_mat, b_vec)
# map = {name1:sample_to_feature_func}
sample = np.array([1., 2., 3., -2., -3.])
# sample_to_feature_func will compute dBar and psiBar
feature_vec = sample_to_feature_func(sample)
assert np.array_equal(feature_vec, np.array([0., 0.]))
sample = np.array([1., 2., 3., -1., -2.])
feature_vec = sample_to_feature_func(sample)
assert np.array_equal(feature_vec, np.array([1., 1.]))
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def test_parse_json():
json_obj = json.loads("""
{"attribute":"((1*fvar0_A0) + (1*fvar1_A0))","cut":-0.01318414,"classification":0,
"children":[
{"attribute":"fvar1_A0","cut":0.01403625,"classification":0,
"children":[{"attribute":"","cut":0,"classification":true,"children":null},
{"attribute":"","cut":0,"classification":false,"children":null}]
},
{"attribute":"fvar1_A1","cut":-0.003193465,"classification":0,
"children":[{"attribute":"","cut":0,"classification":true,"children":null},
{"attribute":"","cut":0,"classification":false,"children":null}]
}
]
}""")
tree = DTreeLearner.parse_tree(json_obj)
a_mat_0 = np.array([[0., -1., 0.],
[0., 0., -1.]])
b_vec_0 = np.zeros(2)
a_mat_1 = np.array([[0., -0.75, 0.],
[0., 0., -1.25]])
b_vec_1 = np.zeros(2)
learner = DTreeLearner(state_dim=3, perc_dim=2)
learner.set_grammar([(a_mat_0, b_vec_0), (a_mat_1, b_vec_1)])
print(learner._subs_basevar_w_states(tree))
if __name__ == "__main__":