Source code for perfsim.service_chain.microservice_replica

#  Copyright (C) 2020 Michel Gokan Khan
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 2 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License along
#  with this program; if not, write to the Free Software Foundation, Inc.,
#  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
#  This file is a part of the PerfSim project, which is now open source and available under the GPLv2.
#  Written by Michel Gokan Khan, February 2020


from __future__ import annotations

from typing import TYPE_CHECKING, Tuple, Dict

import numpy as np

from perfsim import ReplicaThread, Process, MicroserviceEndpointFunction

if TYPE_CHECKING:
    from perfsim import Host, Microservice, Request


[docs] class MicroserviceReplica: microservice: Microservice def __init__(self, name: str, microservice: Microservice): self.cpu_limits_ns = None self.name = name self.__host = None self.microservice = microservice self.process = Process(pname=name, cpu_requests_share=microservice.cpu_requests, cpu_limits=microservice.cpu_limits, memory_capacity=microservice.memory_requests, ingress_bw=microservice.ingress_bw, egress_bw=microservice.egress_bw, ingress_latency=microservice.ingress_latency, egress_latency=microservice.egress_latency, blkio_capacity=microservice.blkio_capacity, endpoint_functions=microservice.endpoint_functions, ms_replica=self) # self, self.cgroup) self.last_thread_id = 0 # def set_reserved_cpu_limits_ns(self) -> None: # if self.host is not None: # self.cpu_limits_ns = (self.host.cfs_period_ns * # self.microservice.cpu_limits) / self.host.cpu.max_cpu_requests
[docs] def reinit(self): self.__init__(self.name, self.microservice)
@property def host(self) -> Host: return self.__host @host.setter def host(self, host: Host): if host != self.host: if self.host is not None: self.host.evict_replica(self) if host is not None: host.place_replica(self) self.__host = host self.microservice.hosts.append(host) # self.set_reserved_cpu_limits_ns() @property def microservice(self) -> Microservice: return self.__microservice @microservice.setter def microservice(self, microservice: Microservice): self.__microservice = microservice # self.set_reserved_cpu_limits_ns() def __str__(self): return self.name
[docs] def remove_host_without_eviction(self) -> None: if self.host is not None: self.host.evict_replica(self) self.__host = None
[docs] def reserve_egress_bw(self, bw: float): self.process.egress_bw -= bw if bw > self.process.egress_bw else 0
[docs] def release_egress_bw(self, bw: float): self.process.egress_bw += bw if self.process.egress_bw > self.process.original_egress_bw: self.process.egress_bw = self.process.original_egress_bw
[docs] def reserve_ingress_bw(self, bw: float): self.process.ingress_bw -= bw if bw > self.process.ingress_bw else 0
[docs] def release_ingress_bw(self, bw: float): self.process.ingress_bw += bw if self.process.ingress_bw > self.process.original_ingress_bw: self.process.ingress_bw = self.process.original_ingress_bw
[docs] def generate_threads(self, from_subchain_id: int, node_in_subchain: Tuple[int, MicroserviceEndpointFunction], replica_identifier_in_subchain: int, load_balance: bool = False, parent_request: Request = None) -> Dict[str, ReplicaThread]: threads_dict: Dict[str, ReplicaThread] = {} for _ in np.arange(node_in_subchain[1].threads_count): #: node_in_subchain[1] == the endpoint function _thread = ReplicaThread(process=self.process, replica=self, replica_identifier_in_subchain=replica_identifier_in_subchain, node_in_alt_graph=node_in_subchain, thread_id_in_node=_, subchain_id=from_subchain_id, average_load=node_in_subchain[1].threads_avg_cpu_usages[_], parent_request=parent_request) threads_dict[str(_thread.id)] = _thread # TODO: To optimize execution we should have active_hosts concept so that it only iterates on active hosts # parent_request.cluster.cluster_scheduler.active_hosts.add(self.host) self.host.cpu.cores[0].runqueue.enqueue_tasks(threads=list(threads_dict.values()), load_balance=load_balance) return threads_dict