Skip to content
Snippets Groups Projects
Commit 7514fdfb authored by navidmokh's avatar navidmokh
Browse files

initial push for UTC colab.

parent 68db7ef1
No related branches found
No related tags found
No related merge requests found
import GeometryUtils as GeoUtils
class DiscrepancyLearner:
def __init__(self):
def _determineInitialSamplePoints(self, initial_set: GeoUtils.SSSlice):
initial_set.wrap("Axis-Aligned Ellipsoid")
from typing import Iterable
class SSSlice():
def __init__(self):
main.py 0 → 100644
import json
import sys
from src.core.multvrmain import verify
from src.frontend.input import read_scenario_data
from src.plotter.parser import parse
import random
import numpy as np
def main(scenario_file_name):
cur_scenario = read_scenario_file(scenario_file_name)
verify(cur_scenario)
simFunction = importSimFunction(data["directory"])
safey, reach = verify(data, simFunction)
lines = reach.raw.split("\n")
print type(lines[0]), lines[0]
initNode, y_min, y_max= parse(lines)
if __name__=='__main__':
# take json file name from parameter
main(sys.argv[-1])
from typing import Tuple, Optional, Dict, List
from collections import deque
def build_safe_reachtube(agent, cur_scenario) -> Tuple[str, 'Scenario']:
mode_sequence_stack: deque = deque(((cur_scenario.initialModeInd,),))
cur_scenario.remaining_initial_sets: Dict[Tuple[int], deque] = {(cur_scenario.initialModeInd,): deque((cur_scenario.initial_set,))}
while len(mode_sequence_stack) > 0:
cur_mode_sequence: Tuple[int] = mode_sequence_stack.pop()
cur_mode_stack: deque = cur_scenario.remaining_initial_sets[cur_mode_sequence]
for init_set in get_unsafe_and_unknown_initial_sets(cur_mode_sequence): # checks if formerly safe Reachtubes in
# this mode are still safe and returns initial set partitions of the unsafe
cur_mode_stack.extend(get_refinements(init_set)) # supports arbitrary refinement strategy & extend is repeated appends
while len(cur_mode_stack) > 0:
cur_initial_set = cur_mode_stack.pop()
traces = sample_traces(cur_initial_set)
unsafe_or_unknown_traces = determine_unsafe_and_unknown_traces(traces)
if len(unsafe_or_unknown_traces) > 0:
if len(cur_mode_sequence) == 1 and any(is_unsafe(unsafe_traces)):
return False, unsafe_traces # Unsafe Trace found
elif len(cur_mode_sequence) == 1:
return None, cur_initial_set # Refinement limit reached at first mode, return Unknown!
elif at_refinement_limit():
cur_scenario.add_to_dynamic_unknown_set(cur_initial_set)
mode_sequence_stack.append(cur_mode_sequence[:-1])
break
else:
cur_scenario.add_to_dynamic_unsafe_set(cur_initial_set)
cur_mode_stack.extend(get_refinements(cur_initial_set))
continue
cur_RT_segment = compute_RT_segment(cur_initial_set, traces)
next_initsets = compute_guard_intersections(cur_RT_segment)
if not is_all_safe(cur_RT_segment, next_initsets): # check if current Reachtube and next initial sets are safe
if at_refinement_limit(): # need to go to parent
cur_scenario.add_to_dynamic_unknown_set(cur_initial_set)
mode_sequence_stack.append(cur_mode_sequence[:-1])
break
else:
cur_scenario.add_to_dynamic_unsafe_set(cur_initial_set)
cur_mode_stack.extend(get_refinements(cur_initial_set))
else:
# reachtube is safe all is well
def verify(cur_scenario: 'Scenario') -> Tuple[Optional[bool], 'Result']:
refinement_limit = 10 # TODO make this a configuration parameter
for agent in Scenario.agents_list:
result = build_safe_reachtube(agent, Scenario)
if result[0] == 'Unknown':
print(f"refinement limit reached: {refinement_limit}")
return (False, result[1])
elif result[0] == 'False':
print(f"Unsafe behavior found!")
return (None, result[1])
elif result[0] != 'True':
raise ValueError("invalid string returned by build_safe_reachtube")
return (True, Scenario.certificate)
from typing import Callable, Tuple, List, Dict
def parseAutomata(automata: 'Automata'):
agentDefinition = None
return agentDefinition
def parseC2E2Model(folder_path: str):
return None
def parseUnsafeSet(unsafe_string: str) -> Dict[str, 'StateSet']:
def importSimFunctions(path: str) -> Tuple[Callable, Callable]:
"""
Load simulation function from given file path
Note the folder in the examples directory must have __init__.py
And the simulation function must be named TC_Simulate
The function should looks like following:
TC_Simulate(Mode, initialCondition, time_bound)
Args:
path (str): Simulator directory.
Returns:
simulation function
"""
try:
import os
import sys
import importlib
sys.path.append(os.path.abspath(path))
mod_name = path.replace('/', '.')
module = importlib.import_module(mod_name)
sys.path.pop()
return (module.TC_Simulate_Batch, module.TC_Simulate)
class Model:
def __init__(self, model_directory_path: str):
pass
class Scenario:
def __init__(self, scenario_file_name: str):
import json
assert ".json" in scenario_file_name, "Please provide json input file"
with open(scenario_file_name, 'r') as f:
data = json.load(f)
self.models_dynamics: List[Model] = []
for ind, model in enumerate(data["models"]):
if data[model]["type"] == "black_box":
self.models_dynamics.append(importSimFunctions(model["folder"]))
elif data[model]["type"] == "c2e2":
self.models_dynamics.append(parseC2E2Model(model["folder"]))
else:
raise ValueError(f"invalid model definition for Model #{ind}")
agent_automatas = list(map(lambda x: parseAutomata(data[x]), data["agents"]))
agent_unsafe_sets: List[Dict[str, 'StateSet']] = [parseUnsafeSet(data[agent]) for agent in data["agents"]]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment