Source code for perfsim.equipments.run_queue

#  Copyright (C) 2020 Michel Gokan Khan
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 2 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License along
#  with this program; if not, write to the Free Software Foundation, Inc.,
#  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
#  This file is a part of the PerfSim project, which is now open source and available under the GPLv2.
#  Written by Michel Gokan Khan, February 2020


from typing import TYPE_CHECKING, List, Dict

from sortedcontainers import SortedDict

from perfsim import ReplicaThread, ThreadSet

if TYPE_CHECKING:
    from perfsim import Core


[docs] class RunQueue: """ RunQueue is a queue of ReplicaThreads. """ #: List of ReplicaThreads rq: list[ReplicaThread] #: The list of active ReplicaThreads active_threads: set[ReplicaThread] #: All active best effort threads best_effort_active_threads: ThreadSet[ReplicaThread] #: All active guaranteed threads guaranteed_active_threads: ThreadSet[ReplicaThread] #: All active burstable threads burstable_active_threads: ThreadSet[ReplicaThread] #: All active burstable threads that do not have a limit burstable_unlimited_active_threads: ThreadSet[ReplicaThread] #: All active burstable threads that have a limit burstable_limited_active_threads: ThreadSet[ReplicaThread] #: The core that this RunQueue belongs to core: 'Core' #: The total load of the threads in the run queue _load: int """ Storing the lightest threads in the runqueue with their load (as the key) and the SortedDict of threads (as the value). In the value part (which is a SortedDoct), the key is the vruntime of the thread, and the value is the set of threads having that vruntime. Because at the end of the day we are using a set, we may still end up having random order of threads with the same vruntime. Even though we are only interested in the lightest threads in the run queue, this might affect the consistency of the result, as threads with the same load and vruntime might have different number of remaining instructions, so the order of the threads might affect the result. """ lightest_threads_in_rq: SortedDict[int, SortedDict[float, set[ReplicaThread]]] #: The total number of the best effort threads in the run queue _total_best_effort_threads: int #: The remaining millicores that can be allocated to threads in the run queue _remaining_millicores: int #: The total number of the threads in the run queue thread_set_dict: Dict[str, set[ThreadSet]] def __init__(self, core: 'Core') -> None: self.rq = [] self.lightest_threads_in_rq = SortedDict() self._load = 0 # self.__time = 0 self.nr = 0 # self.threads_df = pd.DataFrame(columns=[str(_thread.pid) + str(_thread.id) for _thread in threads]) self.threads_total_time = {"idle": {}} self.core = core self.active_threads = set() self.best_effort_active_threads = ThreadSet(type_of_set=0) self.guaranteed_active_threads = ThreadSet(type_of_set=1) self.burstable_active_threads = ThreadSet(type_of_set=2) self.burstable_unlimited_active_threads = ThreadSet(type_of_set=3) self.burstable_limited_active_threads = ThreadSet(type_of_set=4) self._remaining_millicores = self.core.cpu.max_cpu_requests self.thread_set_dict = {}
[docs] def reinit(self): """ Reinitialize the run queue. :return: """ self.__init__(self.core)
[docs] def requeue_task(self, thread: ReplicaThread) -> None: """ Requeue a thread in the run queue. :param thread: :return: """ self.dequeue_task_by_thread(thread=thread, load_balance=False) self.enqueue_task(thread=thread, load_balance=True)
@staticmethod def __calc_buckets_limits(bucket_limits, fair_share): """ Calculate the bucket limits. :param bucket_limits: :param fair_share: :return: """ _limited_buckets = [b for b in bucket_limits if b is not None and b <= fair_share] limited_bucket_count = len(_limited_buckets) unlimited_bucket_count = len(bucket_limits) - limited_bucket_count return _limited_buckets, limited_bucket_count, unlimited_bucket_count @staticmethod def __validate_thread_on_rq(thread: ReplicaThread) -> None: """ Validate if the thread is on the run queue. :param thread: :return: """ if not thread.on_rq: raise Exception("How come thread is not in the rq?!") def _reset_sum_of_shares_in_sets(self): """ Reset the sum of shares in the sets, i.e., best effort, guaranteed, burstable, burstable unlimited, and burstable limited. :return: """ self.best_effort_active_threads.sum_cpu_requests = 0 self.guaranteed_active_threads.sum_cpu_requests = 0 self.burstable_active_threads.sum_cpu_requests = 0 self.burstable_unlimited_active_threads.sum_cpu_requests = 0 self.burstable_limited_active_threads.sum_cpu_requests = 0
[docs] def recalculate_cpu_requests_shares(self) -> None: """ Recalculate the CPU requests shares. :return: """ if len(self.active_threads) <= 0: return self._remaining_millicores = self.core.cpu.max_cpu_requests # self._reset_sum_of_shares_in_sets() # Even if the thread load goes above a thousand here, it won't be alongside best efforts. for thread in self.guaranteed_active_threads: self.assign_cpu_requests_share(thread=thread, cpu_requests=thread.process.get_cpu_request_per_thread()) for thread in self.burstable_active_threads: self.assign_cpu_requests_share(thread=thread, cpu_requests=thread.process.get_cpu_request_per_thread()) self._remaining_millicores -= self.guaranteed_active_threads.sum_cpu_requests + \ self.burstable_active_threads.sum_cpu_requests for thread in self.burstable_unlimited_active_threads: cpu_req_ratio = thread.process.get_cpu_request_per_thread() / self.burstable_active_threads.sum_cpu_requests share = self._remaining_millicores * cpu_req_ratio self.assign_cpu_requests_share(thread, thread.cpu_requests_share + share) self._remaining_millicores -= self.burstable_unlimited_active_threads.sum_cpu_requests if self._remaining_millicores > 0 and len(self.best_effort_active_threads) > 0: _fair_share_be = self._remaining_millicores / len(self.best_effort_active_threads) for thread in self.best_effort_active_threads: self.assign_cpu_requests_share(thread, _fair_share_be) self._remaining_millicores -= _fair_share_be * len(self.best_effort_active_threads)
[docs] def run_idle(self, duration: int) -> None: """ Run the idle threads for the given duration. :param duration: :return: """ try: self.threads_total_time["idle"][int(self.core.cpu.host.cluster.sim.time)] = int(duration) except OverflowError: try: self.threads_total_time["idle"][float('inf')] = int(duration) except OverflowError: self.threads_total_time["idle"][float('inf')] = float('inf')
# from fair.c # /* # * The idea is to set a period in which each task runs once. # * # * When there are too many tasks (sched_nr_latency) we have to stretch # * this period because otherwise the slices get too small. # * # * p = (nr <= nl) ? l : l*nr/nl # */ # def sched_period(self, nr_running: int) -> int: # nr_latency = self.core.cpu.host.sched_latency_ns / self.core.cpu.host.sched_min_granularity_ns # # if nr_running > nr_latency: # return nr_running * self.core.cpu.host.sched_min_granularity_ns # else: # return self.core.cpu.host.sched_latency_ns
[docs] def assign_cpu_requests_share(self, thread: ReplicaThread, cpu_requests: float) -> None: """ Assign the CPU requests share to the thread. :param thread: :param cpu_requests: :return: """ if cpu_requests is None: raise Exception("CPU requests cannot be None") self.__validate_thread_on_rq(thread) if cpu_requests <= self.core.cpu.max_cpu_requests: thread.cpu_requests_share = cpu_requests else: thread.cpu_requests_share = self.core.cpu.max_cpu_requests
[docs] def categorize_thread_into_sets(self, thread: ReplicaThread) -> None: """ Categorize the thread into the sets, i.e., best effort, guaranteed, burstable, burstable unlimited, and burstable limited. :param thread: :return: """ if thread.process.ms_replica.microservice.is_best_effort(): self.best_effort_active_threads.add(thread) elif thread.process.ms_replica.microservice.is_guaranteed(): self.guaranteed_active_threads.add(thread) elif thread.process.ms_replica.microservice.is_burstable(): self.burstable_active_threads.add(thread) if thread.process.ms_replica.microservice.is_unlimited_burstable(): self.burstable_unlimited_active_threads.add(thread) else: self.burstable_limited_active_threads.add(thread) else: raise Exception("Unknown thread type")
[docs] def decategorize_thread_from_sets(self, thread: ReplicaThread) -> None: """ Decategorize the thread from the sets, i.e., best effort, guaranteed, burstable, burstable unlimited, and burstable limited. :param thread: :return: """ if thread.process.ms_replica.microservice.is_best_effort(): self.best_effort_active_threads.remove(thread) elif thread.process.ms_replica.microservice.is_guaranteed(): self.guaranteed_active_threads.remove(thread) elif thread.process.ms_replica.microservice.is_burstable(): self.burstable_active_threads.remove(thread) if thread.process.ms_replica.microservice.is_unlimited_burstable(): self.burstable_unlimited_active_threads.remove(thread) else: self.burstable_limited_active_threads.remove(thread) else: raise Exception("Unknown thread type")
[docs] def enqueue_task(self, thread: ReplicaThread, load_balance: bool = False) -> None: """ Enqueue a thread in the run queue. :param thread: :param load_balance: :return: """ if thread.core is not None: raise Exception("Error: ReplicaThread already belongs to a core/runqueue") # thread.vruntime = self.rq[0].vruntime if len(self.rq) != 0 else 0 if thread.instructions <= 0: raise Exception("Why on earth a zombie thread is enqueuing on a rq?") thread.core = self.core self.rq.append(thread) self.active_threads.add(thread) self.nr += 1 self.core.cpu.host.cluster.cluster_scheduler.active_threads.add(thread) self.categorize_thread_into_sets(thread) if thread.on_rq: self.load += thread.average_load * thread.load self.assign_cpu_requests_share(thread, thread.process.get_cpu_request_per_thread()) else: raise Exception("How come a thread with on_rq==False attempts to get enqueued?") df_column_name = str(thread.id) if df_column_name not in self.threads_total_time: self.threads_total_time[df_column_name] = {0: int(0)} if self.core.cpu.host.cluster.sim.log_timeline: self.core.cpu.host.timeline_event.append("enq " + df_column_name + " on " + str(self.core.cpu.name)) self.core.cpu.host.timeline_time.append(str(round(float(self.core.cpu.host.cluster.sim.time), 5))) self.core.cpu.host.threads.add(thread) if len(self.core.cpu.host.threads) < 2: # Activating host " + str(self.core.cpu.host) + " while adding thread " + str(_thread.id) self.core.cpu.host.cluster.cluster_scheduler.active_hosts.add(self.core.cpu.host) if not self.core.cpu.host.load_balancing_needed: # Forcing host " + str(self.core.cpu.host) + " to load balance while adding thread " + str(_thread.id) self.core.cpu.host.cluster.cluster_scheduler.hosts_need_load_balancing.add(self.core.cpu.host) self.core.cpu.host.load_balancing_needed = True self.core.cpu.add_to_threads_sorted(thread=thread, inverted_thread_load=thread.load * -1) self.core.cpu.update_idle_pairs(core=self.core) if load_balance: self.core.cpu.load_balance()
[docs] def enqueue_tasks(self, threads: List[ReplicaThread], load_balance: bool = False) -> None: """ Enqueue a list of threads in the run queue at once. :param threads: :param load_balance: :return: """ for _thread in threads: self.enqueue_task(_thread, load_balance=False) if load_balance: self.core.cpu.load_balance()
[docs] def remove_from_lightest_threads_in_rq(self, thread): """ Remove the thread from the lightest threads in the run queue. :param thread: :return: """ if thread.load in self.lightest_threads_in_rq: if thread.vruntime in self.lightest_threads_in_rq[thread.load]: self.lightest_threads_in_rq[thread.load][thread.vruntime].discard(thread) if len(self.lightest_threads_in_rq[thread.load][thread.vruntime]) == 0: del self.lightest_threads_in_rq[thread.load][thread.vruntime] if len(self.lightest_threads_in_rq[thread.load]) == 0: del self.lightest_threads_in_rq[thread.load]
[docs] def add_to_lightest_threads_in_rq(self, thread: ReplicaThread): """ Add the thread to the lightest threads in the run queue. :param thread: :return: """ if thread.load in self.lightest_threads_in_rq: if thread.vruntime in self.lightest_threads_in_rq[thread.load]: self.lightest_threads_in_rq[thread.load][thread.vruntime].add(thread) else: self.lightest_threads_in_rq[thread.load][thread.vruntime] = {thread} else: self.lightest_threads_in_rq[thread.load] = SortedDict({thread.vruntime: {thread}})
[docs] def dequeue_task_by_thread(self, thread: ReplicaThread, load_balance: bool = False) -> None: """ Dequeue a thread from the run queue by the thread. :param thread: :param load_balance: :return: """ if not thread.on_rq: raise Exception("How come a thread with on_rq==False attempts to get dequeued?") _index = self.rq.index(thread) del self.rq[_index] self.active_threads.remove(thread) self.remove_from_lightest_threads_in_rq(thread=thread) self.nr -= 1 self.decategorize_thread_from_sets(thread) thread.core = None self.load -= thread.average_load * thread.load df_col_name = str(thread.process.pname) + "_" + str(thread.id) + "_" + str( int(thread.process.original_cpu_requests_share)) if self.core.cpu.host.cluster.sim.time != 0: if self.core.cpu.host.cluster.sim.log_timeline: self.core.cpu.host.timeline_event.append("deq " + df_col_name + " from " + str(self.core.name)) self.core.cpu.host.timeline_time.append(str(round(float(self.core.cpu.host.cluster.sim.time), 5))) if load_balance: self.core.cpu.load_balance() self.core.cpu.update_idle_pairs(core=self.core)
# @overload # def dequeue_task(self, _thread: int) -> ReplicaThread:
[docs] def dequeue_task_by_thread_index(self, thread: int, load_balance: bool = False) -> ReplicaThread: """ Dequeue a thread from the run queue by the thread index in the run queue. :param thread: :param load_balance: :return: """ t = self.rq[thread] self.dequeue_task_by_thread(t, load_balance) return t
@property def load(self) -> int: """ Get the load of the run queue. :return: """ return self._load @load.setter def load(self, v: int): """ Set the load of the run queue. :param v: :return: """ if self._load != v: self.core.cpu.remove_from_pairs_sorted(pair_id=self.core.pair_id, inverted_pair_load=self.core.cpu.pairs_load[self.core.pair_id] * -1) self._load = v other_id = self.core.cpu.get_the_other_core_in_pair(core_id=self.core.id_in_cpu) if other_id is not None: other = self.core.cpu.cores[other_id] self.core.cpu.pairs_load[self.core.pair_id] = other.runqueue.load + v else: self.core.cpu.pairs_load[self.core.pair_id] = v self.core.cpu.add_to_pairs_sorted(pair_id=self.core.pair_id, inverted_pair_load=self.core.cpu.pairs_load[self.core.pair_id] * -1) def __lt__(self, other): """ Compare the run queue with another run queue (less than). :param other: :return: """ if isinstance(other, RunQueue): return self.load < other.load def __gt__(self, other): """ Compare the run queue with another run queue (greater than). :param other: :return: """ if isinstance(other, RunQueue): return self.load > other.load def __le__(self, other): """ Compare the run queue with another run queue (less than or equal to). :param other: :return: """ if isinstance(other, RunQueue): return self.load == other.load or self < other def __ge__(self, other): """ Compare the run queue with another run queue (greater than or equal to). :param other: :return: """ if isinstance(other, RunQueue): return self.load == other.load or self > other