Source code for sbmlsim.comparison.diff

"""Helpers for numerical comparison of data.

Used in the context of model comparison and simulations between different simulators.
Allows to tests semi-automatically for problems with the various models.
Used to benchmark the simulation results.

from pathlib import Path
from typing import Dict

import pandas as pd
import seaborn as sns
from matplotlib import pyplot as plt
from sbmlutils import log
from sbmlutils.console import console

from sbmlsim.utils import timeit

[docs]logger = log.get_logger(__name__)
[docs]def get_files_by_extension(base_path: Path, extension: str = ".json") -> Dict[str, str]: """Get all files by given extension. Simulation definitions are json files. """ # get all files with extension in given path files = [f for f in base_path.glob("**/*") if f.is_file() and f.suffix == extension] offset = len(extension) keys = [[:-offset] for f in files] return dict(zip(keys, files)) # type: ignore
[docs]class DataSetsComparison: """Comparing multiple simulation results. Only the subset of identical columns are compared. In the beginning a matching of column names is performed to find the subset of columns which can be compared. The simulations must contain a "time" column with identical time points. """
[docs] tol_abs = 1e-4 # absolute tolerance for comparison
[docs] tol_rel = 1e-4 # relative tolerance for comparison
[docs] eps_plot = 1e-5 * tol_abs # tolerance for plotting
@timeit def __init__( self, dfs_dict: Dict[str, pd.DataFrame], columns_filter=None, time_column: bool = True, title: str = None, selections: Dict[str, str] = None, factors: Dict[str, float] = None, ): """Initialize the comparison. :param dfs_dict: data dictionary d[simulator_key] = df_result :param columns_filter: function which returns True if in Set or False if should be filtered. :param time_column: flag to check for time column """ self.columns_filter = columns_filter # check that identical number of rows (mostly timepoints) nrow = 0 for label, df in dfs_dict.items(): if nrow == 0: nrow = len(df) if len(df) != nrow: raise ValueError( f"DataFrame have different length (number of rows): " f"{len(df)} != {nrow} ({label})" ) # check that time column exist in data frames if time_column: for label, df in dfs_dict.items(): if "time" not in df.columns: raise ValueError(f"'time' column must exist in data ({label})") # handle selections replacements pd.set_option("display.max_columns", None) if selections: if factors is None: factors = {} # use the first keys for comparison colnames = list(selections.values())[1] for key, sel_keys in selections.items(): console.log("***", key, "***") df = dfs_dict[key] # get subset df_new = df[sel_keys] # apply factors fs = factors.get(key, [1.0] * len(sel_keys)) for k, sel in enumerate(sel_keys): console.log(f"scaling: '{sel}' * {fs[k]}") # type: ignore # df_new[sel] = fs[k] * df_new[sel] df_new.loc[:, sel] *= fs[k] # type: ignore # do renaming df_new = df_new.rename(columns=dict(zip(sel_keys, colnames))) # store updated df console.log(df_new.head()) dfs_dict[key] = df_new # get the subset of columns to compare columns, self.col_intersection, self.col_union = self._process_columns(dfs_dict) # filtered columns if columns_filter: columns = [col for col in columns if columns_filter(col)] self.columns = columns"Comparing: {self.columns}") # get common subset of data self.dfs, self.labels = self._filter_dfs(dfs_dict, self.columns) # set title self.title = title if title else " | ".join(self.labels) # calculate difference ( self.diff, self.diff_abs, self.diff_rel, self.diff_tol, self.diff_tol_bool, ) = self.df_diff() @classmethod
[docs] def _process_columns(cls, dataframes): """Get the intersection and union of columns. :param dataframes: :return: """ numerics = ["int16", "int32", "int64", "float16", "float32", "float64"] # set of columns from the individual dataframes col_union = None col_intersection = None for _path, df in dataframes.items(): # get all numeric columns num_df = df.select_dtypes(include=numerics) if len(num_df.columns) < len(df.columns): logger.warning( f"Non-numeric columns in DataFrame: {set(df.columns)-set(num_df.columns)}" ) cols = set(num_df.columns) if not col_union or not col_intersection: col_union = cols col_intersection = cols else: col_union = col_union.union(cols) col_intersection = col_intersection.intersection(cols)"Column Union #: {len(col_union)}")"Column Intersection #: {len(col_intersection)}") columns = list(col_intersection.copy()) columns.remove("time") columns = ["time"] + sorted(columns) return columns, col_intersection, col_union
[docs] def _filter_dfs(cls, dataframes, columns): """Filter the dataframes using the column ids occurring in all datasets. The common set of columns is used for comparison. :param dataframes: :param columns: :return: List[pd.DataFrame], List[str], list of dataframes and simulator labels. """ dfs = [] labels = [] for label, df in dataframes.items(): try: df_filtered = df[columns] except KeyError: logger.error( f"Some keys from '{columns}' do not exist in DataFrame columns " f"'{df.columns}'" ) raise ValueError dfs.append(df_filtered) labels.append(label) return dfs, labels
[docs] def df_diff(self): """Dataframe of all differences between the files. Let the following variables be defined: * `abs_tol` stand for the absolute tolerance for a tests case, * `rel_tol` stand for the relative tolerance for a tests case, * `c_ij` stand for the expected correct value for row `i`, column `j`, of the result data set for the tests case * `u_ij` stand for the corresponding value produced by a given software simulation system run by the user These absolute and relative tolerances are used in the following way: a data point `u_ij` is considered to be within tolerances if and only if the following expression is true: |c_ij - u_ij| <= (abs_tol + rel_tol * |c_ij|) """ c = self.dfs[0] u = self.dfs[1] # difference diff = c - u # absolute differences between all data frames diff_abs = diff.abs() # relative differences between data frames diff_rel = 2 * diff_abs / (self.dfs[0].abs() + self.dfs[1].abs()) diff_rel[diff_rel.isnull()] = 0.0 # difference based on tolerance # |c_ij - u_ij| <= (abs_tol + rel_tol * |c_ij|) # > 0 if difference diff_tol = (c - u).abs() - (self.tol_abs + self.tol_rel * c.abs()) # boolean matrix: True if difference, False if identical diff_tol_bool = diff_tol > 0 return diff, diff_abs, diff_rel, diff_tol, diff_tol_bool
[docs] def is_equal(self): """Check if DataFrames are identical within numerical tolerance.""" return not self.diff_tol_bool.any(axis=None)
[docs] def __str__(self) -> str: """Get string.""" return f"{self.__class__.__name__} ({self.labels})"
[docs] def __repr__(self): """Get representation.""" return f"{self.__class__.__name__} [{}] ({self.labels})"
[docs] def report_str(self) -> str: """Get report as string.""" lines = [ "-" * 80, str(self), str(self.title), "-" * 80, "# Elements (Nt, Nx)", str(self.diff.shape), "# Maximum column difference (above eps)", ] diff_max = self.diff_abs.max() diff_0 = self.diff_abs.iloc[0] diff_rel_max = self.diff_rel.max() diff_rel_0 = self.diff_rel.iloc[0] diff_info = pd.concat([diff_0, diff_rel_0, diff_max, diff_rel_max], axis=1) diff_info.columns = ["Delta_abs_0", "Delta_rel_0", "Delta_max", "Delta_rel_max"] diff_info = diff_info[diff_max >= DataSetsComparison.tol_abs] with pd.option_context("display.max_rows", None, "display.max_columns", None): lines.append( str(diff_info.sort_values(by=["Delta_rel_max"], ascending=False)) ) lines.append("# Maximum initial column difference") lines.append(str(self.diff.iloc[0].abs().max())) lines.append("# Maximum element difference") lines.append(str(self.diff.abs().max().max())) lines.append( "# Datasets are equal (|c_ij - u_ij| <= (tol_abs + tol_rel * |c_ij|))" ) lines.append(str(self.is_equal()).upper()) lines.append("-" * 80) if not self.is_equal(): logger.warning("Datasets are not equal !") return "\n".join([str(item) for item in lines])
[docs] def report(self): """Report.""" console.log(self.report_str()) # plot figure f = self.plot_diff() return f
[docs] def plot_diff(self): """Plot lines for entries which are above epsilon tre(object)shold.""" # filter data diff_abs = self.diff_abs.copy() diff_rel = self.diff_rel.copy() diff_tol = self.diff_tol.copy() diff_max = diff_abs.max() column_index = diff_max >= DataSetsComparison.eps_plot # console.log(column_index) diff_abs = diff_abs.transpose() diff_abs = diff_abs[column_index] diff_abs = diff_abs.transpose() # plot all overview f1, ((ax1, ax2, ax3, ax4)) = plt.subplots(1, 4, figsize=(20, 4.5)) f1.subplots_adjust(wspace=0.35) f1.suptitle(self.title, fontsize=14, fontweight="bold") sns.heatmap(data=self.diff_tol_bool, cmap="Blues", vmin=0, vmax=1, ax=ax1) ax1.set_title(f"equal = {str(self.is_equal()).upper()}", fontweight="bold") ax1.set_ylabel("Tolerance difference", fontweight="bold") for cid in diff_abs.columns: ax2.plot(diff_tol[cid], label=cid) ax3.plot(diff_abs[cid], label=cid) ax4.plot(diff_rel[cid], label=cid) ax2.set_ylabel("Tolerance difference", fontweight="bold") ax3.set_ylabel("Absolute difference", fontweight="bold") ax4.set_ylabel("Relative difference", fontweight="bold") for ax in (ax3, ax4): ax.set_xlabel("time index", fontweight="bold") ax.set_yscale("log") ax.set_ylim(bottom=1e-10) if ax.get_ylim()[1] < 10 * DataSetsComparison.tol_abs: ax.set_ylim(top=10 * DataSetsComparison.tol_abs) ax2.axhline(0.0, color="black", linestyle="--") for ax in (ax3, ax4): ax.axhline(DataSetsComparison.tol_abs, color="black", linestyle="--") return f1