From efb0ad86e4f9cce443da9942da0eb31c7b6c6ea1 Mon Sep 17 00:00:00 2001
From: Lars Bilke <lars.bilke@ufz.de>
Date: Mon, 14 Oct 2024 17:09:41 +0200
Subject: [PATCH] Full workflow run successfully

nodes: 50289, 51510, 52808, 52730, 51489, 50268
---
 workflow/config.ini                    |   5 +
 workflow/create_container.py           |  35 ++++++
 workflow/create_container_workchain.py |  37 ++++--
 workflow/helper.py                     |   2 +-
 workflow/run.py                        | 166 ++++++++-----------------
 workflow/run_mpi_tests.py              |  47 +++++++
 workflow/run_workflow_workchain.py     |   3 +-
 7 files changed, 171 insertions(+), 124 deletions(-)
 create mode 100644 workflow/config.ini
 create mode 100644 workflow/create_container.py
 create mode 100644 workflow/run_mpi_tests.py

diff --git a/workflow/config.ini b/workflow/config.ini
new file mode 100644
index 0000000..e67a362
--- /dev/null
+++ b/workflow/config.ini
@@ -0,0 +1,5 @@
+[container]
+ogs_commit: cdb32989b54b47f059c697cc12fef62338c9702c
+guix_channel_ogs_commit: 901d7522bfdb710814f26f7858fb55f57769371f
+guix_channel_commit: ebb715aabfc4e59ed14b964ac7fdd2a9f9f44526
+guix_tune_option: --tune=skylake-avx512
diff --git a/workflow/create_container.py b/workflow/create_container.py
new file mode 100644
index 0000000..e26681d
--- /dev/null
+++ b/workflow/create_container.py
@@ -0,0 +1,35 @@
+import os
+import configparser
+from aiida.engine import run_get_node
+from aiida.orm import Bool, SinglefileData, Str
+
+from create_container_workchain import CreateContainerWorkChain
+
+def create_container(manifest, build_envs, ogs_package="ogs-petsc", graph_nodes=None, compare_hashes=True):
+    current_dir = f"{os.path.dirname(os.path.realpath(__file__))}"
+    config = configparser.ConfigParser()
+    config.read(f"{current_dir}/config.ini")
+
+    ogs_manifest_template = SinglefileData(
+        f"{current_dir}/manifests/{manifest}")
+
+    result, node = run_get_node(
+        CreateContainerWorkChain,
+        ogs_package=Str(ogs_package),
+        ogs_commit=Str(config.get('container', 'ogs_commit')),
+        manifest_template=ogs_manifest_template,
+        guix_channel_commit=Str(config.get('container', 'guix_channel_commit')),
+        guix_channel_ogs_commit=Str(config.get('container', 'guix_channel_ogs_commit')),
+        guix_tune_option=Str(config.get('container', 'guix_tune_option')),
+        build_envs=build_envs,
+        compare_hashes=Bool(compare_hashes)
+    )
+    print(f"CreateContainerWorkChain finished: {node}")
+    print(f'container_hashes_identical = {result["container_hashes_identical"]}')
+    print(f'container_file = {result["container_file"]}')
+    ogs_container_file = result["container_file"]
+    if graph_nodes:
+        graph_nodes.append(result["container_hashes_identical"])
+        graph_nodes.append(ogs_container_file)
+
+    return ogs_container_file
diff --git a/workflow/create_container_workchain.py b/workflow/create_container_workchain.py
index 32ba181..aeccb24 100644
--- a/workflow/create_container_workchain.py
+++ b/workflow/create_container_workchain.py
@@ -76,6 +76,7 @@ class CreateContainerWorkChain(WorkChain):
             cls.compare_pack_paths,
             cls.hash,
             cls.compare_hashes,
+            cls.pack_squashfs,
             cls.store,
         )
         spec.output("container_path", valid_type=Str)
@@ -100,23 +101,23 @@ class CreateContainerWorkChain(WorkChain):
         self.ctx.computers = []
         for computer_name in self.inputs.build_envs:
             self.ctx.computers.append(load_computer(computer_name))
+        self.ctx.guix_channel_file = create_channels_file(self.inputs.guix_channel_ogs_commit)
+        self.ctx.ogs_manifest = create_ogs_manifest(self.inputs.ogs_commit, self.inputs.ogs_package,
+                                           self.inputs.manifest_template)
 
     def pack(self):
         """Submit a shell job to the daemon."""
