diff --git a/workflow/create_container_workchain.py b/workflow/create_container_workchain.py index 8607a228425663b80a6ba4597e225ffebc0b46c2..50c6b8af79c5e23bf00a8d0bc76f689fe0065db3 100644 --- a/workflow/create_container_workchain.py +++ b/workflow/create_container_workchain.py @@ -132,14 +132,18 @@ class CreateContainerWorkChain(WorkChain): self.ctx.pack_paths = [] for i, node in enumerate(self.ctx.shell_jobs): if node.is_finished_ok: - print(f"{node} finished successfully") + print(f"'pack' ({node}) finished successfully") else: - print(f"{node} failed with exit code {node.exit_code.status}") + print(f"'pack' ({node}) failed with exit code {node.exit_code.status}") self.ctx.pack_paths.append(node.outputs.pack_path) pack_path_equal = compare_for_equality(*self.ctx.pack_paths) if not pack_path_equal: + print("Container paths differ:") + for path in self.ctx.pack_paths: + print(f"- {path.value}") return self.exit_codes.PACK_DIFFERENT + print(self.ctx.pack_paths[0].value) self.out("container_path", self.ctx.pack_paths[0]) self.out("container_paths_identical", pack_path_equal) @@ -162,9 +166,9 @@ class CreateContainerWorkChain(WorkChain): self.ctx.hashes = [] for i, node in enumerate(self.ctx.hash_jobs): if node.is_finished_ok: - print(f"{node} finished successfully") + print(f"'hash' ({node}) finished successfully") else: - print(f"{node} failed with exit code {node.exit_code.status}") + print(f"'hash' ({node}) failed with exit code {node.exit_code.status}") self.ctx.hashes.append(node.outputs.hash) hashes_equal = compare_for_equality(*self.ctx.hashes) diff --git a/workflow/helper.py b/workflow/helper.py index 0a57297b93dbbd2ab99b177ba27f9b528325ccae..10ed0adfd1f064596bef536b31483735582384e1 100644 --- a/workflow/helper.py +++ b/workflow/helper.py @@ -25,6 +25,7 @@ def launch_batch_job( submit=True, withmpi=True, nodes=None, + run_with_bash=False ): if isinstance(job_settings, aiida.orm.Dict): job_settings = JobSettings(**job_settings.get_dict()) @@ -49,7 +50,6 @@ def launch_batch_job( prepend_text = "" custom_scheduler_commands = "" options_dict = { - "computer": computer, # apptainer runs print e.g, # WARNING: passwd file doesn't exist in container, not updating "redirect_stderr": True, @@ -88,20 +88,30 @@ def launch_batch_job( ) if output_folder: prepend_text += f"mkdir -p {output_folder}\n" - options_dict.update({"prepend_text": prepend_text}) # the `which` command (required by pyvista for `start_xvfb()`) may get # passed from the host environment. Two ways to prevent this: # 1. Calling `unset -f which` before executing apptainer, or - # 2. passing `-e` option to apptainer + # 2. passing `-e` option to apptainer, only works when not using mpi + prepend_text += "unset -f which\n" + + options_dict.update({"prepend_text": prepend_text}) + + run_command = apptainer_exe + run_arguments = f"exec {apptainer_args} {{container_file}} {cmd}" + if run_with_bash: + run_command = "bash" + run_arguments = f"-c '{apptainer_exe} exec -e {apptainer_args} {{container_file}} {cmd}'" + _, node = launch_shell_job( - "bash", - arguments=f"-c '{apptainer_exe} exec -e {apptainer_args} {{container_file}} {cmd}'", + run_command, + arguments=run_arguments, nodes=nodes, submit=submit, parser=parser, metadata={ "options": options_dict, + "computer": computer }, ) return node diff --git a/workflow/run.py b/workflow/run.py index 633a70649e2f5679a60aa0e634531a791062ddb3..654b46e34950679f8ed96afdfb096bcd57be4fd6 100644 --- a/workflow/run.py +++ b/workflow/run.py @@ -33,80 +33,28 @@ if __name__ == "__main__": build_envs = List(list=["envinf3", "envinf4"]) # "envinf3", "envinf5" hpc_envs = List( list=[ - # "barnard2", - "eve2", - "juwels2", + #"barnard", + "eve", + "juwels", ] ) - mpi_benchmarks_manifest = SinglefileData( - f"{os.path.dirname(os.path.realpath(__file__))}/manifests/intel-mpi-benchmarks.scm") + ogs_manifest_template = SinglefileData( + f"{os.path.dirname(os.path.realpath(__file__))}/manifests/ogs-template.scm") arehs_manifest_template = SinglefileData( f"{os.path.dirname(os.path.realpath(__file__))}/manifests/arehs-template.scm") - result, node = run_get_node( - CreateContainerWorkChain, - ogs_package=Str("ogs-petsc-mkl"), - ogs_commit=Str("cdb32989b54b47f059c697cc12fef62338c9702c"), - manifest_template=arehs_manifest_template, - guix_channel_commit=Str("ebb715aabfc4e59ed14b964ac7fdd2a9f9f44526"), - guix_channel_ogs_commit=Str("901d7522bfdb710814f26f7858fb55f57769371f"), - guix_tune_option=Str("--tune=skylake-avx512"), - build_envs=build_envs, - compare_hashes=Bool(False) # arehs container hashes differ, but tar.gz is equal - ) - print(f"CreateContainerWorkChain finished: {node}") - print(f'container_hashes_identical = {result["container_hashes_identical"]}') - print(f'container_file = {result["container_file"]}') - - container_file = result["container_file"] - graph_nodes.append(result["container_hashes_identical"]) - graph_nodes.append(container_file) - - nodes = [] - job_settings = Dict(dict=JobSettings(ntasks=1, cpus_per_task=48, time=120, mem_per_cpu="2GB")._asdict()) # 8, 10 - - for hpc_env in hpc_envs.get_list(): - node = submit( - RunWorkflowWorkChain, - container_file=container_file, - computer_name=Str(hpc_env), - input_repo="https://gitlab.opengeosys.org/bilke/dgr.git", - # TODO: 3D coupled, run the following - # results/plot/3D/Salz-Kissen_prism-id_1-xres_400/glacialcycle/simTH/parts_8/{{temporal,spatial}}/report.pdf \ - # results/plot/2D/{{Ton-Nord,Ton-Sued}}_prism-id_{{1,2}}-xres_200/glacialcycle/simTHM/parts_8/{{temporal,spatial}}/report.pdf \ - workflow_command="snakemake --cores 48 --profile profiles/developer \ - results/plot/2D/{{Salz-Kissen,Salz-flach,Ton-Nord}}_prism-id_{{1,2}}-xres_200/glacialcycle/simT{{,H}}/{{temporal,spatial}}/report.pdf", # parts_8 - ", - job_settings=job_settings, - ) - nodes.append(node) - print(node) - - await_processes(nodes, wait_interval=10) - hash_outputs = [] - for node in nodes: - if node.is_finished_ok: - print(f"simulation {node} finished successfully") - else: - print(f"simulation {node} failed with exit code {node.exit_code.status}") - - hash_outputs.append(node.outputs.hash_file) - - identical = compare_for_equality(*hash_outputs) - print(f"Workflow results identical: {identical}") - graph_nodes.append(identical) - - exit(0) result, node = run_get_node( CreateContainerWorkChain, + ogs_package=Str("ogs-petsc"), ogs_commit=Str("cdb32989b54b47f059c697cc12fef62338c9702c"), - guix_channel_commit=Str("31fe177a97bacec643180cc5bcf8805a6cb07481"), + manifest_template=ogs_manifest_template, + guix_channel_commit=Str("ebb715aabfc4e59ed14b964ac7fdd2a9f9f44526"), guix_channel_ogs_commit=Str("901d7522bfdb710814f26f7858fb55f57769371f"), guix_tune_option=Str("--tune=skylake-avx512"), build_envs=build_envs, - # additional_manifest=mpi_benchmarks_manifest, + compare_hashes=Bool(False) ) print(f"CreateContainerWorkChain finished: {node}") print(f'container_hashes_identical = {result["container_hashes_identical"]}') @@ -142,7 +90,7 @@ if __name__ == "__main__": nodes.append(node) print(node) - await_processes(nodes, wait_interval=10) + await_processes(nodes, wait_interval=30) hash_outputs = [] for node in nodes: if node.is_finished_ok: @@ -150,7 +98,7 @@ if __name__ == "__main__": else: print(f"simulation {node} failed with exit code {node.exit_code.status}") - hash_outputs.append(node.outputs.hash) + hash_outputs.append(node.outputs.hash_file) identical = compare_for_equality(*hash_outputs) print(f"Sim results cube_1e3.prj identical: {identical}") @@ -215,7 +163,7 @@ if __name__ == "__main__": else: print(f"simulation {node} failed with exit code {node.exit_code.status}") - hash_outputs.append(node.outputs.hash) + hash_outputs.append(node.outputs.hash_file) identical = compare_for_equality(*hash_outputs) print( @@ -223,6 +171,59 @@ if __name__ == "__main__": ) graph_nodes.append(identical) + result, node = run_get_node( + CreateContainerWorkChain, + ogs_package=Str("ogs-petsc-mkl"), + ogs_commit=Str("cdb32989b54b47f059c697cc12fef62338c9702c"), + manifest_template=arehs_manifest_template, + guix_channel_commit=Str("ebb715aabfc4e59ed14b964ac7fdd2a9f9f44526"), + guix_channel_ogs_commit=Str("901d7522bfdb710814f26f7858fb55f57769371f"), + guix_tune_option=Str("--tune=skylake-avx512"), + build_envs=build_envs, + compare_hashes=Bool(False) # arehs container hashes differ, but tar.gz is equal + ) + print(f"CreateContainerWorkChain finished: {node}") + print(f'container_hashes_identical = {result["container_hashes_identical"]}') + print(f'container_file = {result["container_file"]}') + + container_file = result["container_file"] + graph_nodes.append(result["container_hashes_identical"]) + graph_nodes.append(container_file) + + nodes = [] + job_settings = Dict(dict=JobSettings(ntasks=1, cpus_per_task=48, time=20, mem_per_cpu="2GB")._asdict()) # 8, 10 + + for hpc_env in hpc_envs.get_list(): + node = submit( + RunWorkflowWorkChain, + container_file=container_file, + computer_name=Str(hpc_env), + input_repo="https://gitlab.opengeosys.org/bilke/dgr.git", + # TODO: 3D coupled, run the following + # results/plot/3D/Salz-Kissen_prism-id_1-xres_400/glacialcycle/simTH/parts_8/{{temporal,spatial}}/report.pdf \ + # results/plot/2D/{{Ton-Nord,Ton-Sued}}_prism-id_{{1,2}}-xres_200/glacialcycle/simTHM/parts_8/{{temporal,spatial}}/report.pdf \ + workflow_command="snakemake --cores 48 --profile profiles/developer \ + results/plot/2D/{{Salz-Kissen,Salz-flach,Ton-Nord,Ton-Sued}}_prism-id_{{1,2}}-xres_200/glacialcycle/simT{{,H}}/{{temporal,spatial}}/report.pdf", # parts_8 + job_settings=job_settings, + ) + nodes.append(node) + print(node) + + await_processes(nodes, wait_interval=60) + hash_outputs = [] + for node in nodes: + if node.is_finished_ok: + print(f"simulation {node} finished successfully") + else: + print(f"simulation {node} failed with exit code {node.exit_code.status}") + + hash_outputs.append(node.outputs.hash_file) + + identical = compare_for_equality(*hash_outputs) + print(f"Workflow results identical: {identical}") + graph_nodes.append(identical) + + # Render diagram graph = Graph(node_sublabel_fn=custom_node_sublabels) for node in graph_nodes: graph.recurse_ancestors( @@ -238,4 +239,5 @@ if __name__ == "__main__": # include_process_inputs=True, # annotate_links='both', # ) + graph.graphviz.render("diagrams/last_run.dot", format="pdf") diff --git a/workflow/run_simulation_workchain.py b/workflow/run_simulation_workchain.py index 24a813518ffa8364d4c1e43eec08c515639889db..4191584ed21422f955bb2e5e616cd82ec6bb156e 100644 --- a/workflow/run_simulation_workchain.py +++ b/workflow/run_simulation_workchain.py @@ -15,11 +15,11 @@ FolderData = DataFactory("core.folder") RemoteData = DataFactory("core.remote") -def hash_parser(self, dirpath): +def hash_parser(dirpath): from aiida.orm import Str return { - "hash": Str((dirpath / "stdout").read_text().strip()), + "hash_file": SinglefileData.from_string((dirpath / "stdout").read_text().strip()), } @@ -30,7 +30,7 @@ def compare_for_equality(*hash_outputs): merged_file_map = {} for output in hash_outputs: file_map = {} - for line in output.value.split("\n"): + for line in output.get_content().split("\n"): if line: parts = line.split() key = parts[1] @@ -72,7 +72,7 @@ class RunSimulationWorkChain(WorkChain): ) spec.outline(cls.test) spec.output("remote_folder", valid_type=RemoteData) - spec.output("hash", valid_type=Str) + spec.output("hash_file", valid_type=SinglefileData) spec.exit_code( 400, "SIM_FAILED", @@ -106,9 +106,9 @@ class RunSimulationWorkChain(WorkChain): parser=hash_parser, metadata={ "options": { - "computer": load_computer(computer_name.value), "withmpi": False, }, + "computer": load_computer(computer_name.value), }, ) - self.out("hash", node.outputs.hash) + self.out("hash_file", node.outputs.hash_file) diff --git a/workflow/run_workflow_workchain.py b/workflow/run_workflow_workchain.py index 9809ea217ad7216eee014fc70e89bb4d0bd4446b..9fb807292cce535b6706a9926ef68e96d262a6db 100644 --- a/workflow/run_workflow_workchain.py +++ b/workflow/run_workflow_workchain.py @@ -71,7 +71,7 @@ class RunWorkflowWorkChain(WorkChain): spec.outline(cls.clone, cls.test) spec.output("remote_folder", valid_type=RemoteData) spec.output("clone_folder", valid_type=RemoteData) - spec.output("hash_file", valid_type=Str) + spec.output("hash_file", valid_type=SinglefileData) spec.exit_code( 400, "SIM_FAILED", @@ -107,7 +107,8 @@ class RunWorkflowWorkChain(WorkChain): job_settings=self.inputs.job_settings, computer=load_computer(self.inputs.computer_name.value), submit=False, - withmpi=False + withmpi=False, + run_with_bash=True ) print(f"Ran 'workflow' {node}, exit_code = {node.exit_code}") if node.exit_status != 0: @@ -122,9 +123,9 @@ class RunWorkflowWorkChain(WorkChain): parser=hash_parser, metadata={ "options": { - "computer": load_computer(computer_name.value), "withmpi": False, }, + "computer": load_computer(computer_name.value), }, ) self.out("hash_file", node.outputs.hash_file) diff --git a/workflow/test_container_workchain.py b/workflow/test_container_workchain.py index d6859674c658e153dfe970592ce97a27a0f11b3e..3f0a2549be360d4181b05c5d0e0c4c090da9e707 100644 --- a/workflow/test_container_workchain.py +++ b/workflow/test_container_workchain.py @@ -14,7 +14,7 @@ FolderData = DataFactory("core.folder") # not re-executed when cached! Can be re-executed with metadata: "disable_cache": True -def bandwidth_parser(self, dirpath): +def bandwidth_parser(dirpath): from aiida.orm import Float lines = (dirpath / "stdout").read_text().strip().split("\n") @@ -62,7 +62,7 @@ class TestContainerWorkChain(WorkChain): node = launch_batch_job( container_file=self.inputs.container_file, cmd="IMB-MPI1 PingPong", - job_settings=JobSettings(nodes=2, ntasks_per_node=1), + job_settings=JobSettings(nodes=2, ntasks_per_node=1, ntasks=2), computer=computer, parser=bandwidth_parser, )