Source code for omnetpypy.simulation

r"""
This module contains a set of global configuration variables for the simulation,
such as the engine to use, and a set of functions to make the configuration more user-friendly.
"""

import csv

import pandas as pd
import pkg_resources

from omnetpypy import utilities, sim_log, parser
from omnetpypy.backends.simpy_connector import SimPyConnector


[docs]class Simulation: r""" This class is used to run a single simulation using the specified configuration. It takes care of reading the running the simulation and returning the recorded metrics. Users should not instantiate this class directly, but use the :class:`~omnetpypy.simulation.Experiment` to manage multiple simulations. All parameters are also stored as attributes. Parameters ---------- engine : str The simulation engine to use. Only "simpy" is implemented at the moment. seed_set : list of int The seed set for the repetition. repetition : int The repetition index. metrics : list of :class:`~omnetpypy.utilities.FutureMetric` The list of metrics to collect. yaml_directory : str The path of the directory that contains all the yaml files. until : int or float The time until which the simulation will run. log_level : str The log level for the simulation. See :func:`~omnetpypy.sim_log.log_to_console`. time_unit : str The time unit for the simulation. See :func:`~omnetpypy.utilities.time_unit_factor`. output_dir : str The output directory for the simulation. global_params : dict A dictionary of global parameters. These parameters are available to all the entities in the simulation. Attributes ---------- rng : :class:`~omnetpypy.utilities.MultiRandom` The random number generator for this simulation, already initialized. It is made available to all the entities in the simulation through the :class:`~omnetpypy.backends.connector.Connector`. connector : :class:`~omnetpypy.backends.Connector` The connector to the simulation engine. Its subclass depends on the chosen engine. network : :class:`~omnetpypy.front_end.compound_module.CompoundModule` The network to simulate, parsed from the YAML file "network.yaml". Raises ------ NotImplementedError If the passed ``engine`` is not implemented. """ def __init__(self, engine, seed_set, repetition, metrics, yaml_directory, until, log_level, time_unit, output_dir, global_params): self.engine = engine self.seed_set = seed_set self.repetition_idx = repetition self.until = until self.log_level = log_level self.time_unit = time_unit self.output_dir = output_dir self.global_params = global_params.copy() # dictionary of global parameters self.rng = utilities.MultiRandom(seeds=seed_set) self.connector = None # set log level sim_log.log_to_console(level=self.log_level) self.metrics = metrics # list of Metrics if engine == "simpy": self.connector = SimPyConnector(simulation=self, metrics=metrics, output_dir=output_dir, repetition=repetition) else: raise NotImplementedError("Engine not implemented") # parse yaml files self.network = parser.parse_yaml_directory(yaml_directory, self)
[docs] def start(self): r""" start the simulation and collect the metrics. Returns ------- dict A dictionary of dictionaries with the collected metrics. """ self.connector.start_simulation(until=self.until) collected_data = {} # collect metrics for metric in self.metrics: if self.output_dir is not None: # dump the remaining samples in memory to the temporary file self.connector.dump_metric(metric.name) # compute the statistics we need filename = f"{self.output_dir}/.{metric.name}_vector_rep{self.repetition_idx}.csv" # check metric type if metric.type == "number": collected_data[metric.name] = utilities.get_metrics_from_csv(metric, filename) else: collected_data[metric.name] = {} return collected_data
[docs] def time(self): """ Return the current simulation time. Returns ------- int or float The current simulation time. """ return self.connector.get_time()
@property def time_unit_factor(self): r""" The factor to convert the simulation time unit to seconds. See Also -------- :func:`~omnetpypy.utilities.time_unit_factor` Returns ------- int or float The factor to convert the time unit to seconds. """ return utilities.time_unit_factor(self.time_unit)
[docs]class Experiment: r""" This class is used to run a set of independent simulations using the same YAML configuration files. An instance of this class takes care of reading the configuration file, parsing it, running the simulations, and dumping the recorded metrics in the output files. Parameters ---------- config_file : str The path to the configuration file, absolute or relative to the working directory. The configuration file is a YAML file with a rigid structure (see ``config`` attribute). Attributes ---------- seed_sets : list of lists of int The seed sets for each repetition. Each repetition has a seed set of length "rngs_per_rep" from the configuration file. config : dict The configuration dictionary, directly parsed from the YAML configuration file. The dictionary has the following keys: - engine (``str``), optional: The simulation engine to use. Only "simpy" is supported at the moment. Defaults to "simpy". - repetitions (``int``), optional: The number of independent repetitions to run. Defaults to 1. - metrics (list of :class:`~omnetpypy.utilities.FutureMetric`), optional: The list of metrics to collect. Defaults to an empty list. - yaml_directory (``str``), optional: The path to the directory containing the yaml files to set the simulation. Relative to the configuration file directory. Defaults to "./" . - simulate_until (``float`` or `None`), optional: The time until which the simulation will run. If `None`, simulation runs as long as there are events. Defaults to `None`. - time_unit (``str``), optional: The time unit for the simulation. Defaults to "us" (microseconds). - log_level (``int`` or ``str``), optional: The log level for the simulation. See :func:`~omnetpypy.sim_log.set_log_level`. Defaults to ``logging.INFO`` - output_dir (``str`` or `None`), optional: The output directory for the simulation, relative to the configuration file directory. Defaults to `None`, in which case no data will be stored. simulations_params : list of tuples The list of simulation parameters for each repetition. Each element of the list is a tuple with the following parameters: - engine (``str``): The simulation engine to use. - seed_set (list of int): The seed set for the repetition. - repetition (``int``): The repetition index. - metrics (list of :class:`~omnetpypy.utilities.FutureMetric`): The list of metrics to collect. - yaml_directory (``str``): The path to the directory containing the yaml files to set the simulation. - until (``float`` or `None`): The time until which the simulation will run. If `None`, simulation runs as long as there are events. - log_level (``int`` or ``str``): The log level for the simulation. See :func:`~omnetpypy.sim_log.set_log_level`. - time_unit (``str``): The time unit for the simulation. - output_dir (``str`` or `None`): The output directory for the simulation. - global_params (dict): A dictionary of global parameters. These parameters are available to all the entities in the simulation. output_dir : str or None The output directory for the simulation. If `None`, no data is stored. Examples -------- >>> exp = Experiment(config_file="config.yaml") >>> exp.run_simulations() """ def __init__(self, config_file): self.seed_sets = [] # every repetition has its own seed set of length rngs_per_rep # seeds are taken from the seeds.csv file # with a single "seeds" column. We open and read the file here # but not all the seeds are used, only the first rngs_per_rep * repetitions self.config = parser.parse_yaml_file(config_file) # derive the configuration file directory from its path by removing everything after the last "/" config_dir = config_file[:config_file.rfind("/") + 1] engine = self.config.get("engine", "simpy") repetitions = self.config.get("repetitions", 1) yaml_path = config_dir + self.config.get("yaml_directory", "./") rngs_per_rep = self.config.get("num_rngs", 1) until = self.config.get("simulate_until", None) log_level = self.config.get("log_level", "info") time_unit = self.config.get("time_unit", "us") global_params = self.config.get("global_params", {}) # set log level sim_log.log_to_console(level=log_level) # so we iterate over the first rngs_per_rep * repetitions seeds # and we store them in a list of lists, where each list of seeds # is the seed set for a repetition seeds_path = pkg_resources.resource_filename('omnetpypy', 'data/seeds.csv') with open(seeds_path, "r") as f: reader = csv.reader(f) # use the reader sequentially on the first rngs_per_rep * repetitions rows for _ in range(repetitions): self.seed_sets.append([]) for _ in range(rngs_per_rep): self.seed_sets[-1].append(int(next(reader)[0])) metrics_raw = self.config.get("metrics", []) metrics = [] for metric in metrics_raw: # set up metric collection name = metric.get("name") metric_type = metric.get("type") collect = metric.get("collect", "number") all_collects = ["vector", "mean", "std", "min", "max", "median", "percentiles", "count", "var"] all_types = ["number", "str", "dict"] assert all([collect_type in all_collects for collect_type in collect]), f"collect must be one of {all_collects}" assert metric_type in all_types, f"type must be one of {all_types}" params_collect = {k: False for k in all_collects} true_params = {k: True for k in collect} params_collect.update(true_params) params = {"name": name, "type": metric_type} params.update(params_collect) if metric_type != "number": assert len([k for k, v in params_collect.items() if v]) == 1 and params_collect["vector"], \ "only vector collect type must be set for nun-number typed metrics" columns = None if metric_type == "dict": columns = metric.get("columns") assert columns is not None, "columns must be provided for dict type metrics" assert isinstance(columns, list), "columns must be a list" assert all(isinstance(c, str) for c in columns), "columns must be a list of strings" assert len([k for k, v in params_collect.items() if v]) == 1 and params_collect["vector"], \ "only vector collect type must be True for dict type metrics" assert "timestamp" not in columns, "timestamp cannot be a column name, it is added automatically" params["columns"] = columns + ["timestamp"] if columns is not None else None metrics.append(utilities.FutureMetric(**params)) if "output_dir" in self.config: output_dir = config_dir + self.config["output_dir"] else: output_dir = None self.output_dir = output_dir self.simulations_params = [(engine, self.seed_sets[i], i, metrics, yaml_path, until, log_level, time_unit, output_dir, global_params) for i in range(repetitions)]
[docs] def run_simulations(self): r""" Run the simulations. """ import multiprocessing max_processes = self.config.get("max_processes", 1) # check if max_processes is greater than the number of cpu cores if max_processes > 1: max_processes = min(multiprocessing.cpu_count(), max_processes) # if max_processes is 1, we just run the simulations sequentially if max_processes == 1: collected = [] for sim_params in self.simulations_params: collected.append(_start_sim(sim_params)) # run the simulations in parallel with a ProcessPoolExecutor else: with multiprocessing.Pool(max_processes) as pool: collected = pool.map(_start_sim, self.simulations_params) # every element of collected is a dictionary of dictionaries with the same keys. # we want to merge them into a single dictionary, where the values # are lists, except for the "name" key and the "vector" key, because the latter # is a pandas DataFrame and we will return a single merged DataFrame # first we create a dictionary merged = {} # now we iterate over the collected data for metric in collected[0].keys(): merged[metric] = {} # now we iterate over the collected data for data in collected: for metric in data.keys(): for key, value in data[metric].items(): if key != "vector": if key not in merged[metric]: merged[metric][key] = [] merged[metric][key].append(value) out_dir = self.output_dir # write the data to csv, two files for each metric: "metric_name.csv" and "metric_name_vector.csv" if out_dir is not None: for metric in merged.keys(): # we write the statistics to a file if merged[metric] is not empty if merged[metric]: df = pd.DataFrame(merged[metric]) df.to_csv(f"{out_dir}/{metric}.csv", index=False) for metric in merged.keys(): # we open the temporary files and merge them into a single file. We add a "repetition" column max_reps = self.config.get("repetitions", 1) for i in range(max_reps): if i == 0: with open(f"{out_dir}/.{metric}_vector_rep{i}.csv", "r") as f: # add the repetition column df = pd.read_csv(f) if max_reps > 1: df["repetition"] = i df.to_csv(f"{out_dir}/{metric}_vector.csv", index=False) else: with open(f"{out_dir}/.{metric}_vector_rep{i}.csv", "r") as f: # add the repetition column df = pd.read_csv(f) if max_reps > 1: df["repetition"] = i df.to_csv(f"{out_dir}/{metric}_vector.csv", mode="a", header=False, index=False) # remove the temporary files import os for metric in merged.keys(): for i in range(self.config.get("repetitions", 1)): os.remove(f"{out_dir}/.{metric}_vector_rep{i}.csv")
def _start_sim(sim_params): # defined here to be picklable engine, seed_set, repetition, metrics, yaml_path, until, log_level, time_unit, output_dir, global_params = sim_params sim = Simulation(engine, seed_set, repetition, metrics, yaml_path, until, log_level, time_unit, output_dir, global_params) return sim.start()