Source code for smart_cruise.jit

import numpy as np
from numba import njit
from numba.core import config as numba_config
from numba.typed import List


[docs] @njit def new_state_dict(): """ Creates empty state dict Returns ------- :class:`~numba.typed.Dict` A Numba typed dict (h, s, b) -> [(w, t, p)]. """ return {(1, 2, 3): List([(1.0, 2.0, 3)]) for _ in range(0)}
[docs] @njit def insert_state(states, h, s, b, w, t, p): """ Defaultdict-like state insertion. Parameters ---------- states: :class:`~numba.typed.Dict` A Numba typed dict (h, s, b) -> [(w, t, p)]. h: :class:`int` Height index. s: :class:`int` Speed index. b: :class:`int` Backoff. w: :class:`float` Weight. t: :class:`float` Time. p: :class:`int` Index of origin state. Returns ------- None """ if w > 0 and t > 0: if (h, s, b) in states: states[h, s, b].append((w, t, p)) else: states[h, s, b] = List([(w, t, p)])
np_rec = np.dtype([("h", "i8"), ("s", "i8"), ("w", "f8"), ("t", "f8")]) if numba_config.DISABLE_JIT: # Pure Python mode: use numpy dtypes directly wt_type = np.dtype([("f0", "f8"), ("f1", "f8")]) state_type = np.dtype( [("h", "i8"), ("s", "i8"), ("b", "i8"), ("w", "f8"), ("t", "f8"), ("p", "i8")] ) nb_rec = np_rec else: # JIT mode: use Numba types from numba import from_dtype, typeof wt_type = typeof((1.0, 2.0)) state_type = typeof((1, 2, 3, 4.0, 5.0, 6)) nb_rec = from_dtype(np_rec)
[docs] @njit def pareto_collapse(states, pareto_max): """ Parameters ---------- states: :class:`~numba.typed.Dict` A Numba typed dict (h, s, b) -> [(w, t, p)]. pareto_max: :class:`int` Maximum number of pareto points. Returns ------- None """ for state, wtp in states.items(): keys = np.empty(len(wtp), dtype=wt_type) for i, x in enumerate(wtp): keys[i] = (-x[0], -x[1]) indices = np.argsort(keys) wtp0 = wtp[indices[0]] res = List([wtp0]) tm = wtp0[1] for i in indices[1:]: t = wtp[i][1] if t > tm: res.append(wtp[i]) tm = t # Sample if needed if len(res) > pareto_max: res = List([res[round(y)] for y in np.linspace(0, len(res) - 1, pareto_max)]) states[state] = res
[docs] @njit def compute( start, timings, speed_array, cruise_matrix, up_matrix, down_matrix, height_gain, weight_cost, backoff, pareto_max, ): """ Core computation of optimal trajectories. Parameters ---------- start: :class:`list` Initial (weight, time) budgets. timings: :class:`~numpy.ndarray` Time cost per speed level. speed_array: :class:`~numpy.ndarray` Speed cost adjustments. cruise_matrix: :class:`~numpy.ndarray` Base energy cost matrix (track point x height). up_matrix: :class:`~numpy.ndarray` Energy cost for moving upward by one height level. down_matrix: :class:`~numpy.ndarray` Energy gain for moving downward by one height level. height_gain: :class:`float` weight_cost: :class:`float` backoff: :class:`int` pareto_max: :class:`int` """ track_point = np.empty(len(start), dtype=state_type) for i, wt in enumerate(start): track_point[i] = (0, 0, 0, *wt, 0) track_points = [track_point] n_d, n_h = cruise_matrix.shape n_s = len(speed_array) # One track point after the other for i in range(n_d): # new state dict states = new_state_dict() # Browse states of current track point and processes their successors for p, state in enumerate(track_points[i]): h, s, b, w, t, _ = state base_cost = cruise_matrix[i, h] - h * height_gain + w * weight_cost c = up_matrix[i, h] d = down_matrix[i, h] bb = max(0, b - 1) ww = w - max(0.0, base_cost + speed_array[s]) tt = t - timings[s] insert_state(states, h, s, bb, ww, tt, p) if h < n_h - 1: insert_state(states, h + 1, s, bb, ww - c, tt, p) if h > 0: insert_state(states, h - 1, s, bb, ww + d, tt, p) if b == 0: for ss in range(n_s): if ss != s: ww = w - max(0.0, base_cost + speed_array[ss]) tt = t - timings[ss] insert_state(states, h, ss, backoff, ww, tt, p) if h < n_h - 1: insert_state(states, h + 1, ss, backoff, ww - c, tt, p) if h > 0: insert_state(states, h - 1, ss, backoff, ww + d, tt, p) pareto_collapse(states, pareto_max=pareto_max) n_states = sum([len(v) for v in states.values()]) track_point = np.empty(n_states, dtype=state_type) index = 0 for k, vv in states.items(): for v in vv: track_point[index] = (*k, *v) index += 1 track_points.append(track_point) # Global Pareto-reduction of last track point last = track_points[-1] if len(last) == 0: return None keys = np.empty(len(last), dtype=wt_type) for i, x in enumerate(last): keys[i] = (-x[3], -x[4]) indices = np.argsort(keys) ends = [last[indices[0]]] tm = last[indices[0]][4] for i in indices[1:]: state = last[i] if state[4] > tm: ends.append(state) tm = state[4] # Convert to trajectory array n_traj = len(ends) trajectories = np.empty((n_traj, n_d + 1), dtype=nb_rec) for i, end in enumerate(ends): trajectories[i, -1]["h"] = end[0] trajectories[i, -1]["s"] = end[1] trajectories[i, -1]["w"] = end[3] trajectories[i, -1]["t"] = end[4] pos = end[5] for j in range(n_d - 1, -1, -1): state = track_points[j][pos] trajectories[i, j]["h"] = state[0] trajectories[i, j]["s"] = state[1] trajectories[i, j]["w"] = state[3] trajectories[i, j]["t"] = state[4] pos = state[5] return trajectories