# Copyright (C) 2020 Michel Gokan Khan
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# This file is a part of the PerfSim project, which is now open source and available under the GPLv2.
# Written by Michel Gokan Khan, February 2020
from __future__ import annotations
from typing import TYPE_CHECKING, Tuple, Dict, Union
import matplotlib.pyplot as plt
import networkx as nx
from matplotlib.lines import Line2D
from perfsim import ClusterScheduler, Topology, ServiceChainManager, Utils, \
MicroserviceEndpointFunction, ServiceChain, Observable, ClusterLogObserver
if TYPE_CHECKING:
from perfsim import Microservice, Simulation
[docs]
class Cluster(Observable):
"""
A *Cluster* is imitating a real heterogeneous cluster with a configurable ``NetworkTopology``,
``PlacementScenario`` and a set of ``ServiceChainManager``s.
"""
#: A reference to the parent LoadGenerator instance
# load_generator: LoadGenerator
#: Name of the cluster
name: str
#: A dictionary of all service chains running on the cluster (key=id, value=service chain)
__scm_dict: Dict[str, ServiceChainManager]
#: The hosts and routers placement scenario on the cluster
# placement_scenario: PlacementScenario
#: The network topology of the cluster
topology: Topology
#: List of all `Microservice` references running on this cluster
__microservices_dict: Dict[str, Microservice]
#: The `ClusterScheduler` of this cluster responsible for scheduling replicas on the given topology
cluster_scheduler: ClusterScheduler
#: The timeout threshold for network transmissions
network_timeout: float
#: The parent `Simulation` instance
sim: Simulation
def __init__(self,
name: str,
simulation: Simulation,
# placement_scenario: PlacementScenario,
topology: Topology,
service_chains_dict: Dict[str, ServiceChain] = None,
scm_dict: Dict[str, ServiceChainManager] = None,
network_timeout: float = float('inf')):
"""
A *Cluster* is imitating a real heterogeneous cluster with a configurable `NetworkTopology`,
`PlacementScenario` and a set of `ServiceChainManager` instances.
:param: name: Name of the cluster
:param: service_chain_managers: A dict of all service chains running on the cluster
:param: placement_algorithm: The hosts and routers' placement scenario on the cluster
:param: topology: The network topology of the cluster
:param: network_timeout: The network timeout
"""
if service_chains_dict is None and scm_dict is None:
raise Exception("Either service_chains_dict or scm_dict must be provided")
elif service_chains_dict is not None and scm_dict is not None:
raise Exception("Only one of service_chains or scm_dict must be provided")
self.name = name
self.sim = simulation
if service_chains_dict is not None:
self.set_service_chains_dict(service_chains_dict=service_chains_dict)
else:
self.set_scm_dict(scm_dict=scm_dict)
self.topology = topology
self.network_timeout = network_timeout if network_timeout != -1 else float('inf')
self.__set_hosts_cluster()
self.__set_routers_cluster()
self.cluster_scheduler = ClusterScheduler(cluster=self)
self.topology.reinitiate_topology()
super().__init__()
if self.sim.debug_level > 0:
self.attach_observer(observer=ClusterLogObserver(cluster=self))
[docs]
def reinit(self):
"""
Reinitialize the cluster
:return: None
"""
self.__init__(name=self.name,
simulation=self.sim,
topology=self.topology,
scm_dict=self.__scm_dict,
network_timeout=self.network_timeout)
[docs]
def count_total_service_edges(self):
"""
Counts the total number of edges in the service topology that is being deployed in this cluster.
:return: The total number of edges in the service topology
"""
total_edges_count = 0
for service_chain_manager in self.__scm_dict.values():
total_edges_count += len(service_chain_manager.service_chain.edges)
return total_edges_count
[docs]
def register_events(self):
"""
Register all events of the cluster
:return: None
"""
self.register_event("after_finish_running_threads_on_a_host")
self.register_event("after_finish_running_a_thread")
self.register_event("before_transmitting_requests_in_network")
self.register_event("in_transmitting_an_active_transmission")
self.register_event("after_transmitting_an_active_transmission")
self.register_event("before_calling_is_there_a_thread_that_ends_sooner_function")
self.register_event("before_checking_a_thread_ends_sooner")
self.register_event("after_calling_is_there_a_thread_that_ends_sooner_function")
self.register_event("before_load_balancing_a_host")
def __set_hosts_cluster(self):
"""
Set the cluster of all hosts in the topology to this cluster
:return: None
"""
for _host in self.topology.hosts_dict.values():
_host.cluster = self
def __set_routers_cluster(self):
"""
Set the cluster of all routers in the topology to this cluster
:return:
"""
for _router in self.topology.routers_dict.values():
_router.cluster = self
@property
def scm_dict(self) -> Dict[str, ServiceChainManager]:
"""
List of all `ServiceChainManager` references running on this cluster
:return: The list of all `ServiceChainManager` references running on this cluster
"""
return self.__scm_dict
def __add_scm(self, scm: ServiceChainManager):
"""
Add a service chain manager to the cluster
:param scm: The service chain manager to add to the cluster
:return: None
"""
if scm.name not in self.scm_dict:
self.__scm_dict[scm.name] = scm
else:
raise ValueError(f'ServiceChainManager with name {scm.name} already exists')
def __add_microservices(self, microservices_dict: Dict[str, Microservice]):
"""
Add a set of microservices to the cluster
:param microservices_dict: The set of microservices to add to the cluster
:return: None
"""
for ms in microservices_dict.values():
self.__add_microservice(microservice=ms)
[docs]
def set_service_chains_dict(self, service_chains_dict: Dict[str, ServiceChain]):
"""
Set the service chains of the cluster
:param service_chains_dict: The service chains to set
:return: None
"""
self.__microservices_dict = {}
self.__scm_dict = {}
for sfc in service_chains_dict.values():
self.__add_scm(scm=ServiceChainManager(name=sfc.name, service_chain=sfc))
self.__add_microservices(microservices_dict=sfc.microservices_dict)
[docs]
def set_scm_dict(self, scm_dict: Dict[str, ServiceChainManager]):
"""
Set the service chain managers of the cluster. This method is used to set the service chain managers of the
cluster when the cluster is created from a dictionary of service chain managers.
:param scm_dict:
:return:
"""
self.__microservices_dict = {}
self.__scm_dict = {}
for scm in scm_dict.values():
self.__add_scm(scm=scm)
self.__add_microservices(microservices_dict=scm.service_chain.microservices_dict)
@scm_dict.setter
def scm_dict(self, scm_dict: Dict[str, ServiceChainManager]):
"""
Set the service chain managers of the cluster. This method is used to set the service chain managers of the
cluster when the cluster is created from a dictionary of service chain managers.
:param scm_dict:
:return:
"""
raise Exception('ServiceChainManagersDict is read only! Use set_scm_dict instead')
[docs]
def is_there_a_thread_that_ends_sooner(self, time_of_next_event: int) -> Tuple[float, float, bool]:
"""
Check all active threads to see if there is one that ends sooner that ``time_of_next_event``
"""
self.notify_observers(event_name="before_calling_is_there_a_thread_that_ends_sooner_function",
time_of_next_event=time_of_next_event)
if len(self.cluster_scheduler.active_threads) == 0:
# If there are no active threads, then by default we know the next event, whatever it is, is the next
it_takes_more_time_to_finish_at_least_one_thread_before_next_event = False
else:
it_takes_more_time_to_finish_at_least_one_thread_before_next_event = True
duration_of_next_event = time_of_next_event - self.sim.time
# run_threads = True
# hosts_to_consider = set()
# TODO: a possible optimization would be that instead of checking all hosts/cores,
# only check hosts that has threads! (is it possible?) Instead of iterating over
# hosts, iterate over threads (?)
for thread in self.cluster_scheduler.active_threads:
if thread.core.runqueue is not None and thread.on_rq:
# hosts_to_consider.add(host)
if self.sim.time > time_of_next_event:
raise Exception("What the hell!? Did we miss a request somewhere in the chain...!?")
else:
duration_to_finish = thread.get_exec_time_on_rq()
time_to_finish = duration_to_finish + self.sim.time
self.notify_observers(event_name="before_checking_a_thread_ends_sooner",
thread=thread,
duration_to_finish=duration_to_finish,
time_to_finish=time_to_finish,
time_of_next_event=time_of_next_event)
if time_to_finish < time_of_next_event:
it_takes_more_time_to_finish_at_least_one_thread_before_next_event = False
time_of_next_event = time_to_finish
duration_of_next_event = duration_to_finish
if it_takes_more_time_to_finish_at_least_one_thread_before_next_event:
duration_of_next_event = time_of_next_event - self.sim.time
self.notify_observers(event_name="after_calling_is_there_a_thread_that_ends_sooner_function",
result=it_takes_more_time_to_finish_at_least_one_thread_before_next_event,
time_of_next_event=time_of_next_event,
duration_of_next_event=duration_of_next_event)
return time_of_next_event, duration_of_next_event, it_takes_more_time_to_finish_at_least_one_thread_before_next_event
[docs]
def run_threads_on_hosts(self, duration: float) -> int:
"""
Run all threads on all active hosts for ``duration`` nanoseconds
:param duration: Duration of threads execution in nanoseconds
"""
completed_threads = 0
for host in self.cluster_scheduler.active_hosts:
host_completed_threads = 0
for core in host.cpu.cores:
current_completed_threads = core.exec_threads(duration)
host_completed_threads += current_completed_threads
completed_threads += current_completed_threads
self.notify_observers(event_name="finish_running_threads_on_a_host",
host=host,
completed_threads_on_host=host_completed_threads,
completed_threads_on_all_hosts=completed_threads)
return completed_threads
[docs]
def run_active_threads(self, duration: Union[int, float]) -> int:
"""
Transmit all requests in network for ``duration`` nanoseconds
:param duration: Duration of threads execution in nanoseconds
"""
completed_threads = 0
for thread in self.cluster_scheduler.active_threads:
current_completed_threads = thread.exec(duration=duration)
completed_threads += current_completed_threads
self.notify_observers(event_name="finish_running_a_thread",
current_completed_threads=current_completed_threads,
completed_threads=completed_threads)
return completed_threads
[docs]
def transmit_requests_in_network(self, duration: Union[int, float]):
"""
Transmit all requests in network for duration nanoseconds
:param: duration: Duration of network transmissions in nanoseconds
:return: None
"""
if duration == float('inf'):
return
is_there_any_finished_transmissions = False
self.notify_observers(event_name="before_transmitting_requests_in_network")
for trans in list(self.topology.active_transmissions):
request_subchainid_pair = trans.subchain_id_request_pair
req = request_subchainid_pair[0]
subchain_id = request_subchainid_pair[1]
host = trans.src_replica.host
# transmission = host.nic["egress"].transmissions[(_active_subchain_id, request)]
self.notify_observers(event_name="in_transmitting_an_active_transmission",
request=req,
active_subchain_id=subchain_id,
duration=duration)
if req.subchains_status[subchain_id] == "IN TRANSMISSION":
remaining_transmission_time = trans.transmit(duration)
if -0.001 < remaining_transmission_time < 0.001:
duration += remaining_transmission_time
req.trans_times[subchain_id] = remaining_transmission_time
if req.trans_exact_times[subchain_id] != trans.transmission_exact_time:
req_trans_from_sorted_dict = \
self.sim.load_generator.next_trans_completion_times.get(req.trans_exact_times[subchain_id])
trans_from_sorted_dict = \
self.sim.load_generator.next_trans_completion_times.get(trans.transmission_exact_time)
# We keep track of number of transmissions at each timestamp using the "counter" key!
# Using this counter prevents poping multiple dying transmissions from the SortedDict.
if req_trans_from_sorted_dict is not None and req_trans_from_sorted_dict["counter"] > 1:
req_trans_from_sorted_dict["counter"] -= 1
else:
self.sim.load_generator.next_trans_completion_times.pop(req.trans_exact_times[subchain_id], 0)
self.sim.load_generator.next_trans_completion_times.update({trans.transmission_exact_time: {
"counter": trans_from_sorted_dict["counter"] + 1 if trans_from_sorted_dict is not None else 1
}})
req.trans_exact_times[subchain_id] = trans.transmission_exact_time
if req.trans_times[subchain_id] < 0:
raise Exception("Error (time = " + str(self.sim.time) + " ): Remaining transmission " +
"time (" + str(req._trans_times[subchain_id]) +
") is less than zero! Something went really wrong here!")
self.notify_observers(event_name="after_transmitting_an_active_transmission",
request=req,
active_subchain_id=subchain_id,
duration=duration)
if req.trans_times[subchain_id] <= 0:
req.status = "MICROSERVICE"
is_there_any_finished_transmissions = True
# request.load_generator.requests_in_transmission.remove(request)
req.load_generator.requests_ready_for_thread_generation.append((subchain_id, req))
host.nic["egress"].release_transmission_for_request(req, subchain_id)
req.finish_transmission_by_subchain_id(subchain_id)
transmission_in_sorted_dict = req.load_generator.next_trans_completion_times.peekitem(0)
if transmission_in_sorted_dict[0] == self.sim.time:
if transmission_in_sorted_dict is not None and transmission_in_sorted_dict[1]["counter"] > 1:
transmission_in_sorted_dict[1]["counter"] -= 1
else:
req.load_generator.next_trans_completion_times.popitem(0)
# TODO: This can be optimized - instead of recalculating for all links, we can recalculate for just
# a portion of links
if is_there_any_finished_transmissions:
self.topology.recalculate_transmissions_bw_on_all_links()
[docs]
def load_balance_threads_in_all_hosts(self):
"""
Perform CPU load-balancing on all hosts
:return: None
"""
# for host in self.cluster_scheduler.hosts.values():
for host in list(self.cluster_scheduler.active_hosts):
self.notify_observers(event_name="before_load_balancing_a_host", host=host)
host.cpu.load_balance()
[docs]
def run_idle(self, until: int) -> None:
"""
Running threads idle for ``until`` - ``core.runqueue.time`` nanoseconds
:param until: The time when running idle stops.
"""
for host_name, host in self.cluster_scheduler.hosts_dict.items():
for core in host.cpu.cores:
core.runqueue.run_idle(until - core.cpu.host.cluster.sim.time)
@property
def microservices_dict(self):
"""
List of all `Microservice` references running on this cluster
:return: The list of all `Microservice` references running on this cluster
"""
return self.__microservices_dict
@microservices_dict.setter
def microservices_dict(self, value):
raise Exception("This property is read-only! Alter microservices via service_chain_managers!")
def __add_microservice(self, microservice: Microservice):
"""
Add a microservice to the cluster.
:param microservice: The microservice to add to the cluster
:return: None
"""
if microservice.name not in self.microservices_dict:
# raise Exception("Microservice with name '" + microservice.name + "' already exists in the cluster!")
# Maybe microservice with the same name exists in the cluster, which serves multiple service chains.
# So we should not raise an exception here.
self.__microservices_dict[microservice.name] = microservice
def __remove_microservice(self, microservice: Microservice):
"""
Remove a microservice from the cluster.
:param microservice: The microservice to remove from the cluster
:return: None
"""
if microservice.name not in self.microservices_dict:
raise Exception("Microservice with name '" + microservice.name + "' does not exist in the cluster!")
del self.__microservices_dict[microservice.name]
[docs]
def draw_all_service_chains(self, save_dir: str = None, show: bool = True):
"""
Draw all service chains of the cluster
:param save_dir:
:param show:
:return:
"""
function_color = '#FFDB58'
scm_color = "#808080"
_G = nx.MultiDiGraph()
color_map = []
edge_labels = {}
node_labels = {}
for _scm in self.scm_dict:
_G.add_edges_from(_scm.service_chain.edges)
_G.add_edge(_scm, list(_scm.service_chain.nodes)[0])
for u, v, c in _scm.service_chain.edges:
edge_data = _scm.service_chain.get_edge_data(u, v, c)
edge_labels[u, v] = edge_data["name"] + ":" + str(edge_data["payload"]) + "B"
for _node in _G:
node_labels[_node] = str(_node)
if isinstance(_node, MicroserviceEndpointFunction):
color_map.append(function_color)
else: # elif isinstance(ServiceChainManager)
color_map.append(scm_color)
pos = nx.spring_layout(_G)
# fig = plt.figure(figsize=(15, 5), facecolor='w')
fig = plt.figure(1, figsize=(12, 12))
plt.rcParams.update({'font.size': 10})
ax = plt.gca()
nx.draw_networkx_nodes(_G, pos=pos, node_color=color_map, ax=ax, node_size=900)
nx.draw_networkx_labels(_G, pos=pos, labels=node_labels, font_size=12, font_color='0')
# connectionstyle='arc3, rad = 0.3')
for e in _G.edges:
# _reverse_edges = _G.number_of_edges(e[1], e[0])
ax.annotate("",
xy=pos[e[0]],
xycoords='data',
xytext=pos[e[1]],
textcoords='data',
alpha=0.5,
arrowprops=dict(arrowstyle="<-",
color="0",
alpha=0.5,
shrinkA=20,
shrinkB=20,
patchA=None,
patchB=None,
connectionstyle=
"arc3, rad = rrr".replace('rrr', str(0.3 * e[2])), ), )
plt.axis('off')
nx.draw_networkx_edge_labels(_G, pos=pos, edge_labels=edge_labels, label_pos=0.2, font_size=8)
plt.title("Service chains of cluster " + self.name)
plt.legend(handles=[Line2D([0], [0], marker='o', label='Microservice endpoint function', color='w',
markerfacecolor=function_color),
Line2D([0], [0], marker='o', label='Load generator (SFC name)', color='w',
markerfacecolor=scm_color)])
if save_dir is not None:
Utils.mkdir_p(save_dir)
plt.savefig(save_dir + "/service_chains.pdf")
if show:
plt.show()
else:
plt.close()
[docs]
def calculate_average_latency_in_seconds(self):
"""
Calculate the average latency in seconds
:return: The average latency in seconds
"""
all_latencies = self.sim.load_generator.latencies["latency"]
return all_latencies.mean() / 10 ** 9
[docs]
def print_and_plot_scenarios(self, printing: bool = True, plotting: bool = True):
"""
Print and plot the scenarios
:param printing:
:param plotting:
:return:
"""
host_count = str(len(self.topology.hosts_dict))
avg_latencies = self.calculate_average_latency_in_seconds()
if printing:
print("Average latency with " + host_count + " hosts: " + str(avg_latencies) + "s")
if plotting:
folder_name = "results/" + host_count + "_hosts"
self.sim.load_generator.plot_latencies(save_dir=folder_name, marker='o', moving_average=True,
save_values=False, show=False)
self.topology.draw(save_dir=folder_name, show=False)
self.draw_all_service_chains(save_dir=folder_name, show=False)
self.plot_hosts_cpu_heatmaps(save_dir=folder_name + "/cpu", show=False)
[docs]
def plot_hosts_cpu_heatmaps(self, save_dir: str = None, show: bool = True):
"""
Plot the hosts' CPU heatmaps
:param save_dir: The directory to save the heatmaps
:param show: Whether to show the heatmaps or not
:return: None
"""
for host in self.cluster_scheduler.hosts_dict.values():
host.cpu.plot(save_dir=save_dir, show=show)