Skip to content
Snippets Groups Projects
Commit 8d0e89d2 authored by Tobias Meisel's avatar Tobias Meisel
Browse files

[logparser] Add type hints

parent 2e0e2a51
No related branches found
No related tags found
1 merge request!130Logparser
...@@ -9,7 +9,7 @@ import pandas as pd ...@@ -9,7 +9,7 @@ import pandas as pd
# Helper functions # Helper functions
def check_input(df, interest, context): def check_input(df: pd.DataFrame, interest: list[str], context: list[str]):
diff = set(interest) - set(df.columns) diff = set(interest) - set(df.columns)
if diff: if diff:
msg = "Column(s) of interest ({}) is/are not present in table".format( msg = "Column(s) of interest ({}) is/are not present in table".format(
...@@ -25,7 +25,7 @@ def check_input(df, interest, context): ...@@ -25,7 +25,7 @@ def check_input(df, interest, context):
) )
def check_output(pt, interest, context): def check_output(pt: pd.DataFrame, interest: list[str], context: list[str]):
if pt.empty: if pt.empty:
msg = "The values of {} are not associated to all of {}. Call or see fill_ogs_context".format( msg = "The values of {} are not associated to all of {}. Call or see fill_ogs_context".format(
",".join(interest), ",".join(context) ",".join(interest), ",".join(context)
...@@ -34,7 +34,7 @@ def check_output(pt, interest, context): ...@@ -34,7 +34,7 @@ def check_output(pt, interest, context):
# decorator for analyses # decorator for analyses
def pre_post_check(interest, context): def pre_post_check(interest: list[str], context: list[str]):
def wrap(f): def wrap(f):
def wrapped_f(df): def wrapped_f(df):
check_input(df, interest, context) check_input(df, interest, context)
...@@ -54,7 +54,7 @@ Time from iteration are accumulated. ...@@ -54,7 +54,7 @@ Time from iteration are accumulated.
""" """
def analysis_time_step(df): def analysis_time_step(df: pd.DataFrame):
interest1 = ["output_time", "time_step_solution_time", "step_size"] interest1 = ["output_time", "time_step_solution_time", "step_size"]
interest2 = ["assembly_time", "linear_solver_time", "dirichlet_time"] interest2 = ["assembly_time", "linear_solver_time", "dirichlet_time"]
interest = [*interest1, *interest2] interest = [*interest1, *interest2]
...@@ -70,7 +70,7 @@ def analysis_time_step(df): ...@@ -70,7 +70,7 @@ def analysis_time_step(df):
return dfe return dfe
def analysis_simulation(df): def analysis_simulation(df: pd.DataFrame):
interest = ["execution_time"] # 'start_time' interest = ["execution_time"] # 'start_time'
context = ["mpi_process"] context = ["mpi_process"]
check_input(df, interest, context) check_input(df, interest, context)
...@@ -80,7 +80,7 @@ def analysis_simulation(df): ...@@ -80,7 +80,7 @@ def analysis_simulation(df):
return pt return pt
def analysis_convergence_newton_iteration(df): def analysis_convergence_newton_iteration(df: pd.DataFrame):
dfe_newton_iteration = df.copy() dfe_newton_iteration = df.copy()
interest = ["dx", "x", "dx_x"] interest = ["dx", "x", "dx_x"]
if "coupling_iteration" in df: if "coupling_iteration" in df:
...@@ -123,7 +123,7 @@ def analysis_convergence_newton_iteration(df): ...@@ -123,7 +123,7 @@ def analysis_convergence_newton_iteration(df):
interest=["dx", "x", "dx_x"], interest=["dx", "x", "dx_x"],
context=["time_step", "coupling_iteration", "coupling_iteration_process"], context=["time_step", "coupling_iteration", "coupling_iteration_process"],
) )
def analysis_convergence_coupling_iteration(df): def analysis_convergence_coupling_iteration(df: pd.DataFrame):
# Coupling iteration column will be modified specific for coupling iteration analysis, modified data can not be used for other analysis ->copy! # Coupling iteration column will be modified specific for coupling iteration analysis, modified data can not be used for other analysis ->copy!
dfe_convergence_coupling_iteration = df.copy() dfe_convergence_coupling_iteration = df.copy()
interest = ["dx", "x", "dx_x"] interest = ["dx", "x", "dx_x"]
...@@ -151,7 +151,7 @@ def analysis_convergence_coupling_iteration(df): ...@@ -151,7 +151,7 @@ def analysis_convergence_coupling_iteration(df):
return pt return pt
def time_step_vs_iterations(df): def time_step_vs_iterations(df: pd.DataFrame):
interest = ["iteration_number"] interest = ["iteration_number"]
context = ["time_step"] context = ["time_step"]
check_input(df, interest, context) check_input(df, interest, context)
...@@ -160,7 +160,7 @@ def time_step_vs_iterations(df): ...@@ -160,7 +160,7 @@ def time_step_vs_iterations(df):
return pt return pt
def analysis_simulation_termination(df): def analysis_simulation_termination(df: pd.DataFrame):
# For full print of messages consider setup jupyter notebook: # For full print of messages consider setup jupyter notebook:
# pd.set_option('display.max_colwidth', None) # pd.set_option('display.max_colwidth', None)
interest = ["message"] interest = ["message"]
...@@ -175,7 +175,7 @@ def analysis_simulation_termination(df): ...@@ -175,7 +175,7 @@ def analysis_simulation_termination(df):
return pd.DataFrame() return pd.DataFrame()
def fill_ogs_context(df): def fill_ogs_context(df: pd.DataFrame):
# Some columns that contain actual integer values are converted to float # Some columns that contain actual integer values are converted to float
# See https://pandas.pydata.org/pandas-docs/stable/user_guide/integer_na.html # See https://pandas.pydata.org/pandas-docs/stable/user_guide/integer_na.html
# ToDo list of columns with integer values are known from regular expression # ToDo list of columns with integer values are known from regular expression
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
import re import re
from pathlib import Path from pathlib import Path
from typing import Any, Optional, Union
from ogs6py.ogs_regexes.ogs_regexes import ogs_regexes from ogs6py.ogs_regexes.ogs_regexes import ogs_regexes
...@@ -48,9 +49,10 @@ def try_match_serial_line( ...@@ -48,9 +49,10 @@ def try_match_serial_line(
return None return None
def mpi_processes(file_name): def mpi_processes(file_name: Union[str, Path]) -> int:
occurrences = 0 occurrences = 0
file_name = Path(file_name) if isinstance(file_name, str):
file_name = Path(file_name)
with file_name.open() as file: with file_name.open() as file:
lines = iter(file) lines = iter(file)
# There is no synchronisation barrier between both info, we count both and divide # There is no synchronisation barrier between both info, we count both and divide
...@@ -62,7 +64,28 @@ def mpi_processes(file_name): ...@@ -62,7 +64,28 @@ def mpi_processes(file_name):
return int(occurrences / 2) return int(occurrences / 2)
def parse_file(file_name, maximum_lines=None, force_parallel=False): def parse_file(
file_name: Union[str, Path],
maximum_lines: Optional[int] = None,
force_parallel: bool = False,
) -> list[Any]:
"""
Parses a log file from OGS, applying regex patterns to extract specific information,
The function supports processing files in serial or parallel mode. In
parallel mode, a specific regex is used to match log entries from different
processes.
:param file_name: The path to the log file, as a string or Path object.
:param maximum_lines: Optional maximum number of lines to read from the file.
If not provided, the whole file is read.
:param force_parallel: Should only be set to True if OGS run with MPI with a single core
:return: A list of extracted records based on the applied regex patterns.
The exact type and structure of these records depend on the regex
patterns and their associated processing functions.
"""
if isinstance(file_name, str):
file_name = Path(file_name)
file_name = Path(file_name) file_name = Path(file_name)
ogs_res = ogs_regexes() ogs_res = ogs_regexes()
parallel_log = force_parallel or mpi_processes(file_name) > 1 parallel_log = force_parallel or mpi_processes(file_name) > 1
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment