Source code for perfsim.cluster_scheduler
# Copyright (C) 2020 Michel Gokan Khan
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# This file is a part of the PerfSim project, which is now open source and available under the GPLv2.
# Written by Michel Gokan Khan, February 2020
from __future__ import annotations
from typing import TYPE_CHECKING, Dict, Set
import numpy as np
import pandas as pd
from perfsim import Host, MicroserviceReplica
if TYPE_CHECKING:
from perfsim import Cluster, ReplicaThread
[docs]
class ClusterScheduler:
"""
This class is responsible for scheduling the cluster.
"""
#: The set of zombie threads in the cluster that needs to be killed
zombie_threads: Set[ReplicaThread]
#: The set of active hosts in the cluster that are actually having a thread running on them.
active_hosts: Set[Host]
#: The set of hosts in the cluster that require CPU load balancing.
hosts_need_load_balancing: Set[Host]
#: The set of active threads in the cluster that are actually running on a host.
active_threads: Set[ReplicaThread]
#: The cluster to which this scheduler belongs
cluster: Cluster
#: The placement dataframe consisting of microservice name as index and host name as column
__placement_matrix: pd.DataFrame
#:
hosts_dict: Dict[str, Host]
replicas: Set[MicroserviceReplica]
# placement_algorithm: PlacementAlgorithm
def __init__(self, cluster: Cluster):
# TODO: concept of active_hosts should be defined here
self.active_hosts = set()
self.hosts_need_load_balancing = set()
self.zombie_threads = set()
self.active_threads = set()
self.cluster = cluster
self.reschedule(hosts_dict=self.cluster.topology.hosts_dict)
def __initialize_cluster(self, hosts_dict: Dict[str, Host] = None):
"""
Initialize the cluster.
:param hosts_dict: The dictionary of hosts.
:return: None
"""
self.hosts_dict = hosts_dict
self.replicas = set()
host_names = list(self.hosts_dict.keys())
microservices_names = list(self.cluster.microservices_dict.keys())
zero_array = np.zeros(shape=(len(self.cluster.microservices_dict), len(self.hosts_dict)))
self.__placement_matrix = pd.DataFrame(data=zero_array, index=microservices_names, columns=host_names)
@property
def placement_matrix(self) -> pd.DataFrame:
"""
Get the placement matrix.
:return:
"""
return self.__placement_matrix
@placement_matrix.setter
def placement_matrix(self, v):
"""
Set the placement matrix.
:param v:
:return:
"""
raise Exception(
"You can't directly change the placement matrix. Use the reschedule method or change microservices "
"affinity/anti-affinities instead.")
[docs]
def reschedule(self, hosts_dict: Dict[str, Host] = None, use_current_hosts: bool = False):
"""
Reschedule the cluster.
:param hosts_dict: The dictionary of hosts.
:param use_current_hosts: Whether to use the current hosts or not.
:return: None
"""
if use_current_hosts:
hosts_dict = self.hosts_dict
self.__initialize_cluster(hosts_dict=hosts_dict)
for microservice_index, microservice in enumerate(self.cluster.microservices_dict.values()):
self.replicas.update(microservice.replicas)
for replica_index, replica in enumerate(microservice.replicas):
replica.remove_host_without_eviction()
self.cluster.sim.placement_algorithm.place(placement_matrix=self.placement_matrix,
replicas=self.replicas,
hosts_dict=hosts_dict)