Skip to content
Snippets Groups Projects
Commit 0102a733 authored by chsieh16's avatar chsieh16
Browse files

Rewrite AgBot case study

parent 70ecbedc
No related branches found
No related tags found
No related merge requests found
......@@ -4,7 +4,7 @@ import numpy as np
from dtree_teacher_base import DTreeGurobiTeacherBase
K_P = 0.1 #the controller gain is the strength of action a controller will take at a particular point
K_P = 0.1 # the controller gain is the strength of action a controller will take at a particular point
FORWARD_VEL = 1.0 # m/s car velocity
CYCLE_TIME = 0.05 # s
......@@ -27,51 +27,12 @@ NEW_ATAN_K_CTE_V_LIM = np.arctan(NEW_K_CTE_V_LIM)
NEW_RAW_ANG_ERR_LIM = ANG_LIM + FORWARD_VEL * CYCLE_TIME
def sensor(state):
""" Assuming the lane to track is aligned with x-axis (i.e., y==0 and yaw==0)
Hence, heading = 0-yaw = -yaw and distance = 0-y = -y."""
x, y, yaw = state
# TODO approximation instead of perfect perception
# TODO is this where we do AX+b
prcv_heading = -yaw
prcv_distance = -y
return prcv_distance,prcv_heading
def controller(prcv_ctd, prcv_heading):
error = prcv_heading + np.arctan(K_P*prcv_ctd/FORWARD_VEL)
# Calculate controller output
ang_vel = error / CYCLE_TIME
if ang_vel > ANG_VEL_LIM:
ang_vel = ANG_VEL_LIM
elif ang_vel < -ANG_VEL_LIM:
ang_vel = -ANG_VEL_LIM
# Return actuator values
return (ang_vel,)
def dynamics(old_state, ang_vel):
""" This dynamics for state variables x, y, yaw
x[n+1] = x[n] + v*cos(yaw)*CYCLE_TIME
y[n+1] = y[n] + v*sin(yaw)*CYCLE_TIME
yaw[n+1] = yaw[n] + ang_vel*CYCLE_TIME
"""
old_x, old_y, old_yaw = old_state
new_x = old_x + FORWARD_VEL * np.cos(old_yaw) * CYCLE_TIME
new_y = old_y + FORWARD_VEL * np.sin(old_yaw) * CYCLE_TIME
new_yaw = old_yaw + ang_vel*CYCLE_TIME
return new_x, new_y, new_yaw
class DTreeAgBotStanleyGurobiTeacher(DTreeGurobiTeacherBase):
# Ideal perception as a linear transform from state to ground truth percept
PERC_GT = np.array([[0., -1., 0.],
[0., 0., -1.]], float)
def __init__(self, name="agbot_stanley", norm_ord=2, ultimate_bound: float=0.0) -> None:
def __init__(self, name="agbot_stanley", norm_ord=2, ultimate_bound: float = 0.0) -> None:
assert ultimate_bound >= 0.0
self._ultimate_bound = ultimate_bound
......@@ -79,17 +40,47 @@ class DTreeAgBotStanleyGurobiTeacher(DTreeGurobiTeacherBase):
state_dim=3, perc_dim=2, ctrl_dim=1, norm_ord=norm_ord)
def is_safe_state(self, ex) -> bool:
def v(ctd, psi) -> float:
return abs(psi + np.arctan( (K_P*ctd) / FORWARD_VEL))
def spec(x, y, theta, ctd, psi) -> bool:
m_star = sensor
f = dynamics
g = controller
v_old = v(*m_star((x, y, theta)))
v_new = v(*m_star( f((x, y, theta),*g(ctd, psi)) ))
def f(old_state, ctrl):
""" This dynamics for state variables x, y, yaw
x[n+1] = x[n] + v*cos(yaw)*CYCLE_TIME
y[n+1] = y[n] + v*sin(yaw)*CYCLE_TIME
yaw[n+1] = yaw[n] + ang_vel*CYCLE_TIME
"""
old_x, old_y, old_yaw = old_state
ang_vel, = ctrl
new_x = old_x + FORWARD_VEL * np.cos(old_yaw) * CYCLE_TIME
new_y = old_y + FORWARD_VEL * np.sin(old_yaw) * CYCLE_TIME
new_yaw = old_yaw + ang_vel*CYCLE_TIME
return new_x, new_y, new_yaw
def g(perc):
prcv_ctd, prcv_heading = perc
error = prcv_heading + np.arctan(K_P*prcv_ctd/FORWARD_VEL)
# Calculate controller output
ang_vel = error / CYCLE_TIME
if ang_vel > ANG_VEL_LIM:
ang_vel = ANG_VEL_LIM
elif ang_vel < -ANG_VEL_LIM:
ang_vel = -ANG_VEL_LIM
# Return actuator values
return (ang_vel,)
def m_star(state):
""" Assuming the lane to track is aligned with x-axis (i.e., y==0 and yaw==0)
Hence, heading = 0-yaw = -yaw and distance = 0-y = -y."""
return self.PERC_GT @ state
def v(gt_perc) -> float:
ctd, psi = gt_perc
return abs(psi + np.arctan((K_P*ctd) / FORWARD_VEL))
def spec(state, perc) -> bool:
v_old = v(m_star(state))
v_new = v(m_star(f(state, g(perc))))
return v_new <= max(v_old, self._ultimate_bound)
return spec(*ex)
return spec(ex[0:self.state_dim],
ex[self.state_dim:self.state_dim+self.perc_dim])
def _add_system(self) -> None:
# Bounds on all domains
......@@ -136,7 +127,7 @@ class DTreeAgBotStanleyGurobiTeacher(DTreeGurobiTeacherBase):
new_yaw = m.addVar(name="θ'", **self.FREEVAR)
m.addConstr(new_yaw == old_yaw + ang_vel * CYCLE_TIME)
m.update()
def _add_unsafe(self) -> None:
assert self.PERC_GT.shape == (self.perc_dim, self.state_dim)
# Variable Aliases
......@@ -183,7 +174,6 @@ class DTreeAgBotStanleyGurobiTeacher(DTreeGurobiTeacherBase):
m.addConstr(new_lya_val >= bound_new_lya_val, name="Increasing Error") # Tracking error is increasing
m.update()
def _add_objective(self) -> None:
# Variable Aliases
m = self._gp_model
......
#!/usr/bin/env python3
from csv import Dialect
import itertools
import json
#import matplotlib.pyplot as plt
import pathlib
import pickle
import traceback
from typing import Dict, Hashable, List, Tuple
from typing import Dict, Hashable, Literal
import numpy as np
import z3
from dtree_learner import DTreeLearner as Learner
from agbot_stanley_teacher import DTreeAgBotStanleyGurobiTeacher as AgBotTeacher
def search_part(partition, state):
assert len(partition) == len(state)
bounds = []
for sorted_list, v in zip(partition, state):
i = np.searchsorted(sorted_list, v)
if i == 0 or i == len(sorted_list):
return None
bounds.append((sorted_list[i-1], sorted_list[i]))
return tuple(bounds)
def load_examples_from_npz(file_name: str, teacher:AgBotTeacher, partition):
print("Loading examples from .npz")
#with open(file_name, "rb") as npz_file_io:
npz_data = np.load(file_name)
files = npz_data._files
data_5d_numpy =npz_data['dps_arr']
data_5d = list(map(tuple, data_5d_numpy))
# x_arr[:-1], except the last one, x_arr[1:]except the first one
from dtree_synth import DataSet, search_part, synth_dtree_per_part
from dtree_learner import DTreeLearner as DTreeLearner
from agbot_stanley_teacher import DTreeAgBotStanleyGurobiTeacher
def load_examples_from_numpy_array(
data_5d_numpy: np.ndarray,
teacher: DTreeAgBotStanleyGurobiTeacher,
partition) -> Dict[Hashable, DataSet]:
# x_arr[:-1], except the last one, x_arr[1:] except the first one
bound_list = list(list(zip(x_arr[:-1], x_arr[1:])) for x_arr in partition)
ret = {part: [[], [], 0] for part in itertools.product(*bound_list)}
num_excl_samples =0
for dpoint in data_5d:
x,y,theta = dpoint[0], dpoint[1], dpoint[2]
#print(x,y,tetha)
#print(dpoint)
ret = {part: DataSet() for part in itertools.product(*bound_list)}
num_excl_samples = 0
for dpoint in data_5d_numpy:
vehicle_state = dpoint[0:teacher.state_dim]
part = search_part(partition, vehicle_state)
if part is None:
num_excl_samples += 1
continue
if np.any(np.isnan(dpoint)):
ret[part][2] += 1
elif teacher.is_safe_state(dpoint):
ret[part][0].append(dpoint)
ret[part].num_nan_dps += 1
elif teacher.is_safe_state(dpoint):
ret[part].safe_dps.append(tuple(dpoint))
else:
ret[part][1].append(dpoint)
#print(dpoint)
ret[part].unsafe_dps.append(tuple(dpoint))
print("# samples not in any selected parts:", num_excl_samples)
return ret
def synth_dtree(learner: Learner, teacher: AgBotTeacher, num_max_iterations: int = 10):
past_candidate_list = []
for k in range(num_max_iterations):
print("="*80)
print(f"Iteration {k}:", sep='')
print("learning ....")
candidate = learner.learn()
print("done learning")
print(f"candidate: {candidate}")
past_candidate_list.append(candidate)
# QUERYING TEACHER IF THERE ARE NEGATIVE EXAMPLES
result = teacher.check(candidate)
print(f"Satisfiability: {result}")
if result == z3.sat:
negative_examples = teacher.model()
assert len(negative_examples) > 0
print(f"negative examples: {negative_examples}")
# assert validate_cexs(teacher.state_dim, teacher.perc_dim, candidate, negative_examples)
learner.add_negative_examples(*negative_examples)
continue
elif result == z3.unsat:
print("we are done!")
return True, (k, z3.simplify(candidate, arith_lhs=True).sexpr())
else:
return False, f"Reason Unknown {teacher.reason_unknown()}"
return False, f"Reached max iteration {num_max_iterations}."
def main(dom: Literal["concat", "diff"], ult_bound: float):
NPZ_FILE_PATH = "data/perceptor-agbot-collect_images_2021-10-29-01-37-44-0.0-50.0.npz"
print("Loading examples from .npz")
with np.load(NPZ_FILE_PATH) as npz_data:
data_5d_numpy = npz_data['dps_arr']
if __name__ == "__main__":
NPZ_FILE_PATH="data/perceptor-agbot-collect_images_2021-10-29-01-37-44-0.0-50.0.npz"
# Partitions on prestate
X_LIM = np.inf
......@@ -102,23 +50,25 @@ if __name__ == "__main__":
PRE_Y_LIM = 0.228
NUM_Y_PARTS = 10
Y_ARR = np.linspace(-PRE_Y_LIM, PRE_Y_LIM, NUM_Y_PARTS + 1)
#temp = np.round(Y_ARR, decimals=4)
PRE_YAW_LIM = np.pi / 6
NUM_YAW_PARTS = 10
YAW_ARR = np.linspace(-PRE_YAW_LIM, PRE_YAW_LIM, NUM_YAW_PARTS + 1)
PARTITION = (X_ARR, Y_ARR, YAW_ARR)
NUM_MAX_ITER = 500
FEATURE_DOMAIN = "concat"
ULT_BOUND = 0.0
FEATURE_DOMAIN = dom
ULT_BOUND = ult_bound
NORM_ORD = 1
teacher= AgBotTeacher(norm_ord=NORM_ORD, ultimate_bound=ULT_BOUND)
part_to_examples = load_examples_from_npz(NPZ_FILE_PATH, teacher, PARTITION)
teacher = DTreeAgBotStanleyGurobiTeacher(norm_ord=NORM_ORD, ultimate_bound=ULT_BOUND)
part_to_examples = load_examples_from_numpy_array(data_5d_numpy, teacher, PARTITION)
# Print statistics about training data points
print("#"*80)
print("Parts with unsafe data points:")
for i, (part, (safe_dps, unsafe_dps, num_nan)) in enumerate(part_to_examples.items()):
for i, (part, dataset) in enumerate(part_to_examples.items()):
safe_dps, unsafe_dps, num_nan = dataset.safe_dps, dataset.unsafe_dps, dataset.num_nan_dps
lb, ub = np.asfarray(part).T
lb[2] = np.rad2deg(lb[2])
ub[2] = np.rad2deg(ub[2])
......@@ -126,71 +76,30 @@ if __name__ == "__main__":
if len(unsafe_dps) > 0:
print(f"Part Index {i}:", f"y in [{lb[1]:.03}, {ub[1]:.03}] (m);", f"θ in [{lb[2]:.03}, {ub[2]:.03}] (deg);",
f"# safe: {len(safe_dps)}", f"# unsafe: {len(unsafe_dps):03}", f"# NaN: {num_nan}")
result = []
for i, (part, (safe_dps, unsafe_dps, num_nan)) in enumerate(part_to_examples.items()):
#if not i == 1:
# continue
# if a partition has zero positive datapoints, then skip the partion
if len(safe_dps) == 0:
continue
if len(safe_dps) == 17:
print()
print("#"*80)
print(f"# safe: {len(safe_dps)}; "
f"# unsafe: {len(unsafe_dps)}; "
f"# NaN: {num_nan}")
lb, ub = np.asfarray(part).T
return
def teacher_builder() -> DTreeAgBotStanleyGurobiTeacher:
return DTreeAgBotStanleyGurobiTeacher(norm_ord=NORM_ORD, ultimate_bound=ULT_BOUND)
teacher = AgBotTeacher(norm_ord=NORM_ORD, ultimate_bound=ULT_BOUND)
teacher.set_old_state_bound(lb=lb, ub=ub)
learner = Learner(state_dim=teacher.state_dim,
perc_dim=teacher.perc_dim, timeout=20000)
learner.set_grammar([(AgBotTeacher.PERC_GT, np.zeros(2))], FEATURE_DOMAIN)
learner.add_positive_examples(*safe_dps)
learner.add_negative_examples(*unsafe_dps)
try:
found, ret = synth_dtree(learner, teacher,
num_max_iterations=NUM_MAX_ITER)
print(f"Found? {found}")
if found:
k, expr = ret
result.append({"part": part,
"feature_domain": FEATURE_DOMAIN,
"ultimate_bound": ULT_BOUND,
"status": "found",
"result": {"k": k, "formula": expr}})
else:
result.append({"part": part,
"feature_domain": FEATURE_DOMAIN,
"ultimate_bound": ULT_BOUND,
"status": "not found",
"result": ret})
except KeyboardInterrupt:
print("User pressed Ctrl+C. Skip all remaining partition.")
break # NOTE finally block is executed before break
except Exception as e:
result.append({"part": part,
"status": "exception",
"result": traceback.format_exc()})
print(e)
finally:
data_file = pathlib.Path("out/pre.data")
data_file.rename(f"out/part-{i:03}-pre.data")
del teacher
del learner
with open(f"out/dtree_synth.{NUM_Y_PARTS}x{NUM_YAW_PARTS}.out.json", "w") as f:
json.dump(result, f)
def learner_builder() -> DTreeLearner:
learner = DTreeLearner(state_dim=teacher.state_dim,
perc_dim=teacher.perc_dim, timeout=20000)
learner.set_grammar([(DTreeAgBotStanleyGurobiTeacher.PERC_GT, np.zeros(2))], FEATURE_DOMAIN)
return learner
result = synth_dtree_per_part(
part_to_examples,
teacher_builder,
learner_builder,
num_max_iter=NUM_MAX_ITER,
ult_bound=ULT_BOUND,
feature_domain=FEATURE_DOMAIN
)
with open(f"out/dtree_synth.{NUM_Y_PARTS}x{NUM_YAW_PARTS}.out.json", "w") as f:
json.dump(result, f)
if __name__ == "__main__":
main("concat", 0.0)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment