# Copyright (C) 2020 Michel Gokan Khan
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# This file is a part of the PerfSim project, which is now open source and available under the GPLv2.
# Written by Michel Gokan Khan, February 2020
from __future__ import annotations
import math
from typing import TYPE_CHECKING, Tuple, Union
from perfsim import Observable, ReplicaThreadLogObserver, ReplicaThreadTimelineObserver
if TYPE_CHECKING:
from perfsim import MicroserviceReplica, Process, Core, Request, MicroserviceEndpointFunction
[docs]
class ReplicaThread(Observable):
"""
This class represents a thread of execution of a microservice replica.
"""
#: The event that is going to be notified before killing a thread.
before_killing_thread: str
#: The event that is going to be notified before executing a thread.
before_executing_thread: str
#: The event that is going to be notified after executing a thread.
after_executing_thread: str
#: Indicates whether the thread is on a core's runqueue or not.
_on_rq: bool
#: The process that the thread belongs to.
_process: Process
#: The vruntime represents the virtual runtime of the thread (as in the Linux scheduler).
_vruntime: float
#: Thread's load.
_load: int
#: Replica that the thread belongs to.
replica: MicroserviceReplica
#: Replica's identifier in the subchain.
replica_identifier_in_subchain: int
#: Thread's cpu_requests_share.
_cpu_requests_share: int
#: Thread's cpu_limits.
_cpu_limits: int
#: The current core that the thread is running on.
_core: Core
#: The index id of the thread in the node of the alternative graph.
_thread_id_in_node: int
#: The node in the alternative graph that this thread belongs to.
_node_in_alt_graph: Tuple[int, MicroserviceEndpointFunction]
__in_best_effort_active_threads: bool
__in_burstable_active_threads: bool
__in_guaranteed_active_threads: bool
def __init__(self,
process: Process,
replica: MicroserviceReplica,
replica_identifier_in_subchain: int,
node_in_alt_graph: Tuple[int, MicroserviceEndpointFunction],
thread_id_in_node: int,
subchain_id: int,
average_load: float = 1,
core: Core = None,
parent_request: Request = None):
self._process = process
# self._process_backup = process
self.id = str(replica.host.cluster.sim.time) + "_" + parent_request.id + "_" + \
str(parent_request.iteration_id) + "_" + str(subchain_id) + "_" + process.pname + "_" + \
str(len(self.process.threads))
# self.__hash = int.from_bytes(self.id.encode(), byteorder='big')
# TODO: Is this really needed?!
self.process.threads.add(self)
self._load = 0
self.average_load = average_load
self._cpu_requests_share = 0
self._cpu_limits = 0
self.period = 0
self.request = 0
self._core = core
self.vruntime = 0
self._on_rq = True
self.executed_instructions = 0
# self.cache_penalty = 0
self.__in_best_effort_active_threads = False
self.__in_burstable_active_threads = False
self.__in_guaranteed_active_threads = False
self.__in_burstable_unlimited_active_threads = False
self.__in_burstable_limited_active_threads = False
# self.total_runtime = 0
self.is_idle = 0
self.replica = replica
self.replica_identifier_in_subchain = replica_identifier_in_subchain
self.set_node_in_alt_graph(node=node_in_alt_graph, thread_id_in_node=thread_id_in_node)
self.duration_to_finish = -1
self.parent_request = parent_request
self.subchain_id = subchain_id
self.process.active_threads_count += 1
self.parent_request.current_active_threads[self.subchain_id] += 1
super().__init__()
if self.replica.host.cluster.sim.debug_level > 0:
self.attach_observer(ReplicaThreadLogObserver(replica_thread=self))
if self.replica.host.cluster.sim.log_timeline:
self.attach_observer(ReplicaThreadTimelineObserver(replica_thread=self))
@property
def node_in_alt_graph(self) -> Tuple[int, MicroserviceEndpointFunction]:
return self._node_in_alt_graph
@node_in_alt_graph.setter
def node_in_alt_graph(self, node: Tuple[int, MicroserviceEndpointFunction]):
raise Exception("Cannot set node_in_alt_graph directly. Use set_node_in_alt_graph() instead.")
@property
def thread_id_in_node(self) -> int:
return self._thread_id_in_node
@thread_id_in_node.setter
def thread_id_in_node(self, thread_id_in_node: int):
raise Exception("Cannot set thread_id_in_node directly. Use set_node_in_alt_graph() instead.")
[docs]
def set_node_in_alt_graph(self, node: Tuple[int, MicroserviceEndpointFunction], thread_id_in_node: int):
self._thread_id_in_node = thread_id_in_node
self._node_in_alt_graph = node
self._set_values_from_endpoint_function()
[docs]
def register_events(self):
self.register_event("before_killing_thread")
self.register_event("before_executing_thread")
self.register_event("after_executing_thread")
[docs]
def kill(self) -> None:
self.notify_observers(self.before_killing_thread)
self.process.active_threads_count -= 1
self.core.cpu.host.threads.remove(self)
self.core.cpu.host.cluster.cluster_scheduler.active_threads.remove(self)
if not self.core.cpu.host.is_active():
self.core.cpu.host.cluster.cluster_scheduler.active_hosts.remove(self.core.cpu.host)
self.core.cpu.host.load_balancing_needed = False
try:
self.core.cpu.host.cluster.cluster_scheduler.hosts_need_load_balancing.remove(self.core.cpu.host)
except KeyError:
pass
else:
self.core.cpu.host.load_balancing_needed = True
self.core.cpu.host.cluster.cluster_scheduler.hosts_need_load_balancing.add(self.core.cpu.host)
self.process.threads.remove(self)
self.core.runqueue.dequeue_task_by_thread(thread=self)
self.on_rq = False
# We are already calculating cpu_requests_share in cpu.recalculate_share
# for _thread in self.process.threads:
# if _thread.on_rq:
# _thread.cpu_requests_share = _thread.cpu_requests_share / self.process.active_threads_count
# a=1
# self._process_backup = self.process
self.process = None
def __recalculate_cache_penalty(self, millicores: Union[float, int]):
miss_rate = (self.replica_single_core_isolated_cache_misses / self.replica_single_core_isolated_cache_refs)
contention_penalty = 0.033420389 * math.log(len(self.core.runqueue.active_threads)) + 0.003341528
# altered_millicores = millicores if millicores >= 100 else 100
# millicores = (share * self.core.cpu.max_cpu_requests) / 1000
# |___> it supposed to (1000 * share) / max_cpu_requests
cpu_size_penalty = -0.02509033 * math.log(millicores) + 0.17859156
miss_rate += miss_rate * cpu_size_penalty
miss_rate += miss_rate * contention_penalty
return ((self.replica_memory_accesses / self.original_instructions) *
miss_rate * self.replica_avg_cache_miss_penalty)
def __get_share_proportion(self) -> float:
millicores = self.get_relative_guaranteed_cpu_requests_share()
cache_penalty = self.__recalculate_cache_penalty(millicores=millicores)
# _share_considering_cache_miss = ((self.cpi + self.cache_penalty) * (_cpu_requests_share ** 2)) / \
# (self.cpi * self.core.cpu.max_cpu_requests)
millicores_to_share = (1024 * millicores) / 1000
share_considering_cache_miss = (self.cpi * millicores_to_share) / (self.cpi + cache_penalty)
return share_considering_cache_miss / self.core.cpu.max_cpu_requests
[docs]
def is_runnable(self):
return self.on_rq and self.instructions > 0 and self.core is not None
[docs]
def exec(self, duration: int, simultaneous_flag: bool = False) -> int:
if not self.is_runnable():
raise Exception("You can't execute a zombie thread and/or a thread without any instructions left!")
# if not simultaneous_flag:
# self.total_runtime += duration
# self.core.runqueue.time += duration
relative_share_proportion = self.__get_share_proportion()
instructions_to_consume = (duration * relative_share_proportion /
(self.cpi * (1 / self.replica.host.cpu.clock_rate_in_nanohertz)))
remaining_instructions = self.instructions - instructions_to_consume
if -0.001 < remaining_instructions < 0.001:
instructions_to_consume += remaining_instructions
self.notify_observers(event_name=self.before_executing_thread,
simultaneous_flag=simultaneous_flag,
duration=duration,
instructions_to_consume=instructions_to_consume)
self.instructions -= instructions_to_consume
self.executed_instructions += instructions_to_consume
self.vruntime += duration * relative_share_proportion
self.notify_observers(event_name=self.after_executing_thread,
simultaneous_flag=simultaneous_flag,
duration=duration,
instructions_to_consume=instructions_to_consume,
relative_share_proportion=relative_share_proportion)
return 1 if self.instructions == 0 else 0
[docs]
def get_best_effort_cpu_requests_share(self) -> int:
if self.process.ms_replica.microservice.is_best_effort():
return self.core.cpu.max_cpu_requests
elif self.process.ms_replica.microservice.is_burstable():
if self.cpu_limits != -1:
return self.cpu_limits - self.cpu_requests_share
else:
return self.core.cpu.max_cpu_requests - self.cpu_requests_share
elif self.process.ms_replica.microservice.is_guaranteed():
return 0
else:
raise Exception("Unknown microservice type")
[docs]
def get_relative_guaranteed_cpu_requests_share(self) -> int:
if self.cpu_limits != -1:
my_actual_guaranteed_cpu_requests_share = self.cpu_requests_share
if my_actual_guaranteed_cpu_requests_share > self.core.cpu.max_cpu_requests:
return self.core.cpu.max_cpu_requests
else:
return my_actual_guaranteed_cpu_requests_share
else: # if thread is best effort
if self.cpu_requests_share == -1:
raise Exception("I'm not sure this is an error, but thread supposed to get a cpu request before")
else:
return self.cpu_requests_share
# v = (self.core.cpu.max_cpu_requests * my_actual_guaranteed_cpu_requests_share)
# return v / self.core.runqueue.total_guaranteed_cpu_requests
# best_effort_cpu_requests_share_proportion =
# my_actual_guaranteed_cpu_requests_share / self.core.runqueue.total_guaranteed_cpu_requests
# rq_free_share = self.core.cpu.max_cpu_requests - self.core.runqueue.total_guaranteed_cpu_requests
# relative_share =
# my_actual_guaranteed_cpu_requests_share + (best_effort_cpu_requests_share_proportion * rq_free_share)
# return relative_share
[docs]
def get_exec_time_on_rq(self) -> float:
relative_share_proportion = self.__get_share_proportion()
self.duration_to_finish = (self.instructions * self.cpi) / \
(self.replica.host.cpu.clock_rate_in_nanohertz * relative_share_proportion)
return self.duration_to_finish
@property
def instructions(self):
return self.__instructions
@instructions.setter
def instructions(self, v):
self.__instructions = v
if self.__instructions <= 0:
self.core.cpu.host.cluster.cluster_scheduler.zombie_threads.add(self)
@property
def process(self):
return self._process
@process.setter
def process(self, v: Process):
self._process = v
# if self._process is not None:
# self._process_backup = v
# @property
# def process_backup(self):
# return self._process_backup
def __lt__(self, other):
if isinstance(other, ReplicaThread):
if self.load < other.load:
return True
else:
return self.vruntime < other.vruntime
def __gt__(self, other):
if isinstance(other, ReplicaThread):
if self.load > other.load:
return True
else:
return self.vruntime > other.vruntime
def __le__(self, other):
if isinstance(other, ReplicaThread):
if self.load <= other.load:
return True
else:
return self.vruntime <= other.vruntime
def __ge__(self, other):
if isinstance(other, ReplicaThread):
if self.load >= other.load:
return True
else:
return self.vruntime >= other.vruntime
# def __hash__(self):
# return self.__hash
#
# def __eq__(self, other):
# return self.__hash__() == other.__hash__()
def __str__(self):
return str(self.id)
def _set_values_from_endpoint_function(self) -> ReplicaThread:
func = self.node_in_alt_graph[1]
thread_id = self.thread_id_in_node
if func.microservice.cpu_requests != -1:
self.cpu_requests_share = \
min(self.replica.host.cpu.max_cpu_requests, func.microservice.cpu_requests / func.threads_count)
else:
self.cpu_requests_share = self.replica.host.cpu.max_cpu_requests / func.threads_count
if func.microservice.cpu_limits != -1:
self.cpu_limits = func.microservice.cpu_limits / func.threads_count
else:
self.cpu_limits = -1
self.instructions = func.threads_instructions[thread_id]
self.cpi = func.threads_avg_cpi[thread_id]
self.replica_memory_accesses = func.threads_avg_mem_accesses[thread_id]
self.original_instructions = func.threads_instructions[thread_id]
self.replica_single_core_isolated_cache_misses = func.threads_single_core_isolated_cache_misses[thread_id]
self.replica_single_core_isolated_cache_refs = func.threads_single_core_isolated_cache_refs[thread_id]
self.replica_avg_cache_miss_penalty = func.threads_avg_cache_miss_penalty[thread_id]
return self
@property
def on_rq(self):
return self._on_rq
@on_rq.setter
def on_rq(self, v):
self._on_rq = v
@property
def load(self):
return self._load
@property
def vruntime(self):
return self._vruntime
@property
def core(self):
return self._core
@core.setter
def core(self, v: Core):
if hasattr(self, 'core') and self._core is not None:
self._core.cpu.remove_from_threads_sorted(self)
self._core = v
if self._core is not None:
self._core.cpu.add_to_threads_sorted(self)
@vruntime.setter
def vruntime(self, v: float):
if v > 0 and self.core is not None:
self.core.cpu.remove_from_threads_sorted(self)
self._vruntime = v
if self.core is not None:
self.core.cpu.add_to_threads_sorted(self)
@load.setter
def load(self, v):
if self.core is None:
self._load = v
elif self._load != v:
self.core.cpu.remove_from_threads_sorted(thread=self, inverted_thread_load=self._load * -1)
self.core.runqueue.load -= self._load
self._load = v
self.core.cpu.add_to_threads_sorted(thread=self, inverted_thread_load=self._load * -1)
self.core.runqueue.load += self._load
@property
def cpu_requests_share(self):
return self._cpu_requests_share
@cpu_requests_share.setter
def cpu_requests_share(self, v: int):
if v > 1000:
error_message = "CPU requests share cannot be greater than {} in a single " \
"thread placed in a core. {} Given!".format(self.core.cpu.max_cpu_requests, v)
raise Exception(error_message)
else:
difference = self._cpu_requests_share - v
self._cpu_requests_share = v
# TODO: I originally started with millicores = 1024, but I think it should be 1000. So, I converted
# all the millicores to 1000.
millicores_to_share = (self._cpu_requests_share * 1024) / self.replica.host.cpu.max_cpu_requests
self.load = self.average_load * millicores_to_share
if self.core is not None and difference != 0:
for thread_set in self.core.runqueue.thread_set_dict[self.id]:
# if thread_set.sum_cpu_requests != 0:
thread_set.sum_cpu_requests -= difference
@property
def cpu_limits(self):
return self._cpu_limits
@cpu_limits.setter
def cpu_limits(self, v):
if v > 1000:
error_message = "CPU limits share cannot be greater than {} in a single " \
"thread placed in a core. {} Given!".format(self.core.cpu.max_cpu_requests, v)
raise Exception(error_message)
else:
if self.core is not None:
self.core.runqueue.decategorize_thread_from_sets(self)
self._cpu_limits = v
if self.core is not None:
self.core.runqueue.categorize_thread_into_sets(self)