import libsedml
[docs]class TaskNode(object):
"""Tree implementation of task tree."""
def __init__(self, task: libsedml.SedAbstractTask, depth: int):
self.task = task
self.depth = depth
self.children = []
self.parent = None
[docs] def add_child(self, obj):
obj.parent = self
self.children.append(obj)
[docs] def is_leaf(self):
return len(self.children) == 0
[docs] def __str__(self) -> str:
lines = [f"<[{self.depth}] {self.task.getId()} ({self.task.getElementName()})>"]
for child in self.children:
child_str = child.__str__()
lines.extend([f"\t{line}" for line in child_str.split("\n")])
return "\n".join(lines)
[docs] def info(self) -> str:
return f"<[{self.depth}] {self.task.getId()} ({self.task.getElementName()})>"
[docs] def __iter__(self):
"""Depth-first iterator which yields TaskNodes."""
yield self
for child in self.children:
for node in child:
yield node
[docs] def __repr__(self) -> str:
return self.info()
[docs]class Stack(object):
"""Stack implementation for nodes."""
def __init__(self):
self.items = []
[docs] def isEmpty(self):
return self.items == []
[docs] def push(self, item):
self.items.append(item)
[docs] def pop(self):
return self.items.pop()
[docs] def peek(self):
return self.items[len(self.items) - 1]
[docs] def size(self):
return len(self.items)
[docs] def __str__(self):
return "stack: " + str([item.info() for item in self.items])
[docs]class TaskTree(object):
@staticmethod
[docs] def from_sedml_task(
sed_task: libsedml.SedDocument, root_task: libsedml.SedAbstractTask
) -> TaskNode:
"""Creates task tree for given SedTask
The task tree is used to resolve the order of all simulations.
"""
def add_children(node):
"""Adds task children to given node"""
typeCode = node.task.getTypeCode()
if typeCode == libsedml.SEDML_TASK:
return # no children
elif typeCode == libsedml.SEDML_TASK_REPEATEDTASK:
# add the ordered list of subtasks as children
subtasks = TaskTree.get_ordered_subtasks(node.task)
for st in subtasks:
# get real task for subtask
t = sed_task.getTask(st.getTask())
child = TaskNode(t, depth=node.depth + 1)
node.add_child(child)
# recursive adding of children
add_children(child)
else:
raise IOError("Unsupported task type: {node.task_id.getElementName()}")
# create root
root = TaskNode(root_task, depth=0)
# recursive adding of children
add_children(root)
return root
@staticmethod
[docs] def parse_task_tree(doc: libsedml.SedDocument, tree: TaskNode):
"""Python code generation from task tree."""
[docs]class Test(object):
@staticmethod
[docs] def simpleTaskToPython(doc, node: TaskNode):
"""Creates the simulation python code for a given taskNode.
The taskNodes are required to handle the relationships between
RepeatedTasks, SubTasks and SimpleTasks (Task).
:param doc: sedml document
:type doc: SEDDocument
:param node: taskNode of the current task
:type node: TaskNode
:return:
:rtype:
"""
lines = []
task = node.task
lines.append("# Task: <{}>".format(task.getId()))
lines.append("{} = [None]".format(task.getId()))
mid = task.getModelReference()
sid = task.getSimulationReference()
simulation = doc.getSimulation(sid)
simType = simulation.getTypeCode()
algorithm = simulation.getAlgorithm()
if algorithm is None:
warnings.warn(
"Algorithm missing on simulation, defaulting to 'cvode: KISAO:0000019'"
)
algorithm = simulation.createAlgorithm()
algorithm.setKisaoID("KISAO:0000019")
kisao = algorithm.getKisaoID()
# is supported algorithm
if not SEDMLCodeFactory.is_supported_algorithm_for_simulation_type(
kisao=kisao, sim_type=simType
):
warnings.warn(
"Algorithm {} unsupported for simulation {} type {} in task {}".format(
kisao, simulation.getId(), simType, task.getId()
)
)
lines.append(
"# Unsupported Algorithm {} for SimulationType {}".format(
kisao, simulation.getElementName()
)
)
return lines
# set integrator/solver
integratorName = SEDMLCodeFactory.integrator_from_kisao(kisao)
if not integratorName:
warnings.warn("No integrator exists for {} in roadrunner".format(kisao))
return lines
if simType is libsedml.SEDML_SIMULATION_STEADYSTATE:
lines.append("{}.setSteadyStateSolver('{}')".format(mid, integratorName))
else:
lines.append("{}.setIntegrator('{}')".format(mid, integratorName))
# use fixed step by default for stochastic sims
if integratorName == "gillespie":
lines.append(
"{}.integrator.setValue('{}', {})".format(
mid, "variable_step_size", False
)
)
if kisao == "KISAO:0000288": # BDF
lines.append("{}.integrator.setValue('{}', {})".format(mid, "stiff", True))
elif kisao == "KISAO:0000280": # Adams-Moulton
lines.append("{}.integrator.setValue('{}', {})".format(mid, "stiff", False))
# integrator/solver settings (AlgorithmParameters)
for par in algorithm.getListOfAlgorithmParameters():
pkey = SEDMLCodeFactory.algorithm_parameter_to_parameter_key(par)
# only set supported algorithm paramters
if pkey:
if pkey.dtype is str:
value = "'{}'".format(pkey.value)
else:
value = pkey.value
if value == str("inf") or pkey.value == float("inf"):
value = "float('inf')"
else:
pass
if simType is libsedml.SEDML_SIMULATION_STEADYSTATE:
lines.append(
"{}.steadyStateSolver.setValue('{}', {})".format(
mid, pkey.key, value
)
)
else:
lines.append(
"{}.integrator.setValue('{}', {})".format(mid, pkey.key, value)
)
if simType is libsedml.SEDML_SIMULATION_STEADYSTATE:
lines.append(
"if {model}.conservedMoietyAnalysis == False: {model}.conservedMoietyAnalysis = True".format(
model=mid
)
)
else:
lines.append(
"if {model}.conservedMoietyAnalysis == True: {model}.conservedMoietyAnalysis = False".format(
model=mid
)
)
# get parents
parents = []
parent = node.parent
while parent is not None:
parents.append(parent)
parent = parent.parent
# <selections> of all parents
# ---------------------------
selections = SEDMLCodeFactory.selections_for_task(doc=doc, sed_task=node.task)
for p in parents:
selections.update(
SEDMLCodeFactory.selections_for_task(doc=doc, sed_task=p.task_id)
)
# <setValues> of all parents
# ---------------------------
# apply changes based on current variables, parameters and range variables
for parent in reversed(parents):
rangeId = parent.task_id.getRangeId()
helperRanges = {}
for r in parent.task_id.getListOfRanges():
if r.getId() != rangeId:
helperRanges[r.getId()] = r
for setValue in parent.task_id.getListOfTaskChanges():
variables = {}
# range variables
variables[rangeId] = "__value__{}".format(rangeId)
for key in helperRanges.keys():
variables[key] = "__value__{}".format(key)
# parameters
for par in setValue.getListOfParameters():
variables[par.getId()] = par.getValue()
for var in setValue.getListOfVariables():
vid = var.getId()
mid = var.getModelReference()
selection = SEDMLCodeFactory.selectionFromVariable(var, mid)
expr = selection.id
if selection.type == "concentration":
expr = "init([{}])".format(selection.id)
elif selection.type == "amount":
expr = "init({})".format(selection.id)
# create variable
lines.append("__value__{} = {}['{}']".format(vid, mid, expr))
# variable for replacement
variables[vid] = "__value__{}".format(vid)
# value is calculated with the current state of model
lines.append(
SEDMLCodeFactory.targetToPython(
xpath=setValue.getTarget(),
value=evaluableMathML(setValue.getMath(), variables=variables),
modelId=setValue.getModelReference(),
)
)
# handle result variable
resultVariable = "{}[0]".format(task.getId())
# -------------------------------------------------------------------------
# <UNIFORM TIMECOURSE>
# -------------------------------------------------------------------------
if simType == libsedml.SEDML_SIMULATION_UNIFORMTIMECOURSE:
lines.append("{}.timeCourseSelections = {}".format(mid, list(selections)))
initialTime = simulation.getInitialTime()
outputStartTime = simulation.getOutputStartTime()
outputEndTime = simulation.getOutputEndTime()
numberOfPoints = simulation.getNumberOfPoints()
# reset before simulation (see https://github.com/sys-bio/tellurium/issues/193)
lines.append("{}.reset()".format(mid))
# throw some points away
if abs(outputStartTime - initialTime) > 1e-6:
lines.append(
"{}.simulate(start={}, end={}, points=2)".format(
mid, initialTime, outputStartTime
)
)
# real simulation
lines.append(
"{} = {}.simulate(start={}, end={}, steps={})".format(
resultVariable, mid, outputStartTime, outputEndTime, numberOfPoints
)
)
# -------------------------------------------------------------------------
# <ONESTEP>
# -------------------------------------------------------------------------
elif simType == libsedml.SEDML_SIMULATION_ONESTEP:
lines.append("{}.timeCourseSelections = {}".format(mid, list(selections)))
step = simulation.getStep()
lines.append(
"{} = {}.simulate(start={}, end={}, points=2)".format(
resultVariable, mid, 0.0, step
)
)
# -------------------------------------------------------------------------
# <STEADY STATE>
# -------------------------------------------------------------------------
elif simType == libsedml.SEDML_SIMULATION_STEADYSTATE:
lines.append(
"{}.steadyStateSolver.setValue('{}', {})".format(
mid, "allow_presimulation", False
)
)
lines.append("{}.steadyStateSelections = {}".format(mid, list(selections)))
lines.append(
"{}.simulate()".format(mid)
) # for stability of the steady state solver
lines.append("{} = {}.steadyStateNamedArray()".format(resultVariable, mid))
# no need to turn this off because it will be checked before the next simulation
# lines.append("{}.conservedMoietyAnalysis = False".format(mid))
# -------------------------------------------------------------------------
# <OTHER>
# -------------------------------------------------------------------------
else:
lines.append("# Unsupported simulation: {}".format(simType))
return lines
@staticmethod
[docs] def repeatedTaskToPython(doc, node):
"""Create python for RepeatedTask.
Must create
- the ranges (Ranges)
- apply all changes (SetValues)
"""
# storage of results
task = node.task_id
lines = ["", "{} = []".format(task.getId())]
# <Range Definition>
# master range
rangeId = task.getRangeId()
masterRange = task.getRange(rangeId)
if masterRange.getTypeCode() == libsedml.SEDML_RANGE_UNIFORMRANGE:
lines.extend(SEDMLCodeFactory.uniformRangeToPython(masterRange))
elif masterRange.getTypeCode() == libsedml.SEDML_RANGE_VECTORRANGE:
lines.extend(SEDMLCodeFactory.vectorRangeToPython(masterRange))
elif masterRange.getTypeCode() == libsedml.SEDML_RANGE_FUNCTIONALRANGE:
warnings.warn("FunctionalRange for master range not supported in task.")
# lock-in ranges
for r in task.getListOfRanges():
if r.getId() != rangeId:
if r.getTypeCode() == libsedml.SEDML_RANGE_UNIFORMRANGE:
lines.extend(SEDMLCodeFactory.uniformRangeToPython(r))
elif r.getTypeCode() == libsedml.SEDML_RANGE_VECTORRANGE:
lines.extend(SEDMLCodeFactory.vectorRangeToPython(r))
# <Range Iteration>
# iterate master range
lines.append(
"for __k__{}, __value__{} in enumerate(__range__{}):".format(
rangeId, rangeId, rangeId
)
)
# Everything from now on is done in every iteration of the range
# We have to collect & intent all lines in the loop)
forLines = []
# definition of lock-in ranges
helperRanges = {}
for r in task.getListOfRanges():
if r.getId() != rangeId:
helperRanges[r.getId()] = r
if r.getTypeCode() in [
libsedml.SEDML_RANGE_UNIFORMRANGE,
libsedml.SEDML_RANGE_VECTORRANGE,
]:
forLines.append(
"__value__{} = __range__{}[__k__{}]".format(
r.getId(), r.getId(), rangeId
)
)
# <functional range>
if r.getTypeCode() == libsedml.SEDML_RANGE_FUNCTIONALRANGE:
variables = {}
# range variables
variables[rangeId] = "__value__{}".format(rangeId)
for key in helperRanges.keys():
variables[key] = "__value__{}".format(key)
# parameters
for par in r.getListOfParameters():
variables[par.getId()] = par.getValue()
for var in r.getListOfVariables():
vid = var.getId()
mid = var.getModelReference()
selection = SEDMLCodeFactory.selectionFromVariable(var, mid)
expr = selection.id
if selection.type == "concentration":
expr = "[{}]".format(selection.id)
lines.append("__value__{} = {}['{}']".format(vid, mid, expr))
variables[vid] = "__value__{}".format(vid)
# value is calculated with the current state of model
value = evaluableMathML(r.getMath(), variables=variables)
forLines.append("__value__{} = {}".format(r.getId(), value))
# <resetModels>
# models to reset via task tree below node
mids = set([])
for child in node:
t = child.task_id
if t.getTypeCode() == libsedml.SEDML_TASK:
mids.add(t.getModelReference())
# reset models referenced in tree below task
for mid in mids:
if task.getResetModel():
# reset before every iteration
forLines.append("{}.reset()".format(mid))
else:
# reset before first iteration
forLines.append("if __k__{} == 0:".format(rangeId))
forLines.append(" {}.reset()".format(mid))
# add lines
lines.extend(" " + line for line in forLines)
return lines
@staticmethod
@staticmethod
[docs] def vectorRangeToPython(r):
lines = []
__range = np.zeros(shape=[r.getNumValues()])
for k, v in enumerate(r.getValues()):
__range[k] = v
lines.append("__range__{} = {}".format(r.getId(), list(__range)))
return lines