Skip to content
Snippets Groups Projects
Forked from ogs / ogs
962 commits behind the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
ParallelVectorMatrixAssembler.cpp 11.95 KiB
/**
 * \file
 * \copyright
 * Copyright (c) 2012-2024, OpenGeoSys Community (http://www.opengeosys.org)
 *            Distributed under a Modified BSD License.
 *              See accompanying file LICENSE.txt or
 *              http://www.opengeosys.org/project/license
 *
 */

#include "ParallelVectorMatrixAssembler.h"

#include <cstdlib>
#include <fstream>
#include <range/v3/range/conversion.hpp>
#include <range/v3/view/iota.hpp>
#include <range/v3/view/transform.hpp>
#include <range/v3/view/zip.hpp>
#include <vector>

#include "BaseLib/StringTools.h"
#include "BaseLib/ThreadException.h"
#include "NumLib/DOF/DOFTableUtil.h"
#include "ProcessLib/Assembly/MatrixAssemblyStats.h"
#include "ProcessLib/Assembly/MatrixElementCache.h"
#include "ProcessLib/Assembly/MatrixOutput.h"
#include "ProcessLib/CoupledSolutionsForStaggeredScheme.h"

namespace
{
void assembleWithJacobianOneElement(
    const std::size_t mesh_item_id,
    ProcessLib::LocalAssemblerInterface& local_assembler,
    const NumLib::LocalToGlobalIndexMap& dof_table, const double t,
    const double dt, const GlobalVector& x, const GlobalVector& x_prev,
    std::vector<double>& local_b_data, std::vector<double>& local_Jac_data,
    ProcessLib::AbstractJacobianAssembler& jacobian_assembler,
    ProcessLib::Assembly::MultiMatrixElementCache& cache)
{
    std::vector<GlobalIndexType> const& indices =
        NumLib::getIndices(mesh_item_id, dof_table);

    local_b_data.clear();
    local_Jac_data.clear();

    auto const local_x = x.get(indices);
    auto const local_x_prev = x_prev.get(indices);
    jacobian_assembler.assembleWithJacobian(local_assembler, t, dt, local_x,
                                            local_x_prev, local_b_data,
                                            local_Jac_data);

    if (local_Jac_data.empty())
    {
        OGS_FATAL(
            "No Jacobian has been assembled! This might be due to "
            "programming errors in the local assembler of the "
            "current process.");
    }

    cache.add(local_b_data, local_Jac_data, indices);
}

void assembleWithJacobianForStaggeredSchemeOneElement(
    const std::size_t mesh_item_id,
    ProcessLib::LocalAssemblerInterface& local_assembler,
    std::vector<NumLib::LocalToGlobalIndexMap const*> const& dof_tables,
    const double t, const double dt, std::vector<GlobalVector*> const& x,
    std::vector<GlobalVector*> const& x_prev, int const process_id,
    std::vector<double>& local_b_data, std::vector<double>& local_Jac_data,
    ProcessLib::AbstractJacobianAssembler& jacobian_assembler,
    ProcessLib::Assembly::MultiMatrixElementCache& cache)
{
    std::vector<std::vector<GlobalIndexType>> indices_of_processes;
    indices_of_processes.reserve(dof_tables.size());
    transform(cbegin(dof_tables), cend(dof_tables),
              back_inserter(indices_of_processes),
              [&](auto const* dof_table)
              { return NumLib::getIndices(mesh_item_id, *dof_table); });

    auto local_coupled_xs =
        ProcessLib::getCoupledLocalSolutions(x, indices_of_processes);
    auto const local_x = MathLib::toVector(local_coupled_xs);

    auto local_coupled_x_prevs =
        ProcessLib::getCoupledLocalSolutions(x_prev, indices_of_processes);
    auto const local_x_prev = MathLib::toVector(local_coupled_x_prevs);

    std::vector<GlobalIndexType> const& indices =
        indices_of_processes[process_id];

    local_b_data.clear();
    local_Jac_data.clear();

    jacobian_assembler.assembleWithJacobianForStaggeredScheme(
        local_assembler, t, dt, local_x, local_x_prev, process_id, local_b_data,
        local_Jac_data);

    if (local_Jac_data.empty())
    {
        OGS_FATAL(
            "No Jacobian has been assembled! This might be due to "
            "programming errors in the local assembler of the "
            "current process.");
    }

    cache.add(local_b_data, local_Jac_data, indices);
}

/// Returns a vector of active element ids with corresponding local assemblers.
std::vector<
    std::tuple<std::ptrdiff_t,
               std::reference_wrapper<ProcessLib::LocalAssemblerInterface>>>
collectActiveLocalAssemblers(
    BaseLib::PolymorphicRandomAccessContainerView<
        ProcessLib::LocalAssemblerInterface> const& local_assemblers,
    std::vector<std::size_t> const& active_elements)
{
    auto id_and_local_asm = [&local_assemblers](std::size_t const id)
        -> std::tuple<std::ptrdiff_t, std::reference_wrapper<
                                          ProcessLib::LocalAssemblerInterface>>
    { return {id, local_assemblers[id]}; };

    auto create_ids_asm_pairs = [&](auto const& element_ids)
    {
        return element_ids | ranges::views::transform(id_and_local_asm) |
               ranges::to<std::vector>();
    };

    if (active_elements.empty())
    {
        return create_ids_asm_pairs(ranges::views::iota(
            static_cast<std::size_t>(0), local_assemblers.size()));
    }
    return create_ids_asm_pairs(active_elements);
}

void runAssemblyForEachLocalAssembler(
    std::vector<std::tuple<
        std::ptrdiff_t,
        std::reference_wrapper<ProcessLib::LocalAssemblerInterface>>> const&
        ids_local_assemblers,
    NumLib::LocalToGlobalIndexMap const& dof_table, double const t,
    double const dt, GlobalVector const& x, GlobalVector const& x_prev,
    std::vector<double>& local_b_data, std::vector<double>& local_Jac_data,
    ProcessLib::AbstractJacobianAssembler& jac_asm, ThreadException& exception,
    ProcessLib::Assembly::MultiMatrixElementCache& cache,
    auto local_matrix_output)
{
    // due to MSVC++ error:
    // error C3016: 'element_id': index variable in OpenMP 'for'
    // statement must have signed integral type
    std::ptrdiff_t n_elements =
        static_cast<std::ptrdiff_t>(ids_local_assemblers.size());
#pragma omp for nowait
    for (std::ptrdiff_t i = 0; i < n_elements; ++i)
    {
        if (exception)
        {
            continue;
        }
        auto [element_id, loc_asm] = ids_local_assemblers[i];

        try
        {
            assembleWithJacobianOneElement(element_id, loc_asm, dof_table, t,
                                           dt, x, x_prev, local_b_data,
                                           local_Jac_data, jac_asm, cache);
        }
        catch (...)
        {
            exception.capture();
            continue;
        }

        local_matrix_output(element_id);
    }
}

void runStaggeredAssemblyForEachLocalAssembler(
    std::vector<std::tuple<
        std::ptrdiff_t,
        std::reference_wrapper<ProcessLib::LocalAssemblerInterface>>> const&
        ids_local_assemblers,
    std::vector<NumLib::LocalToGlobalIndexMap const*> const& dof_tables,
    double const t, double const dt, std::vector<GlobalVector*> const& x,
    std::vector<GlobalVector*> const& x_prev, int const process_id,
    std::vector<double>& local_b_data, std::vector<double>& local_Jac_data,
    ProcessLib::AbstractJacobianAssembler& jac_asm, ThreadException& exception,
    ProcessLib::Assembly::MultiMatrixElementCache& cache,
    auto local_matrix_output)
{
    // due to MSVC++ error:
    // error C3016: 'element_id': index variable in OpenMP 'for'
    // statement must have signed integral type
    std::ptrdiff_t n_elements =
        static_cast<std::ptrdiff_t>(ids_local_assemblers.size());
#pragma omp for nowait
    for (std::ptrdiff_t i = 0; i < n_elements; ++i)
    {
        if (exception)
        {
            continue;
        }
        auto [element_id, loc_asm] = ids_local_assemblers[i];

        try
        {
            assembleWithJacobianForStaggeredSchemeOneElement(
                element_id, loc_asm, dof_tables, t, dt, x, x_prev, process_id,
                local_b_data, local_Jac_data, jac_asm, cache);
        }
        catch (...)
        {
            exception.capture();
            continue;
        }

        local_matrix_output(element_id);
    }
}

int getNumberOfThreads()
{
    char const* const num_threads_env = std::getenv("OGS_ASM_THREADS");

    if (!num_threads_env)
    {
        return 1;
    }

    if (std::strlen(num_threads_env) == 0)
    {
        OGS_FATAL("The environment variable OGS_ASM_THREADS is set but empty.");
    }

    std::string num_threads_str{num_threads_env};
    BaseLib::trim(num_threads_str);

    std::istringstream num_threads_iss{num_threads_str};
    int num_threads = -1;

    num_threads_iss >> num_threads;

    if (!num_threads_iss)
    {
        OGS_FATAL("Error parsing OGS_ASM_THREADS (= \"{}\").", num_threads_env);
    }

    if (!num_threads_iss.eof())
    {
        OGS_FATAL(
            "Error parsing OGS_ASM_THREADS (= \"{}\"): not read entirely, the "
            "remainder is \"{}\"",
            num_threads_env,
            num_threads_iss.str().substr(num_threads_iss.tellg()));
    }

    if (num_threads < 1)
    {
        OGS_FATAL(
            "You asked (via OGS_ASM_THREADS) to assemble with {} < 1 thread.",
            num_threads);
    }

    return num_threads;
}
}  // namespace

