Source code for mesh2scattering.output.output

"""Write output for NumCalc."""

import os
import warnings
import json
import numpy as np
import pyfar as pf
import glob
import sofar as sf
import csv
import re


[docs] def write_pressure(folder): """ Process NumCalc output and write data to disk. Processing the data is done in the following steps 1. Read project parameter from `parameters.json` 2. use :py:func:`~write_output_report` to parse files in project_folder/NumCalc/source_*/NC*.out, write project report to project_folder/report/report_source_*.csv. Raise a warning if any issues were detected and write report_issues.txt to the same folder 3. Read simulated pressures from project_folder/NumCalc/source_*/be.out. This and the following steps are done, even if an issue was detected in the previous step 4. save the sound pressure for each evaluation grid to save the a SOFA file Parameters ---------- folder : str, optional The path of the NumCalc project folder, i.e., the folder containing the subfolders EvaluationsGrids, NumCalc, and ObjectMeshes. """ if not os.path.exists(folder): raise ValueError( 'Folder need to exists.') # write the project report and check for issues print('\n Writing the project report ...') found_issues, report = write_output_report(folder) if found_issues: warnings.warn(report, stacklevel=2) # read sample data evaluationGrids, params = _read_numcalc( folder) # process BEM data for writing HRTFs and HRIRs to SOFA files for i_grid, grid in enumerate(evaluationGrids): # get pressure as SOFA object (all following steps are run on SOFA # objects. This way they are available to other users as well) source_position = np.array(params['sources']) source_type = params['source_type'] if source_type == 'Plane wave': source_position = -source_position*99 receiver_position = np.array(evaluationGrids[grid]['nodes'][:, 1:4]) # apply symmetry of reference sample data = pf.FrequencyData( evaluationGrids[grid]['pressure'], params['frequencies']) receiver_coords = _cart_coordinates(receiver_position) source_coords = _cart_coordinates(source_position) source_coords.weights = params['sources_weights'] receiver_coords.weights = np.array( params['evaluation_grids_weights'][i_grid]) # write data c = float(params['speed_of_sound']) Lbyl = params['structural_wavelength'] / c * data.frequencies sofa = _create_pressure_sofa( data, Lbyl, source_coords, receiver_coords, structural_wavelength=params['structural_wavelength'], structural_wavelength_x=params['structural_wavelength_x'], structural_wavelength_y=params['structural_wavelength_y'], sample_diameter=params['sample_diameter'], speed_of_sound=params['speed_of_sound'], density_of_medium=params['density_of_medium'], mesh2scattering_version=params['mesh2scattering_version'], model_scale=params['model_scale'], symmetry_azimuth=params['symmetry_azimuth'], symmetry_rotational=params['symmetry_rotational'], ) sofa.GLOBAL_Title = folder.split(os.sep)[-1] file_path = os.path.join( folder, '..', f'{params["project_title"]}_{grid}.pressure.sofa') # write scattered sound pressure to SOFA file sf.write_sofa(file_path, sofa)
def _create_pressure_sofa( data, Lbyl, sources, receivers, structural_wavelength, structural_wavelength_x, structural_wavelength_y, sample_diameter, speed_of_sound, density_of_medium,mesh2scattering_version, model_scale=1, symmetry_azimuth=None, symmetry_rotational=False, ): """Write complex pressure data to SOFA object from NumCalc simulation. Parameters ---------- data : :py:class:`~pyfar.classes.audio.FrequencyData` the complex pressure data. Lbyl : numpy.ndarray structural wavelength over wavelength of sound. sources : :py:class:`~pyfar.classes.coordinates.Coordinates` source positions in cartesian coordinates. receivers : :py:class:`~pyfar.classes.coordinates.Coordinates` Receivers positions in cartesian coordinates. structural_wavelength : float structural wavelength in main direction (x) in meters. structural_wavelength_x : float structural wavelength in x direction in meters. structural_wavelength_y : float structural wavelength in y direction in meters. sample_diameter : float diameter of the sample in meters. speed_of_sound : float speed of sound in m/s density_of_medium : float density of the medium in kg/m^3. mesh2scattering_version : str version string of mesh2scattering model_scale : int, optional scale of the surface, by default 1. symmetry_azimuth : list, optional the azimuth angles in degree, where the symmetry axes are, by default []. symmetry_rotational : bool, optional Whether the surface is rotational symmetric, by default False Returns ------- sf.Sofa sofa file. """ # create empty SOFA object convention = 'GeneralTF' if type( data) is pf.FrequencyData else 'GeneralFIR' assert speed_of_sound is not None assert speed_of_sound > 250 assert speed_of_sound < 350 sofa = sf.Sofa(convention) # write meta data sofa.GLOBAL_ApplicationName = 'Mesh2scattering' sofa.GLOBAL_ApplicationVersion = mesh2scattering_version sofa.GLOBAL_History = 'numerically simulated data' # Source and receiver data sofa.EmitterPosition = sources.cartesian sofa.EmitterPosition_Units = 'meter' sofa.EmitterPosition_Type = 'cartesian' sources_sph = sources.spherical_elevation sources_sph = pf.rad2deg(sources_sph) sofa.SourcePosition = sources_sph sofa.SourcePosition_Units = 'degree, degree, metre' sofa.SourcePosition_Type = 'spherical' sofa.ReceiverPosition = receivers.cartesian sofa.ReceiverPosition_Units = 'meter' sofa.ReceiverPosition_Type = 'cartesian' if type(data) is pf.FrequencyData: Lbyl = np.array(Lbyl) if structural_wavelength == 0: f = Lbyl else: f = Lbyl/structural_wavelength*speed_of_sound sofa.N = data.frequencies sofa.add_variable( 'OriginalFrequencies', f, 'double', 'N') sofa.add_variable( 'RealScaleFrequencies', f/model_scale, 'double', 'N') sofa.add_variable( 'Lbyl', Lbyl, 'double', 'N') # HRTF/HRIR data if data.cshape[0] != sources.csize: data.freq = np.swapaxes(data.freq, 0, 1) sofa.Data_Real = np.real(data.freq) sofa.Data_Imag = np.imag(data.freq) else: sofa.Data_IR = data.time sofa.Data_SamplingRate = data.sampling_rate sofa.Data_Delay = np.zeros((1, receivers.csize)) sofa.add_variable( 'SampleStructuralWavelength', structural_wavelength, 'double', 'I') sofa.add_variable( 'SampleStructuralWavelengthX', structural_wavelength_x, 'double', 'I') sofa.add_variable( 'SampleStructuralWavelengthY', structural_wavelength_y, 'double', 'I') sofa.add_variable( 'SampleModelScale', model_scale, 'double', 'I') sofa.add_variable( 'SampleDiameter', sample_diameter, 'double', 'I') sofa.add_variable( 'SpeedOfSound', speed_of_sound, 'double', 'I') sofa.add_variable( 'DensityOfMedium', density_of_medium, 'double', 'I') sofa.add_variable( 'ReceiverWeights', receivers.weights, 'double', 'R') sofa.add_variable( 'SourceWeights', sources.weights, 'double', 'E') if symmetry_azimuth is None: symmetry_azimuth_str = '' else: symmetry_azimuth_str = ','.join([f'{a}' for a in symmetry_azimuth]) sofa.add_variable( 'SampleSymmetryAzimuth', symmetry_azimuth_str, 'string', 'S') sofa.add_variable( 'SampleSymmetryRotational', 1 if symmetry_rotational else 0, 'double', 'I') return sofa def _cart_coordinates(xyz): return pf.Coordinates(xyz[:, 0], xyz[:, 1], xyz[:, 2]) def _read_numcalc(folder=None): """ Process NumCalc output and write data to disk. Processing the data is done in the following steps 1. Read project parameter `from parameters.json` 2. use :py:func:`~write_output_report` to parse files in project_folder/NumCalc/source_*/NC*.out, write project report to project_folder/report/report_source_*.csv. Raise a warning if any issues were detected and write report_issues.txt to the same folder 3. Read simulated pressures from project_folder/NumCalc/source_*/be.out. This and the following steps are done, even if an issue was detected in the previous step 4. write the reflected sound pressure to sofa file. Parameters ---------- folder : str, optional The path of the NumCalc project folder, i.e., the folder containing the subfolders EvaluationsGrids, NumCalc, and ObjectMeshes. The default, ``None`` uses the current working directory. """ # check input if folder is None: folder = os.getcwd() # check and load parameters, required parameters are: # mesh2scattering_version, reference, computeHRIRs, speedOfSound, # densityOfAir, # sources_num, sourceType, sources, sourceArea, # num_frequencies, frequencies params = os.path.join(folder, 'parameters.json') if not os.path.isfile(params): raise ValueError(( f"The folder {folder} is not a valid Mesh2scattering project. " "It must contain the file 'parameters.json'")) with open(params, "r") as file: params = json.load(file) # get source positions source_coords = pf.Coordinates() source_coords.cartesian = np.array(params['sources']) # get the evaluation grids evaluationGrids, _ = _read_nodes_and_elements( os.path.join(folder, 'EvaluationGrids')) # Load EvaluationGrid data num_sources = source_coords.csize if not len(evaluationGrids) == 0: pressure, _ = _read_numcalc_data( num_sources, len(params["frequencies"]), folder, 'pEvalGrid') # save to struct cnt = 0 for grid in evaluationGrids: evaluationGrids[grid]["pressure"] = pressure[ cnt:cnt+evaluationGrids[grid]["num_nodes"], :, :] cnt = cnt + evaluationGrids[grid]["num_nodes"] receiver_coords = evaluationGrids[grid]["nodes"][:, 1:4] receiver_coords = pf.Coordinates( receiver_coords[..., 0], receiver_coords[..., 1], receiver_coords[..., 2]) return evaluationGrids, params
[docs] def write_output_report(folder=None): r""" Generate project report from NumCalc output files. NumCalc (mesh2scattering's numerical core) writes information about the simulations to the files `NC*.out` located under `NumCalc/source_*`. The file `NC.out` exists if NumCalc was ran without the additional command line parameters ``-istart`` and ``-iend``. If these parameters were used, there is at least one `NC\*-\*.out`. If this is the case, information from `NC\*-\*.out` overwrites information from NC.out in the project report. .. note:: The project reports are written to the files `report/report_source_*.csv`. If issues were detected, they are listed in `report/report_issues.csv`. The report contain the following information Frequency step The index of the frequency. Frequency in Hz The frequency in Hz. NC input Name of the input file from which the information was taken. Input check passed Contains a 1 if the check of the input data passed and a 0 otherwise. If the check failed for one frequency, the following frequencies might be affected as well. Converged Contains a 1 if the simulation converged and a 0 otherwise. If the simulation did not converge, the relative error might be high. Num. iterations The number of iterations that were required to converge relative error The relative error of the final simulation Comp. time total The total computation time in seconds Comp. time assembling The computation time for assembling the matrices in seconds Comp. time solving The computation time for solving the matrices in seconds Comp. time post-proc The computation time for post-processing the results in seconds Parameters ---------- folder : str, optional The path of the NumCalc project folder, i.e., the folder containing the subfolders EvaluationsGrids, NumCalc, and ObjectMeshes. The default, ``None`` uses the current working directory. Returns ------- found_issues : bool ``True`` if issues were found, ``False`` otherwise report : str The report or an empty string if no issues were found """ if folder is None: folder = os.getcwd() if folder is None: folder = os.getcwd() # get sources and number of sources and frequencies sources = glob.glob(os.path.join(folder, "NumCalc", "source_*")) num_sources = len(sources) with open(os.path.join(folder, "parameters.json"), "r") as file: params = json.load(file) # sort source files (not read in correct order in some cases) nums = [int(source.split("_")[-1]) for source in sources] sources = np.array(sources) sources = sources[np.argsort(nums)] # parse all NC*.out files for all sources all_files, fundamentals, out, out_names = _parse_nc_out_files( sources, num_sources, len(params["frequencies"])) # write report as csv file _write_project_reports(folder, all_files, out, out_names) # look for errors report = _check_project_report(folder, fundamentals, out) found_issues = True if report else False return found_issues, report
def _read_nodes_and_elements(folder, objects=None): """ Read the nodes and elements of the evaluation grids or object meshes. Parameters ---------- folder : str Folder containing the object. Must end with EvaluationGrids or Object Meshes objects : str, options Name of the object. The default ``None`` reads all objects in folder Returns ------- grids : dict One item per object (with the item name being the object name). Each item has the sub-items `nodes`, `elements`, `num_nodes`, `num_elements` gridsNumNodes : int Number of nodes in all grids """ # check input if os.path.basename(folder) not in ['EvaluationGrids', 'ObjectMeshes']: raise ValueError('folder must be EvaluationGrids or ObjectMeshes!') if objects is None: objects = os.listdir(folder) # discard hidden folders that might occur on Mac OS objects = [o for o in objects if not o.startswith('.')] elif isinstance(objects, str): objects = [objects] grids = {} gridsNumNodes = 0 for grid in objects: tmpNodes = np.loadtxt(os.path.join( folder, grid, 'Nodes.txt'), delimiter=' ', skiprows=1, dtype=np.float64) tmpElements = np.loadtxt(os.path.join( folder, grid, 'Elements.txt'), delimiter=' ', skiprows=1, dtype=np.float64) grids[grid] = { "nodes": tmpNodes, "elements": tmpElements, "num_nodes": tmpNodes.shape[0], "num_elements": tmpElements.shape[0]} gridsNumNodes += grids[grid]['num_nodes'] return grids, gridsNumNodes def _read_numcalc_data(n_sources, n_frequencies, folder, data): """Read the sound pressure on the object meshes or evaluation grid.""" pressure = [] if data not in ['pBoundary', 'pEvalGrid', 'vBoundary', 'vEvalGrid']: raise ValueError( 'data must be pBoundary, pEvalGrid, vBoundary, or vEvalGrid') for source in range(n_sources): tmpFilename = os.path.join( folder, 'NumCalc', f'source_{source+1}', 'be.out') tmpPressure, indices = _load_results( tmpFilename, data, n_frequencies) pressure.append(tmpPressure) pressure = np.transpose(np.array(pressure), (2, 0, 1)) return pressure, indices def _load_results(foldername, filename, num_frequencies): """ Load results of the BEM calculation. Parameters ---------- foldername : string The folder from which the data is loaded. The data to be read is located in the folder be.out inside NumCalc/source_* filename : string The kind of data that is loaded pBoundary The sound pressure on the object mesh vBoundary The sound velocity on the object mesh pEvalGrid The sound pressure on the evaluation grid vEvalGrid The sound velocity on the evaluation grid num_frequencies : int the number of simulated frequencies Returns ------- data : numpy array Pressure or abs velocity values of shape (num_frequencies, numEntries) """ # ---------------------check number of header and data lines--------------- current_file = os.path.join(foldername, 'be.1', filename) numDatalines = None with open(current_file) as file: line = csv.reader(file, delimiter=' ', skipinitialspace=True) for _idx, li in enumerate(line): # read number of data points and head lines if len(li) == 2 and not ( li[0].startswith("mesh2scattering") or li[0].startswith( "Mesh2HRTF")): numDatalines = int(li[1]) # read starting index elif numDatalines and len(li) > 2: start_index = int(li[0]) break if numDatalines is None: raise ValueError( f'{current_file} is empty!') # ------------------------------load data---------------------------------- dtype = complex if filename.startswith("p") else float data = np.zeros((num_frequencies, numDatalines), dtype=dtype) for ii in range(num_frequencies): tmpData = [] current_file = os.path.join(foldername, 'be.%d' % (ii+1), filename) with open(current_file) as file: line = csv.reader(file, delimiter=' ', skipinitialspace=True) for li in line: # data lines have 3 ore more entries if len(li) < 3 or li[0].startswith("Mesh"): continue if filename.startswith("p"): tmpData.append(complex(float(li[1]), float(li[2]))) elif filename == "vBoundary": tmpData.append(np.abs(complex(float(li[1]), float(li[2])))) elif filename == "vEvalGrid": tmpData.append(np.sqrt( np.abs(complex(float(li[1]), float(li[2])))**2 + np.abs(complex(float(li[3]), float(li[4])))**2 + np.abs(complex(float(li[5]), float(li[6])))**2)) data[ii, :] = tmpData if tmpData else np.nan return data, np.arange(start_index, numDatalines + start_index) def _check_project_report(folder, fundamentals, out): # return if there are no fundamental errors or other issues if not all(all(f) for f in fundamentals) and not np.any(out == -1) \ and np.all(out[:, 3:5]): return # report detailed errors report = "" for ss in range(out.shape[2]): # currently we detect frequencies that were not calculated and # frequencies with convergence issues missing = "Frequency steps that were not calculated:\n" input_test = "Frequency steps with bad input:\n" convergence = "Frequency steps that did not converge:\n" any_missing = False any_input_failed = False any_convergence = False # loop frequencies for ff in range(out.shape[0]): f = out[ff, :, ss] # no value for frequency if f[1] == -1: any_missing = True missing += f"{int(f[0])}, " continue # input data failed if f[3] == 0: any_input_failed = True input_test += f"{int(f[0])}, " # convergence value is zero if f[4] == 0: any_convergence = True convergence += f"{int(f[0])}, " if any_missing or any_input_failed or any_convergence: report += f"Detected issues for source {ss+1}\n" report += "----------------------------\n" if any_missing: report += missing[:-2] + "\n\n" if any_input_failed: report += input_test[:-2] + "\n\n" if any_convergence: report += convergence[:-2] + "\n\n" if not report: report = ("\nDetected unknown issues\n" "-----------------------\n" "Check the project reports in report,\n" "and the NC*.out files in NumCalc/source_*\n\n") report += ("For more information check report/report_source_*.csv " "and the NC*.out files located at NumCalc/source_*") # write to disk report_name = os.path.join( folder, "report", "report_issues.txt") with open(report_name, "w") as f_id: f_id.write(report) return report def _write_project_reports(folder, all_files, out, out_names): """ Write project report to disk at folder/report/report_source_*.csv. For description of input parameter refer to write_output_report and _parse_nc_out_files """ if not os.path.exists(os.path.join(folder, 'report')): os.mkdir(os.path.join(folder, 'report')) # loop sources for ss in range(out.shape[2]): report = ", ".join(out_names) + "\n" # loop frequencies for ff in range(out.shape[0]): f = out[ff, :, ss] report += ( f"{int(f[0])}, " # frequency step f"{float(f[1])}, " # frequency in Hz f"{all_files[ss][int(f[2])]}," # NC*.out file f"{int(f[3])}, " # input check f"{int(f[4])}, " # convergence f"{int(f[5])}, " # number of iterations f"{float(f[6])}, " # relative error f"{int(f[7])}, " # total computation time f"{int(f[8])}, " # assembling equations time f"{int(f[9])}, " # solving equations time f"{int(f[10])}\n" # post-processing time ) # write to disk report_name = os.path.join( folder, "report", f"report_source_{ss + 1}.csv") with open(report_name, "w") as f_id: f_id.write(report) def _parse_nc_out_files(sources, num_sources, num_frequencies): """ Parse all NC*.out files for all sources. This function should never raise a value error, regardless of how mess NC*.out files are. Looking for error is done at a later step. Parameters ---------- sources : list of strings full path to the source folders num_sources : int number of sources - len(num_sources) num_frequencies : int number of frequency steps Returns ------- out : numpy array containing the extracted information for each frequency step out_names : list of string verbal information about the columns of `out` """ # array for reporting fundamental errors fundamentals = [] all_files = [] # array for saving the detailed report out_names = ["frequency step", # 0 "frequency in Hz", # 1 "NC input file", # 2 "Input check passed", # 3 "Converged", # 4 "Num. iterations", # 5 "relative error", # 6 "Comp. time total", # 7 "Comp. time assembling", # 8 "Comp. time solving", # 9 "Comp. time post-proc."] # 10 out = -np.ones((num_frequencies, 11, num_sources)) # values for steps out[:, 0] = np.arange(1, num_frequencies + 1)[..., np.newaxis] # values indicating failed input check and non-convergence out[:, 3] = 0 out[:, 4] = 0 # regular expression for finding a number that can be int or float re_number = r"(\d+(?:\.\d+)?)" # loop sources for ss, source in enumerate(sources): # list of NC*.out files for parsing files = glob.glob(os.path.join(source, "NC*.out")) # make sure that NC.out is first nc_out = os.path.join(source, "NC.out") if nc_out in files and files.index(nc_out): files = [files.pop(files.index(nc_out))] + files # update fundamentals fundamentals.append([0 for f in range(len(files))]) all_files.append([os.path.basename(f) for f in files]) # get content from all NC*.out for ff, file in enumerate(files): # read the file and join all lines with open(file, "r") as f_id: lines = f_id.readlines() lines = "".join(lines) # split header and steps lines = lines.split( ">> S T E P N U M B E R A N D F R E Q U E N C Y <<") # look for fundamental errors if len(lines) == 1: fundamentals[ss][ff] = 1 continue # parse frequencies (skip header) for line in lines[1:]: # find frequency step idx = re.search(r'Step \d+,', line) if idx: step = int(line[idx.start()+5:idx.end()-1]) # write number of input file (replaced by string later) out[step-1, 2, ss] = ff # find frequency idx = re.search(f'Frequency = {re_number} Hz', line) if idx: out[step-1, 1, ss] = float( line[idx.start()+12:idx.end()-3]) # check if the input data was ok if "Too many integral points in the theta" not in line: out[step-1, 3, ss] = 1 # check and write convergence if 'Maximum number of iterations is reached!' not in line: out[step-1, 4, ss] = 1 # check iterations idx = re.search(r'number of iterations = \d+,', line) if idx: out[step-1, 5, ss] = int(line[idx.start()+23:idx.end()-1]) # check relative error idx = re.search('relative error = .+', line) if idx: out[step-1, 6, ss] = float(line[idx.start()+17:idx.end()]) # check time stats # -- assembling idx = re.search( r'Assembling the equation system : \d+', line) if idx: out[step-1, 8, ss] = float(line[idx.start()+41:idx.end()]) # -- solving idx = re.search( r'Solving the equation system : \d+', line) if idx: out[step-1, 9, ss] = float(line[idx.start()+41:idx.end()]) # -- post-pro idx = re.search( r'Post processing : \d+', line) if idx: out[step-1, 10, ss] = float(line[idx.start()+41:idx.end()]) # -- total idx = re.search( r'Total : \d+', line) if idx: out[step-1, 7, ss] = float(line[idx.start()+41:idx.end()]) return all_files, fundamentals, out, out_names def read_evaluation_grid(name): """ Read NumCalc evaluation grid. Parameters ---------- name : str Name of the folder containing the nodes of the evaluation grid in Nodes.txt show : bool, optional If ``True`` the points of the evaluation grid are plotted. The default is ``False``. Returns ------- coordinates : pyfar Coordinates The points of the evaluation grid as a pyfar Coordinates object """ # check if the grid exists if not os.path.isfile(os.path.join(name, "Nodes.txt")): raise ValueError(f"{os.path.join(name, 'Nodes.txt')} does not exist") # read the nodes with open(os.path.join(name, "Nodes.txt"), "r") as f_id: nodes = f_id.readlines() # get number of nodes N = int(nodes[0].strip()) points = np.zeros((N, 3)) # get points (first entry is node number) for nn in range(N): node = nodes[nn+1].strip().split(" ") points[nn, 0] = float(node[1]) points[nn, 1] = float(node[2]) points[nn, 2] = float(node[3]) # make coordinates object coordinates = pf.Coordinates(points[:, 0], points[:, 1], points[:, 2]) return coordinates