# Copyright (C) 2020 Michel Gokan Khan
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# This file is a part of the PerfSim project, which is now open source and available under the GPLv2.
# Written by Michel Gokan Khan, February 2020
from __future__ import annotations
import math
import os
import time as tt
from copy import deepcopy
from typing import TYPE_CHECKING, List, Dict, Union, Set
import numpy as np
import pandas as pd
import plotly.express as px
from sortedcontainers import SortedDict, SortedSet
from perfsim import Logger, Core, RunQueue, Observable, CPULogObserver, Utils, Plotter, ReplicaThread
if TYPE_CHECKING:
from perfsim import Host
"""
A *CPU* object represents a single NUMA node with a given *clock_rate* and *name*
in the system. Each NUMA node contains a number of cores represented by the
*cores_count*.
Here are the possible initialization parameters:
`name`
Name of the CPU. i.e. *CPU1*
`cores_count`
Number of cores that this CPU contains.
`cpu_clock_rate`
The maximum clock rate of this CPU for this host (in Hertz).
There are also other parameters that one can access after initialization:
`cores`
Return a list of cores in a *Cpu*
"""
[docs]
class CPU(Observable):
"""
The CPU class represents a single NUMA node in the system. It contains a number
of cores and has a clock rate. It is responsible for load balancing among the
cores and the threads that are assigned to them.
"""
#: The list of cores in this CPU
cores: List[Core]
#: sched_domain_hierarchy similar to the Linux kernel
sched_domain_hierarchy = ["core pairs", "node"]
#: The clock rate of the CPU in Hertz
_clock_rate: int
#: The clock rate of the CPU in nanohertz
_clock_rate_in_nanohertz: float
#: The host that this CPU belongs to
host: Host
#: Sorted dictionary of all threads (by load) belongs to this CPU (not reliable, only for emergency load balancing)
threads_sorted: SortedDict[int, SortedDict[float, set[ReplicaThread]]]
#: Sorted dictionary of all pairs (by load) belongs to this CPU (not reliable, only for load balancing)
pairs_sorted: SortedDict[int, Set[int]]
#: Stores the id of cores in pairs that are idle (the key is pair_id in CPU and values are idle cores)
idle_core_pair_ids: Dict[int, SortedSet[int]]
#: Stores the id of idle pairs (sorted by the pair id)
idle_pair_ids: SortedSet[int]
#: Stores the id of idle cores (sorted by the core id)
idle_core_ids: SortedSet[int]
#: Stores the load of each pair (sorted by the load)
pairs_load: List[int]
def __init__(self, name: str, cores_count: int, clock_rate: int, host: Host):
self.name = name
self.clock_rate = clock_rate # in hertz
self.cores = []
self.host = host
self.max_cpu_requests = 1000
self.events = {0: {key: 0 for (key, value) in enumerate(np.arange(0, cores_count))}}
self.thread_events = {0: {key: 0 for (key, value) in enumerate(np.arange(0, cores_count))}}
self.threads_sorted = SortedDict()
self.pairs_sorted = SortedDict()
self.idle_core_pair_ids = {}
self.idle_pair_ids = SortedSet()
self.idle_core_ids = SortedSet()
self.pairs_load = []
for core_id in np.arange(0, cores_count):
self.cores.append(Core(cpu=self, core_id=self.name + "_core" + str(core_id), core_id_in_cpu=core_id))
self.idle_pair_ids.add(self.cores[core_id].pair_id)
self.idle_core_ids.add(core_id)
if self.cores[core_id].pair_id not in self.idle_core_pair_ids:
self.idle_core_pair_ids[self.cores[core_id].pair_id] = SortedSet([core_id])
self.pairs_load.append(0)
else:
self.idle_core_pair_ids[self.cores[core_id].pair_id].add(core_id)
self.sched_domains = {}
self.sched_domains = self.get_sched_domains()
self.sched_groups = []
super().__init__()
if host.cluster is not None and host.cluster.sim.debug:
self.attach_observer(CPULogObserver(cpu=self))
# _core_rq_exec_order = SortedList(core_rqs[0])
# _core_rq_exec_order = 0
[docs]
def reinit(self):
"""
Reinitialize the CPU
:return:
"""
self.__init__(name=self.name, cores_count=len(self.cores), clock_rate=self.clock_rate, host=self.host)
[docs]
def register_events(self):
pass
[docs]
def get_available(self) -> int:
"""
Returns available cores in the *CPU*
:return: Returns available cores in the *CPU*
:rtype: int
"""
available = 0
for core in self.cores:
available += core.get_available()
return available
@property
def capacity(self):
"""
Returns the total capacity of the *CPU* in terms of CPU requests (each core is 1000 milicores).
:return:
"""
capacity = 0
for core in self.cores:
capacity += core.capacity
return capacity
[docs]
def is_there_enough_resources_to_reserve(self, amount: int) -> bool:
"""
Check if there are enough resources to reserve *amount* of CPU in the *CPU*.
:param amount:
:return:
"""
if self.get_available() >= amount:
return True
else:
return False
[docs]
def reserve(self, amount: int) -> None:
"""
Uniformly reserve a given *amount* of CPU within all the *cores* in the
*CPU*.
:param amount:
"""
cpu_requests_per_core = round(amount / len(self.cores))
for core in self.cores:
core.reserve(amount=cpu_requests_per_core)
[docs]
def release(self, amount: int) -> None:
"""
Uniformly release a given *amount* of CPU within all the *cores* in the
*CPU*.
:param amount:
:return:
"""
cpu_requests_per_core = round(amount / len(self.cores))
for core in self.cores:
core.release(amount=cpu_requests_per_core)
def _get_clock_rate_in_nanohertz(self) -> float:
"""
Returns the clock rate of the CPU in nanohertz
:return:
"""
return self.clock_rate / 10 ** 9
[docs]
def get_idle_core_in_sd(self, sd_name: str, sd: List, numa_node_id: int, current_core_in_sd: Core) -> int:
"""
Get the idle core in the given sched domain
:param sd_name:
:param sd:
:param numa_node_id:
:param current_core_in_sd:
:return:
"""
if len(sd) != 0 and len(sd[numa_node_id]) != 0:
if sd_name == CPU.sched_domain_hierarchy[0]:
the_other_core_id_in_pair = self.get_the_other_core_in_pair(core_id=current_core_in_sd.id_in_cpu,
return_same_if_not_exists=True)
if the_other_core_id_in_pair in self.idle_core_pair_ids[current_core_in_sd.pair_id]:
return the_other_core_id_in_pair
elif current_core_in_sd.id_in_cpu in self.idle_core_ids:
return current_core_in_sd.id_in_cpu
elif sd_name == CPU.sched_domain_hierarchy[1]:
if len(self.idle_pair_ids) != 0:
_next_idle_pair_id = next(iter(self.idle_pair_ids))
next_idle_core_id = self.idle_core_pair_ids[_next_idle_pair_id][0]
return next_idle_core_id
return -1
else:
return 0
[docs]
def get_the_other_core_in_pair(self, core_id: int, return_same_if_not_exists: bool = False) -> Union[None, int]:
"""
Get the other core in the pair
:param core_id:
:param return_same_if_not_exists:
:return:
"""
if core_id % 2 == 0:
# TODO: If in the future, we want to support more than one NUMA node, we need to change this
if len(self.sched_domains['core pairs'][0][int(core_id / 2)]) == 1:
# There is only one core in the pair
return core_id if return_same_if_not_exists else None
else:
return core_id + 1
else:
return core_id - 1
[docs]
def get_core_pairs(self) -> List[List[int]]:
"""
Get the core pairs
:return:
"""
numa_node_id = 0
pairs_range = range(int(len(self.cores) + 1 / 2))
pairs = []
for core_id in range(len(self.cores)):
if core_id % 2 == 0:
pairs.append([core_id])
else:
pairs[math.ceil(core_id / 2) - 1].append(core_id)
return pairs
[docs]
def get_sched_domains(self) -> Dict:
"""
Get the sched domains
:return:
"""
numa_node_id = 0
self.sched_domains[self.sched_domain_hierarchy[0]] = []
self.sched_domains[self.sched_domain_hierarchy[0]].append(numa_node_id) # assume we have 1 CPU socket per host
self.sched_domains[self.sched_domain_hierarchy[0]][numa_node_id] = self.get_core_pairs()
self.sched_domains[self.sched_domain_hierarchy[1]] = []
self.sched_domains[self.sched_domain_hierarchy[1]].append(
self.sched_domains[self.sched_domain_hierarchy[0]][numa_node_id])
return self.sched_domains
[docs]
def get_busiest_core_in_pair_by_core_id(self, core_id) -> Union['Core', None]:
"""
Get the busiest core in the pair
:param core_id:
:return:
"""
the_other_core_id_in_cur_pair = self.get_the_other_core_in_pair(core_id=core_id)
if the_other_core_id_in_cur_pair is None:
return self.cores[core_id]
busiest_core = None
if self.cores[core_id].runqueue.load > self.cores[the_other_core_id_in_cur_pair].runqueue.load:
busiest_core = self.cores[core_id]
elif self.cores[core_id].runqueue.load < self.cores[the_other_core_id_in_cur_pair].runqueue.load:
busiest_core = self.cores[the_other_core_id_in_cur_pair]
elif len(self.cores[core_id].runqueue.active_threads) >= \
len(self.cores[the_other_core_id_in_cur_pair].runqueue.active_threads):
busiest_core = self.cores[core_id]
else:
busiest_core = self.cores[the_other_core_id_in_cur_pair]
return busiest_core
[docs]
def get_busiest_core_in_pair(self, pair_id) -> Union['Core', None]:
"""
Get the busiest core in the pair
:param pair_id:
:return:
"""
return self.get_busiest_core_in_pair_by_core_id(core_id=pair_id * 2)
[docs]
def get_busiest_core_in_busiest_pair(self, current_pair_id, numa_node_id: int = 0) -> Union['Core', None]:
"""
Get the busiest core in the busiest pair
:param current_pair_id:
:param numa_node_id:
:return:
"""
if len(self.pairs_sorted) > 0:
_busiest_pair_inverted_load = next(iter(self.pairs_sorted))
_busiest_pair_load = _busiest_pair_inverted_load * -1
if self.pairs_load[current_pair_id] > _busiest_pair_load:
raise Exception("How come the busiest pair (load = " + str(self.pairs_load[current_pair_id]) +
") is lighter than the current pair (load = " + str(_busiest_pair_inverted_load) + ")?")
elif self.pairs_load[current_pair_id] == _busiest_pair_load:
return None
_busiest_pairs_iter = iter(self.pairs_sorted[_busiest_pair_inverted_load])
_next_busiest_pair = next(_busiest_pairs_iter)
if _next_busiest_pair == current_pair_id and len(self.pairs_sorted[_busiest_pair_inverted_load]) > 1:
_next_busiest_pair = next(_busiest_pairs_iter)
busiest_core_in_pair = self.get_busiest_core_in_pair(_next_busiest_pair)
return busiest_core_in_pair if busiest_core_in_pair is not None else self.cores[_next_busiest_pair * 2]
return None
[docs]
def load_balance_threads_among_runqueues(self) -> List[List[RunQueue]]:
"""
Load balance threads among runqueues
:return:
"""
time1 = tt.time()
numa_node_id = 0
break_flag = 0
for core_id in range(len(self.cores)):
for sd_name in self.sched_domains:
idle_core = self.get_idle_core_in_sd(sd_name=sd_name,
sd=self.sched_domains[sd_name],
numa_node_id=numa_node_id,
current_core_in_sd=self.cores[core_id])
if idle_core != -1:
if core_id != idle_core:
continue
break_flag = 0
while break_flag == 0:
busiest_core = None
if sd_name == CPU.sched_domain_hierarchy[0]:
busiest_core = self.get_busiest_core_in_pair_by_core_id(core_id=core_id)
elif sd_name == CPU.sched_domain_hierarchy[1]:
busiest_core = \
self.get_busiest_core_in_busiest_pair(current_pair_id=self.cores[core_id].pair_id,
numa_node_id=numa_node_id)
if busiest_core is None:
break_flag = 1
break
if sd_name == CPU.sched_domain_hierarchy[0]:
if len(busiest_core.runqueue.active_threads) <= 1:
break_flag = 1
break
lightest_thread_load_in_busiest_core = next(iter(busiest_core.runqueue.lightest_threads_in_rq))
local_new_load = self.cores[core_id].runqueue.load + lightest_thread_load_in_busiest_core
busiest_new_load = busiest_core.runqueue.load - lightest_thread_load_in_busiest_core
if round(busiest_new_load, 5) >= round(local_new_load, 5) and \
len(busiest_core.runqueue.active_threads) > 1:
lightest_threads_set = \
busiest_core.runqueue.lightest_threads_in_rq[lightest_thread_load_in_busiest_core]
lightest_thread_vruntime_in_busiest_core = next(iter(lightest_threads_set))
lightest_thread_in_busiest_core = next(iter(
lightest_threads_set[lightest_thread_vruntime_in_busiest_core]))
busiest_core.runqueue.dequeue_task_by_thread(thread=lightest_thread_in_busiest_core)
self.cores[core_id].runqueue.enqueue_task(thread=lightest_thread_in_busiest_core)
else:
break
elif sd_name == CPU.sched_domain_hierarchy[1]:
the_other_core_id_in_busiest_pair = self.get_the_other_core_in_pair(
core_id=busiest_core.id_in_cpu)
if the_other_core_id_in_busiest_pair is not None:
number_of_cores_in_busiest_pair = 2
the_other_core_in_busiest_pair = self.cores[the_other_core_id_in_busiest_pair]
total_number_of_threads_in_busiest_pair = \
len(self.cores[busiest_core.id_in_cpu].runqueue.active_threads) + \
len(the_other_core_in_busiest_pair.runqueue.active_threads)
else:
number_of_cores_in_busiest_pair = 1
total_number_of_threads_in_busiest_pair = \
len(self.cores[busiest_core.id_in_cpu].runqueue.active_threads)
if total_number_of_threads_in_busiest_pair <= 1:
break_flag = 1
break
for counter in np.arange(number_of_cores_in_busiest_pair):
if counter != 0:
busiest_core_id = the_other_core_id_in_busiest_pair
if len(self.cores[busiest_core_id].runqueue.active_threads) == 0:
break_flag = 1
break # --> break mikonim chon nemishe chizi kand az busiest pair
else:
busiest_core_id = busiest_core.id_in_cpu
lightest_thread_load_in_busiest_core = \
next(iter(self.cores[busiest_core_id].runqueue.lightest_threads_in_rq))
local_new_load = \
self.pairs_load[self.cores[core_id].pair_id] + lightest_thread_load_in_busiest_core
busiest_new_load = \
self.pairs_load[self.cores[busiest_core_id].pair_id] - \
lightest_thread_load_in_busiest_core
if round(busiest_new_load, 5) >= round(local_new_load, 5):
lightest_threads_set = self.cores[busiest_core_id].runqueue.lightest_threads_in_rq[
lightest_thread_load_in_busiest_core]
lightest_thread_vruntime_in_busiest_core = next(iter(lightest_threads_set))
lightest_thread_in_busiest_core = next(iter(
lightest_threads_set[lightest_thread_vruntime_in_busiest_core]))
self.cores[busiest_core_id].runqueue.dequeue_task_by_thread(
thread=lightest_thread_in_busiest_core)
if lightest_thread_in_busiest_core.instructions <= 0:
raise Exception("Are you kidding me?! How come this zombie made its way here?")
self.cores[core_id].runqueue.enqueue_task(thread=lightest_thread_in_busiest_core)
break
elif counter == 1: # --> break mikonim chon nemishe chizi kand az busiest pair, bikhial sho
break_flag = 1
break
_time2 = tt.time()
Logger.lb_timer.append(_time2 - time1)
self.host.cluster.cluster_scheduler.hosts_need_load_balancing.remove(self.host)
self.host.load_balancing_needed = False
[docs]
def emergency_load_balance_idle_cores(self) -> None:
"""
Emergency load balance idle cores
:return:
"""
time1 = tt.time()
numa_node_id = 0
cores_that_became_busy = set()
for core_id in self.idle_core_ids:
core = self.cores[core_id]
heaviest_thread = None
core_id_of_the_rq_containing_heaviest_load = -1
break_flag = 0
to_discard = set()
for thread_load, thread_sorted_set in self.threads_sorted.items():
for vruntime, thread_set in thread_sorted_set.items():
for _, thread in enumerate(thread_set):
if not thread.on_rq or thread.load <= 0 or thread.instructions <= 0:
to_discard.add((thread_load, thread))
continue
if (thread.core is not None and thread.core.id_in_cpu == core_id) or len(
thread.core.runqueue.rq) <= 1:
continue
heaviest_thread = thread
core_id_of_the_rq_containing_heaviest_load = thread.core.id_in_cpu
break_flag = 1
break
if break_flag == 1:
break
for _, thread_load_tuple in enumerate(to_discard):
self.remove_from_threads_sorted(thread=thread_load_tuple[1], inverted_thread_load=thread_load_tuple[0])
if core_id_of_the_rq_containing_heaviest_load != -1:
self.cores[core_id_of_the_rq_containing_heaviest_load].runqueue.dequeue_task_by_thread(
thread=heaviest_thread)
self.cores[core_id].runqueue.enqueue_task(thread=heaviest_thread)
# self.cores[core_id].runqueue.recalculate_cpu_requests_shares()
time2 = tt.time()
Logger.lb_timer.append(time2 - time1)
[docs]
def recalculate_cpu_requests_shares(self) -> None:
"""
Recalculate CPU requests shares
:return:
"""
for core_id, core in enumerate(self.cores):
core.runqueue.recalculate_cpu_requests_shares()
[docs]
def kill_zombie_threads(self) -> None:
"""
Kill zombie threads
:return:
"""
transmission_time_recalculation_list = {}
cores_killed_a_thread_in = set()
for thread in self.host.cluster.cluster_scheduler.zombie_threads:
if thread.instructions > 0:
raise Exception("How is that even possible!?")
cores_killed_a_thread_in.add(thread.core.id_in_cpu)
thread.kill()
if thread.parent_request is None:
continue
thread.parent_request.current_active_threads[thread.subchain_id] -= 1
if thread.parent_request.current_active_threads[thread.subchain_id] == 0:
calc_todo = thread.parent_request.init_transmission(node_in_alt_graph=thread.node_in_alt_graph)
transmission_time_recalculation_list[thread.parent_request] = calc_todo
elif thread.parent_request.current_active_threads[thread.subchain_id] < 0:
raise Exception("What the hell is going on here? Number of active threads in subchain id #" +
str(thread.subchain_id) + " belonging to request " +
str(thread.parent_request.id) + " is less than zero !!!!")
self.host.cluster.cluster_scheduler.zombie_threads = set()
if len(transmission_time_recalculation_list) != 0:
# TODO: do we really need to recalculate transmission bw on ALL LINKS? Isn't it enough to recalculate for a
# subset of links only?
self.host.cluster.topology.recalculate_transmissions_bw_on_all_links()
[docs]
def plot(self, save_dir: str = None, show: bool = True):
"""
Plot the CPU requests share and threads count on run queues
:param save_dir:
:param show:
:return:
"""
share_events = {}
thread_events = {}
step = 1000000000 #: 1s
_share_events = SortedDict(deepcopy(self.events))
_threads_events = SortedDict(deepcopy(self.thread_events))
timerange = np.arange(start=0, stop=self.host.cluster.sim.time + step, step=step, dtype=int)
core_range = range(len(self.cores))
df_share = pd.DataFrame.from_dict(_share_events).transpose()
df_threads = pd.DataFrame.from_dict(_threads_events).transpose()
for _, time_range in enumerate(timerange):
if time_range not in _share_events:
get_last_share = df_share[df_share.index < time_range].iloc[-1]
get_last_threads_count = df_threads[df_threads.index < time_range].iloc[-1]
_share_events[time_range] = {}
_threads_events[time_range] = {}
for core in core_range:
_share_events[time_range][core] = get_last_share[core]
_threads_events[time_range][core] = get_last_threads_count[core]
share_events_list = list(_share_events)
share_events_list_items = list(_share_events.items())
thread_events_list = list(_threads_events)
thread_events_list_items = list(_threads_events.items())
event_number = 0
# step = 1000000 #: 1ms
current_event_time = 0
xaxis_nticks = 0
yaxis_nticks = len(self.events[current_event_time].keys()) if current_event_time in self.events else 0
for _, time_range in enumerate(timerange):
xaxis_nticks += 1
current_range_end = time_range + step
while event_number + 1 < len(share_events_list) and share_events_list[event_number] < current_range_end:
if time_range not in share_events.keys():
share_events[time_range] = {core_id: 0 for core_id in core_range}
thread_events[time_range] = {core_id: 0 for core_id in core_range}
time_weight = (share_events_list[event_number + 1] - share_events_list[event_number]) / step
for core in core_range:
share_events[time_range][core] += share_events_list_items[event_number][1][core] * time_weight
thread_events[time_range][core] += thread_events_list_items[event_number][1][core] * time_weight
event_number += 1
a = [[] for _ in range(len(self.cores))]
b = [[] for _ in range(len(self.cores))]
step_id = 0
for timestamp, value1 in share_events.items():
for core_id, value2 in value1.items():
if len(a[core_id]) < step_id + 1:
a[core_id].append(int(value2))
else:
a[core_id][step_id] = int(value2)
step_id += 1
step_id = 0
for timestamp, value1 in thread_events.items():
for core_id, value2 in value1.items():
if len(b[core_id]) < step_id + 1:
b[core_id].append(value2)
else:
b[core_id][step_id] = value2
step_id += 1
fig_share = px.imshow(a, labels=dict(x="Time (in secs)", y="Core ID", color="Average CPU Requests Share"),
aspect="auto")
fig_threads = px.imshow(b, labels=dict(x="Time (in secs)", y="Core ID", color="Threads count"), aspect="auto")
fig_share.update_layout(title='Average cpu_requests_share load on run queues for host ' + self.host.name,
xaxis_nticks=xaxis_nticks,
yaxis_nticks=yaxis_nticks,
xaxis=dict(tickmode='array', tickvals=list(range(xaxis_nticks))),
yaxis=dict(tickmode='array', tickvals=list(range(yaxis_nticks))))
fig_threads.update_layout(title='Average threads count on run queues for host ' + self.host.name,
xaxis_nticks=xaxis_nticks,
yaxis_nticks=yaxis_nticks,
xaxis=dict(tickmode='array', tickvals=list(range(xaxis_nticks))),
yaxis=dict(tickmode='array', tickvals=list(range(yaxis_nticks))))
if show:
fig_share.show()
fig_threads.show()
if save_dir is not None:
try:
Utils.mkdir_p(save_dir)
file_name1 = f"{str(self.host.name)}-threads-lb-cpu_requests_share.html"
file_name2 = f"{str(self.host.name)}-threads-lb-threads.html"
file_path1 = os.path.join(save_dir, file_name1)
file_path2 = os.path.join(save_dir, file_name2)
file_path3 = os.path.join(save_dir, f"{str(self.host.name)}-dashboard.html")
fig_share.write_html(file_path1)
fig_threads.write_html(file_path2)
Plotter.figures_to_html(figs=[(file_name1, fig_share), (file_name2, fig_threads)], filename=file_path3)
except FileExistsError:
pass
return [fig_share, fig_threads]
def __update_events(self):
_time = self.host.cluster.sim.time
self.events[_time] = {}
self.thread_events[_time] = {}
for core_id in np.arange(0, len(self.cores)):
nr = 0
self.events[_time][core_id] = self.cores[core_id].runqueue.load
self.thread_events[_time][core_id] = len(self.cores[core_id].runqueue.active_threads)
[docs]
def load_balance(self) -> None:
"""
Load balance the CPU by balancing the threads among the runqueues
:return:
"""
self.kill_zombie_threads()
if self.host.load_balancing_needed and self.host.is_active():
self.load_balance_threads_among_runqueues()
self.emergency_load_balance_idle_cores()
self.recalculate_cpu_requests_shares()
if self.host.cluster.sim.log_cpu_events:
self.__update_events()
[docs]
def add_to_pairs_sorted(self, pair_id, inverted_pair_load):
"""
Add to pairs sorted dictionary
:param pair_id:
:param inverted_pair_load:
:return:
"""
if inverted_pair_load in self.pairs_sorted:
self.pairs_sorted[inverted_pair_load].add(pair_id)
else:
self.pairs_sorted[inverted_pair_load] = {pair_id}
[docs]
def add_to_threads_sorted(self, thread: ReplicaThread, inverted_thread_load: int = None):
"""
Add to threads sorted dictionary
:param thread:
:param inverted_thread_load:
:return:
"""
if inverted_thread_load is None:
inverted_thread_load = thread.load * -1
if inverted_thread_load in self.threads_sorted:
if thread.vruntime in self.threads_sorted[inverted_thread_load]:
self.threads_sorted[inverted_thread_load][thread.vruntime].add(thread)
else:
self.threads_sorted[inverted_thread_load][thread.vruntime] = {thread}
else:
self.threads_sorted[inverted_thread_load] = SortedDict({thread.vruntime: {thread}})
thread.core.runqueue.add_to_lightest_threads_in_rq(thread=thread)
[docs]
def remove_from_pairs_sorted(self, pair_id, inverted_pair_load):
"""
Remove from pairs sorted dictionary
:param pair_id:
:param inverted_pair_load:
:return:
"""
if inverted_pair_load in self.pairs_sorted:
self.pairs_sorted[inverted_pair_load].discard(pair_id)
if len(self.pairs_sorted[inverted_pair_load]) == 0:
del self.pairs_sorted[inverted_pair_load]
[docs]
def remove_from_threads_sorted(self, thread: ReplicaThread, inverted_thread_load: int = None):
"""
Remove from threads sorted dictionary
:param thread:
:param inverted_thread_load:
:return:
"""
if inverted_thread_load is None:
inverted_thread_load = thread.load * -1
if inverted_thread_load in self.threads_sorted:
if thread.vruntime in self.threads_sorted[inverted_thread_load]:
self.threads_sorted[inverted_thread_load][thread.vruntime].discard(thread)
if len(self.threads_sorted[inverted_thread_load][thread.vruntime]) == 0:
del self.threads_sorted[inverted_thread_load][thread.vruntime]
if len(self.threads_sorted[inverted_thread_load]) == 0:
del self.threads_sorted[inverted_thread_load]
if thread.core is not None:
thread.core.runqueue.remove_from_lightest_threads_in_rq(thread=thread)
else:
raise ValueError(f"Thread {thread} is not in threads_sorted[" + str(thread.vruntime) + "] " +
"(perhaps vruntime was changed without updating threads_sorted?)")
else:
raise ValueError(f"Thread {thread} is not in threads_sorted")
[docs]
def update_idle_pairs(self, core):
"""
Update idle pairs in the CPU by checking the load of the cores in the pair
:param core:
:return:
"""
other_core_id_in_pair = self.get_the_other_core_in_pair(core_id=core.id_in_cpu)
if other_core_id_in_pair is not None:
other_core_in_pair = self.cores[other_core_id_in_pair]
total_active_threads_in_pair = \
len(core.runqueue.active_threads) + \
(len(other_core_in_pair.runqueue.active_threads) if other_core_in_pair is not None else 0)
else:
total_active_threads_in_pair = len(core.runqueue.active_threads)
if total_active_threads_in_pair > 0:
self.idle_core_pair_ids[core.pair_id].discard(core.id_in_cpu)
if len(self.idle_core_pair_ids[core.pair_id]) != 2:
self.idle_pair_ids.discard(core.pair_id)
else:
self.idle_core_pair_ids[core.pair_id].add(core.id_in_cpu)
self.idle_pair_ids.add(core.pair_id)
if len(core.runqueue.active_threads) == 0:
self.idle_core_ids.add(core.id_in_cpu)
else:
self.idle_core_ids.discard(core.id_in_cpu)
@property
def clock_rate(self) -> int:
"""
Get the clock rate of the CPU in Hertz
:return:
"""
return self._clock_rate
@clock_rate.setter
def clock_rate(self, value):
"""
Set the clock rate of the CPU in Hertz and update the clock rate in nanohertz
:param value:
:return:
"""
self._clock_rate = value
self._clock_rate_in_nanohertz = self._get_clock_rate_in_nanohertz()
@property
def clock_rate_in_nanohertz(self) -> float:
"""
Get the clock rate of the CPU in nanohertz
:return:
"""
return self._clock_rate_in_nanohertz
@clock_rate_in_nanohertz.setter
def clock_rate_in_nanohertz(self, value):
"""
Set the clock rate of the CPU in nanohertz
:param value:
:return:
"""
raise ValueError("clock_rate_in_nanohertz is read-only! Set clock_rate instead.")
def __str__(self):
"""
String representation of the CPU
:return:
"""
return self.name