From 916243991d36d69049a2e4579a55cf1c5bf9ffda Mon Sep 17 00:00:00 2001
From: "Hsieh, Chiao" <chsieh16@illinois.edu>
Date: Tue, 30 Nov 2021 16:06:52 -0600
Subject: [PATCH] Linting and type checking

---
 firstpass_learner.py | 84 ++++++++++++++++++++++++++++----------------
 firstpass_teacher.py | 29 ++++++++-------
 learner_base.py      |  4 ++-
 teacher_base.py      | 29 +++++++++++----
 4 files changed, 96 insertions(+), 50 deletions(-)

diff --git a/firstpass_learner.py b/firstpass_learner.py
index 2d1e513..1c769ce 100644
--- a/firstpass_learner.py
+++ b/firstpass_learner.py
@@ -1,4 +1,4 @@
-from typing import Mapping, Sequence, Optional, Tuple
+from typing import MutableMapping, Sequence, Optional, Tuple
 
 import numpy as np
 import z3
@@ -6,40 +6,49 @@ import z3
 from learner_base import Z3LearnerBase
 
 
-def affine_transform(state_vars: Sequence[z3.ExprRef], perc_vars: Sequence[z3.ExprRef],
-        coeff: np.ndarray, intercept: np.ndarray) -> Sequence[z3.ExprRef]:
-    assert len(coeff.shape) == 2  # Matrix is a 2D array
-    assert coeff.shape == (intercept.shape[0], len(state_vars))
-    assert intercept.shape[0] == len(perc_vars)
+def affine_transform(
+        x_vec: Sequence[z3.ExprRef],
+        z_vec: Sequence[z3.ExprRef],
+        a_mat: np.ndarray, b_vec: np.ndarray) -> Sequence[z3.ExprRef]:
+    num_rows = len(z_vec)
+    num_cols = len(x_vec)
+    assert len(a_mat.shape) == 2 and a_mat.shape == (num_rows, num_cols)
+    assert b_vec.shape[0] == num_rows
+
+    return [z_vec[i] -
+            z3.Sum(*(a_mat[i][j]*x_vec[j] for j in range(num_cols)), b_vec[i])
+            for i in range(num_rows)]
 
-    return [perc_vars[i] - z3.Sum(*(coeff[i][j]*state_vars[j] for j in range(coeff.shape[1])), intercept[i])
-            for i in range(coeff.shape[0])]
 
 def abs_expr(expr: z3.ExprRef) -> z3.ExprRef:
     return z3.If(expr >= 0, expr, -expr)
 
+
 def l1_norm(*exprs: z3.ExprRef) -> z3.ExprRef:
     return z3.Sum(*(abs_expr(expr) for expr in exprs))
 
+
 def l2_norm(*exprs: z3.ExprRef) -> z3.ExprRef:
     return z3.Sum(*(expr*expr for expr in exprs))
 
+
 def max_expr(*exprs: z3.ExprRef) -> z3.ExprRef:
     m = exprs[0]
     for v in exprs[1:]:
         m = z3.If(m >= v, m, v)
     return m
 
+
 def loo_norm(*exprs) -> z3.ExprRef:
     return max_expr(*(abs_expr(expr) for expr in exprs))
 
 
 class FirstpassLearner(Z3LearnerBase):
-    def __init__(self) -> None:
-        super().__init__(state_dim=2, perc_dim=2)
+    def __init__(self, timeout=10000) -> None:
+        super().__init__(state_dim=2, perc_dim=2, timeout=timeout)
 
         self._in_shape_pred: z3.ExprRef = z3.BoolVal(True)
-        self._part_affine_map: Mapping[int, Tuple] = {}
+        self._part_affine_map: MutableMapping[int, Tuple] = {}
         self.set_grammar(None)
 
     @property
@@ -51,23 +60,27 @@ class FirstpassLearner(Z3LearnerBase):
         self._in_shape_pred = self._in_shape_pred and self._get_shape_def(0)
 
     def _get_shape_def(self, idx: int) -> z3.ExprRef:
-        shape_sel = z3.BoolVector("sel%d" % idx, self.num_shapes)
+        norm_sel = z3.BoolVector("sel%d" % idx, self.num_shapes)
 
-        coeff_list = z3.Reals(["A%d__%d_%d" % (idx, i, j) for i in range(self.perc_dim) for j in range(self.state_dim)])
-        coeff_arr = np.array(coeff_list).reshape(self.perc_dim, self.state_dim)
-        intercept_arr = np.array(z3.RealVector("b%d" % idx, self.perc_dim))
+        coeff_list = z3.Reals(["A%d__%d_%d" % (idx, i, j)
+                               for i in range(self.perc_dim)
+                               for j in range(self.state_dim)])
+        a_mat = np.array(coeff_list).reshape(self.perc_dim, self.state_dim)
+        b_vec = np.array(z3.RealVector("b%d" % idx, self.perc_dim))
         radius = z3.Real("r%d" % idx)
 
