Source code for perfsim.drivers.neptune_storage_driver

from typing import TYPE_CHECKING, Dict

import neptune
from neptune import Run
from neptune.exceptions import InactiveRunException

from perfsim import ServiceChainManager, FileStorageDriver, Host

if TYPE_CHECKING:
    from perfsim import Simulation, Cluster


[docs] class NeptuneStorageDriver(FileStorageDriver): """ Neptune storage driver is the class responsible for storing the results of the simulation in Neptune. """ #: The Neptune handler. handler: Run def __init__(self, name: str, project_id: str, api_token: str): self.project_id = project_id self.api_token = api_token super().__init__(name)
[docs] def init_neptune(self): """ Initialize the Neptune handler. :return: """ # Adjusted to updated import path self.handler = neptune.init(project=self.project_id, api_token=self.api_token)
[docs] def save_simulation_scenario_results(self, simulation: 'Simulation'): """ Save the results of the simulation scenario in Neptune. :param simulation: :return: """ self.save_all(simulation) self.handler.stop() return "OK"
[docs] def save_cluster_topology_graph(self, cluster: 'Cluster', save_dir="results/topologies"): """ Save the topology graph of the cluster in Neptune. :param cluster: :param save_dir: :return: """ content = super().save_cluster_topology_graph(cluster=cluster, save_dir=save_dir) self.handler[save_dir].upload(neptune.types.File.from_content(content, extension='html'))
[docs] def save_service_chains_original_graph(self, service_chain_managers_dict: dict[str, ServiceChainManager], save_dir="results/service_chains/{middle}/original/{middle}"): """ Save the original service chains graph in Neptune. :param service_chain_managers_dict: :param save_dir: :return: """ contents = super().save_service_chains_original_graph(service_chain_managers_dict=service_chain_managers_dict, save_dir=save_dir) for save_folder, content in contents.items(): self.handler[save_folder].upload(neptune.types.File.from_content(content, extension='html')) return contents
[docs] def save_service_chains_alternative_graph(self, service_chain_managers_dict: dict[str, ServiceChainManager], save_dir="results/service_chains/{middle}/alternative"): """ Save the alternative service chains graph in Neptune. :param service_chain_managers_dict: :param save_dir: :return: """ contents = super().save_service_chains_alternative_graph( service_chain_managers_dict=service_chain_managers_dict, save_dir=save_dir) for save_folder, content in contents.items(): self.handler[save_folder].upload(neptune.types.File.from_content(content, extension='html')) return contents
[docs] def save_service_chain_result_graph(self, result, save_dir="results/{result_key}"): """ Save the service chain result graph in Neptune. :param result: :param save_dir: :return: """ contents = super().save_service_chain_result_graph(result=result, save_dir=save_dir) save_dir = save_dir.format(middle="summary", result_key="results") file_path = save_dir + ".json" try: self.handler[save_dir].upload(file_path) except (InactiveRunException, AttributeError) as e: self.init_neptune() self.handler[save_dir].upload(file_path) for save_path, content in contents.items(): if save_path.endswith("_json"): file_path = save_path[:-len("_json")] + ".json" self.handler[save_path].track_files(file_path) elif not save_path.endswith("_raw"): self.handler[save_path].upload(neptune.types.File.as_html(content)) else: self.handler[save_path].log(content) return contents
[docs] def save_timeline_graph(self, result, save_dir="results/timeline"): """ Save the timeline graph in Neptune. :param result: :param save_dir: :return: """ fig = super().save_timeline_graph(result=result, save_dir=save_dir) self.handler[save_dir].upload(neptune.types.File.as_html(fig)) return fig
[docs] def save_hosts_cores_heatmap(self, hosts_dict: Dict[str, Host], save_dir: str = "results/cpu/heatmap/{result_key}"): """ Save the hosts cores heatmap in Neptune. :param hosts_dict: :param save_dir: :return: """ figs = super().save_hosts_cores_heatmap(hosts_dict=hosts_dict, save_dir=save_dir) for host_name, fig in figs.items(): save_path = save_dir.format(result_key=host_name) self.handler[save_path + "/" + host_name + "-threads-lb-cpu_requests_share.html"].upload( neptune.types.File.as_html(fig[0])) self.handler[save_path + "/" + host_name + "-threads-lb-threads.html"].upload( neptune.types.File.as_html(fig[1])) return figs