#
# File: dynamics.py
#
import abc
from collections import OrderedDict
import torch
from .odesolver import odestep
# DynamicalSystems
[docs]class DiscreteDynamicalSystem(metaclass=abc.ABCMeta):
"""
"""
[docs] def step(self, t, x, u=None):
"""Returns next x_{t+1}
Args:
t (torch.IntTensor): (B, T,) shaped time
x (torch.tensor): (B, T, n) shaped system states
u (torch.tensor): (B, T, m) shaped control inputs
Returns:
x_next (torch.tensor): (B, T, n) shaped next system states
"""
raise NotImplementedError
[docs] def observe(self, t, x, u=None):
"""Returns y_t"""
raise NotImplementedError
@property
def xdim(self):
"""system state dimensions
"""
return self._xdim
@property
def udim(self):
"""system input dimension
returns None if unactuated
"""
return self._udim
@property
def ydim(self):
"""system observation dimension
"""
return self._ydim
[docs]class ContinuousDynamicalSystem:
"""
"""
[docs] def step_derivs(self, t, x, u=None):
"""Returns xdot_t
Args:
t (torch.tensor): (B, T,) shaped time indices
x (torch.tensor): (B, T, n) shaped system states
u (torch.tensor): (B, T, m) shaped control inputs
Returns:
xdot (torch.tensor): (B, T, n) system state derivatives
"""
raise NotImplementedError
[docs] def observe(self, t, x, u=None):
"""Returns y_t"""
raise NotImplementedError
@property
def xdim(self):
"""system state dimensions
"""
return self._xdim
@property
def udim(self):
"""system input dimension
returns None if unactuated
"""
return self._udim
@property
def ydim(self):
"""system observation dimension
"""
return self._ydim
[docs]class KinoDynamicalSystem(ContinuousDynamicalSystem):
"""
"""
[docs] def step_derivs(self, t, x, u=None):
"""Returns xdot_t
Args:
t (torch.tensor): (B, T,) shaped time indices
x (torch.tensor): (B, T, qn+vn) shaped system states
u (torch.tensor): (B, T, m) shaped control inputs
Returns:
xdot (torch.tensor): (B, T, n) system state derivatives
"""
q = x[:, :, :self.qdim]
v = x[:, :, self.qdim:]
qdot = self.kinematics(t, q, v, u)
vdot = self.dynamics(t, q, v, u)
xdot = torch.cat([qdot, vdot], dim=-1)
return xdot
[docs] def kinematics(self, t, q, v, u):
"""Returns qdot_t
Args:
t (torch.tensor): (B, T,) shaped time indices
q (torch.tensor): (B, T, qn) shaped system generalized coordinates
v (torch.tensor): (B, T, vn) shaped system generalized velocities
u (torch.tensor): (B, T, m) shaped control inputs
Returns:
qdot (torch.tensor): (B, T, n) system gen. coordinate derivatives
"""
raise NotImplementedError
[docs] def dynamics(self, t, q, v, u):
"""Returns qdot_t
Args:
t (torch.tensor): (B, T,) shaped time indices
q (torch.tensor): (B, T, qn) shaped system generalized coordinates
v (torch.tensor): (B, T, vn) shaped system generalized velocities
u (torch.tensor): (B, T, m) shaped control inputs
Returns:
vdot (torch.tensor): (B, T, n) system gen. velocity derivatives
"""
raise NotImplementedError
@property
def qdim(self):
return self._qdim
@property
def vdim(self):
return self._vdim
@property
def xdim(self):
return self.qdim + self.vdim
[docs]class C2DSystem(DiscreteDynamicalSystem, ContinuousDynamicalSystem):
"""
Continuous to Discrete System
"""
def __init__(self, dt, method, transforms=None):
"""
Args:
csys (ContinuousTimeSystem): continuous-time system
dt (float): time-step
method (str): see ceem/odesolver.py SOLVERS for options
transforms (tuple)
"""
self._dt = dt
self._method = method
self._transforms = transforms
[docs] def step(self, t, x, u=None):
nx = odestep(self.step_derivs, t, self._dt, x, u=u, method=self._method,
transforms=self._transforms)
return nx
[docs]class ObsJacMixin:
"""
Mixin for computing jacobians of observations.
Defaults to using autograd.
"""
[docs] def jac_obs_x(self, t, x, u=None):
"""Returns the Jacobian of observation wrt x
Args:
t (torch.tensor): (B, T,) shaped time indices
x (torch.tensor): (B, T, n) shaped system states
u (torch.tensor): (B, T, m) shaped control inputs
Returns
jac_y_t (torch.tensor): (B, T, ydim, n) shaped jacobian of the observation
"""
p = self._ydim
x = x.detach()
x.requires_grad_(True)
y = self.observe(t, x, u)
jac_x = torch.stack(
[torch.autograd.grad(y[:, :, i].sum(), x, retain_graph=True)[0] for i in range(p)],
dim=2)
return jac_x
[docs] def jac_obs_u(self, t, x, u):
"""Returns the Jacobian of observation wrt u
Args:
t (torch.tensor): (B, T,) shaped time indices
x (torch.tensor): (B, T, n) shaped system states
u (torch.tensor): (B, T, m) shaped control inputs
Returns
jac_y_t (torch.tensor): (B, T, ydim, m) shaped jacobian of the observation
"""
p = self._ydim
u = u.detach()
u.requires_grad_(True)
y = self.observe(t, x, u)
jac_u = torch.stack(
[torch.autograd.grad(y[:, :, i].sum(), u, retain_graph=True)[0] for i in range(p)],
dim=2)
return jac_u
[docs] def jac_obs_theta(self, t, x, u=None):
"""Returns the Jacobian of observation wrt theta
Args:
t (torch.tensor): (B, T,) shaped time indices
x (torch.tensor): (B, T, n) shaped system states
u (torch.tensor): (B, T, m) shaped control inputs
Returns
jac_y_t (seq of torch.tensor): jacobian of the observation wrt each nn.Parameter
"""
# TODO - hopefully we bypass ever explicitly calling this
raise NotImplementedError
[docs]class DynJacMixin:
"""
Mixin for computing jacobians of dynamics.
Defaults to using autograd.
"""
[docs] def jac_step_x(self, t, x, u=None):
"""Returns the Jacobian of step at time t
Args:
t (torch.tensor): (B, T,) shaped time indices
x (torch.tensor): (B, T, n) shaped system states
u (torch.tensor): (B, T, m) shaped control inputs
Returns
jac_x_t (torch.tensor): (B, T, n, n) shaped jacobian of the next state
"""
n = self.xdim
x = x.detach()
x.requires_grad_(True)
nx = self.step(t, x, u)
jac_x = torch.stack(
[torch.autograd.grad(nx[:, :, i].sum(), x, retain_graph=True)[0] for i in range(n)],
dim=2)
return jac_x
[docs] def jac_step_u(self, t, x, u):
"""Returns the Jacobian of step at time t
Args:
t (torch.tensor): (B, T,) shaped time indices
x (torch.tensor): (B, T, n) shaped system states
u (torch.tensor): (B, T, m) shaped control inputs
Returns
jac_u (torch.tensor): (B, T, n, m) shaped jacobian of the next state
"""
n = self.xdim
u = u.detach()
u.requires_grad_(True)
nx = self.step(t, x, u)
jac_u = torch.stack(
[torch.autograd.grad(nx[:, :, i].sum(), u, retain_graph=True)[0] for i in range(n)],
dim=2)
return jac_u
[docs] def jac_step_theta(self, t, x, u=None):
"""Returns the Jacobian of step at time t
Args:
t (torch.tensor): (B, T,) shaped time indices
x (torch.tensor): (B, T, n) shaped system states
u (torch.tensor): (B, T, m) shaped control inputs
Returns
jac_x_t (seq of torch.tensor): jacobian of the observation wrt each nn.Parameter
"""
nx = self.step(t, x, u)
B, T, N = x.shape
jacobians = OrderedDict([(name, None) for name, p in self.named_parameters()])
for iB in range(B):
for iT in range(T):
for iN in range(1, N):
self.zero_grad()
nx[iB, iT, iN].backward(retain_graph=True)
for name, param in self.named_parameters():
if param.grad is not None:
if jacobians[name] is None:
jacobians[name] = torch.zeros(B, T, N, *param.shape)
jacobians[name][iB, iT, iN] = param.grad
return jacobians
[docs]class AnalyticObsJacMixin(ObsJacMixin):
"""
Mixin for analytically computing observation jacobains
"""
[docs] def jac_obs_x(self, t, x, u=None):
"""Returns the Jacobian of observation wrt x
Args:
t (torch.tensor): (B, T,) shaped time indices
x (torch.tensor): (B, T, n) shaped system states
u (torch.tensor): (B, T, m) shaped control inputs
Returns
jac_y_t (torch.tensor): (B, T, m, n) shaped jacobian of the observation
"""
raise NotImplementedError
[docs] def jac_obs_theta(self, t, x, u=None):
"""Returns the Jacobian of observation wrt theta
Args:
t (torch.tensor): (B, T,) shaped time indices
x (torch.tensor): (B, T, n) shaped system states
u (torch.tensor): (B, T, m) shaped control inputs
Returns
jac_y_t (seq of torch.tensor): jacobian of the observation wrt each nn.Parameter
"""
raise NotImplementedError
[docs]class AnalyticDynJacMixin(DynJacMixin):
"""
Mixin for analytically computing jacobians of dynamics.
"""
[docs] def jac_step_x(self, t, x, u=None):
"""Returns the Jacobian of step at time t
Args:
t (torch.tensor): (B, T,) shaped time indices
x (torch.tensor): (B, T, n) shaped system states
u (torch.tensor): (B, T, m) shaped control inputs
Returns
jac_x_t (torch.tensor): (B, T, n, n) shaped jacobian of the next state
"""
raise NotImplementedError
[docs] def jac_step_theta(self, t, x, u=None):
"""Returns the Jacobian of step at time t
Args:
t (torch.tensor): (B, T,) shaped time indices
x (torch.tensor): (B, T, n) shaped system states
u (torch.tensor): (B, T, m) shaped control inputs
Returns
jac_x_t (seq of torch.tensor): jacobian of the observation wrt each nn.Parameter
"""
# TODO - hopefully we bypass ever explicitly calling this
raise NotImplementedError