# Copyright (C) 2020 Michel Gokan Khan
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# This file is a part of the PerfSim project, which is now open source and available under the GPLv2.
# Written by Michel Gokan Khan, February 2020
from __future__ import annotations
from copy import deepcopy
from typing import Union, List, Dict, Set
import networkx as nx
from perfsim import Host, Router, MicroserviceReplica, Transmission, TopologyLink, Plotter
[docs]
class TopologyPrototype(nx.MultiDiGraph):
hosts_dict: Dict[str, Host]
routers_dict: Dict[str, Router]
topology_links_dict: Dict[str, TopologyLink]
"""
In Kubernetes, we noticed a slight error between desired and actual egress_bandwidths. For example,
if we set egress_bandwidth of a pod to 100Mbps, it gets slightly lower bandwidth (~95Mbps - error = 0.05 = 5%).
We call this slight error as egress_err. Use 0.05 if you want to indicate an error of 5%.
"""
egress_err: float
"""
In Kubernetes, we noticed a slight error between desired and actual ingress_bandwidths. For example,
if we set ingress_bandwidth of a pod to 100Mbps, it gets slightly lower bandwidth (~95Mbps - error = 0.05 = 5%).
We call this slight error as ingress_err. Use 0.05 if you want to indicate an error of 5%.
"""
ingress_err: float
active_transmissions: Set[Transmission]
before_recalculate_transmissions_bw_on_all_links: str
def __init__(self,
name: str,
egress_err: float,
ingress_err: float,
incoming_graph_data=None,
hosts: Dict[str, Host] = None,
routers: Dict[str, Router] = None,
links: Dict[str, TopologyLink] = None,
**attr):
super().__init__(incoming_graph_data, **attr)
self.incoming_graph_data = incoming_graph_data
self.attr = attr
self.name = name
self.egress_err = egress_err
self.ingress_err = ingress_err
self.hosts_dict = {}
self.routers_dict = {}
self.topology_links_dict = {}
self.active_transmissions = set()
self.active_edges = set()
self.zombie_edges = set()
if hosts is not None and routers is not None:
self.add_equipments(hosts=hosts, routers=routers)
if links is not None:
self.add_edges_from(links)
[docs]
def add_equipments(self, hosts: Dict[str, Host], routers: Dict[str, Router], **attr):
self.hosts_dict = hosts
self.routers_dict = routers
hosts_for_adding: list = list(self.hosts_dict.values())
routers_for_adding: list = list(self.routers_dict.values())
nodes_for_adding = hosts_for_adding + routers_for_adding
super().add_nodes_from(nodes_for_adding=nodes_for_adding, **attr)
[docs]
def add_edges_from(self, ebunch_to_add: dict[str, TopologyLink], **attr):
edges = []
for edge in ebunch_to_add.values():
if edge.source not in self.nodes or edge.destination not in self.nodes:
raise Exception("Source or Destination of an edge in this topology (" + str(self.name) +
") is not in the graph!")
edges.append((edge.source, edge.destination, {"name": edge.name,
"latency": edge.latency,
"transmissions_portion_of_bandwidth": None,
"transmissions": set()}))
self.topology_links_dict[edge.name] = edge
super().add_edges_from(edges, **attr)
self.reinitiate_topology()
[docs]
def add_edge(self, u_for_edge, v_for_edge, key=None, **attr):
if isinstance(u_for_edge, Host):
if not isinstance(v_for_edge, Router):
raise Exception(f"Can't connect host {u_for_edge} to an object of type "
f"{type(v_for_edge).__name__}. A host can only be connected to a Router "
f"object!")
v_for_edge.connect_host(u_for_edge)
elif isinstance(u_for_edge, Router) and isinstance(v_for_edge, Router):
v_for_edge.connect_router(router=u_for_edge)
# else:
# raise Exception(f"Something is wrong adding edge to topology {self.name}!")
return super().add_edge(u_for_edge=u_for_edge, v_for_edge=v_for_edge, key=key, **attr)
[docs]
def reinitiate_topology(self):
for edge in self.edges:
edge_data = self.get_edge_data(edge[0], edge[1])[0]
if edge_data["transmissions_portion_of_bandwidth"] is None:
edge_data["transmissions_portion_of_bandwidth"] = Transmission.get_bandwidth_on_link(edge[0], edge[1])
[docs]
def draw(self, show_microservices: bool = True, save_dir: str = None, show: bool = False, type: str = "html"):
if show_microservices:
_replica_list = []
_G = nx.MultiDiGraph()
_edges = []
for edge in self.edges:
edge_data = self.get_edge_data(u=edge[0], v=edge[1])[0]
_edges.append((edge[0], edge[1], edge_data))
_G.add_edges_from(ebunch_to_add=_edges)
# _G = self.copy()
# _G.__class__ = nx.MultiDiGraph
for node in self.nodes:
if isinstance(node, Host):
for replica in node.replicas:
_replica_list.append((node, replica))
for _tuple in _replica_list:
host = _tuple[0] if isinstance(_tuple[0], Host) else _tuple[1]
replica = _tuple[1] if isinstance(_tuple[1], MicroserviceReplica) else _tuple[0]
host_to_replica_bw = min(host.nic["egress"].bandwidth, replica.process.ingress_bw)
replica_to_host_bw = min(replica.process.egress_bw, host.nic["ingress"].bandwidth)
_G.add_edge(host, replica, name="", bandwidth=str(host_to_replica_bw))
_G.add_edge(replica, host, name="", bandwidth=str(replica_to_host_bw))
else:
_G = self
return Plotter.draw_graph(_G, name=self.name, save_dir=save_dir, show=show)
# def recalculate_transmissions_time_on_all_links(self):
# for edge in self.edges:
# self.recalculate_transmissions_time_on_link(edge)
[docs]
@staticmethod
def recalculate_transmissions_times(transmissions: Set[Transmission]):
# edge_data = self.get_edge_data(link[0], link[1])[0]
for trans in transmissions:
prev_trans_exact_time = trans.transmission_exact_time
trans.calculate_transmission_time()
if trans.transmission_exact_time != prev_trans_exact_time:
# TODO: Is there any way to merge this with request's recalculate_transmission_times ?
request = trans.subchain_id_request_pair[0]
subchain_id = trans.subchain_id_request_pair[1]
load_generator = trans.src_replica.host.cluster.sim.load_generator
# request.transmission_times[subchain_id] = transmission.calculate_transmission_time()
request.trans_times[subchain_id] = trans.transmission_time
request.trans_exact_times[subchain_id] = trans.transmission_exact_time
prev_trans_from_sorted_dict = load_generator.next_trans_completion_times.get(prev_trans_exact_time)
trans_from_sorted_dict = load_generator.next_trans_completion_times.get(trans.transmission_exact_time)
if prev_trans_from_sorted_dict is not None and prev_trans_from_sorted_dict["counter"] > 1:
prev_trans_from_sorted_dict["counter"] -= 1
else:
load_generator.next_trans_completion_times.pop(prev_trans_exact_time, 0)
load_generator.next_trans_completion_times.update(
{trans.transmission_exact_time: {
"counter": trans_from_sorted_dict["counter"] + 1 if trans_from_sorted_dict is not None else 1
}})
[docs]
@staticmethod
def copy_to_dict(topology_prototypes: Union[List[TopologyPrototype], Dict[str, TopologyPrototype]]) \
-> Dict[str, TopologyPrototype]:
if isinstance(topology_prototypes, dict):
return deepcopy(topology_prototypes)
else:
topology_prototypes_dict = {}
for topology_prototype in topology_prototypes:
topology_prototypes_dict[topology_prototype.name] = deepcopy(topology_prototype)
return topology_prototypes_dict
[docs]
@staticmethod
def from_config(conf: dict, topology_equipments_dict, link_prototypes_dict) -> Dict[str, TopologyPrototype]:
topology_prototypes_dict = {}
for _topology_id, _topology_name in enumerate(conf):
_topology_nodes = {}
_hosts_nodes = {}
_routers_nodes = {}
_topology_edges = {}
for _node_index in conf[_topology_name]["nodes"]:
_node_data = conf[_topology_name]["nodes"][_node_index]
_node_name = _node_data["name"]
if _node_data["type"] == "router":
_topology_nodes[_node_index] = topology_equipments_dict[_topology_name]["routers"][_node_name]
_routers_nodes[_node_index] = topology_equipments_dict[_topology_name]["routers"][_node_name]
elif _node_data["type"] == "host":
_topology_nodes[_node_index] = topology_equipments_dict[_topology_name]["hosts"][_node_name]
_hosts_nodes[_node_index] = topology_equipments_dict[_topology_name]["hosts"][_node_name]
else:
raise Exception("Node type is not defined in topology " + str(_topology_name))
for _edge_name in conf[_topology_name]["edges"]:
_link_data = conf[_topology_name]["edges"][_edge_name]
_link_type = _link_data["link_type"]
_link_prototype = link_prototypes_dict[_link_type]
_source_index = _link_data["connection"][0]
_destination_index = _link_data["connection"][1]
_src = _topology_nodes[_source_index]
_dest = _topology_nodes[_destination_index]
_link = TopologyLink.from_prototype(name=_edge_name, prototype=_link_prototype, src=_src, dest=_dest)
_topology_edges[_link.name] = _link
_nodes = list(_topology_nodes.values())
# _edges = [(edge.source, edge.destination) for edge in _topology_edges]
_tau = TopologyPrototype(name=_topology_name,
egress_err=float(conf[_topology_name]["egress_err"]),
ingress_err=float(conf[_topology_name]["ingress_err"]))
_tau.add_equipments(hosts=_hosts_nodes, routers=_routers_nodes)
_tau.add_edges_from(ebunch_to_add=_topology_edges)
topology_prototypes_dict[_topology_name] = _tau
return topology_prototypes_dict