"""Module for encoding simulation results and processed data."""
from __future__ import annotations
from pathlib import Path
from typing import Dict, List, Optional
import numpy as np
import pandas as pd
import xarray as xr
from sbmlutils import log
from sbmlutils.console import console
from sbmlsim.simulation import Dimension, ScanSim
[docs]logger = log.get_logger(__name__)
[docs]class XResult:
"""Data structure for storing results.
Results is always structured data.
A wrapper around xr.Dataset from xarray
"""
def __init__(self, xdataset: xr.Dataset):
"""Initialize XResult."""
self.xds = xdataset
self.units: Dict[str, str] = {}
[docs] def __getitem__(self, key: str) -> xr.DataArray:
"""Get item."""
try:
return self.xds[key]
except KeyError as err:
logger.error(f"Key '{key}' not in {self.xds}" f"\n{err}")
raise err
[docs] def __getattr__(self, attr: str):
"""Provide dot access to keys."""
if attr in {"xds", "scan"}:
# local field lookup
return getattr(self, attr)
else:
# forward lookup to xds
return getattr(self.xds, attr)
[docs] def __str__(self) -> str:
"""Get string."""
return f"{self.xds.__str__()} + \nunits={self.units}"
[docs] def __repr__(self) -> str:
"""Get string representation."""
return f"{self.xds.__repr__()} + \nunits={self.units}"
[docs] def set_units(self, udict: Optional[Dict[str, str]] = None):
"""Set units on attributes."""
# set units attribute
if udict:
for key in self.xds.keys():
if key in udict:
self.xds[key].attrs["units"] = udict[key]
self.units[key] = udict[key]
@staticmethod
[docs] def from_dfs(
dfs: List[pd.DataFrame],
scan: ScanSim = None,
udict: Optional[Dict[str, str]] = None,
) -> XResult:
"""Create XResult from DataFrames.
Structure is based on the underlying scans which were performed.
An optional unit dictionary can be provided.
"""
if isinstance(dfs, pd.DataFrame):
dfs = [dfs]
df = dfs[0]
num_dfs = len(dfs)
# add time dimension
shape = [len(df)]
dims = ["_time"]
coords = {"_time": df.time.values}
columns = df.columns
del df
# Additional dimensions
if scan is None:
dimensions = [Dimension("_dfs", index=np.arange(num_dfs))]
else:
dimensions = scan.dimensions # type: ignore
# add additional dimensions
dimension: Dimension
for dimension in dimensions:
shape.append(len(dimension))
dim_id = dimension.dimension
coords[dim_id] = dimension.index
dims.append(dim_id)
indices = Dimension.indices_from_dimensions(dimensions)
data_dict = {col: np.empty(shape=shape) for col in columns}
for k_df, df in enumerate(dfs):
for column in columns:
index = tuple(
[...] + list(indices[k_df])
) # trick to get the ':' in first time dimension
data = data_dict[column]
data[index] = df[column].values
# create DataSet
ds = xr.Dataset(
{
key: xr.DataArray(data=data, dims=dims, coords=coords)
for key, data in data_dict.items()
}
)
xres = XResult(xdataset=ds)
xres.set_units(udict)
return xres
[docs] def to_netcdf(self, path: Path):
"""Store results as netcdf4/HDF5."""
self.xds.to_netcdf(path, format="NETCDF4", engine="h5netcdf")
@staticmethod
[docs] def from_netcdf(path: Path) -> XResult:
"""Read from netCDF."""
ds: xr.Dataset = xr.open_dataset(path)
return XResult(xdataset=ds)
[docs] def to_mean_dataframe(self) -> pd.DataFrame:
"""Convert to DataFrame with mean data."""
res = {}
for col in self.xds:
res[col] = self.mean_all_dims(key=col)
return pd.DataFrame(res)
[docs] def to_dataframe(self) -> pd.DataFrame:
"""Convert to DataFrame.
Only timecourse simulations without scan
"""
if not self.is_no_scan():
# only timecourse data can be uniquely converted to DataFrame
# higher dimensional data will be flattened.
logger.warning("Higher dimensional data, data will be mean.")
data = {v: self.xds[v].values.flatten() for v in self.xds.keys()}
df = pd.DataFrame(data)
return df
[docs] def is_scan(self) -> bool:
"""Check if scan.
Checks if additional dimensions besides `_time` exist.
"""
is_scan = False
if len(self.xds.dims) == 2:
for dim in self.xds.dims:
if dim == "_time":
continue
else:
# check length
if self.xds.dims[dim] != 1:
is_scan = True
else:
return True
return is_scan
[docs] def to_tsv(self, path_tsv):
"""Write data to tsv."""
df = self.to_dataframe()
if df is not None:
df.to_csv(path_tsv, sep="\t", index=False)
else:
logger.warning("Could not write TSV")
[docs] def mean_all_dims(self, key: str) -> xr.Dataset:
"""Get mean over all dimensions.
Skips NA.
"""
return self.xds[key].mean(dim=self._redop_dims(), skipna=True)
[docs] def std_all_dims(self, key: str) -> xr.Dataset:
"""Get standard deviation over all dimensions besides time.
Skips NA.
"""
return self.xds[key].std(dim=self._redop_dims(), skipna=True)
[docs] def min_all_dims(self, key: str) -> xr.Dataset:
"""Get minimum over all dimensions besides time.
Skips NA.
"""
return self.xds[key].min(dim=self._redop_dims(), skipna=True)
[docs] def max_all_dims(self, key: str) -> xr.Dataset:
"""Get maximum over all dimensions besides time.
Skips NA.
"""
return self.xds[key].max(dim=self._redop_dims(), skipna=True)
[docs] def _redop_dims(self) -> List[str]:
"""Dimensions for reducing operations.
All dimensions besides time.
"""
return [dim_id for dim_id in self.dims if dim_id != "_time"]
if __name__ == "__main__":
from sbmlsim.model import RoadrunnerSBMLModel
from sbmlsim.resources import REPRESSILATOR_SBML
from sbmlsim.units import UnitsInformation
[docs] uinfo = UnitsInformation.from_sbml(REPRESSILATOR_SBML)
r = RoadrunnerSBMLModel(source=REPRESSILATOR_SBML).model
udict: u
dfs = []
for _ in range(10):
s = r.simulate(0, 10, steps=10)
dfs.append(pd.DataFrame(s, columns=s.colnames))
xres = XResult.from_dfs(dfs, udict=uinfo.udict)
console.print(xres)