-        guix_channel_file = create_channels_file(self.inputs.guix_channel_ogs_commit)
-        ogs_manifest = create_ogs_manifest(self.inputs.ogs_commit, self.inputs.ogs_package, self.inputs.manifest_template)
-
         for computer in self.ctx.computers:
             _, node = launch_shell_job(
                 "guix",
                 arguments="time-machine -C {channel_file} --commit={guix_channel_commit} -- pack "
-                "-RR --format=squashfs {guix_tune_option} -S /usr/bin=bin "
+                "{guix_tune_option} "  # -RR -S /usr/bin=bin --format=squashfs
                 "-m {ogs_manifest}",
                 nodes={
-                    "channel_file": guix_channel_file,
+                    "channel_file": self.ctx.guix_channel_file,
                     "guix_channel_commit": self.inputs.guix_channel_commit,
                     "guix_tune_option": self.inputs.guix_tune_option,
-                    "ogs_manifest": ogs_manifest,
+                    "ogs_manifest": self.ctx.ogs_manifest,
                 },
                 submit=True,
                 parser=pack_parser,
@@ -178,8 +179,30 @@ class CreateContainerWorkChain(WorkChain):
         self.out("container_hash", self.ctx.hashes[0])
         self.out("container_hashes_identical", hashes_equal)
 
+    def pack_squashfs(self):
+        """Submit a shell job to the daemon."""
+        for computer in self.ctx.computers:
+            _, node = launch_shell_job(
+                "guix",
+                arguments="time-machine -C {channel_file} --commit={guix_channel_commit} -- pack "
+                "-RR -S /usr/bin=bin --format=squashfs "
+                "{guix_tune_option} "
+                "-m {ogs_manifest}",
+                nodes={
+                    "channel_file": self.ctx.guix_channel_file,
+                    "guix_channel_commit": self.inputs.guix_channel_commit,
+                    "guix_tune_option": self.inputs.guix_tune_option,
+                    "ogs_manifest": self.ctx.ogs_manifest,
+                },
+                submit=True,
+                parser=pack_parser,
+                # guix may print some warnings to stderr
+                metadata={"options": {"redirect_stderr": True}, "computer": computer},
+            )
+            print(f"Submitted 'pack_squashfs' to {computer}: {node}")
+            self.to_context(shell_jobs_squashfs=append_(node))
     def store(self):
-        node = self.ctx.shell_jobs[0]
+        node = self.ctx.shell_jobs_squashfs[0]
         pack_path = node.outputs.pack_path
         pack_file = Path(pack_path.value).name
         results, hash_node = launch_shell_job(
diff --git a/workflow/helper.py b/workflow/helper.py
index f458642..fc5f4ce 100644
--- a/workflow/helper.py
+++ b/workflow/helper.py
@@ -101,7 +101,7 @@ def launch_batch_job(
     run_arguments = f"exec {apptainer_args} {{container_file}} {cmd}"
     if run_with_bash:
         run_command = "bash"
-        run_arguments = f"-c '{apptainer_exe} exec -e {apptainer_args} {{container_file}} {cmd}'"
+        run_arguments = f"-c '{apptainer_exe} exec {apptainer_args} {{container_file}} {cmd}'"
 
     _, node = launch_shell_job(
         run_command,
diff --git a/workflow/run.py b/workflow/run.py
index 85abb94..392eccd 100644
--- a/workflow/run.py
+++ b/workflow/run.py
@@ -1,21 +1,23 @@
 #!/usr/bin/env runaiida
-import os
 import warnings
 from pathlib import Path
 
 import aiida
 from aiida.engine import run, submit, await_processes, run_get_node
-from aiida.orm import load_computer, SinglefileData, Bool
-from aiida.tools import Graph
-
-from create_container_workchain import CreateContainerWorkChain
-from helper import JobSettings, custom_node_sublabels, launch_batch_job, compare_for_equality
+from aiida.orm import load_computer, Bool
+from aiida.orm.utils.loaders import load_node
+from aiida.tools.visualization.graph import Graph
+
+from create_container import create_container
+from run_mpi_tests import run_mpi_tests
+# from create_container_workchain import CreateContainerWorkChain
+from helper import JobSettings, launch_batch_job, compare_for_equality
 from aiida.plugins.factories import DataFactory
 
 from run_simulation_workchain import RunSimulationWorkChain
 from partition_mesh_workchain import PartitionMeshWorkChain
 from run_workflow_workchain import RunWorkflowWorkChain
-from test_container_workchain import TestContainerWorkChain
+from helper import custom_node_sublabels
 
 Str = DataFactory("core.str")
 Int = DataFactory("core.int")
@@ -25,86 +27,42 @@ Dict = DataFactory("core.dict")
 
 warnings.filterwarnings(action='ignore',module='.*paramiko.*')
 
-if __name__ == "__main__":
-
-    aiida.load_profile()
-    graph_nodes = []  # collect interesting nodes for graph generation
-
-    build_envs = List(list=["envinf3", "envinf4"]) # "envinf3", "envinf5"
-    hpc_envs = List(
-        list=[
-            #"barnard",
-            "eve",
-            "juwels",
-        ]
-    )
-
-    ogs_manifest_template = SinglefileData(
-        f"{os.path.dirname(os.path.realpath(__file__))}/manifests/ogs-template.scm")
-    arehs_manifest_template = SinglefileData(
-        f"{os.path.dirname(os.path.realpath(__file__))}/manifests/arehs-template.scm")
-
-
-    result, node = run_get_node(
-        CreateContainerWorkChain,
-        ogs_package=Str("ogs-petsc"),
-        ogs_commit=Str("cdb32989b54b47f059c697cc12fef62338c9702c"),
-        manifest_template=ogs_manifest_template,
-        guix_channel_commit=Str("ebb715aabfc4e59ed14b964ac7fdd2a9f9f44526"),
-        guix_channel_ogs_commit=Str("901d7522bfdb710814f26f7858fb55f57769371f"),
-        guix_tune_option=Str("--tune=skylake-avx512"),
-        build_envs=build_envs,
-        compare_hashes=Bool(False)
-    )
-    print(f"CreateContainerWorkChain finished: {node}")
-    print(f'container_hashes_identical = {result["container_hashes_identical"]}')
-    print(f'container_file = {result["container_file"]}')
-
-    container_file = result["container_file"]
-    graph_nodes.append(result["container_hashes_identical"])
-    graph_nodes.append(container_file)
-
-    result = run(
-        TestContainerWorkChain, container_file=container_file, hpc_envs=hpc_envs
-    )
-    print(result["mpi_bandwidth"])
-    graph_nodes.append(result["mpi_bandwidth"])
-
-    #### Sim cube_1e3.prj ####
-    input_folder = FolderData(
-        tree=Path("../tests/ParallelFEM_GroundWaterFlow3D_DirichletBC").absolute()
-    )
-
-    job_settings = Dict(dict=JobSettings(ntasks=3)._asdict())  # nodes=3, ntasks_per_node=1
 
+def run_arehs(container_file, hpc_envs, graph_nodes=None): # build_envs, hpc_envs, graph_nodes
     nodes = []
+    job_settings = Dict(dict=JobSettings(ntasks=1, cpus_per_task=48, time=120, mem_per_cpu="2GB")._asdict())  # 8, 10
     for hpc_env in hpc_envs.get_list():
         node = submit(
-            RunSimulationWorkChain,
+            RunWorkflowWorkChain,
             container_file=container_file,
             computer_name=Str(hpc_env),
-            input_folder=input_folder,
-            prj_file=Str("cube_1e3.prj"),
+            input_repo="https://gitlab.opengeosys.org/bilke/dgr.git",
+            # TODO: 3D coupled, run the following
+            workflow_command="snakemake --cores 48 --profile profiles/developer \
+            results/plot/3D/Salz-Kissen_prism-id_1-xres_400/glacialcycle/simTH/{{temporal,spatial}}/report.pdf \
+            results/plot/2D/{{Ton-Nord,Ton-Sued}}_prism-id_{{1,2}}-xres_200/glacialcycle/simTHM/{{temporal,spatial}}/report.pdf \
+            results/plot/2D/{{Salz-Kissen,Salz-flach,Ton-Nord,Ton-Sued}}_prism-id_{{1,2}}-xres_200/glacialcycle/simT{{,H}}/{{temporal,spatial}}/report.pdf",
+            # parts_8
             job_settings=job_settings,
         )
         nodes.append(node)
         print(node)
-
-    await_processes(nodes, wait_interval=30)
+    await_processes(nodes, wait_interval=60)
     hash_outputs = []
     for node in nodes:
         if node.is_finished_ok:
-            print(f"simulation {node} finished successfully")
+            print(f"workflow {node} finished successfully")
         else:
-            print(f"simulation {node} failed with exit code {node.exit_code.status}")
+            print(f"workflow {node} failed with exit code {node.exit_code.status}")
 
         hash_outputs.append(node.outputs.hash_file)
-
     identical = compare_for_equality(*hash_outputs)
-    print(f"Sim results cube_1e3.prj identical: {identical}")
+    print(f"Workflow results identical: {identical}")
     graph_nodes.append(identical)
 
-    #### liquid_flow.prj with structured mesh ####
+
+def run_ogs_sim(container_file, hpc_envs, graph_nodes=None):
+    # global nodes, partitions, computer_name, node, result, job_settings, input_folder, identical
     nodes = []
     # TODO: add this to provenance, maybe add nodes-parameter to launch_batch_job
     # 250, > 112 somehow times out on juwels?
@@ -154,7 +112,6 @@ if __name__ == "__main__":
         )
         print(f"Submitted 'RunSimulationWorkChain' ({hpc_env}) {node}")
         nodes.append(node)
-
     await_processes(nodes, wait_interval=10)
     hash_outputs = []
     for node in nodes:
@@ -164,64 +121,43 @@ if __name__ == "__main__":
             print(f"simulation {node} failed with exit code {node.exit_code.status}")
 
         hash_outputs.append(node.outputs.hash_file)
-
     identical = compare_for_equality(*hash_outputs)
     print(
         f"Sim results liquid_flow.prj ({mesh_resolution.value}, {partitions.value}) identical: {identical}"
     )
     graph_nodes.append(identical)
 
-    result, node = run_get_node(
-        CreateContainerWorkChain,
-        ogs_package=Str("ogs-petsc-mkl"),
-        ogs_commit=Str("cdb32989b54b47f059c697cc12fef62338c9702c"),
-        manifest_template=arehs_manifest_template,
-        guix_channel_commit=Str("ebb715aabfc4e59ed14b964ac7fdd2a9f9f44526"),
-        guix_channel_ogs_commit=Str("901d7522bfdb710814f26f7858fb55f57769371f"),
-        guix_tune_option=Str("--tune=skylake-avx512"),
-        build_envs=build_envs,
-        compare_hashes=Bool(False)  # arehs container hashes differ, but tar.gz is equal
-    )
-    print(f"CreateContainerWorkChain finished: {node}")
-    print(f'container_hashes_identical = {result["container_hashes_identical"]}')
-    print(f'container_file = {result["container_file"]}')
 
-    container_file = result["container_file"]
-    graph_nodes.append(result["container_hashes_identical"])
-    graph_nodes.append(container_file)
+if __name__ == "__main__":
 
-    nodes = []
-    job_settings = Dict(dict=JobSettings(ntasks=1, cpus_per_task=48, time=20, mem_per_cpu="2GB")._asdict()) # 8, 10
+    aiida.load_profile()
+    graph_nodes = []  # collect interesting nodes for graph generation
 
-    for hpc_env in hpc_envs.get_list():
-        node = submit(
-            RunWorkflowWorkChain,
-            container_file=container_file,
-            computer_name=Str(hpc_env),
-            input_repo="https://gitlab.opengeosys.org/bilke/dgr.git",
-            # TODO: 3D coupled, run the following
-            # results/plot/3D/Salz-Kissen_prism-id_1-xres_400/glacialcycle/simTH/parts_8/{{temporal,spatial}}/report.pdf \
-            # results/plot/2D/{{Ton-Nord,Ton-Sued}}_prism-id_{{1,2}}-xres_200/glacialcycle/simTHM/parts_8/{{temporal,spatial}}/report.pdf \
-            workflow_command="snakemake --cores 48 --profile profiles/developer \
-            results/plot/2D/{{Salz-Kissen,Salz-flach,Ton-Nord,Ton-Sued}}_prism-id_{{1,2}}-xres_200/glacialcycle/simT{{,H}}/{{temporal,spatial}}/report.pdf", # parts_8
-            job_settings=job_settings,
-        )
-        nodes.append(node)
-        print(node)
+    build_envs = List(list=["envinf3", "envinf4"])  # , "envinf5", "nuc"
+    # if False:
+    #     ogs_container_file = create_container("ogs-template.scm", build_envs)
+    #     arehs_container_file = create_container("arehs-template.scm", build_envs, "ogs-petsc-mkl", compare_hashes=False)
 
-    await_processes(nodes, wait_interval=60)
-    hash_outputs = []
-    for node in nodes:
-        if node.is_finished_ok:
-            print(f"workflow {node} finished successfully")
-        else:
-            print(f"workflow {node} failed with exit code {node.exit_code.status}")
+    ogs_container_file = load_node(pk=50289)
+    arehs_container_file = load_node(pk=51510)
+    # arehs_container_file = create_container("arehs-template.scm", build_envs, "ogs-petsc-mkl", compare_hashes=False)
 
-        hash_outputs.append(node.outputs.hash_file)
+    hpc_envs = List(
+        list=[
+            "barnard2",
+            "eve2",
+            "juwels2",
+        ]
+    )
 
-    identical = compare_for_equality(*hash_outputs)
-    print(f"Workflow results identical: {identical}")
-    graph_nodes.append(identical)
+    ### mpi bandwidth benchmark and simple petsc sim
+    run_mpi_tests(ogs_container_file, hpc_envs, graph_nodes)
+
+    #### liquid_flow.prj with structured mesh ####
+    run_ogs_sim(ogs_container_file, hpc_envs, graph_nodes)
+
+    ### AREHS workflow
+    run_arehs(arehs_container_file, hpc_envs, graph_nodes)
 
     # Render diagram
     graph = Graph(node_sublabel_fn=custom_node_sublabels)
diff --git a/workflow/run_mpi_tests.py b/workflow/run_mpi_tests.py
new file mode 100644
index 0000000..691340f
--- /dev/null
+++ b/workflow/run_mpi_tests.py
@@ -0,0 +1,47 @@
+from pathlib import Path
+
+from aiida.engine import run, submit, await_processes
+from aiida.orm import FolderData, Dict, Str
+
+from helper import JobSettings, compare_for_equality
+from run_simulation_workchain import RunSimulationWorkChain
+from test_container_workchain import TestContainerWorkChain
+
+
+def run_mpi_tests(ogs_container_file, hpc_envs, graph_nodes=None):
+    result = run(
+        TestContainerWorkChain, container_file=ogs_container_file, hpc_envs=hpc_envs
+    )
+    print(result["mpi_bandwidth"])
+    if graph_nodes:
+        graph_nodes.append(result["mpi_bandwidth"])
+    #### Sim cube_1e3.prj ####
+    input_folder = FolderData(
+        tree=Path("../tests/ParallelFEM_GroundWaterFlow3D_DirichletBC").absolute()
+    )
+    job_settings = Dict(dict=JobSettings(ntasks=3)._asdict())  # nodes=3, ntasks_per_node=1
+    nodes = []
+    for hpc_env in hpc_envs.get_list():
+        node = submit(
+            RunSimulationWorkChain,
+            container_file=ogs_container_file,
+            computer_name=Str(hpc_env),
+            input_folder=input_folder,
+            prj_file=Str("cube_1e3.prj"),
+            job_settings=job_settings,
+        )
+        nodes.append(node)
+        print(node)
+    await_processes(nodes, wait_interval=30)
+    hash_outputs = []
+    for node in nodes:
+        if node.is_finished_ok:
+            print(f"simulation {node} finished successfully")
+        else:
+            print(f"simulation {node} failed with exit code {node.exit_code.status}")
+
+        hash_outputs.append(node.outputs.hash_file)
+    identical = compare_for_equality(*hash_outputs)
+    print(f"Sim results cube_1e3.prj identical: {identical}")
+    if graph_nodes:
+        graph_nodes.append(identical)
diff --git a/workflow/run_workflow_workchain.py b/workflow/run_workflow_workchain.py
index 5eb511b..5faccff 100644
--- a/workflow/run_workflow_workchain.py
+++ b/workflow/run_workflow_workchain.py
@@ -64,10 +64,11 @@ class RunWorkflowWorkChain(WorkChain):
         # print(self.ctx.clone_folder)
 
     def test(self):
+        computer = load_computer(self.inputs.computer_name.value)
         node = launch_batch_job(
             container_file=self.inputs.container_file,
             cmd=self.inputs.workflow_command.value,
-            input_folder=self.ctx.clone_folder, # self.out("clone_folder"),
+            input_folder=RemoteData(remote_path=f"{computer.get_workdir()}/../dgr", computer=computer), # self.out("clone_folder"),
             output_folder="results",  # folder cannot start with an underscore
             job_settings=self.inputs.job_settings,
             computer=load_computer(self.inputs.computer_name.value),
-- 
GitLab