# Copyright (C) 2020 Michel Gokan Khan
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# This file is a part of the PerfSim project, which is now open source and available under the GPLv2.
# Written by Michel Gokan Khan, February 2020
from __future__ import annotations
import heapq
from typing import TYPE_CHECKING, Union, Dict, List, Tuple
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from sortedcontainers import SortedDict
from perfsim import Request, ReplicaThread, Utils, LoadGeneratorLogObserver, Observable, ServiceChainResultDict, \
SimulationScenarioResultDict
from perfsim.observers.results_observer import ResultsObserver
if TYPE_CHECKING:
from perfsim import Simulation
[docs]
class LoadGenerator(Observable):
"""
This class is responsible for generating a load for each service chain manager in a `simulation`.
"""
#: Name of the LoadGenerator instance.
name: str
#: The ``Simulation`` object where traffic belongs to.
sim: Simulation
#: All ``Request`` instances in the simulation
requests: list[Request]
#: All ``ReplicaThread`` instances in the simulation
threads: list[ReplicaThread]
#: All ``ReplicaThread`` instances in the simulation (in dict)
threads_dict: Dict[str, ReplicaThread]
#: The next batch request arrival time.
_next_batch_arrival_time: Union[int, float]
#: Next ``ServiceChainManager``s in the loop that arrive at `_next_batch_arrival_time`
_next_scm_names: list[str]
_min_arrival_scm_names: list[str]
_scm_current_arrival_iteration_ids: Dict[str, int]
#: Requests that are ready for thread generation in the next "THREAD GEN" event. List of tuple(subchain_id, request)
_requests_ready_for_thread_generation: list[tuple[int, Request]]
#: A sorted dictionary of all future transmissions completion time ("exact" means the clock time, not duration)
_next_trans_completion_times: SortedDict
#: Sum of all expected requests count in the simulation
__total_requests_count: int
#: Stores whether its the last request by scm_name (as keys)
_last_request: Dict[str, bool]
#: Stores requests latencies
latencies: pd.DataFrame
#: Stores requests arrival times
arrivals: pd.DataFrame
#: The event that is being triggered before traffic starts.
before_traffic_start: str
#: The event that is being triggered
before_generate_threads: str
#: The event that is being triggered before request is generated.
before_requests_start: str
#: The event that is being triggered at the end of initiate_next_batch_of_requests in being triggered.
after_requests_start: str
#: The event that is being triggered before estimating threads execution time.
before_exec_time_estimation: str
#: The event that is being triggered before start running threads.
before_executing_threads: str
#: The event that is being triggered after executing all threads/requests of this load generator.
after_completing_load_generation: str
#: The event that is being triggered after next batch arrival time is being calculated.
after_next_batch_arrival_time_calculation: str
#: The event that is being triggered before generating a thread for a request (within a subchain).
before_generate_request_threads: str
#: The event that is being triggered after generating a thread for a request (within a subchain).
after_generate_request_threads: str
#: The event that is being triggered after transmission completion time is being estimated.
after_transmission_estimation: str
#: The event that is being triggered after next event time is being estimated.
after_estimating_time_of_next_event: str
#: The event that is being triggered before transmitting packets
before_transmit_requests_in_network: str
#: The event that is being triggered after transmitting requests and load balancing threads on all hosts
after_transmit_requests_in_network_and_load_balancing_threads: str
#: The event that is being triggered after transmitting requests and load balancing threads on all hosts
before_request_created: str
#: This property stores the merged arrival tables from the traffic prototypes defined in the simulation scenario.
__merged_arrival_table: List[Tuple[int, Request]]
def __init__(self,
name: str,
simulation: Simulation,
notify_observers_on_event: bool = True):
"""
A *LoadGenerator* is an object responsible for not only generating a given list of traffic objects
*traffic_prototypes* on a given *simulation.cluster*, but to control the state, events and time of
the entire simulation.
:param name: Name of the LoadGenerator instance.
:param simulation: The `Simulation` object where traffic belongs to.
"""
self.name = name
self.sim = simulation
self.previous_event = None
self.next_event = "REQUEST"
self.requests = []
self.threads = []
self.latencies = pd.DataFrame(columns=["scenario #",
"SFC",
"iteration_id",
"req_id_in_iteration",
"latency",
"arrival_time",
"completion_time",
"status",
"traffic_type"])
self.__total_requests_count = self.__calculate_total_requests_count()
self._next_scm_names = list(self.sim.cluster.scm_dict.keys())
self._scm_current_arrival_iteration_ids = {scm_name: 0 for scm_name in self._next_scm_names}
self._requests_ready_for_thread_generation = []
self._next_trans_completion_times = SortedDict({float('inf'): None})
self._next_thread_completion_exact_times = SortedDict({float('inf'): None})
self._last_request = {scm_name: False for scm_name in self.sim.traffic_prototypes_dict.keys()}
self._completed_threads = 0
self._completed_requests = 0
self._next_batch_arrival_time = 0
self._min_arrival_time = float('inf')
self._min_arrival_scm_names = []
self._last_transmission_id = 0
self.notify_observers_on_event = notify_observers_on_event
self.__merged_arrival_table = []
heapq.heapify(self.__merged_arrival_table)
self.merge_arrival_tables()
super().__init__()
if self.sim.debug_level > 0:
self.attach_observer(observer=LoadGeneratorLogObserver(load_generator=self))
self.attach_observer(observer=ResultsObserver(load_generator=self))
def __calculate_total_requests_count(self):
requests_count = 0
for sfc_name, traffic_type_object in self.sim.scenario["traffic_scenario"]["service_chains"].items():
requests_count += self.sim.traffic_prototypes_dict[traffic_type_object["traffic_type"]].requests_count
return requests_count
@property
def total_requests_count(self):
return self.__total_requests_count
@total_requests_count.setter
def total_requests_count(self, value):
raise AttributeError(
"The attribute total_requests_count is read-only and can only be set during initialization "
"based on give simulation scenario.")
@property
def merged_arrival_table(self):
return self.__merged_arrival_table
@merged_arrival_table.setter
def merged_arrival_table(self, value):
raise Exception("Cannot set merged_arrival_table!")
[docs]
def register_events(self):
self.register_event("before_traffic_start")
self.register_event("before_requests_start")
self.register_event("after_requests_start")
self.register_event("before_generate_threads")
self.register_event("before_exec_time_estimation")
self.register_event("before_executing_threads")
self.register_event("after_completing_load_generation")
self.register_event("after_next_batch_arrival_time_calculation")
self.register_event("before_generate_request_threads")
self.register_event("after_generate_request_threads")
self.register_event("after_transmission_estimation")
self.register_event("after_estimating_time_of_next_event")
self.register_event("before_transmit_requests_in_network")
self.register_event("after_transmit_requests_in_network_and_load_balancing_threads")
self.register_event("before_request_created")
[docs]
def execute_traffic(self) -> [ReplicaThread]:
"""
This is the main function responsible to start the traffic in the cluster
:param: debug: Enable/disable debugging mode
"""
self.notify_observers(event_name=self.before_traffic_start)
while self.next_event != "DONE":
if self.next_event == "REQUEST":
self._initiate_next_batch_of_requests()
elif self.next_event == "THREAD GEN":
self._initiate_next_endpoint_function_in_chain()
elif self.next_event == "EXEC TIME EST":
self._exec_time_estimation()
elif self.next_event == "RUN THREADS":
self._run_threads_on_hosts()
for host_name, host in self.sim.cluster.cluster_scheduler.hosts_dict.items():
if self.sim.time not in host.cpu.events:
host.cpu.events[self.sim.time] = {key: 0 for (key, value) in
enumerate(np.arange(0, len(host.cpu.cores)))}
self.notify_observers(event_name=self.after_completing_load_generation)
return self.threads
def _generate_threads(self, request: Request, subchain_id: int, load_balance: bool = True) -> None:
current_replicas = request.init_next_microservices(subchain_id)
self.notify_observers(event_name=self.before_generate_request_threads, request=request, subchain_id=subchain_id)
replica_pair = current_replicas[subchain_id]
replica_identifier_in_subchain = replica_pair[0]
replica = replica_pair[1]
current_node = request.current_nodes[subchain_id]
threads = replica.generate_threads(from_subchain_id=subchain_id,
node_in_subchain=current_node,
load_balance=load_balance,
parent_request=request,
replica_identifier_in_subchain=replica_identifier_in_subchain)
self.threads.extend(list(threads.values()))
self.notify_observers(event_name=self.after_generate_request_threads, request=request, subchain_id=subchain_id,
threads=threads, current_replicas=current_replicas)
def _initiate_next_endpoint_function_in_chain(self) -> None:
self.notify_observers(event_name=self.before_generate_threads)
_id = 0
_load_balance_on_id = len(self._requests_ready_for_thread_generation) - 1
for request_subchainid_pair in self._requests_ready_for_thread_generation:
self._generate_threads(subchain_id=request_subchainid_pair[0],
request=request_subchainid_pair[1],
load_balance=(_load_balance_on_id == _id))
_id += 1
self._requests_ready_for_thread_generation = []
self.next_event = "EXEC TIME EST"
self.previous_event = "THREAD GEN"
def _exec_time_estimation(self) -> None:
self.notify_observers(event_name=self.before_exec_time_estimation)
self.sim.cluster.topology.recalculate_transmissions_bw_on_all_links()
next_trans_completion_time = self._next_trans_completion_times.peekitem(0)[0]
if next_trans_completion_time == float('inf') or self._next_batch_arrival_time < next_trans_completion_time:
time_of_next_event = self._next_batch_arrival_time
self.prediction_for_the_next_event_after_running_threads = "REQUEST"
else:
time_of_next_event = self._next_trans_completion_times.peekitem(0)[0]
self.prediction_for_the_next_event_after_running_threads = "THREAD GEN"
self.time_of_next_event, self.duration_of_next_event, thread_ending_sooner = \
self.sim.cluster.is_there_a_thread_that_ends_sooner(time_of_next_event)
if self.duration_of_next_event == float('inf'):
if self.previous_event != "THREAD GEN":
self.next_event = "THREAD GEN"
else:
raise Exception("Next event can't be inf! Probably a bug! (number of active transmissions=" +
str(len(self.sim.cluster.topology.active_transmissions)) + ")")
else:
self.next_event = "RUN THREADS"
self.notify_observers(event_name=self.after_estimating_time_of_next_event,
next_trans_completion_time=next_trans_completion_time,
time_of_next_event=self.time_of_next_event,
next_batch_arrival_time=self._next_batch_arrival_time,
estimated_next_event=self.next_event,
is_thread_ending_sooner=thread_ending_sooner)
self.previous_event = "EXEC TIME EST"
def _run_threads_on_hosts(self) -> None:
self.notify_observers(event_name=self.before_executing_threads)
self.sim.time += self.duration_of_next_event
current_completed_threads = self.sim.cluster.run_active_threads(self.duration_of_next_event)
self.notify_observers(event_name=self.before_transmit_requests_in_network)
self.sim.cluster.transmit_requests_in_network(self.duration_of_next_event)
self.sim.cluster.load_balance_threads_in_all_hosts()
self._completed_threads += current_completed_threads
if self._completed_requests == self.total_requests_count:
self.next_event = "DONE"
else:
self.next_event = self.prediction_for_the_next_event_after_running_threads
self.notify_observers(event_name=self.after_transmit_requests_in_network_and_load_balancing_threads,
current_completed_threads=current_completed_threads)
self.previous_event = "RUN THREADS"
def _initiate_next_batch_of_requests(self) -> None:
self.notify_observers(event_name=self.before_requests_start)
try:
requests = [heapq.heappop(self.__merged_arrival_table)[1]]
request_subchain_id_pairs = [(0, requests[0])]
self.sim.time = requests[0].arrival_time
except IndexError:
self.next_event = "EXEC TIME EST"
return
while self.__merged_arrival_table and self.__merged_arrival_table[0][0] == requests[0].arrival_time:
requests.append(heapq.heappop(self.__merged_arrival_table)[1])
request_subchain_id_pairs.append((0, requests[-1]))
self.requests.extend(requests)
self._requests_ready_for_thread_generation.extend(request_subchain_id_pairs)
self.next_event = "THREAD GEN"
if self.__merged_arrival_table:
self._next_batch_arrival_time = self.__merged_arrival_table[0][1].arrival_time
else:
self._next_batch_arrival_time = float('inf')
self.previous_event = "REQUEST"
self.notify_observers(event_name=self.after_requests_start)
@property
def completed_requests(self) -> int:
"""
Return number of all completed requests
"""
return self._completed_requests
@completed_requests.setter
def completed_requests(self, v):
self._completed_requests = v
[docs]
def get_latencies_grouped_by_sfc(self) -> SimulationScenarioResultDict:
"""
Return list of latencies for all requests of the service chain with the given name
"""
mask = self.latencies['scenario #'].values == self.sim.scenario['name']
sim_lats = pd.DataFrame(self.latencies.values[mask], self.latencies.index[mask], self.latencies.columns)
grouped_lats = sim_lats.groupby("SFC")
result: SimulationScenarioResultDict = {"service_chains": {}}
for sfc_name, sfc_latencies in grouped_lats:
ranges = pd.cut(sfc_latencies['completion_time'], np.arange(0, self.sim.time + 1e9, 1e9))
grouped_completion_times = ranges.groupby(ranges, observed=False)
grouped_throughput = grouped_completion_times.count().fillna(0).to_dict()
grouped_lats_by_iteration = sfc_latencies.groupby("iteration_id")
r = {"latency": {}, "completion_time": {}, "arrival_time": {}, "traffic_type": {}}
for iteration_id, sfc_latencies_by_iteration in grouped_lats_by_iteration:
for key in r.keys():
r[key][iteration_id] = {}
for _id, result_in_iteration in sfc_latencies_by_iteration.iterrows():
r[key][iteration_id][_id] = result_in_iteration[key]
# TODO: Requests timeout need to be implemented
sfc: ServiceChainResultDict = {
'simulation_name': self.sim.name,
'estimated_cost': 0,
'total_requests': len(sfc_latencies),
'successful_requests': len(sfc_latencies),
'timeout_requests': 0,
'avg_latency': sfc_latencies["latency"].mean(),
'throughput': {str(k): grouped_throughput[k] for k in grouped_throughput.keys()},
'arrival_times': {"iterations": r['arrival_time']},
'latencies': {"iterations": r['latency']},
'completion_times': {"iterations": r['completion_time']},
'traffic_types': {"iterations": r['traffic_type']},
}
result["service_chains"][str(sfc_name)] = sfc
return result
[docs]
def plot_latencies(self,
save_dir=None,
marker='o',
show_numbers=None,
moving_average=False,
save_values=False,
show: bool = True):
plt.ioff()
perfsim_lats = self.latencies["latency"].copy().reset_index(drop=True)
fig = plt.figure(figsize=(15, 5), facecolor='w')
plt.rcParams.update({'font.size': 10})
if moving_average:
perfsim_lats = perfsim_lats.rolling(window=3).mean()
if save_values:
perfsim_lats.to_csv(path_or_buf=save_dir + "perfsim_lats.csv")
(perfsim_lats.values / 10 ** 9).tofile(save_dir + "perfsim_lats_values.txt", sep="\n")
ax1 = perfsim_lats.plot(label="PerfSim (sfc-stress)", marker=marker)
max_latency = perfsim_lats.max()
if show_numbers:
for i, v in enumerate(perfsim_lats):
ax1.text(i, v + 25, "%0.2f" % (v / 10 ** 6), ha="center")
plt.ylim(0, max_latency + max_latency * 0.1)
ax1.set_xlabel("Request #")
ax1.set_ylabel("Response Time (ms)")
ax1.get_yaxis().set_major_formatter(plt.FuncFormatter(lambda x, p: format(int(x) / 1000000, ',')))
fig.tight_layout()
plt.legend()
if save_dir is not None:
Utils.mkdir_p(dir_path=save_dir)
plt.savefig(save_dir + "/latency_evaluation.pdf")
if show:
plt.show()
else:
plt.close()
@property
def last_transmission_id(self):
return self._last_transmission_id
@last_transmission_id.setter
def last_transmission_id(self, v: int):
self._last_transmission_id = v
@property
def next_trans_completion_times(self):
return self._next_trans_completion_times
@next_trans_completion_times.setter
def next_trans_completion_times(self, v):
raise AttributeError("next_trans_completion_times is read-only! It's going to be set automatically!")
@property
def requests_ready_for_thread_generation(self):
return self._requests_ready_for_thread_generation
@requests_ready_for_thread_generation.setter
def requests_ready_for_thread_generation(self, v):
raise AttributeError("requests_ready_for_thread_generation is read-only! It's going to be set automatically!")
@property
def next_batch_arrival_time(self):
return self._next_batch_arrival_time
@next_batch_arrival_time.setter
def next_batch_arrival_time(self, v):
raise AttributeError("next_batch_arrival_time is read-only! It's going to be set automatically!")
[docs]
def merge_arrival_tables(self):
for scm_name, traffic_scenario_object in self.sim.scenario['traffic_scenario']['service_chains'].items():
traffic_proto = self.sim.traffic_prototypes_dict[traffic_scenario_object['traffic_type']]
for iteration_id, arrival_time in enumerate(traffic_proto.arrival_table):
for uid in range(traffic_proto.parallel_user):
rq_num = iteration_id * traffic_proto.parallel_user + uid
req_id = self.sim.scenario["name"] + "_" + traffic_proto.name + "_" + scm_name + "_" + str(rq_num)
arrival_time_request_tuple = (arrival_time, Request(request_id=req_id,
iteration_id=iteration_id,
id_in_iteration=uid,
load_generator=self,
traffic_prototype=traffic_proto,
scm=self.sim.cluster.scm_dict[scm_name],
arrival_time=arrival_time))
heapq.heappush(self.__merged_arrival_table, arrival_time_request_tuple)