Source code for perfsim.observers.cluster_log_observer

#  Copyright (C) 2020 Michel Gokan Khan
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 2 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License along
#  with this program; if not, write to the Free Software Foundation, Inc.,
#  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
#  This file is a part of the PerfSim project, which is now open source and available under the GPLv2.
#  Written by Michel Gokan Khan, February 2020

from typing import Union, TYPE_CHECKING

from perfsim import LogObserver, Event

if TYPE_CHECKING:
    from perfsim import Cluster, Host, ReplicaThread, Request


[docs] class ClusterLogObserver(LogObserver): """ This class is responsible for logging the events of the cluster. """ def __init__(self, cluster: 'Cluster'): super().__init__(name="ClusterLogObserver", subject=cluster, logger=cluster.sim.logger) @Event def after_finish_running_threads_on_a_host(self, host: 'Host', completed_threads_on_host: int, completed_threads_on_all_hosts: int): """ Log the completion of threads on a host. :param host: :param completed_threads_on_host: :param completed_threads_on_all_hosts: :return: """ if self.subject.sim.debug: self.logger.log(" ** Completed execution of " + str(completed_threads_on_host) + " threads on host " + str(host) + " - Total completed" + " threads = " + str(completed_threads_on_all_hosts), 3) @Event def after_finish_running_a_thread(self, thread: 'ReplicaThread', current_completed_threads: int, completed_threads: int): """ Log the completion of a thread. :param thread: :param current_completed_threads: :param completed_threads: :return: """ if self.subject.sim.debug: if current_completed_threads == 1: self.logger.log( " ** Completed execution of thread #" + str(thread.id) + " - Total completed threads = " + str(completed_threads) + "/" + str(len(self.subject.cluster_scheduler.active_threads)), 3) @Event def before_transmitting_requests_in_network(self): if self.subject.sim.debug: self.logger.log(" *** Number of active transmissions: " + str(len(self.subject.topology.active_transmissions))) @Event def in_transmitting_an_active_transmission(self, request: 'Request', active_subchain_id: int, duration: Union[int, float]): """ Log the status of the request while transmitting an active transmission. :param request: :param active_subchain_id: :param duration: :return: """ if self.subject.sim.debug: log = self.logger.log log(" *** Checking status of request #" + str(request)) log(" **** transmission_times=" + str(request.trans_times), 3) log(" **** transmission_exact_times=" + str(request.trans_exact_times), 3) if request.subchains_status[active_subchain_id] == "IN TRANSMISSION": log(" **** Subchain ID " + str(active_subchain_id) + " in this request, has a \"IN TRANSMISSION\" node, let's reduce" + " its transmission time by " + str(duration), 3) @Event def after_transmitting_an_active_transmission(self, request: 'Request', active_subchain_id: int, duration: Union[int, float]): """ Log the remaining transmission time of the request after transmitting an active transmission. :param request: :param active_subchain_id: :param duration: :return: """ if self.subject.sim.debug: self.logger.log(" ***** Remaining transmission time=" + str(request.trans_times[active_subchain_id]), 3) if request.trans_times[active_subchain_id] <= 0: self.subject.sim.logger.log(" **** Transmission time <= 0 -> finish transmission", 3) @Event def before_load_balancing_a_host(self, host: 'Host'): """ Log the host that is going to be load balanced. :param host: :return: """ if self.subject.sim.debug: self.logger.log(" ** Load balancing threads in host " + str(host), 3) @Event def before_calling_is_there_a_thread_that_ends_sooner_function(self, time_of_next_event: Union[int, float]): """ Log the time of the next event before calling the function is_there_a_thread_that_ends_sooner. :param time_of_next_event: :return: """ if self.subject.sim.debug: self.logger.log("- Checking if a thread's execution is going to end sooner " + "than next event (which is " + str(time_of_next_event) + ") or" + "not. If so, then we need to change the execution time...", 3) @Event def before_checking_a_thread_ends_sooner(self, thread: 'ReplicaThread', duration_to_finish: Union[int, float], time_to_finish: Union[int, float], time_of_next_event: Union[int, float]): """ Log the information of the thread that is going to be checked if it ends sooner than the next event. :param thread: :param duration_to_finish: :param time_to_finish: :param time_of_next_event: :return: """ if self.subject.sim.debug: log = self.logger.log log(" ** Initiating to run thread #" + str(thread.id) + " belongs to replica " + str(thread.replica) + " on host " + str(thread.core.cpu.host), 3) log(" *** It will take " + str(duration_to_finish) + " to execute this thread, therefore, it will " + "end at " + str(time_to_finish), 3) if time_to_finish < time_of_next_event: log(" **** Because it takes less time to complete this thread (end time=" + str(time_to_finish) + ") than starting next planned event (" + str(time_of_next_event) + "), we will replace " + "time_of_next_event with " + str(time_to_finish) + "!", 3) @Event def after_calling_is_there_a_thread_that_ends_sooner_function(self, result: bool, time_of_next_event: Union[int, float], duration_of_next_event: Union[int, float]): """ Log the result of the function is_there_a_thread_that_ends_sooner. :param result: :param time_of_next_event: :param duration_of_next_event: :return: """ if self.subject.sim.debug: if result: self.logger.log(" ** Next event occurs sooner than finishing at least one thread. Therefore," " let's run threads only until next event's occurrence. Duration between now and" " next event = " + str(duration_of_next_event), 3) self.logger.log(" *** Next event will occur at " + str(time_of_next_event), 3) else: if self.subject.sim.debug: self.logger.log(" * Going to execute all threads on all hosts for " + str(duration_of_next_event) + "ns! (until next event)", 3)