Source code for perfsim.simulation

#  Copyright (C) 2020 Michel Gokan Khan
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 2 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License along
#  with this program; if not, write to the Free Software Foundation, Inc.,
#  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
#  This file is a part of the PerfSim project, which is now open source and available under the GPLv2.
#  Written by Michel Gokan Khan, February 2020

from copy import deepcopy
from typing import Union, Dict, List, Any, TYPE_CHECKING

from perfsim import SimulationScenario, ServiceChain, PlacementAlgorithm, ResourceAllocationScenario, \
    AffinityPrototype, Microservice, TrafficPrototype, LoadGenerator, Cluster, Logger, TopologyPrototype, Topology, \
    ResultsStorageDriver

if TYPE_CHECKING:
    from perfsim import SimulationScenarioManager


[docs] class Simulation: """ This class represents a simulation. It contains all the necessary information to run a simulation. """ #: microservices_dict is a dictionary of microservices microservices_dict: Dict[str, Microservice] #: service_chains_dict is a dictionary of service chains service_chains_dict: Dict[str, ServiceChain] #: placement_algorithm is the algorithm used to place the microservices affinity_prototypes_dict: Dict[str, AffinityPrototype] #: resource_allocation_scenarios_dict is a dictionary of resource allocation scenarios resource_allocation_scenarios_dict: Dict[str, ResourceAllocationScenario] #: topology is the topology of the simulation topology: Topology #: traffic_prototypes_dict is a dictionary of traffic prototypes traffic_prototypes_dict: Dict[str, TrafficPrototype] #: scenario is the simulation scenario scenario: SimulationScenario #: cluster is the cluster of the simulation cluster: Cluster #: load_generator is the load generator of the simulation load_generator: LoadGenerator #: storage_driver is the storage driver of the simulation storage_driver: ResultsStorageDriver #: The name of the simulation name: str #: Status of debug mode (automatically configured based on _debug_level) _debug: bool #: Level of debug verbosity (1-5) _debug_level: bool #: Collects various logs during simulation logger: Logger #: Stores the location of the log file (if set to None, logs are not saved) _debug_file_location: str #: If set yes, we collect CPU events (task load balancing) for each core in all hosts. Use for debug purposes only! _log_cpu_events: bool #: If set yes, we collect all events as a timeline. Use for debug purposes only! _log_timeline: bool #: The simulation clock (in ns). Other classes use this parameter to know current simulation time. _time: int DEFAULT_DEBUG_LEVEL = 0 DEFAULT_DEBUG_FILE_LOCATION = False DEFAULT_LOG_CPU_EVENTS = False DEFAULT_LOG_TIMELINE = False def __init__(self, name: str, simulation_scenario: SimulationScenario, service_chains_dict: Dict[str, ServiceChain], topology_prototype: TopologyPrototype, placement_algorithm: PlacementAlgorithm, resource_allocation_scenarios_dict: Dict[str, ResourceAllocationScenario], affinity_prototypes_dict: Dict[str, AffinityPrototype], traffic_prototypes_dict: Dict[str, TrafficPrototype], storage_driver: ResultsStorageDriver, validate: bool = False, copy: bool = True): self.name = name self.time = 0 self.scenario = self.set_object(simulation_scenario, copy) self.set_debug_properties( level=self.__get_debug_param(name="debug_level", default=self.DEFAULT_DEBUG_LEVEL), file_path=self.__get_debug_param(name="debug_file_location", default=self.DEFAULT_DEBUG_FILE_LOCATION), log_cpu_events=self.__get_debug_param(name="log_cpu_events", default=self.DEFAULT_LOG_CPU_EVENTS), log_timeline=self.__get_debug_param(name="log_timeline", default=self.DEFAULT_LOG_TIMELINE)) self.logger = Logger(simulation=self) # self.simulation_scenario_manager = simulation_scenario_manager self.service_chains_dict = self.set_object(service_chains_dict, copy) self.microservices_dict = ServiceChain.microservices_to_dict_from_dict(service_chains=self.service_chains_dict) if validate: self.validate_simulation_scenario(sim_scenario=simulation_scenario) self.topology = Topology.from_prototype(prototype=topology_prototype, simulation=self, copy=copy) # self.topology.sim = self # in topology to man daram copy mikonam ke baes mishe simulatione toosham copy beshe, deghat kon ke maa hamin # alan too simulation hastim, vase hamin baes mishe 2 ta simulation ijad beshe....in bayad dorost she self.placement_algorithm = self.set_object(placement_algorithm, copy) self.resource_allocation_scenarios_dict = self.set_object(resource_allocation_scenarios_dict, copy) self.affinity_prototypes_dict = self.set_object(affinity_prototypes_dict, copy) self.traffic_prototypes_dict = self.set_object(traffic_prototypes_dict, copy) self.storage_driver = storage_driver self.setting_scaling_scenario() self.setting_affinity_scenarios() self.cluster = Cluster(name=self.name, simulation=self, topology=self.topology, service_chains_dict=self.service_chains_dict) self.load_generator = LoadGenerator(name=self.scenario["traffic_scenario"]["name"], simulation=self)
[docs] @staticmethod def set_object(obj: Any, copy: bool): """ Set an object with or without copy :param obj: :param copy: :return: """ if copy: return deepcopy(obj) else: return obj
[docs] def setting_scaling_scenario(self): """ Setting the scaling scenario :return: None """ for scaling_scenario in self.scenario["scaling_scenarios"]: for microservice_name in list(scaling_scenario["microservice"].keys()): ras_name = scaling_scenario["microservice"][microservice_name]["resource_allocation_scenario"] replica_count = scaling_scenario["microservice"][microservice_name]["replica_count"] ms = self.microservices_dict[microservice_name] ras = self.resource_allocation_scenarios_dict[ras_name] ms.cpu_requests = ras.cpu_requests ms.cpu_limits = ras.cpu_limits ms.memory_requests = ras.memory_requests ms.ingress_bw = ras.ingress_bw ms.egress_bw = ras.egress_bw ms.ingress_latency = ras.ingress_latency ms.egress_latency = ras.egress_latency ms.blkio_capacity = ras.blkio_capacity ms.resource_allocation_scenario = ras ms.replica_count = replica_count
[docs] def setting_affinity_scenarios(self): """ Setting the affinity scenarios :return: """ for affinity_ruleset in self.scenario["affinity_scenarios"]: microservice_name = list(affinity_ruleset["microservice"].keys())[0] ms = self.microservices_dict[microservice_name] affinity_ruleset_name = affinity_ruleset["microservice"][microservice_name]["affinity_ruleset"] if affinity_ruleset_name is not None: ruleset = self.affinity_prototypes_dict[affinity_ruleset_name] for affinity_ms_name in ruleset.affinity_microservices: ms.add_microservice_affinity_with(self.microservices_dict[affinity_ms_name]) for antiaffinity_ms_name in ruleset.antiaffinity_microservices: ms.add_microservice_anti_affinity_with(self.microservices_dict[antiaffinity_ms_name]) for affinity_host_name in ruleset.affinity_hosts: ms.add_host_affinity_with(self.topology.hosts_dict[affinity_host_name]) for antiaffinity_host_name in ruleset.antiaffinity_hosts: ms.add_host_anti_affinity_with(self.topology.hosts_dict[antiaffinity_host_name])
[docs] def validate_simulation_scenario(self, sim_scenario: dict): """ Validate the simulation scenario to make sure all the microservices are in the simulation scenario It raises an exception if a microservice is not found in the simulation scenario :param sim_scenario: The simulation scenario :return: None """ for microservice in sim_scenario["microservices"]: if microservice.name not in self.microservices_dict: raise Exception("Microservice {} not found in microservices".format(microservice.name))
[docs] @staticmethod def copy_sim_scenarios_to_dict(sim_scenarios: Union[List[SimulationScenario], Dict[str, SimulationScenario]]) \ -> Dict[str, SimulationScenario]: """ Copy simulation scenarios to a dictionary :param sim_scenarios: :return: """ if isinstance(sim_scenarios, dict): return deepcopy(sim_scenarios) else: simulation_scenarios_dict = {} for scenario in sim_scenarios: simulation_scenarios_dict[scenario['name']] = deepcopy(scenario) return simulation_scenarios_dict
def __get_debug_param(self, name: str, value: Any = None, default: Any = None): """ Get a debug parameter :param name: :param value: :param default: :return: """ scenario_debug_config = self.scenario["debug"] if "debug" in self.scenario else {} if value is None: if name in scenario_debug_config: return scenario_debug_config[name] elif default is not None: return default else: raise Exception("Debug parameter {} not found in scenario debug config".format(name)) else: return value
[docs] def set_debug_properties(self, level: Union[bool, int] = None, file_path: str = None, log_cpu_events: bool = None, log_timeline: bool = None): """ Set debug properties :param level: :param file_path: :param log_cpu_events: :param log_timeline: :return: """ self.debug_level = self.__get_debug_param("debug_level", level) self.debug_file_location = self.__get_debug_param("debug_file_location", file_path) self._log_cpu_events = self.__get_debug_param("log_cpu_events", log_cpu_events) self._log_timeline = self.__get_debug_param("log_timeline", log_timeline)
@property def log_timeline(self): """ Log timeline :return: """ return self._log_timeline @property def debug(self): """ Status of debug mode (automatically configured based on _debug_level) :return: """ return self._debug @property def debug_level(self): """ Level of debug verbosity (1-5) :return: Return debug level """ return self._debug_level @debug_level.setter def debug_level(self, debug_level: Union[int, bool]): """ Set debug level (1-5) :param debug_level: Debug level (1-5) :return: None """ self._debug = False if debug_level == 0 or debug_level is False else True self._debug_level = 0 if self._debug is False or debug_level < 0 else debug_level @property def debug_file_location(self): """ Return log file location :return: Log file path - None if empty (then log will print out in console) :rtype: str """ return self._debug_file_location @debug_file_location.setter def debug_file_location(self, debug_file_location: str): self._debug_file_location = debug_file_location @property def log_cpu_events(self): """ Status of logging CPU events (used for drawing task load balancing heatmaps) :return: bool """ return self._log_cpu_events @property def time(self) -> int: """ Simulation time :return: Simulation time :rtype: float """ return self._time @time.setter def time(self, v: int): """ Set simulation time :param v: :return: """ self._time = v
[docs] @staticmethod def from_scenarios_manager(sm: 'SimulationScenarioManager') -> dict[str, 'Simulation']: """ Create simulations from scenarios manager :param sm: :return: """ simulations_dict = {} for scenario in sm.simulation_scenarios_dict.values(): simulations_dict[scenario["name"]] = \ Simulation(name=scenario["name"], simulation_scenario=scenario, service_chains_dict=sm.service_chains_dict, topology_prototype=sm.topologies_prototype_dict[scenario["topology"]], placement_algorithm=sm.placement_algorithms_dict[scenario["placement_algorithm"]], resource_allocation_scenarios_dict=sm.res_alloc_scenarios_dict, affinity_prototypes_dict=sm.affinity_prototypes_dict, traffic_prototypes_dict=sm.traffic_prototypes_dict, storage_driver=sm.results_storage_driver) return simulations_dict