namespace ProcessLib::Assembly
{
ParallelVectorMatrixAssembler::ParallelVectorMatrixAssembler(
    AbstractJacobianAssembler& jacobian_assembler)
    : jacobian_assembler_{jacobian_assembler},
      num_threads_(getNumberOfThreads())
{
}

void ParallelVectorMatrixAssembler::assembleWithJacobian(
    BaseLib::PolymorphicRandomAccessContainerView<
        LocalAssemblerInterface> const& local_assemblers,
    std::vector<std::size_t> const& active_elements,
    std::vector<NumLib::LocalToGlobalIndexMap const*> const& dof_tables,
    const double t, double const dt, std::vector<GlobalVector*> const& xs,
    std::vector<GlobalVector*> const& x_prevs, int const process_id,
    GlobalVector& b, GlobalMatrix& Jac)
{
    // checks //////////////////////////////////////////////////////////////////
    if (dof_tables.size() != xs.size())
    {
        OGS_FATAL("Different number of DOF tables and solution vectors.");
    }

    std::size_t const number_of_processes = xs.size();
    // algorithm ///////////////////////////////////////////////////////////////

    auto stats = CumulativeStats<MultiStats>::create();
    ConcurrentMatrixView b_view(b);
    ConcurrentMatrixView Jac_view(Jac);

    ThreadException exception;
#pragma omp parallel num_threads(num_threads_)
    {
#ifdef _OPENMP
#pragma omp single nowait
        {
            INFO("Number of threads: {}", omp_get_num_threads());
        }
#endif

        // temporary data only stored here in order to avoid frequent memory
        // reallocations.
        std::vector<double> local_b_data;
        std::vector<double> local_Jac_data;

        // copy to avoid concurrent access
        auto const jac_asm = jacobian_assembler_.copy();
        auto stats_this_thread = stats->clone();

        MultiMatrixElementCache cache{b_view, Jac_view,
                                      stats_this_thread->data};

        auto local_matrix_output = [&](std::ptrdiff_t element_id)
        {
            local_matrix_output_(t, process_id, element_id, local_b_data,
                                 local_Jac_data);
        };

        // TODO corner case: what if all elements on a submesh are deactivated?

        // Monolithic scheme
        if (number_of_processes == 1)
        {
            assert(process_id == 0);
            auto const& dof_table = *dof_tables[0];
            auto const& x = *xs[0];
            auto const& x_prev = *x_prevs[0];

            runAssemblyForEachLocalAssembler(
                collectActiveLocalAssemblers(local_assemblers, active_elements),
                dof_table, t, dt, x, x_prev, local_b_data, local_Jac_data,
                *jac_asm, exception, cache, local_matrix_output);
        }
        else  // Staggered scheme
        {
            runStaggeredAssemblyForEachLocalAssembler(
                collectActiveLocalAssemblers(local_assemblers, active_elements),
                dof_tables, t, dt, xs, x_prevs, process_id, local_b_data,
                local_Jac_data, *jac_asm, exception, cache,
                local_matrix_output);
        }
    }

    stats->print();

    global_matrix_output_(t, process_id, b, Jac);
    exception.rethrow();
}
}  // namespace ProcessLib::Assembly