Source code for perfsim.equipments.host

#  Copyright (C) 2020 Michel Gokan Khan
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 2 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License along
#  with this program; if not, write to the Free Software Foundation, Inc.,
#  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
#  This file is a part of the PerfSim project, which is now open source and available under the GPLv2.
#  Written by Michel Gokan Khan, February 2020


from __future__ import annotations

from typing import TYPE_CHECKING, List, Union, Set, Dict

import numpy as np

from perfsim import Nic, CPU, Storage, Settings, HostPrototype, RamSet, Router, ReplicaThread, CoreLogObserver, \
    CPULogObserver, Equipment, CostDict, CostEventsDict

if TYPE_CHECKING:
    from perfsim import MicroserviceReplica, Cluster


[docs] class Host(HostPrototype, Equipment): """ A Host object simulates a single host in a cluster. It has a single CPU and a single NIC. The Number of cores in its CPU can be specified using the *cores_count* property, and its maximum network bandwidth can be specified with the *max_bandwidth* property. """ #: The cluster this host belongs to __cluster: Cluster #: The CPU of this host cpu: CPU #: The cost events of this host cost_events: CostEventsDict def __init__(self, name: str, cpu_core_count: int, cpu_clock_rate: int, memory_capacity: int, ram_speed: int, storage_capacity: int, storage_speed: int, network_bandwidth: int, router: Router = None, cluster: Cluster = None, sched_latency_ns: int = Settings.args.loc['sched_latency_ns']['min'], sched_min_granularity_ns: int = Settings.args.loc['sched_min_granularity_ns']['min'], cfs_period_ns: int = Settings.args.loc['cfs_period_ns']['min'], cost_dict: CostDict = None): super().__init__(name=name, cpu_core_count=cpu_core_count, cpu_clock_rate=cpu_clock_rate, memory_capacity=memory_capacity, ram_speed=ram_speed, storage_capacity=storage_capacity, storage_speed=storage_speed, network_bandwidth=network_bandwidth, sched_latency_ns=sched_latency_ns, sched_min_granularity_ns=sched_min_granularity_ns, cfs_period_ns=cfs_period_ns, cost_dict=cost_dict) self.cluster = cluster self.cost_events: CostEventsDict = { "power_on_periods": [], "best_effort_periods": [], "storage_reserved_periods": [], "core_reserved_periods": [] } self.cpu = CPU(name=name + "_cpu0", cores_count=self.cpu_core_count, clock_rate=self.cpu_clock_rate, host=self) self.nic = { "egress": Nic(name + "_nic0_egress", self.network_bandwidth, self), "ingress": Nic(name + "_nic0_ingress", self.network_bandwidth, self), } self.ram = RamSet(ram_set_id=name + "_ram0", capacity=self.memory_capacity, speed=self.ram_speed, host=self) self.blkio = Storage(storage_id=name + "_storage0", capacity=self.storage_capacity, speed=self.storage_speed, host=self) self._id_in_cluster = -1 self.microservices = [] self.replicas = set() self.__threads = set() self.timeline_event = [] self.timeline_time = [] self.name = name self.__router = router self.load_balancing_needed = False
[docs] def reinit(self): """ Reinitialize the host object :return: """ self.__init__(name=self.name, cpu_core_count=self.cpu_core_count, cpu_clock_rate=self.cpu_clock_rate, memory_capacity=self.memory_capacity, ram_speed=self.ram_speed, storage_capacity=self.storage_capacity, storage_speed=self.storage_speed, network_bandwidth=self.network_bandwidth, router=self.router, cluster=self.cluster, sched_latency_ns=self.sched_latency_ns, sched_min_granularity_ns=self.sched_min_granularity_ns, cfs_period_ns=self.cfs_period_ns, cost_dict=self.cost_dict)
@property def cluster(self) -> Cluster: """ Returns the cluster this host belongs to :return: """ return self.__cluster @cluster.setter def cluster(self, cluster: Cluster): """ Sets the cluster this host belongs to and attaches the necessary observers to the host's CPU and cores :param cluster: :return: """ self.__cluster = cluster if cluster is not None: for core in self.cpu.cores: core.attach_observer(observer=CoreLogObserver(core=core)) self.cpu.attach_observer(CPULogObserver(cpu=self.cpu)) @property def router(self) -> Router: """ Returns the router this host is connected to (if any) :return: """ return self.__router @router.setter def router(self, v: Union[Router, None]): """ Sets the router this host is connected to (if any) :param v: :return: """ if not isinstance(v, Router) and v is not None: raise Exception(f"Can't connect host {self} to object of type {type(v).__name__}") self.__router = v
[docs] @classmethod def from_host_prototype(cls, name: str, host_prototype: HostPrototype, cluster: Cluster = None, router: Router = None): """ Create a host from a host prototype object and assign it to a cluster and a router :param name: :param host_prototype: :param cluster: :param router: :return: """ return cls(name=name, cpu_core_count=host_prototype.cpu_core_count, cpu_clock_rate=host_prototype.cpu_clock_rate, memory_capacity=host_prototype.memory_capacity, ram_speed=host_prototype.ram_speed, storage_capacity=host_prototype.storage_capacity, storage_speed=host_prototype.storage_speed, network_bandwidth=host_prototype.network_bandwidth, router=router, cluster=cluster, sched_latency_ns=host_prototype.sched_latency_ns, sched_min_granularity_ns=host_prototype.sched_min_granularity_ns, cfs_period_ns=host_prototype.cfs_period_ns)
[docs] def is_replica_placeable_on_host_from_resource_perspective(self, replica: MicroserviceReplica) -> bool: """ Check if a replica can be placed on this host from a resource perspective (CPU, RAM, BLKIO, NIC) :param replica: :return: """ c_a = self.cpu.is_there_enough_resources_to_reserve(amount=replica.microservice.cpu_requests) or \ replica.microservice.cpu_requests == replica.microservice.cpu_limits == -1 r_a = self.ram.is_there_enough_resources_to_reserve(amount=replica.microservice.memory_requests) b_a = self.blkio.is_there_enough_resources_to_reserve(amount=replica.microservice.blkio_capacity) if c_a and r_a and b_a: # and e_a and i_a: return True else: return False
[docs] def place_replica(self, replica: MicroserviceReplica) -> None: """ Place a replica on this host and reserve the necessary resources (CPU, RAM, BLKIO, NIC) :param replica: :return: """ if replica.microservice.is_guaranteed() or replica.microservice.is_unlimited_burstable(): self.cpu.reserve(amount=replica.microservice.cpu_requests) elif replica.microservice.is_limited_burstable(): self.cpu.reserve(amount=replica.microservice.cpu_limits) elif replica.microservice.is_burstable(): self.cpu.reserve(amount=replica.microservice.cpu_limits) else: replica.process.cpu_requests_share = len(self.cpu.cores) * self.cpu.max_cpu_requests self.ram.reserve(amount=replica.microservice.memory_requests) self.blkio.reserve(amount=replica.microservice.blkio_capacity) self.nic["egress"].request_bw(bandwidth_request=replica.microservice.egress_bw) self.nic["ingress"].request_bw(bandwidth_request=replica.microservice.ingress_bw) self.replicas.add(replica) if len(self.replicas) == 1: self.cost_events["power_on_periods"].append((self.cluster.sim.time, float("inf")))
[docs] def evict_replica(self, replica: MicroserviceReplica) -> None: """ Evict a replica from this host and release the reserved resources (CPU, RAM, BLKIO, NIC) :param replica: :return: """ self.replicas.remove(replica) if replica.microservice.cpu_requests != -1 and replica.microservice.cpu_limits != -1: self.cpu.release(amount=replica.microservice.cpu_requests) self.ram.release(amount=replica.microservice.memory_requests) self.blkio.release(amount=replica.microservice.blkio_capacity) self.nic["egress"].dismiss_bw(bandwidth_request=replica.microservice.egress_bw) self.nic["ingress"].dismiss_bw(bandwidth_request=replica.microservice.ingress_bw) if len(self.replicas) == 0: last_power_on_period = self.cost_events["power_on_periods"][-1] if last_power_on_period[1] != float("inf"): raise Exception("Last power on period is not infinite! Probably something went wrong (i.e., a bug).") else: last_power_on_period = (last_power_on_period[0], self.cluster.sim.time)
[docs] def is_active(self) -> bool: """ Check if the host is active (i.e., has at least one thread running) :return: """ return len(self.threads) > 0
[docs] @staticmethod def generate_random_instances(cluster: Cluster, host_count: int, core_count: int = np.random.choice(Settings.chunks["host_cpu_core_count"]), cpu_clock_rate: int = np.random.choice(Settings.chunks["host_cpu_clock_rate"]), memory_capacity: int = 16 * 1024 * 1024 * 1024, ram_speed: int = 2675787694, # sysbench storage_capacity: int = 1000, storage_speed: int = 1.0695 * 10 ** 7, # sysbench network_bandwidth: int = np.random.choice(Settings.chunks["host_network_bandwidth"]), name_index_starts_from: int = 0) -> List[Host]: """ Generate random instances of hosts :param cluster: :param host_count: :param core_count: :param cpu_clock_rate: :param memory_capacity: :param ram_speed: :param storage_capacity: :param storage_speed: :param network_bandwidth: :param name_index_starts_from: :return: """ host_count = np.random.choice(Settings.chunks["host_count"]) \ if host_count is None else host_count hosts = [] for i in np.arange(0, host_count): hosts.append( Host(name="host" + str(i + name_index_starts_from), cpu_core_count=core_count, cpu_clock_rate=cpu_clock_rate, memory_capacity=memory_capacity, ram_speed=ram_speed, storage_capacity=storage_capacity, storage_speed=storage_speed, network_bandwidth=network_bandwidth, cluster=cluster) ) return hosts
@property def threads(self) -> Set[ReplicaThread]: """ Get the threads running on the host (if any) :return: """ return self.__threads @threads.setter def threads(self, v: Set[ReplicaThread]): """ Set the threads running on the host (if any) :param v: :return: """ self.__threads = v
[docs] @staticmethod def from_config(conf: Dict = None, host_prototypes_dict: dict[str, 'HostPrototype'] = None) -> dict[str, 'Host']: """ Create hosts from a configuration dictionary and a dictionary of host prototypes :param conf: :param host_prototypes_dict: :return: """ hosts_dict = {} for _host_id, _host_name in enumerate(conf): _host_type = conf[_host_name] _h = Host.from_host_prototype(name=_host_name, host_prototype=host_prototypes_dict[_host_type]) hosts_dict[_host_name] = _h return hosts_dict
[docs] @staticmethod def to_dict(hosts_list: list[Host]) -> dict[str, Host]: """ Convert a list of hosts to a dictionary of hosts :param hosts_list: :return: """ hosts_dict = {} for node in hosts_list: hosts_dict[node.name] = node return hosts_dict