Source code for perfsim.observers.load_generator_log_observer

#  Copyright (C) 2020 Michel Gokan Khan
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 2 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License along
#  with this program; if not, write to the Free Software Foundation, Inc.,
#  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
#  This file is a part of the PerfSim project, which is now open source and available under the GPLv2.
#  Written by Michel Gokan Khan, February 2020

from typing import List, Tuple, Union, TYPE_CHECKING, Dict

from perfsim import Event, LogObserver

if TYPE_CHECKING:
    from perfsim import ReplicaThread, LoadGenerator, Request, ReplicaThread, MicroserviceReplica, TrafficPrototype


[docs] class LoadGeneratorLogObserver(LogObserver): """ This class is responsible for logging the events of the load generator. """ def __init__(self, load_generator: 'LoadGenerator'): super().__init__(name="LoadGeneratorLogObserver", subject=load_generator, logger=load_generator.sim.logger) @Event def before_traffic_start(self): """ Log the start of the traffic. :return: None """ if self.subject.sim.debug: self.logger.print_all() @Event def before_requests_start(self): """ Log the start of the requests. :return: None """ if self.subject.sim.debug: self.logger.log("--- REQUEST ---") @Event def before_generate_threads(self): """ Log the generation of threads. :return: None """ if self.subject.sim.debug: self.logger.log("--- THREAD GENERATION ---") @Event def before_exec_time_estimation(self): """ Log the estimation of the execution time. :return: None """ if self.subject.sim.debug: self.logger.log("--- EXEC TIME ESTIMATION ---") self.logger.log(" * Next network transmission is going to complete at " + str(self.subject.next_trans_completion_times.peekitem(0)[0]), 1) self.logger.log(" * Next batch of requests will arrive at " + str(self.subject.next_batch_arrival_time), 1) @Event def before_executing_threads(self): """ Log the execution of threads. :return: """ if self.subject.sim.debug: self.logger.log("--- RUN THREADS ---") @Event def after_completing_load_generation(self): """ Log the completion of the load generation. :return: """ if self.subject.sim.debug: self.logger.log("--- ***** DONE ***** ---") self.logger.log("--- ***** Total execution time = " + str(self.subject.sim.time) + " ***** ---", 1) @Event def after_next_batch_arrival_time_calculation(self, next_scm_names: list[str], next_batch_arrival_time: int): """ Log the calculation of the next batch arrival time. :param next_scm_names: :param next_batch_arrival_time: :return: """ if self.subject.sim.debug: self.logger.log(" Next batch of requests belongs to scm ID " + str(next_scm_names) + " and will arrive at " + str(next_batch_arrival_time) + "", 3) @Event def before_generate_request_threads(self, request: 'Request', subchain_id: int): """ Log the generation of threads for a request. :param request: :param subchain_id: :return: """ if self.subject.sim.debug: self.logger.log(" * Generating threads in request #" + str(request.id) + " for subchain #" + str(subchain_id)) @Event def after_generate_request_threads(self, request: 'Request', subchain_id: int, threads: Dict[str, 'ReplicaThread'], current_replicas: List[Tuple[int, 'MicroserviceReplica']]): """ Log the generation of threads for a request. :param request: :param subchain_id: :param threads: :param current_replicas: :return: """ if self.subject.sim.debug: thread_ids = [str(_t.id) for _t in threads.values()] replica_pair = current_replicas[subchain_id] replica_identifier_in_subchain = replica_pair[0] replica = replica_pair[1] self.logger.log(" * Generated " + str(len(threads)) + " threads for replica node (" + str(replica_identifier_in_subchain) + "," + str(replica) + ") (total threads count=" + str(len(self.subject.threads)) + ") - threads ids=" + str(thread_ids), 1) @Event def after_estimating_time_of_next_event(self, next_trans_completion_time: Union[int, float], time_of_next_event: Union[int, float], next_batch_arrival_time: Union[int, float], estimated_next_event: str, is_thread_ending_sooner: bool): """ Log the estimation of the time of the next event. :param next_trans_completion_time: :param time_of_next_event: :param next_batch_arrival_time: :param estimated_next_event: :param is_thread_ending_sooner: :return: """ if self.subject.sim.debug: self.logger.log(" * Next transmission will complete at " + str(next_trans_completion_time), 2) self.logger.log(" * Time of next event is " + str(time_of_next_event), 2) self.logger.log(" * Next batch arrival time is " + str(next_batch_arrival_time), 2) self.logger.log(" * Next estimated event (so far) is " + estimated_next_event, 2) self.logger.log(" * Thread completing next = " + str(is_thread_ending_sooner), 2) self.logger.log(" * Next estimated event after executing threads is " + self.subject.prediction_for_the_next_event_after_running_threads, 2) if self.subject.duration_of_next_event == float('inf'): if self.subject.previous_event != "THREAD GEN": self.logger.log(" ** Duration of next event is Infinity! It means there are some leftover " "threads that were not getting generated in the previous step! " "Let's check for any left over threads...!", 1) else: self.logger.log(" ** Well, the number of active transmissions is " + str(len(self.subject.sim.cluster.topology.active_transmissions))) else: self.logger.log(" ** Running threads on hosts for " + str(self.subject.duration_of_next_event) + "ns!", 10) @Event def before_transmit_requests_in_network(self): """ Log the transmission of requests in the network. :return: """ if self.subject.sim.debug: self.logger.log("- Transmitting packets for " + str(self.subject.duration_of_next_event) + "ns!", 3) @Event def after_transmit_requests_in_network_and_load_balancing_threads(self, current_completed_threads: int): """ Log the transmission of requests in the network and the load balancing of threads. :param current_completed_threads: :return: """ if self.subject.sim.debug: self.logger.log(" * " + str(current_completed_threads) + " threads completed", 1) if self.subject.completed_requests == self.subject.total_requests_count: self.logger.log("- No more request to run! DONE!", 1) else: self.logger.log("- There are remaining threads, next event=" + str(self.subject.next_event), 1) self.logger.log(" * completed requests == " + str(self.subject.completed_requests) + " | total requests count == " + str(self.subject.total_requests_count), 10) @Event def before_request_created(self, request_number: int, scm_name: str, request_id: str, traffic_prototype: 'TrafficPrototype'): """ Log the creation of a request. :param request_number: :param scm_name: :param request_id: :param traffic_prototype: :return: """ if self.subject.sim.debug: self.logger.log(" * Starting request #" + str(request_number) + "/" + str(traffic_prototype.requests_count) + " (id=" + request_id + ") of scm ID " + scm_name, 1) @Event def after_requests_start(self): """ Log the start of the requests. :return: """ if self.subject.sim.debug: self.logger.log(" * A batch of requests has been initiated. Next batch will arrive at " + str(self.subject.next_batch_arrival_time), 1)