diff --git a/ogstools/studies/log_parser/common_ogs_analyses.py b/ogstools/studies/log_parser/common_ogs_analyses.py index a4a4159c252ff82ad5a035394c036848b59702e5..227860ee63d4da24fb59234dad0e1c0f90f1275b 100644 --- a/ogstools/studies/log_parser/common_ogs_analyses.py +++ b/ogstools/studies/log_parser/common_ogs_analyses.py @@ -9,7 +9,7 @@ import pandas as pd # Helper functions -def check_input(df, interest, context): +def check_input(df: pd.DataFrame, interest: list[str], context: list[str]): diff = set(interest) - set(df.columns) if diff: msg = "Column(s) of interest ({}) is/are not present in table".format( @@ -25,7 +25,7 @@ def check_input(df, interest, context): ) -def check_output(pt, interest, context): +def check_output(pt: pd.DataFrame, interest: list[str], context: list[str]): if pt.empty: msg = "The values of {} are not associated to all of {}. Call or see fill_ogs_context".format( ",".join(interest), ",".join(context) @@ -34,7 +34,7 @@ def check_output(pt, interest, context): # decorator for analyses -def pre_post_check(interest, context): +def pre_post_check(interest: list[str], context: list[str]): def wrap(f): def wrapped_f(df): check_input(df, interest, context) @@ -54,7 +54,7 @@ Time from iteration are accumulated. """ -def analysis_time_step(df): +def analysis_time_step(df: pd.DataFrame): interest1 = ["output_time", "time_step_solution_time", "step_size"] interest2 = ["assembly_time", "linear_solver_time", "dirichlet_time"] interest = [*interest1, *interest2] @@ -70,7 +70,7 @@ def analysis_time_step(df): return dfe -def analysis_simulation(df): +def analysis_simulation(df: pd.DataFrame): interest = ["execution_time"] # 'start_time' context = ["mpi_process"] check_input(df, interest, context) @@ -80,7 +80,7 @@ def analysis_simulation(df): return pt -def analysis_convergence_newton_iteration(df): +def analysis_convergence_newton_iteration(df: pd.DataFrame): dfe_newton_iteration = df.copy() interest = ["dx", "x", "dx_x"] if "coupling_iteration" in df: @@ -123,7 +123,7 @@ def analysis_convergence_newton_iteration(df): interest=["dx", "x", "dx_x"], context=["time_step", "coupling_iteration", "coupling_iteration_process"], ) -def analysis_convergence_coupling_iteration(df): +def analysis_convergence_coupling_iteration(df: pd.DataFrame): # Coupling iteration column will be modified specific for coupling iteration analysis, modified data can not be used for other analysis ->copy! dfe_convergence_coupling_iteration = df.copy() interest = ["dx", "x", "dx_x"] @@ -151,7 +151,7 @@ def analysis_convergence_coupling_iteration(df): return pt -def time_step_vs_iterations(df): +def time_step_vs_iterations(df: pd.DataFrame): interest = ["iteration_number"] context = ["time_step"] check_input(df, interest, context) @@ -160,7 +160,7 @@ def time_step_vs_iterations(df): return pt -def analysis_simulation_termination(df): +def analysis_simulation_termination(df: pd.DataFrame): # For full print of messages consider setup jupyter notebook: # pd.set_option('display.max_colwidth', None) interest = ["message"] @@ -175,7 +175,7 @@ def analysis_simulation_termination(df): return pd.DataFrame() -def fill_ogs_context(df): +def fill_ogs_context(df: pd.DataFrame): # Some columns that contain actual integer values are converted to float # See https://pandas.pydata.org/pandas-docs/stable/user_guide/integer_na.html # ToDo list of columns with integer values are known from regular expression diff --git a/ogstools/studies/log_parser/log_parser.py b/ogstools/studies/log_parser/log_parser.py index ef7c37d561bfe197bd09c27c4df8b2510abb49f7..258a8f4a7ef35b6aee662fd69acfa411f3c4fd59 100644 --- a/ogstools/studies/log_parser/log_parser.py +++ b/ogstools/studies/log_parser/log_parser.py @@ -5,6 +5,7 @@ import re from pathlib import Path +from typing import Any, Optional, Union from ogs6py.ogs_regexes.ogs_regexes import ogs_regexes @@ -48,9 +49,10 @@ def try_match_serial_line( return None -def mpi_processes(file_name): +def mpi_processes(file_name: Union[str, Path]) -> int: occurrences = 0 - file_name = Path(file_name) + if isinstance(file_name, str): + file_name = Path(file_name) with file_name.open() as file: lines = iter(file) # There is no synchronisation barrier between both info, we count both and divide @@ -62,7 +64,28 @@ def mpi_processes(file_name): return int(occurrences / 2) -def parse_file(file_name, maximum_lines=None, force_parallel=False): +def parse_file( + file_name: Union[str, Path], + maximum_lines: Optional[int] = None, + force_parallel: bool = False, +) -> list[Any]: + """ + Parses a log file from OGS, applying regex patterns to extract specific information, + + The function supports processing files in serial or parallel mode. In + parallel mode, a specific regex is used to match log entries from different + processes. + + :param file_name: The path to the log file, as a string or Path object. + :param maximum_lines: Optional maximum number of lines to read from the file. + If not provided, the whole file is read. + :param force_parallel: Should only be set to True if OGS run with MPI with a single core + :return: A list of extracted records based on the applied regex patterns. + The exact type and structure of these records depend on the regex + patterns and their associated processing functions. + """ + if isinstance(file_name, str): + file_name = Path(file_name) file_name = Path(file_name) ogs_res = ogs_regexes() parallel_log = force_parallel or mpi_processes(file_name) > 1