from collections import OrderedDict import csv import itertools import json import logging import os from typing import Any, Dict, List, MutableSet, Tuple import numpy as np import sympy from learner_base import LearnerBase class DTreeLearner(LearnerBase): def __init__(self, state_dim: int, perc_dim: int, timeout: int = 10000) -> None: super().__init__() self.debug_neg_conc = set() # type: MutableSet[Tuple[float,...]] self.debug_neg_perc = set() # type: MutableSet[Tuple[float,...]] self._state_dim: int = state_dim self._perc_dim: int = perc_dim self.count_neg_dup = 0 self._s2f_func = lambda x: x # Given a base or derived feature name, # returns a mapping from base feature names to coefficients self._var_coeff_map: Dict[str, Dict[str, int]] = {} # Given a base feature name, # this map returns the affine transformation provided in the grammar self._basevar_trans_map: Dict[str, Tuple[Any, int]] = {} # check directory name exists, if not create it. self.dir_name = "out" if not os.path.isdir(self.dir_name): os.makedirs(self.dir_name) path_prefix = self.dir_name+"/pre" self.data_file = path_prefix + ".data" self.names_file = path_prefix + ".names" self.tree_out = path_prefix + ".json" # create empty data files or truncate existing data in files open(self.data_file, 'w').close() self.exec = f'c50exact/c5.0dbg -I 1 -m 1 -f {path_prefix}' @property def state_dim(self) -> int: return self._state_dim @property def perc_dim(self) -> int: return self._perc_dim def set_grammar(self, grammar) -> None: base_features: List[str] = [] derived_feature_map: Dict[str, Tuple[Dict, str]] = OrderedDict() s2f_func_list = [] for i, trans in enumerate(grammar): s2f_func_list.append( construct_sample_to_feature_func(*trans)) ith_vars = [f"fvar{j}_A{i}" for j in range(self.perc_dim)] self._basevar_trans_map.update([(var, (trans, j)) for j, var in enumerate(ith_vars)]) base_features.extend(ith_vars) derived_feature_map.update( self._generate_derived_features(ith_vars)) # Store mapping from all feature names to coefficients of base features self._var_coeff_map.update([ (var, {var: 1}) for var in base_features ]) self._var_coeff_map.update([ (var, coeff_map) for var, (coeff_map, _) in derived_feature_map.items() ]) # One sample to feature vector function for many linear transformations self._s2f_func = self._compose_s2f_functions(s2f_func_list) # Write names file file_lines = ["precondition."] + \ [f"{var}: continuous." for var in base_features] + \ [f"{var} := {expr}." for var, (_, expr) in derived_feature_map.items()] + \ ["precondition: true, false."] with open(self.names_file, "w") as f: f.write('\n'.join(file_lines)) @staticmethod def _compose_s2f_functions(s2f_func_list): def composed_func(sample): return sum((list(f(sample)) for f in s2f_func_list), []) return composed_func @staticmethod def _generate_derived_features( base_vars: List[str], k: int = 2) -> List[Tuple[str, Tuple[Any, str]]]: res = [] for var in base_vars: var_coeff_map = {var: -1} expr = f"(-1*{var})" name = expr res.append((name, (var_coeff_map, expr))) if len(base_vars) < k: return res coeff_combinations = list(itertools.product([1, -1], repeat=k)) var_id_iter = range(len(base_vars)) for selected_var_ids in itertools.combinations(var_id_iter, k): for coeff in coeff_combinations: var_coeff_map = {base_vars[i]: c for c, i in zip(coeff, selected_var_ids)} expr = " + ".join(f"({c}*{base_vars[i]})" for c, i in zip(coeff, selected_var_ids)) name = f"({expr})" res.append((name, (var_coeff_map, expr))) return res def add_implication_examples(self, *args) -> None: return super().add_implication_examples(*args) def add_positive_examples(self, *args) -> None: feature_vec_list = [self._s2f_func(sample) for sample in args] print("Positive feature vectors:", feature_vec_list) self._append_to_data_file(feature_vec_list, "true") def add_negative_examples(self, *args) -> None: for samp in args: if samp in self.debug_neg_conc: self.count_neg_dup += 1 raise ValueError("repeated negative example: " + str(samp)) perc_samp = tuple(self._s2f_func(samp)) print(tuple(perc_samp)) if perc_samp in self.debug_neg_perc: raise ValueError("repeated negative example: " + str(perc_samp)) self.debug_neg_perc.add(perc_samp) self.debug_neg_conc.add(samp) print(f"number of negative duplicate {self.count_neg_dup}") feature_vec_list = [self._s2f_func(sample) for sample in args] print("Negative feature vectors:", feature_vec_list) self._append_to_data_file(feature_vec_list, "false") def _append_to_data_file(self, feature_vec_list, label: str): with open(self.data_file, 'a') as d_file: data_out = csv.writer(d_file) for f in feature_vec_list: row = itertools.chain(f, [label]) # append label at the end of each row data_out.writerow(row) def learn(self) -> sympy.logic.boolalg.Boolean: res = os.popen(self.exec).read() assert os.path.exists(self.tree_out), "if learned successfully" \ f"there should be a json file in {self.dir_name}" ite_expr = self.get_pre_from_json(self.tree_out) os.remove(self.tree_out) # Remove the generated json to avoid reusing old trees ite_expr = self._subs_basevar_w_states(ite_expr) return ite_expr def _subs_basevar_w_states(self, ite_expr) -> sympy.logic.boolalg.Boolean: state_vars = sympy.symbols([f"x[{i}]" for i in range(self.state_dim)]) state_vec = sympy.Matrix(state_vars) perc_vars = sympy.symbols([f"z[{i}]" for i in range(self.perc_dim)]) perc_vec = sympy.Matrix(perc_vars) subs_basevar = [] for basevar, (trans, j) in self._basevar_trans_map.items(): a_mat, b_vec = trans a_mat, b_vec = sympy.Matrix(a_mat), sympy.Matrix(b_vec) expanded_basevar = (perc_vec - (a_mat @ state_vec + b_vec))[j] subs_basevar.append((basevar, expanded_basevar)) return ite_expr.subs(subs_basevar) @staticmethod def get_pre_from_json(path): try: with open(path) as json_file: tree = json.load(json_file) return DTreeLearner.parse_tree(tree) except json.JSONDecodeError: raise ValueError(f"cannot parse {path} as a json file") @staticmethod def parse_tree(tree) -> sympy.logic.boolalg.Boolean: if tree['children'] is None: # At a leaf node, return the clause if tree['classification']: return sympy.true # True leaf node else: return sympy.false # False leaf node elif len(tree['children']) == 2: # Post-order traversal left = DTreeLearner.parse_tree(tree['children'][0]) right = DTreeLearner.parse_tree(tree['children'][1]) # Create an ITE expression tree cond = sympy.sympify(f"{tree['attribute']} <= {tree['cut']}") return sympy.logic.boolalg.ITE(cond, left, right) else: raise ValueError("error parsing the json object as a binary decision tree)") def construct_sample_to_feature_func(a_mat: np.ndarray, b_vec: np.ndarray): perc_dim, state_dim = a_mat.shape def sample_to_feature_vec(sample): assert len(sample) == state_dim + perc_dim state = np.array(sample[0: state_dim]) perc = np.array(sample[state_dim: state_dim+perc_dim]) return perc - (a_mat @ state + b_vec) return sample_to_feature_vec def test_dtree_learner(): a_mat_0 = np.array([[0., -1., 0.], [0., 0., -1.]]) b_vec_0 = np.zeros(2) a_mat_1 = np.array([[0., -0.75, 0.], [0., 0., -1.25]]) b_vec_1 = np.zeros(2) learner = DTreeLearner(state_dim=3, perc_dim=2) learner.set_grammar([(a_mat_0, b_vec_0), (a_mat_1, b_vec_1)]) logging.debug(*learner._basevar_trans_map.items(), sep='\n') logging.debug(*learner._var_coeff_map.items(), sep='\n') pos_examples = [ (1., 2., 3., -2., -3.), (1., 2., 3., -1., -2.) ] learner.add_positive_examples(*pos_examples) neg_examples = [ (10., 1.0, 1.0, 0.5, 0.5), (10., 1.0, 1.0, 1.5, 1.5), (10., 9.0, 9.0, 5.0, 5.0), ] learner.add_negative_examples(*neg_examples) print(learner.learn()) def test_sample_to_feature(): # tuple a_mat = np.array([[0., -1., 0.], [0., 0., -1]]) b_vec = np.zeros(2) # construct_sample_to_feature_func: returns a function # map: lin_trans(a_mat and b_vec pair) -> func sample_to_feature_func = construct_sample_to_feature_func(a_mat, b_vec) # map = {name1:sample_to_feature_func} sample = np.array([1., 2., 3., -2., -3.]) # sample_to_feature_func will compute dBar and psiBar feature_vec = sample_to_feature_func(sample) print("sample: " + str(feature_vec)) assert np.array_equal(feature_vec, np.array([0., 0.])) sample = np.array([1., 2., 3., -1., -2.]) feature_vec = sample_to_feature_func(sample) print("sample: " + str(feature_vec)) assert np.array_equal(feature_vec, np.array([1., 1.])) def test_parse_json(): json_obj = json.loads(""" {"attribute":"((1*fvar0_A0) + (1*fvar1_A0))","cut":-0.01318414,"classification":0, "children":[ {"attribute":"fvar1_A0","cut":0.01403625,"classification":0, "children":[{"attribute":"","cut":0,"classification":true,"children":null}, {"attribute":"","cut":0,"classification":false,"children":null}] }, {"attribute":"fvar1_A1","cut":-0.003193465,"classification":0, "children":[{"attribute":"","cut":0,"classification":true,"children":null}, {"attribute":"","cut":0,"classification":false,"children":null}] } ] }""") tree = DTreeLearner.parse_tree(json_obj) a_mat_0 = np.array([[0., -1., 0.], [0., 0., -1.]]) b_vec_0 = np.zeros(2) a_mat_1 = np.array([[0., -0.75, 0.], [0., 0., -1.25]]) b_vec_1 = np.zeros(2) learner = DTreeLearner(state_dim=3, perc_dim=2) learner.set_grammar([(a_mat_0, b_vec_0), (a_mat_1, b_vec_1)]) print(learner._subs_basevar_w_states(tree)) if __name__ == "__main__": # test_sample_to_feature() test_dtree_learner() test_parse_json()