-        self._part_affine_map[idx] = (shape_sel, coeff_arr, intercept_arr, radius)
+        self._part_affine_map[idx] = (norm_sel, a_mat, b_vec, radius)
 
-        transformed_perc_seq = affine_transform(self._state_vars, self._percept_vars, coeff_arr, intercept_arr)
+        transformed_perc_seq = affine_transform(self._state_vars,
+                                                self._percept_vars,
+                                                a_mat, b_vec)
         return z3.And(
-            z3.AtLeast(*shape_sel, 1),
-            z3.Implies(shape_sel[0], l1_norm(*transformed_perc_seq)  <= radius),
-            z3.Implies(shape_sel[1], l2_norm(*transformed_perc_seq)  <= radius),
-            z3.Implies(shape_sel[2], loo_norm(*transformed_perc_seq) <= radius),
+            z3.AtLeast(*norm_sel, 1),
+            z3.Implies(norm_sel[0], l1_norm(*transformed_perc_seq) <= radius),
+            z3.Implies(norm_sel[1], l2_norm(*transformed_perc_seq) <= radius),
+            z3.Implies(norm_sel[2], loo_norm(*transformed_perc_seq) <= radius),
         )
-        
+
     def add_positive_examples(self, *vals) -> None:
         assert all(len(val) == self.state_dim+self.perc_dim for val in vals)
         self._solver.add(*(
@@ -79,7 +92,8 @@ class FirstpassLearner(Z3LearnerBase):
         assert all(len(val) == self.state_dim+self.perc_dim for val in vals)
         self._solver.add(*(
             z3.Not(z3.substitute_vars(self._in_shape_pred,
-                                      *(z3.RealVal(v) for v in val))) for val in vals
+                                      *(z3.RealVal(v) for v in val)))
+            for val in vals
         ))
 
     def add_implication_examples(self, *args) -> None:
@@ -88,13 +102,15 @@ class FirstpassLearner(Z3LearnerBase):
     def learn(self) -> Optional[Tuple]:
         res = self._solver.check()
         if res == z3.sat:
-            z3_shape_sel, z3_coeff_arr, z3_intercept_arr, z3_radius = self._part_affine_map[0]  # FIXME
+            z3_shape_sel, z3_a_mat, z3_b_vec, z3_radius = \
+                self._part_affine_map[0]  # FIXME
             m = self._solver.model()
-            shape_sel = extract_shape_from_z3_model(m, z3_shape_sel)  # TODO: return integer is not that good
-            coeff_arr = np.vectorize(extract_real_from_z3_model(m))(z3_coeff_arr)
-            intercept_arr = np.vectorize(extract_real_from_z3_model(m))(z3_intercept_arr)
+            # TODO: return integer is not that good
+            shape_sel = extract_shape_from_z3_model(m, z3_shape_sel)
+            a_mat = np.vectorize(extract_real_from_z3_model(m))(z3_a_mat)
+            b_vec = np.vectorize(extract_real_from_z3_model(m))(z3_b_vec)
             radius = extract_real_from_z3_model(m)(z3_radius)
-            return shape_sel, coeff_arr, intercept_arr, radius
+            return shape_sel, a_mat, b_vec, radius
         elif res == z3.unsat:
             print("(unsat)")
             return None
@@ -103,10 +119,14 @@ class FirstpassLearner(Z3LearnerBase):
             return None
 
 
-def extract_shape_from_z3_model(m: z3.ModelRef, vars: Sequence[z3.ExprRef]) -> int:
+def extract_shape_from_z3_model(
+        m: z3.ModelRef,
+        vars: Sequence[z3.ExprRef]) -> int:
     for i, var in enumerate(vars):
         if m[var]:
             return i
+    raise RuntimeError("Z3 did not select a shape. "
+                       "Check SMT encoding.")
 
 
 def extract_real_from_z3_model(m: z3.ModelRef):
@@ -119,7 +139,9 @@ def extract_real_from_z3_model(m: z3.ModelRef):
 def test_affine_transform():
     state_vars = z3.RealVector("x", 3)
     percept_vars = z3.RealVector("z", 2)
-    coeff_list = z3.Reals(["A__%d_%d" % (i, j) for i in range(len(percept_vars)) for j in range(len(state_vars))])
+    coeff_list = z3.Reals(["A__%d_%d" % (i, j)
+                           for i in range(len(percept_vars))
+                           for j in range(len(state_vars))])
     coeff = np.array(coeff_list).reshape((len(percept_vars), len(state_vars)))
     print(coeff)
     intercept = np.array([7.0, 8.0])
