"""Contains functions for managing numerical calculations."""
import os
import glob
import time
import psutil
import subprocess
import numpy as np
import shutil
import mesh2scattering as m2s
from packaging.version import Version
import pooch
[docs]
def build_or_fetch_numcalc(replace_existing=False):
"""Get the numcalc executable from building (linux and mac) or downloading
the exe from Github (windows) and will be placed in the
``numcalc/bin`` folder.
Building NumCalc on Linux requires the ``build-essential`` package to be
installed. On Ubuntu, this can be done with the following command:
.. code-block:: bash
sudo apt-get install build-essential
For Mac, the ``xcode`` command line tools are required.
Parameters
----------
replace_existing : bool, optional
If True, the existing NumCalc executable is replaced. The default is
False.
Returns
-------
str
NumCalc path
"""
# ignore tests for windows since its difficult to build the exe
if os.name == 'nt':
numcalc_path = os.path.join(
m2s.utils.program_root(), "numcalc", "bin", 'NumCalc.exe')
if replace_existing and os.path.isfile( numcalc_path):
os.remove(numcalc_path)
if not os.path.exists(numcalc_path):
numcalc_path = _download_windows_build()
else:
# Build NumCalc locally to use for testing
numcalc = os.path.join(
m2s.utils.program_root(), "numcalc", "bin", "NumCalc")
numcalc_path = numcalc
if os.path.isfile(numcalc) and replace_existing:
os.remove(numcalc)
if not os.path.isfile(numcalc):
subprocess.run(
["make"], cwd=os.path.join(
m2s.utils.program_root(), "numcalc", "src"), check=True)
return numcalc_path
def _download_windows_build():
"""Download the NumCalc executable from the Github release."""
if Version(m2s.__version__) < Version('1.0.0'):
m2s_version= 'develop'
else:
m2s_version = f'v{m2s.__version__}'
win_exe = pooch.create(
# Use the default cache folder for the operating system
path=os.path.join(
m2s.utils.program_root(), "numcalc", "bin"),
# The remote data is on Github
base_url=(
"https://github.com/ahms5/Mesh2scattering/raw"
f"/{m2s_version}/release/"),
registry={
"NumCalc_WindowsExe.zip": None,
},
)
_ = win_exe.fetch("NumCalc_WindowsExe.zip", processor=pooch.Unzip(
extract_dir=os.path.join(
m2s.utils.program_root(), "numcalc", "bin"),
))
return os.path.join(
m2s.utils.program_root(), "numcalc", "bin", 'NumCalc.exe')
[docs]
def remove_outputs(
paths, boundary=False, grid=False, log=False):
"""
Remove output data from scattering project folder.
Use this function to delete output that is no longer needed and is taking
too much disk space.
Parameters
----------
paths : str, tuple of strings
Paths under which scattering project folders are searched. Can contain
`*` remove data from multiple project folders, e.g., `path/*left` will
remove data from all folders in `path` that end with `left`.
boundary : bool, optional
Remove raw pressure and velocity simulated on the boundary, i.e., the
mesh. This data is saved in
`project_folder/NumCalc/source_*/be.out/be.*/*Boundary`
grid : bool, optional
Remove raw pressure and velocity simulated on the evaluation grid.This
data is saved in
`project_folder/NumCalc/source_*/be.out/be.*/*EvalGrid`
log : bool, optional
Remove log ``(*.txt, *.out)`` files in ``source_*`` dir.
"""
# check input
if isinstance(paths, str):
paths = (paths, )
if not isinstance(paths, (tuple, list)):
raise ValueError(
"paths must be a string or a tuple of strings")
# loop paths and contained folders
for pp, path in enumerate(paths):
folders = glob.glob(path)
for ff, folder in enumerate(folders):
print(
f"Purging path {pp+1}/{len(paths)} "
f"folder {ff+1}/{folders}")
print(os.path.basename(folder))
# check if the directories exist ------------------------------
has_numcalc = os.path.isdir(os.path.join(folder, "NumCalc"))
if has_numcalc:
numcalc = glob.glob(os.path.join(
folder, "NumCalc", "source_*"))
# data in source*/be.out/be.* folders -------------------------
# delete entire be.out folders
if boundary and grid and has_numcalc:
for nc in numcalc:
shutil.rmtree(os.path.join(nc, "be.out"))
# delete only the boundary data
elif boundary and has_numcalc:
for nc in numcalc:
for be in glob.glob(
os.path.join(nc, "be.out", "be.*")):
os.remove(os.path.join(be, "pBoundary"))
os.remove(os.path.join(be, "vBoundary"))
# delete only the grid data
elif grid and has_numcalc:
for nc in numcalc:
for be in glob.glob(
os.path.join(nc, "be.out", "be.*")):
os.remove(os.path.join(be, "pEvalGrid"))
os.remove(os.path.join(be, "vEvalGrid"))
# delete only the log data
if log and has_numcalc:
for nc in numcalc:
for be in glob.glob(os.path.join(nc, "*.txt")):
if os.path.isfile(os.path.join(nc, be)):
os.remove(os.path.join(nc, be))
for be in glob.glob(os.path.join(nc, "*.out")):
if os.path.isfile(os.path.join(nc, be)):
os.remove(os.path.join(nc, be))
[docs]
def manage_numcalc(project_path=None, numcalc_path=None,
max_ram_load=None, ram_safety_factor=1.05, max_cpu_load=90,
max_instances=None, wait_time=15,
starting_order='alternate', confirm_errors=False):
"""
Run NumCalc on one or multiple NumCalc project folders.
This script monitors the RAM and CPU usage and starts a new NumCalc
instance whenever enough resources are available. The required RAM for each
frequency step is estimated using NumCalc's `estimate_ram` option. A log
file is written to the `project_path` containing detailed information on
the launched frequency steps, available resources, and detected errors.
.. note ::
`manage_numcalc` can also be launched by running the python script
`manage_numcalc_script.py` contained in the subfolder
`mesh2scattering/NumCalc` of the mesh2scattering Git repository.
Parameters
----------
project_path : str, optional
The directory to simulate: It can be path to either
1- directory that contains multiple NumCalc project folders or
2- one NumCalc project folder (folder containing "parameters.json").
The default ``None`` uses ``os.getcwd()``
numcalc_path : str, optional
This is the path to the NumCalc binary (by default 'NumCalc'
is used).
By default, :py:func:`build_or_fetch_numcalc` is used.
max_ram_load : number, optional
The RAM that can maximally be used in GB. New NumCalc instances are
only started if enough RAM is available. The default ``None`` uses all
available RAM.
ram_safety_factor : number, optional
A safety factor that is applied to the estimated RAM consumption. The
estimate is obtained using NumCalc -estimate_ram. The default of
``1.05`` would for example assume that 10.5 GB ram are needed if a RAM
consumption of 10 GB was estimated by NumCalc.
max_cpu_load : number, optional
Maximum allowed CPU load in percent. New instances are only launched if
the current CPU load is below this value. The default is 90 percent.
max_instances : int, optional
The maximum numbers of parallel NumCalc instances. If max_instances is
``None``, by default a new instance is launched until the number of
available CPU cores given by ``psutil.cpu_count()`` is reached.
wait_time : int, optional
Delay in seconds for waiting until the RAM and CPU usage is checked
after launching a NumCalc instance. This has to be sufficiently large
for the RAM and CPU to be fully used by the started NumCalc instance.
The default is 15 s but values of 60 s or even more might be required
depending on the machine. The RAM values that ``manage_numcalc``
outputs are usually a good indicator to check if `wait_time` is
sufficiently high. After this initial wait time, the resources are
checked every second. And the next instance is started, once enough
resources are available.
starting_order : str, optional
Control the order in which the frequency steps are launched.
``'high'``
Always launches the step with the highest possible memory
consumption.
``'low'``
Always launches the step with the lowest estimated memory
consumption
``'alternate'`` (default)
mixes the two approaches above.
confirm_errors : bool, optional
If True, manage_numcalc waits for user input in case an error "
occurs. The default false exits the function immediately if an error
occurs.
"""
# log_file initialization -------------------------------------------------
if project_path is None:
project_path = os.getcwd()
current_time = time.strftime("%Y_%m_%d_%H-%M-%S", time.localtime())
log_file = os.path.join(
project_path, f"manage_numcalc_{current_time}.txt")
# remove old log-file
if os.path.isfile(log_file):
os.remove(log_file)
# default values ----------------------------------------------------------
if numcalc_path is None:
numcalc_path = build_or_fetch_numcalc()
ram_info = psutil.virtual_memory()
total_ram = ram_info.total / 1073741824
if max_ram_load is None:
max_ram_load = total_ram
elif max_ram_load > total_ram:
raise ValueError((
f"The maximum RAM load of {max_ram_load} GB must be smaller than "
f"the total RAM, which is {total_ram} GB."))
# helping variables -------------------------------------------------------
# trick to get colored print-outs https://stackoverflow.com/a/54955094
text_color_red = '\033[31m'
text_color_green = '\033[32m'
text_color_reset = '\033[0m'
# wait time in seconds before checking resources again if we are busy
# (this is not the wait time directly after a NumCalc instance was
# launched. That is given by wait time!)
wait_time_busy = 1
# check input -------------------------------------------------------------
if max_instances is None:
max_instances = psutil.cpu_count()
elif max_instances > psutil.cpu_count():
_raise_error(
(f"max_instances is {max_instances} but can not be larger than "
f"{psutil.cpu_count()} (The number of logical CPUs)"),
text_color_red, log_file, confirm_errors)
# Detect what the project_path or "getcwd()" is pointing to:
if os.path.isfile(os.path.join(project_path, 'parameters.json')):
# project_path is a NumCalc project folder
all_projects = [project_path]
log_file = os.path.join(project_path, log_file)
else:
# project_path contains multiple NumCalc project folders
all_projects = [] # list of project folders to execute
for subdir in os.listdir(project_path):
if os.path.isdir(os.path.join(project_path, subdir,
'ObjectMeshes', 'Reference')):
all_projects.append(os.path.join(project_path, subdir))
log_file = os.path.join(project_path, log_file)
# stop if no project folders were detected
if len(all_projects) == 0:
message = ("manage_numcalc could not detect any NumCalc "
f"projects at project_path={project_path}")
_raise_error(message, text_color_red, log_file, confirm_errors)
# echo input parameters ---------------------------------------------------
current_time = time.strftime("%b %d %Y, %H:%M:%S", time.localtime())
message = ("\nStarting manage_numcalc with the following arguments "
f"[{current_time}]\n")
message += "-" * (len(message) - 2) + "\n"
message += (
f"project_path: {project_path}\n"
f"numcalc_path: {numcalc_path}\n"
f"max_ram_load: {max_ram_load:.2f} GB ({total_ram:.2f} GB detected, "
f"{ram_info.available / 1073741824:.2f} GB available)\n"
f"ram_safety_factor: {ram_safety_factor}\n"
f"max_cpu_load: {max_cpu_load} %\n"
f"max_instances: {max_instances} "
f"({psutil.cpu_count()} cores detected)\n"
f"wait_time: {wait_time} seconds\n"
f"starting_order: {starting_order}\n"
f"confirm_errors: {confirm_errors}\n")
_print_message(message, text_color_reset, log_file)
# Check for NumCalc executable --------------------------------------------
if os.name == 'nt': # Windows detected
numcalc_path_base = os.path.dirname(numcalc_path)
# files that are needed to execute NumCalc
NumCalc_runtime_files = ['NumCalc.exe', 'libgcc_s_seh-1.dll',
'libstdc++-6.dll', 'libwinpthread-1.dll']
# Check that each required runtime file is present:
for calc_file in NumCalc_runtime_files:
if not os.path.isfile(os.path.join(numcalc_path_base, calc_file)):
message = (
f"The file {calc_file} is missing or manage_numcalc "
f"did not find the containing folder 'NumCalc_WindowsExe'")
_raise_error(message, text_color_red, log_file, confirm_errors)
del calc_file, NumCalc_runtime_files
else:
if not numcalc_path.endswith("NumCalc"):
_raise_error(
"numcalc_path must end with 'NumCalc'", text_color_red,
log_file, confirm_errors)
p = subprocess.Popen(
f"command -v {numcalc_path}", stdout=subprocess.PIPE, shell=True)
if not len(p.stdout.read()):
_raise_error(
f"NumCalc executable does not exist at {numcalc_path}",
text_color_red, log_file, confirm_errors)
# Check all projects that may need to be executed -------------------------
projects_to_run = []
message = ("\nPer project summary of instances that will be run\n"
"-------------------------------------------------\n")
message += f"Detected {len(all_projects)} NumCalc projects in\n"
message += f"{os.path.dirname(log_file)}\n"
# print already here because _check_project might produce output that
# should come after this
_print_message(message, text_color_reset, log_file)
message = "\n"
for project in all_projects:
all_instances, instances_to_run, _ = _check_project(
project, numcalc_path, log_file)
if instances_to_run is not None:
projects_to_run.append(project)
message += (
f"{len(instances_to_run)}/{len(all_instances)} frequency "
f"steps to run in '{os.path.basename(project)}'\n")
else:
message += f"'{os.path.basename(project)}' is already complete\n"
_print_message(message, text_color_reset, log_file)
# loop to process all projects --------------------------------------------
for pp, project in enumerate(projects_to_run):
current_time = time.strftime("%b %d %Y, %H:%M:%S", time.localtime())
# Get number of instances in project and estimate their RAM consumption
root_NumCalc = os.path.join(project, 'NumCalc')
all_instances, instances_to_run, source_counter = \
_check_project(project, numcalc_path, log_file)
total_nr_to_run = instances_to_run.shape[0]
# Status printouts:
message = (f"Started '{os.path.basename(project)}' project "
f"({pp + 1}/{len(projects_to_run)}, {current_time})")
message = "\n" + message + "\n" + "-" * len(message) + "\n"
if total_nr_to_run:
message += (
f"Running {total_nr_to_run}/{len(all_instances)} unfinished "
"frequency steps in the project\n")
else:
message += (
"All NumCalc simulations in this project are complete")
_print_message(message, text_color_reset, log_file)
continue
_print_message(message, text_color_reset, log_file)
# sort instances according to RAM consumption (lowest first)
instances_to_run = instances_to_run[np.argsort(instances_to_run[:, 3])]
# check if available memory is enough for running the instance with the
# highest memory consumption without ever exceeding 100% of RAM.
if max_ram_load < instances_to_run[-1, 3] * ram_safety_factor:
# note: it IS possible to run simulations that use even more than
# 100% of available system RAM - only the performance will be poor.
_raise_error((
f"Stop - not sufficient free RAM for this simulation project: "
f"Available RAM is {round(max_ram_load, 2)} GB, but frequency"
f" step {int(instances_to_run[-1, 1])} of source "
f"{int(instances_to_run[-1, 0])} requires "
f"{round(instances_to_run[-1, 3] * ram_safety_factor, 2)} "
"GB."), text_color_red, log_file, confirm_errors)
# assure highest first if demanded
if starting_order != "low":
instances_to_run = np.flip(instances_to_run, axis=0)
# main loop for starting instances
started_instance = False # init
while instances_to_run.shape[0]:
ram_required = np.min(instances_to_run[:, 3]) * ram_safety_factor
# current time and resources
current_time = time.strftime(
"%b %d %Y, %H:%M:%S", time.localtime())
ram_available, ram_used = _get_current_ram(total_ram, max_ram_load)
cpu_load = psutil.cpu_percent(.1)
running_instances = _numcalc_instances()
# wait if
# - CPU usage too high
# - number of running instances is too large
# - not enough RAM available
if cpu_load > max_cpu_load \
or running_instances >= max_instances \
or ram_available < ram_required:
# print message (only done once between launching instances)
if started_instance:
_print_message(
(f"... waiting for resources and checking every "
f"second ({current_time})\n"
f"{running_instances} NumCalc instances running at "
f"{cpu_load:.2f}% CPU load\n"
f"{round(ram_available, 2)} GB RAM available "
f"({ram_used:.2f} GB used), "
f"{round(ram_required, 2)} GB required\n"),
text_color_reset, log_file)
started_instance = False
# wait and continue
time.sleep(wait_time_busy)
continue
# find frequency step with the highest possible RAM consumption
idx = -1
for i, ram_required in enumerate(instances_to_run[:, 3]):
idx = i
if ram_required <= ram_available:
break
# start new NumCalc instance
source = int(instances_to_run[idx, 0])
step = int(instances_to_run[idx, 1])
frequency = float(instances_to_run[idx, 2])
ram = float(instances_to_run[idx, 3])
progress = total_nr_to_run - instances_to_run.shape[0] + 1
message = (
f"{progress}/{total_nr_to_run} starting instance from "
f"'{os.path.basename(project)}' ({current_time})\n"
f"source {source}, step {step}, {frequency} Hz\n"
f"estimated {ram:.2f} GB RAM of available {ram_available:.2f} "
"GB required\n")
_print_message(message, text_color_reset, log_file)
# new working directory
cwd = os.path.join(root_NumCalc, "source_" + str(source))
if os.name == 'nt': # Windows detected
# create a log file for all print-outs
LogFileHandle = open(
os.path.join(cwd, "NC{step}-{step}_log.txt"), "w")
# run NumCalc and route all printouts to a log file
subprocess.Popen(
f"{numcalc_path} -istart {step} -iend {step}",
stdout=LogFileHandle, cwd=cwd)
else: # elif os.name == 'posix': Linux or Mac detected
# run NumCalc and route all printouts to a log file
subprocess.Popen((
f"{numcalc_path} -istart {step} -iend {step}"
f" >NC{step}-{step}_log.txt"), shell=True, cwd=cwd)
# prepare instances for next loop
instances_to_run = np.delete(instances_to_run, idx, 0)
if starting_order == "alternate":
instances_to_run = np.flip(instances_to_run, axis=0)
started_instance = True
time.sleep(wait_time) # long wait to initialize RAM
# END of per project loop --------------------------------------------
# END of all projects loop -----------------------------------------------
# wait for last NumCalc instances to finish
current_time = time.strftime("%b %d %Y, %H:%M:%S", time.localtime())
message = (f"\n... waiting for the last NumCalc instances to finish "
f"(checking every second, {current_time})")
_print_message(message, text_color_reset, log_file)
while True:
if _numcalc_instances() == 0:
break
time.sleep(wait_time_busy)
# Check all projects that may need to be executed -------------------------
current_time = time.strftime("%b %d %Y, %H:%M:%S", time.localtime())
message = ("\nThe following instances did not finish\n"
"--------------------------------------\n")
for project in all_projects:
all_instances, instances_to_run, _ = _check_project(
project, numcalc_path, log_file)
if instances_to_run is None:
continue
if instances_to_run.shape[0] > 0:
message += f"'{os.path.basename(project)}': "
unfinished = [f"source {int(p[0])} step {int(p[1])}"
for p in instances_to_run]
message += "; ".join(unfinished) + "\n"
if message.count("\n") > 3:
message += f"Finished at {current_time}"
_raise_error(message, text_color_reset, log_file, confirm_errors)
else:
message = f"\nAll NumCalc projects finished at {current_time}"
_print_message(message, text_color_reset, log_file)
if confirm_errors:
input(text_color_green + 'DONE. Hit Enter to exit')
print(text_color_reset)
def _raise_error(message, text_color, log_file, confirm_errors):
"""Two different ways of error handling depending on `confirm_errors`."""
# error to logfile
with open(log_file, "a", encoding="utf8", newline="\n") as f:
f.write("\n\n" + message + "\n")
# error to console
if confirm_errors:
if os.name == 'nt': # Windows detected
print(message)
input("Press Enter to exit manage_numcalc")
else: # elif os.name == 'posix': Linux or Mac detected
print(text_color + message)
input(text_color + "Press Enter to exit manage_numcalc\033[0m")
raise Exception("manage_numcalc was stopped due to an error")
else:
raise ValueError(message)
def _print_message(message, text_color, log_file):
"""Print message to console and log file."""
if os.name == 'nt': # Windows detected
text_color = '' # color codes do not work as intended on Win10
print(text_color + message)
with open(log_file, "a", encoding="utf8", newline="\n") as f:
f.write(message + "\n")
def _get_current_ram(total_ram, max_ram_load):
"""
Get the available based on currently available RAM, total RAM, and allowed
RAM load.
"""
ram_info = psutil.virtual_memory()
ram_free = ram_info.available / 1073741824
ram_used = total_ram - ram_free
ram_available = max([0, max_ram_load - ram_used])
return ram_available, ram_used
def _numcalc_instances():
"""Return the number of currently running NumCalc instances."""
numcalc_executable = 'NumCalc' if os.name != 'nt' else 'NumCalc.exe'
num_instances = 0
for p in psutil.process_iter(['name', 'memory_info']):
if p.info['name'].endswith(numcalc_executable):
num_instances += 1
return num_instances
def _check_project(project, numcalc_executable, log_file):
"""
Find unfinished instances (frequency steps) in a NumCalc project folder.
Parameters
----------
project : str
Full path of the NumCalc project folder
numcalc_executable : str
Full path to the NumCalc executable
log_file : str
Full path to the log file
Returns
-------
all_instances : numpy array
Array of shape (N, 4) where N is the number of detected frequency
steps in all source_* folders in the project. The first column contains
the source number, the second the frequency step, the third the
frequency in Hz, and the fourth the estimated RAM consumption in GB.
instances_to_run : numpy array, None
Array of size (M, 4) if any instances need to be run (in this case M
gives the unfinished instances). ``None``, if all instances are
finished.
source_counter : int
Number of sources in the project
"""
# get source folders and number of sources
sources = glob.glob(os.path.join(project, 'NumCalc', "source_*"))
source_counter = len(sources)
sources = [os.path.join(project, 'NumCalc', f"source_{s+1}")
for s in range(source_counter)]
# loop source_* folders
for source_id, ff in enumerate(sources):
# estimate RAM consumption if required
if not os.path.isfile(os.path.join(ff, "Memory.txt")):
_print_message(f"Obtaining RAM estimates for {ff}",
'\033[0m', log_file)
if os.name == 'nt': # Windows detected
# run NumCalc and route all printouts to a log file
subprocess.run(
f"{numcalc_executable} -estimate_ram",
stdout=subprocess.DEVNULL, cwd=ff, check=True)
else: # elif os.name == 'posix': Linux or Mac detected
# run NumCalc and route all printouts to a log file
subprocess.run(
[f"{numcalc_executable} -estimate_ram"],
shell=True, stdout=subprocess.DEVNULL, cwd=ff, check=True)
# get RAM estimates and prepend source number
estimates = read_ram_estimates(ff)
estimates = np.concatenate(
((source_id + 1) * np.ones((estimates.shape[0], 1)), estimates),
axis=1)
if source_id == 0:
all_instances = estimates
instances_to_run = None
else:
all_instances = np.append(all_instances, estimates, axis=0)
# loop frequency steps
for step in range(estimates.shape[0]):
if not os.path.isfile(os.path.join(
ff, "be.out", f"be.{1 + step}", "pEvalGrid")):
# there are no output files, process this
if instances_to_run is None:
instances_to_run = np.atleast_2d(estimates[step])
else:
instances_to_run = np.append(
instances_to_run, np.atleast_2d(estimates[step]),
axis=0)
elif os.path.isfile(os.path.join(
ff, f'NC{1 + step}-{1 + step}.out')):
# check if "NCx-x.out" contains "End time:" to confirm that
# the simulation was completed.
nc_out = os.path.join(
ff, f'NC{1 + step}-{1 + step}.out')
with open(nc_out, "r", encoding="utf8", newline="\n") as f:
nc_out = "".join(f.readlines())
if 'End time:' not in nc_out:
# instance did not finish
if instances_to_run is None:
instances_to_run = np.atleast_2d(estimates[step])
else:
instances_to_run = np.append(
instances_to_run, np.atleast_2d(estimates[step]),
axis=0)
return all_instances, instances_to_run, source_counter
[docs]
def read_ram_estimates(folder: str):
"""
Read estimated RAM consumption from Memory.txt.
Note that the RAM consumption per frequency step can be estimated and
written to `Memory.txt` by calling `NumCalc -estimate_ram`. This must
be done before calling this function.
Parameters
----------
folder : str
full path to the source folder containing the `Memory.txt` file from
which the estimates are read
Returns
-------
estimates : numpy array
An array of shape `(N, 3)` where `N` is the number of frequency
steps. The first column contains the frequency step, the second the
frequency in Hz, and the third the estimated RAM consumption in GB.
"""
# check if file exists
if not os.path.isfile(os.path.join(folder, "Memory.txt")):
raise ValueError(f"{folder} does not contain a Memory.txt file")
# read content of file
with open(os.path.join(folder, "Memory.txt"), "r") as ff:
content = ff.readlines()
# parse data to nested list
estimates = []
for line in content:
estimate = []
for ee in line.strip().split(" "):
estimate.append(float(ee))
estimates.append(estimate)
return np.asarray(estimates)
[docs]
def calc_and_read_ram(project_path, numcalc_executable):
"""Calculate if not exists and returns the memory usage.
Parameters
----------
project_path : str, path
project root path.
numcalc_executable : str, path
Path to numcalc executable, on Windows it ends with ``NumCalc.exe``
and on Unix system ``NumCalc``
Returns
-------
ram : numpy.ndarray
with shape (N, 3), where the first row definitions are as follows:
- id of the frequency
- frequency itself
- expected RAM usage in GB
"""
if not os.path.isdir(project_path):
raise ValueError(f'No such directory {project_path}')
path = os.path.join(project_path, 'NumCalc', 'source_1')
if not os.path.isfile(os.path.join(path, "Memory.txt")):
if os.name == 'nt': # Windows detected
# run NumCalc and route all printouts to a log file
subprocess.run(
f"{numcalc_executable} -estimate_ram",
stdout=subprocess.DEVNULL, cwd=path, check=True)
else: # elif os.name == 'posix': Linux or Mac detected
# run NumCalc and route all printouts to a log file
subprocess.run(
[f"{numcalc_executable} -estimate_ram"],
shell=True, stdout=subprocess.DEVNULL,
cwd=path, check=True)
ram = read_ram_estimates(path)
return ram