from __future__ import annotations from .structures import * import numpy as np import matplotlib.pyplot as pp from .plotdefs import Plotter import copy from collections import defaultdict import pathlib import toml import shutil import os """ Builds on abstractions defined in lib.structures. """ def _titlecase(s: str) -> str: return " ".join(_s[:1].upper() + _s[1:] for _s in s.replace(" ", "_").split("_")) def _fresh_filename(path: pathlib.Path, overwrite: bool=False) -> pathlib.Path: _path = path while _path.exists(): print(f"WARNING: {_path} exists in filesystem") if overwrite: print(f":: rm -rf {_path}") if _path.is_dir(): shutil.rmtree(str(_path)) else: os.remove(str(_path)) else: _path = _path.parent.joinpath(_path.stem + "_" + _path.suffix) print(f"Write instead to {_path}") return _path def min_n(generator, default=None): x0 = None for x in generator: if x is None: continue elif x0 is None or x < x0: x0 = x return default if x0 is None else x0 def max_n(generator, default=None): x0 = None for x in generator: if x is None: continue elif x0 is None or x > x0: x0 = x return default if x0 is None else x0 def _dict_to_defaultdict(D: dict, factory) -> defaultdict: DD = defaultdict(factory) for k, v in D.items(): DD[k] = v return DD class SuperSequence: """ Sequence of labeled Sequences. Mostly for visualization purposes. Fields ------ outdir : pathlib.Path name : str sequences : connections.defaultdict[str, Sequence] defaults : ConnectionLayout """ def __init__(self, outdir: pathlib.Path, name: str, sequences: dict[str, Sequence]=None, params: dict[str, ...]=None, defaults: ConnectionLayout=None): """ Constructor. Parameters ---------- outdir : pathlib.Path name : str sequences : dict[str, Sequence] (optional) defaults : ConnectionLayout (optional) """ self.outdir = outdir self.name = name self.sequences = ( defaultdict(Sequence) if sequences is None else _dict_to_defaultdict(sequences, Sequence) ) self.params = dict() if params is None else params self.defaults = defaults def __getitem__(self, key: str) -> Sequence: assert isinstance(key, str) return self.sequences[key] def __setitem__(self, key: str, seq: Sequence): assert isinstance(key, str) assert isinstance(seq, Sequence) self.sequences[key] = seq def get(self, key, default=None) -> Sequence: return self.sequences.get(key, default) def keys(self): return self.sequences.keys() def values(self): return self.sequences.values() def items(self): return self.sequences.items() def update(self, other): self.sequences.update(other) return self def __add__(self, other): assert isinstance(other, SuperSequence) sequences = copy.deepcopy(self.sequences) return SuperSequence(self.outdir, self.name, sequences).update(other) def __iadd__(self, other): return self.update(other) def by_times(self) -> list[Sequence]: """ Return Sequences in a list ordered by earliest Event. """ return sorted( self.sequences.values(), key=lambda e: e.min_time() ) def by_times_named(self) -> list[(str, Sequence)]: """ Return Sequences with their names in a list ordered by earliest Event. """ return sorted( self.sequences.items(), key=lambda name_seq: name_seq[1].min_time() ) def min_time(self) -> float: return min(map(lambda seq: seq.min_time(), self.sequences.values())) def max_time(self) -> float: return max(map(lambda seq: seq.max_time(), self.sequences.values())) def to_sequence(self) -> Sequence: """ Condense to a single Sequence to pass to the computer. """ seq = Sequence() for S in self.by_times(): seq = seq + S return seq def to_primitives(self) -> dict[str, dict]: return { label: seq.to_primitives() for label, seq in self.sequences.items() } @staticmethod def from_primitives(outdir: pathlib.Path, name: str, alldata: dict[str, dict]): return SuperSequence(outdir, name, { label: Sequence.from_primitives(seq_dict) for label, seq_dict in alldata.items() }) def save(self, target: pathlib.Path=None, overwrite: bool=False, printflag: bool=True): T = self.outdir.joinpath(self.name) if target is None else target if printflag: print(f"[sequencing] Saving sequence data to '{T}':") T = _fresh_filename(T, overwrite) T_seq = T.joinpath("sequence.toml") T_par = T.joinpath("parameters.toml") if not T.is_dir(): if printflag: print(f":: mkdir -p '{T}'") T.mkdir(parents=True, exist_ok=True) with T_seq.open('w') as outfile: toml.dump(self.to_primitives(), outfile) with T_par.open('w') as outfile: toml.dump( { k: float(v) if isinstance(v, (float, np.float32, np.float64)) else v for k, v in self.params.items() }, outfile ) return self @staticmethod def load(target: pathlib.Path, outdir: pathlib.Path=None, name: str=None, printflag: bool=True): outdir = target.parent if outdir is None else outdir name = target.stem if name is None else name if printflag: print(f"[sequencing] Loading data from target '{target}':") if not target.is_dir(): raise Exception(f"Target '{target}' does not exist") sequence_file = target if printflag: print("[sequencing] load sequence") if not sequence_file.is_file(): raise Exception(f"Sequence file '{sequence_file}' does not exist") seq_dict = toml.load(sequence_file) return SuperSequence.from_primitives(outdir, name, seq_dict) @staticmethod def _gen_timeline(detailed=False, layout=(1, 4, 32, 1, 32, -10, 10)) \ -> Plotter: P = Plotter().ggrid(True) P.set_yticks([]).set_yticklabels([]) return P def draw_simple(self) -> Plotter: P = SuperSequence._gen_timeline(detailed=False) H = 0.2 c = 0 tl_level = 0 tl_max = 0 t1_last = dict() Tmin = 0 Tmax = 0 for i, (label, seq) in enumerate(self.items()): tmin, tmax = seq.when() Tmin = min(Tmin, tmin) Tmax = max(Tmax, tmax) k = 0 while tmin < t1_last.get(k, tmin): k += 1 tl_level = k if seq.stack_idx is None else seq.stack_idx tl_max = max(tl_max, tl_level) P.ax.add_patch( pp.Rectangle( (tmin, tl_level * H), tmax - tmin, H / 2, edgecolor="k", facecolor=f"C{c}" if seq.color is None else seq.color, zorder=100 ) ) P.ax.text( tmin, (tl_level * H - H / 15), label, #_titlecase(label), horizontalalignment="left", verticalalignment="top" ) if seq.color is None: c = (c + 1) % 10 t1_last[tl_level] = max(tmax, t1_last.get(tl_level, tmax)) P.set_xlim(Tmin - (Tmax - Tmin) / 20, Tmax + (Tmax - Tmin) / 20) P.set_ylim(-3* H / 2, (tl_max + 2) * H) P.set_xlabel("Time [s]") return P def draw_detailed( self, ghost_delays: bool=True, connections: dict[str, Connection]=None, mogtables: list[(..., dict[str, ...] | None)]=None ) -> Plotter: assert (connections is not None) ^ (self.defaults is not None) _connections = list( (connections if connections is not None else self.defaults.connections).items() ) P = SuperSequence._gen_timeline(detailed=True) T0 = min([seq.min_time() for seq in self.values()]) T1 = max([seq.max_time() for seq in self.values()]) color_C = 0 hk = 3 linewidth=0.7 def ycoord(x, conn, k): y = x if isinstance(conn, DigitalConnection) else (x + 10) / 20 return hk * k + 1 + y # set up background lines and histories on each connection # each history is a list of matplotlib line objects for each sequence conn_history = dict() for k, (conn_label, conn) in enumerate(reversed(_connections)): if isinstance(conn, DigitalConnection): P.ax.axhline( ycoord(0, conn, k), color="0.85", linestyle="-", linewidth=linewidth / 2, zorder=100) P.ax.axhline( ycoord(1, conn, k), color="0.85", linestyle="--", linewidth=linewidth / 2, zorder=100) elif isinstance(conn, AnalogConnection): P.ax.axhline( ycoord(-10.0, conn, k), color="0.85", linestyle="--", linewidth=linewidth / 2, zorder=100) P.ax.axhline( ycoord(0.0, conn, k), color="0.85", linestyle="-", linewidth=linewidth / 2, zorder=100) P.ax.axhline( ycoord(+10.0, conn, k), color="0.85", linestyle="--", linewidth=linewidth / 2, zorder=100) else: raise Exception("Invalid connection type") P.ax.text( T0, ycoord(0.0, AnalogConnection, k), conn_label + " ", #_titlecase(conn_label) + " ", horizontalalignment="right", verticalalignment="center", zorder=101) P.ax.text( T1, ycoord(0.0, AnalogConnection, k), " " + conn_label, #" " + _titlecase(conn_label), horizontalalignment="left", verticalalignment="center", zorder=101) conn_history[conn] = [ P.ax.plot( [T0], [ycoord(conn.default, conn, k)], color="0.65", linewidth=linewidth, zorder=103)[0] ] # add line objects to each history for each sequence they're a part of for seq_label, seq in self.by_times_named(): if seq.color is None: color = f"C{color_C}" color_C = (color_C + 1) % 10 else: color = seq.color t0 = seq.min_time() P.ax.axvline( t0, color="k", linestyle="--", linewidth=linewidth / 2, zorder=102) for k, (conn_label, conn) in enumerate(reversed(_connections)): ts = seq._get_states(**conn) if len(ts) > 0: t_prev = conn_history[conn][-1].get_xdata() s_prev = conn_history[conn][-1].get_ydata() conn_history[conn][-1].set_xdata( list(t_prev) + [(t_prev[-1] + t0) / 2]) conn_history[conn][-1].set_ydata( list(s_prev) + [s_prev[-1]]) _t = [(t_prev[-1] + t0) / 2] _s = [s_prev[-1]] # _t = [conn_history[conn][-1].get_xdata()[-1]] # _s = [conn_history[conn][-1].get_ydata()[-1]] for t, s in ts: if ghost_delays: if isinstance(conn, DigitalConnection): dt = conn.delay_up if s == 1 else conn.delay_down elif isinstance(conn, AnalogConnection): dt = conn.delay if dt > 0: P.ax.plot( _t[-1:] + [t + dt, t + dt], _s[-1:] + [_s[-1], ycoord(s, conn, k)], color=color, linewidth=linewidth, linestyle="--", zorder=103 ) _t += [t, t] _s += [_s[-1], ycoord(s, conn, k)] conn_history[conn].append( P.ax.plot(_t, _s, color=color, linewidth=linewidth, zorder=103)[0]) P.ax.text(t0, hk * k + 1, seq_label, #_titlecase(seq_label), horizontalalignment="left", verticalalignment="top", zorder=101) # add tails to the final history items going until T1 for history in conn_history.values(): i0 = history.index(max(history, key=lambda h: max(h.get_xdata()))) x = history[i0].get_xdata() y = history[i0].get_ydata() history[i0].set_xdata(list(x) + [T1]) history[i0].set_ydata(list(y) + [y[-1]]) # process any MOGTables moghs = 1 moghk = 4 + 1 + 3 * moghs mT0 = (T0 + T1) / 2 mT1 = (T0 + T1) / 2 def mogycoord(x, xlim, k, j): xmin, xmax = xlim return moghk * k + j * (1 + moghs) + 1 + (x - xmin) / (xmax - xmin) mogtables = list() if mogtables is None else mogtables for k, (mogtable, opt) \ in zip(range(-1, -len(mogtables) - 1, -1), mogtables): opt = dict() if opt is None else opt t0 = opt.get("offset", 0.0) name = opt.get("name", f"mogtable {k}") color = "C7" if mogtable.color is None else mogtable.color mT0 = min(T0, t0 + mogtable.min_time()) mT1 = max(T1, t0 + mogtable.max_time()) # default min/max taken from the MOGRF manual fmin = min_n((e.frequency for e in mogtable), 10.0) fmax = max_n((e.frequency for e in mogtable), 200.0) pmin = min_n((e.power for e in mogtable), -70.0) pmax = max_n((e.power for e in mogtable), 33.0) phmin = min_n((e.phase for e in mogtable), 0.0) phmax = max_n((e.phase for e in mogtable), 360.0) # set up lines in the analog style for frequency, power, and phase for j, (param, lim) in enumerate([ ("phase", (phmin, phmax)), ("power", (pmin, pmax)), ("frequency", (fmin, fmax)) ]): P.ax.axhline( mogycoord(0.0, [0.0, 1.0], k, j), color="0.85", linestyle="--", linewidth=linewidth / 2, zorder=100) P.ax.axhline( mogycoord(1.0, [0.0, 1.0], k, j), color="0.85", linestyle="--", linewidth=linewidth / 2, zorder=100) P.ax.text( mT0, mogycoord(0.5, [0.0, 1.0], k, j), param + " ", #_titlecase(param) + " ", horizontalalignment="right", verticalalignment="center", zorder=101) P.ax.text( mT1, mogycoord(0.5, [0.0, 1.0], k, j), " " + param, #" " + _titlecase(param), horizontalalignment="left", verticalalignment="center", zorder=101) P.axvline( t0, color="0.25", linestyle=":", linewidth=linewidth / 2, zorder=102) P.ax.text( t0, moghk * k + 1, name, #_titlecase(name), horizontalalignment="left", verticalalignment="top", zorder=101) t = [(mT0 + t0 + mogtable.min_time()) / 2] s = [mogycoord(lim[0], lim, k, j)] for i, mogevent in enumerate(mogtable): t += [t0 + mogevent.time, t0 + mogevent.time] s += [ s[-1], mogycoord(getattr(mogevent, param), lim, k, j) if getattr(mogevent, param) is not None else s[-1] ] t += [(t[-1] + mT1) / 2] s += [s[-1]] P.plot( [mT0, t[0]], 2 * [mogycoord(lim[0], lim, k, j)], color="0.65", linewidth=linewidth, zorder=103 ) P.plot( [t[-1], mT1], 2 * [s[-1]], color="0.65", linewidth=linewidth, zorder=103 ) P.plot(t, s, color=color, linewidth=linewidth, zorder=103) P.set_xlim(min(mT0, T0), max(mT1, T1)) P.set_ylim( -moghk * len(mogtables) - 2 - hk / 2, hk * len(_connections) + 2 + hk / 2 ) return P