@@ -151,9 +173,9 @@ def test_firstpass_learner():
         (9.0, 9.0, 5.0, 5.0),
     ]
     learner.add_negative_examples(*neg_examples)
-    # print(learner._solver.assertions())
     print(learner.learn())
 
+
 if __name__ == "__main__":
     # test_affine_transform()
     # test_norms()
diff --git a/firstpass_teacher.py b/firstpass_teacher.py
index 45c3ea7..4097faa 100755
--- a/firstpass_teacher.py
+++ b/firstpass_teacher.py
@@ -16,39 +16,43 @@ class FirstpassTeacher(GurobiTeacherBase):
     def _add_system(self) -> None:
         # Controller
         for zi, ui in zip(self._percept.tolist(), self._control.tolist()):
-            self._gp_model.addGenConstrPWL(zi, ui, [-10, 0, 0, 0, 10], [1, 1, 0, -1, -1])
+            self._gp_model.addGenConstrPWL(
+                zi, ui, [-10, 0, 0, 0, 10], [1, 1, 0, -1, -1])
         # Dynamics
-        self._gp_model.addConstr(self._new_state == self._old_state + self._control)
+        self._gp_model.addConstr(
+            self._new_state == self._old_state + self._control)
 
     def _add_unsafe(self) -> None:
-        old_state, new_state = self._old_state.tolist(), self._new_state.tolist()
+        old_state = self._old_state.tolist()
+        new_state = self._new_state.tolist()
         assert len(old_state) == len(new_state)
         m = self._gp_model
         m.update()
         old_V, new_V = 0.0, 0.0
         for old_xi, new_xi in zip(old_state, new_state):
             old_var_name = "|%s|" % old_xi.VarName
-            old_abs_xi = m.addVar(name=old_var_name,vtype=GRB.CONTINUOUS, lb=0, ub=np.inf)
+            old_abs_xi = m.addVar(name=old_var_name, **self.NNEGVAR)
             m.addConstr(old_abs_xi == gp.abs_(old_xi))
             old_V += old_abs_xi
 
             new_var_name = "|%s|" % new_xi.VarName
-            new_abs_xi = m.addVar(name=new_var_name, vtype=GRB.CONTINUOUS, lb=0, ub=np.inf)
+            new_abs_xi = m.addVar(name=new_var_name, **self.NNEGVAR)
             m.addConstr(new_abs_xi == gp.abs_(new_xi))
             new_V += new_abs_xi
-        m.addConstr(new_V >= old_V, name="Unstable")  # Tracking error is non-decreasing (UNSAFE)
+        # Tracking error is non-decreasing (UNSAFE)
+        m.addConstr(new_V >= old_V, name="Unstable")
 
     def _add_candidate(self, candidate) -> None:
         # TODO parse candidate
         shape_sel, coeff, intercept, radius = candidate
-        assert coeff.shape == (self._percept.shape[0], self._old_state.shape[0])
+        assert coeff.shape == (self.perc_dim, self.state_dim)
         assert intercept.shape == self._percept.shape
 
         m = self._gp_model
         x = self._old_state
         z = self._percept
         # Constraints on values of xP and yP
-        z_diff = m.addMVar(name="z-(Ax+b)", shape=len(z.tolist()), vtype=GRB.CONTINUOUS, lb=-np.inf, ub=np.inf)
+        z_diff = m.addMVar(name="z-(Ax+b)", shape=z.shape[0], **self.FREEVAR)
         m.addConstr(z_diff == z - (coeff @ x + intercept))
         m.update()
 
@@ -56,24 +60,25 @@ class FirstpassTeacher(GurobiTeacherBase):
             z_dist = 0.0
             for zi in z_diff.tolist():
                 abs_zi_name = "|%s|" % zi.VarName
-                abs_zi = m.addVar(name=abs_zi_name, vtype=GRB.CONTINUOUS, lb=0, ub=np.inf)
+                abs_zi = m.addVar(name=abs_zi_name, **self.NNEGVAR)
                 m.addConstr(abs_zi == gp.abs_(zi))
                 z_dist += abs_zi
             m.addConstr(z_dist <= radius, "||z-(Ax+b)||_1 <= r")
             # L1-norm objective
             m.setObjective(z_dist, GRB.MINIMIZE)
         elif shape_sel == 1:
-            m.addConstr(z_diff @ z_diff <= radius*radius, "||z-(Ax+b)||^2 <= r^2")
+            m.addConstr(z_diff @ z_diff <= radius*radius,
+                        name="||z-(Ax+b)||^2 <= r^2")
             # L2-norm objective
             m.setObjective(z_diff @ z_diff, GRB.MINIMIZE)
         elif shape_sel == 2:
             abs_zi_list = []
             for zi in z_diff.tolist():
                 abs_zi_name = "|%s|" % zi.VarName
-                abs_zi = m.addVar(name=abs_zi_name, vtype=GRB.CONTINUOUS, lb=0, ub=np.inf)
+                abs_zi = m.addVar(name=abs_zi_name, **self.NNEGVAR)
                 m.addConstr(abs_zi == gp.abs_(zi))
                 abs_zi_list.append(abs_zi)
-            z_dist = m.addVar(name="||z-(Ax+b)||_oo", vtype=GRB.CONTINUOUS, lb=0, ub=np.inf)
+            z_dist = m.addVar(name="||z-(Ax+b)||_oo", **self.NNEGVAR)
             m.addConstr(z_dist == gp.max_(abs_zi_list))
             m.addConstr(z_dist <= radius, "||z-(Ax+b)||_oo <= r")
 
diff --git a/learner_base.py b/learner_base.py
index 383a8de..cf624d5 100644
--- a/learner_base.py
+++ b/learner_base.py
@@ -2,6 +2,7 @@ import abc
 
 import z3
 
+
 class LearnerBase(abc.ABC):
     def __init__(self) -> None:
         pass
@@ -28,7 +29,8 @@ class LearnerBase(abc.ABC):
 
 
 class Z3LearnerBase(LearnerBase):
-    def __init__(self, state_dim:int, perc_dim:int, timeout:int=10000) -> None:
+    def __init__(self, state_dim: int, perc_dim: int,
+                 timeout: int = 10000) -> None:
         super().__init__()
         z3.set_param("timeout", timeout,
                      "solver.timeout", timeout,
diff --git a/teacher_base.py b/teacher_base.py
index 336c3a2..3f23d52 100644
--- a/teacher_base.py
+++ b/teacher_base.py
@@ -24,24 +24,41 @@ class TeacherBase(abc.ABC):
 
 
 class GurobiTeacherBase(TeacherBase):
+    FREEVAR = {"vtype": gp.GRB.CONTINUOUS, "lb": -np.inf, "ub": np.inf}
+    NNEGVAR = {"vtype": gp.GRB.CONTINUOUS, "lb": 0.0, "ub": np.inf}
+
     def __init__(self, name: str,
                  state_dim: int, perc_dim: int, ctrl_dim: int) -> None:
         super().__init__()
 
         self._gp_model = gp.Model(name)
+        m = self._gp_model
+
         # Old state variables
-        self._old_state = self._gp_model.addMVar(shape=state_dim, name='x', vtype=gp.GRB.CONTINUOUS, lb=-np.inf, ub=np.inf)
+        self._old_state = m.addMVar(shape=state_dim, name='x', **self.FREEVAR)
         # New state variables
-        self._new_state = self._gp_model.addMVar(shape=state_dim, name="x'", vtype=gp.GRB.CONTINUOUS, lb=-np.inf, ub=np.inf)
+        self._new_state = m.addMVar(shape=state_dim, name="x'", **self.FREEVAR)
         # Perception variables
-        self._percept = self._gp_model.addMVar(shape=perc_dim, name='z', vtype=gp.GRB.CONTINUOUS, lb=-np.inf, ub=np.inf)
+        self._percept = m.addMVar(shape=perc_dim, name='z', **self.FREEVAR)
         # Control variables
-        self._control = self._gp_model.addMVar(shape=ctrl_dim, name='u', vtype=gp.GRB.CONTINUOUS, lb=-np.inf, ub=np.inf)
+        self._control = m.addMVar(shape=ctrl_dim, name='u', **self.FREEVAR)
 
         # FIXME Replace hardcoded contraints for this particular example
         self._add_system()
         self._add_unsafe()
 
+    @property
+    def state_dim(self) -> int:
+        return self._old_state.shape[0]
+
+    @property
+    def perc_dim(self) -> int:
+        return self._percept.shape[0]
+
+    @property
+    def ctrl_dim(self) -> int:
+        return self._control.shape[0]
+
     @abc.abstractmethod
     def _add_system(self) -> None:
         raise NotImplementedError
@@ -51,7 +68,7 @@ class GurobiTeacherBase(TeacherBase):
         raise NotImplementedError
 
     @abc.abstractmethod
-    def _add_candidate(candidate) -> None:
+    def _add_candidate(self, candidate) -> None:
         raise NotImplementedError
 
     def set_old_state_bound(self, lb, ub) -> None:
@@ -68,7 +85,7 @@ class GurobiTeacherBase(TeacherBase):
         else:
             return z3.unknown
 
-    def dump_model(self, basename:str = "") -> None:
+    def dump_model(self, basename: str = "") -> None:
         """ Dump optimization problem in LP format """
         if not basename:
             basename = self._gp_model.ModelName
-- 
GitLab