From eb1bfb476f517fdad1972159e76fee800888ccff Mon Sep 17 00:00:00 2001
From: Wenqing Wang <wenqing.wang@ufz.de>
Date: Wed, 10 May 2017 12:09:25 +0200
Subject: [PATCH] [PETsc] Introduced a function setLocalAccessibleVector

to set a local vector of a processor from the global one.
---
 MathLib/LinAlg/LinAlg.cpp                     |  9 ++++++
 MathLib/LinAlg/LinAlg.h                       |  8 +++++
 MathLib/LinAlg/PETSc/PETScVector.cpp          | 29 +++++++----------
 MathLib/LinAlg/PETSc/PETScVector.h            |  7 +++--
 NumLib/ODESolver/NonlinearSolver.cpp          | 23 ++++++++++++++
 NumLib/ODESolver/TimeDiscretization.h         | 31 +++++++++++++++++++
 NumLib/ODESolver/TimeDiscretizedODESystem.cpp |  7 +++++
 ProcessLib/UncoupledProcessesTimeLoop.cpp     | 14 +++++++--
 Tests/NumLib/TestODEInt.cpp                   |  7 ++++-
 Tests/NumLib/TimeLoopSingleODE.h              |  2 ++
 10 files changed, 114 insertions(+), 23 deletions(-)

diff --git a/MathLib/LinAlg/LinAlg.cpp b/MathLib/LinAlg/LinAlg.cpp
index 1f804d46c6e..3bad1aa5ed3 100644
--- a/MathLib/LinAlg/LinAlg.cpp
+++ b/MathLib/LinAlg/LinAlg.cpp
@@ -22,6 +22,11 @@ namespace MathLib { namespace LinAlg
 
 // Vector
 
+void setLocalAccessibleVector(PETScVector& x)
+{
+    x.setLocalAccessibleVector();
+}
+
 void set(PETScVector& x, double const a)
 {
     VecSet(x.getRawVector(), a);
@@ -177,6 +182,10 @@ namespace MathLib { namespace LinAlg
 
 // Vector
 
+void setLocalAccessibleVector(EigenVector& /*x*/)
+{
+}
+
 void set(EigenVector& x, double const a)
 {
     x.getRawVector().setConstant(a);
diff --git a/MathLib/LinAlg/LinAlg.h b/MathLib/LinAlg/LinAlg.h
index 06388524bce..43ad6a018c8 100644
--- a/MathLib/LinAlg/LinAlg.h
+++ b/MathLib/LinAlg/LinAlg.h
@@ -148,6 +148,10 @@ namespace LinAlg
 
 // Vector
 
+/// Set local accessible vector in order to get entries.
+/// Call this before call operator[] or get(...).
+void setLocalAccessibleVector(PETScVector& x);
+
 void set(PETScVector& x, double const a);
 
 void copy(PETScVector const& x, PETScVector& y);
@@ -206,6 +210,10 @@ namespace LinAlg
 
 // Vector
 
+/// Set local accessible vector in order to get entries.
+/// Call this before call operator[] or get(...). Not used for EIGEN
+void setLocalAccessibleVector(EigenVector& x);
+
 void set(EigenVector& x, double const a);
 
 void copy(EigenVector const& x, EigenVector& y);
diff --git a/MathLib/LinAlg/PETSc/PETScVector.cpp b/MathLib/LinAlg/PETSc/PETScVector.cpp
index 6bbf6d41c93..8381982c0b1 100644
--- a/MathLib/LinAlg/PETSc/PETScVector.cpp
+++ b/MathLib/LinAlg/PETSc/PETScVector.cpp
@@ -38,8 +38,6 @@ PETScVector::PETScVector(const PetscInt vec_size, const bool is_global_size)
     }
 
     config();
-
-    _global_v = std::unique_ptr<PetscScalar[]>{ new PetscScalar[_size] };
 }
 
 PETScVector::PETScVector(const PetscInt vec_size,
@@ -63,8 +61,6 @@ PETScVector::PETScVector(const PetscInt vec_size,
     }
 
     config();
-
-    _global_v = std::unique_ptr<PetscScalar[]>{ new PetscScalar[_size] };
 }
 
 PETScVector::PETScVector(const PETScVector &existing_vec, const bool deep_copy)
@@ -76,13 +72,6 @@ PETScVector::PETScVector(const PETScVector &existing_vec, const bool deep_copy)
     {
         VecCopy(existing_vec._v, _v);
     }
-
-    if (!_global_v)
-    {
-        _global_v = std::unique_ptr<PetscScalar[]>{ new PetscScalar[_size] };
-    }
-
-    getGlobalVector(_global_v.get());
 }
 
 PETScVector::PETScVector(PETScVector &&other)
@@ -119,13 +108,6 @@ void PETScVector::finalizeAssembly()
 {
     VecAssemblyBegin(_v);
     VecAssemblyEnd(_v);
-
-    if (!_global_v)
-    {
-        _global_v = std::unique_ptr<PetscScalar[]>{ new PetscScalar[_size] };
-    }
-
-    getGlobalVector(_global_v.get());
 }
 
 void PETScVector::gatherLocalVectors( PetscScalar local_array[],
@@ -181,6 +163,17 @@ void PETScVector::getGlobalVector(PetscScalar u[])
 #endif
 }
 
+void PETScVector::setLocalAccessibleVector()
+{
+    // TODO: use getLocalVector
+    if (!_global_v)
+    {
+        _global_v = std::unique_ptr<PetscScalar[]>{ new PetscScalar[_size] };
+    }
+
+    getGlobalVector(_global_v.get());
+}
+
 void PETScVector::copyValues(std::vector<double>& u) const
 {
     assert(u.size() == (std::size_t) (getLocalSize() + getGhostSize()));
diff --git a/MathLib/LinAlg/PETSc/PETScVector.h b/MathLib/LinAlg/PETSc/PETScVector.h
index 0ea4cb51e8f..b9722deb9d5 100644
--- a/MathLib/LinAlg/PETSc/PETScVector.h
+++ b/MathLib/LinAlg/PETSc/PETScVector.h
@@ -155,14 +155,17 @@ class PETScVector
             VecSetValues(_v, e_idxs.size(), &e_idxs[0], &sub_vec[0], INSERT_VALUES);
         }
 
-        //! Get several entries
+        /// Set local accessible vector in order to get entries.
+        /// Call this before call operator[] or get(...).
+        void setLocalAccessibleVector();
+
+        /// Get several entries
         std::vector<double> get(std::vector<IndexType> const& indices) const;
 
         // TODO preliminary
         double operator[] (PetscInt idx) const
         {
             const PetscInt id_p = (idx == -_size) ?  0 : std::abs(idx);
-            //VecGetValues(_v, 1, &id_p, &value);
             return _global_v[id_p];
         }
 
diff --git a/NumLib/ODESolver/NonlinearSolver.cpp b/NumLib/ODESolver/NonlinearSolver.cpp
index c8a36c347e1..3a0c4a24046 100644
--- a/NumLib/ODESolver/NonlinearSolver.cpp
+++ b/NumLib/ODESolver/NonlinearSolver.cpp
@@ -55,10 +55,15 @@ bool NonlinearSolver<NonlinearSolverTag::Picard>::solve(
         BaseLib::RunTime time_iteration;
         time_iteration.start();
 
+        // The function only has computation if DDC is appied,
+        // e.g. Parallel comuting.
+        MathLib::LinAlg::setLocalAccessibleVector(x);
+
         sys.preIteration(iteration, x);
 
         BaseLib::RunTime time_assembly;
         time_assembly.start();
+
         sys.assemble(x, coupling_term);
         sys.getA(A);
         sys.getRhs(rhs);
@@ -88,6 +93,10 @@ bool NonlinearSolver<NonlinearSolverTag::Picard>::solve(
         }
         else
         {
+            // The function only has computation if DDC is appied,
+            // e.g. Parallel comuting.
+            LinAlg::setLocalAccessibleVector(x_new);
+
             if (postIterationCallback)
                 postIterationCallback(iteration, x_new);
 
@@ -105,6 +114,7 @@ bool NonlinearSolver<NonlinearSolverTag::Picard>::solve(
                     // Thereby the failed solution can be used by the caller for
                     // debugging purposes.
                     LinAlg::copy(x_new, x);
+                    LinAlg::setLocalAccessibleVector(x);
                     break;
                 case IterationResult::REPEAT_ITERATION:
                     INFO(
@@ -136,6 +146,7 @@ bool NonlinearSolver<NonlinearSolverTag::Picard>::solve(
 
         // Update x s.t. in the next iteration we will compute the right delta x
         LinAlg::copy(x_new, x);
+        LinAlg::setLocalAccessibleVector(x);
 
         INFO("[time] Iteration #%u took %g s.", iteration,
              time_iteration.elapsed());
@@ -199,10 +210,15 @@ bool NonlinearSolver<NonlinearSolverTag::Newton>::solve(
         BaseLib::RunTime time_iteration;
         time_iteration.start();
 
+        // The function only has computation if DDC is appied,
+        // e.g. Parallel comuting.
+        LinAlg::setLocalAccessibleVector(x);
+
         sys.preIteration(iteration, x);
 
         BaseLib::RunTime time_assembly;
         time_assembly.start();
+
         sys.assemble(x, coupling_term);
         sys.getResidual(x, res);
         sys.getJacobian(J);
@@ -235,6 +251,11 @@ bool NonlinearSolver<NonlinearSolverTag::Newton>::solve(
                     x, _x_new_id);
             LinAlg::axpy(x_new, -_alpha, minus_delta_x);
 
+            // The function only has computation if DDC is appied,
+            // e.g. Parallel comuting.
+            LinAlg::setLocalAccessibleVector(x_new);
+
+
             if (postIterationCallback)
                 postIterationCallback(iteration, x_new);
 
@@ -261,6 +282,8 @@ bool NonlinearSolver<NonlinearSolverTag::Newton>::solve(
             // TODO could be done via swap. Note: that also requires swapping
             // the ids. Same for the Picard scheme.
             LinAlg::copy(x_new, x);  // copy new solution to x
+            LinAlg::setLocalAccessibleVector(x);
+
             NumLib::GlobalVectorProvider::provider.releaseVector(
                 x_new);
         }
diff --git a/NumLib/ODESolver/TimeDiscretization.h b/NumLib/ODESolver/TimeDiscretization.h
index 0db170d920e..eec9b707f57 100644
--- a/NumLib/ODESolver/TimeDiscretization.h
+++ b/NumLib/ODESolver/TimeDiscretization.h
@@ -220,12 +220,18 @@ public:
     {
         _t = t0;
         MathLib::LinAlg::copy(x0, _x_old);
+        // The function only has computation if DDC is appied,
+        // e.g. Parallel comuting.
+        MathLib::LinAlg::setLocalAccessibleVector(_x_old);
     }
 
     void pushState(const double /*t*/, GlobalVector const& x,
                    InternalMatrixStorage const&) override
     {
         MathLib::LinAlg::copy(x, _x_old);
+        // The function only has computation if DDC is appied,
+        // e.g. Parallel comuting.
+        MathLib::LinAlg::setLocalAccessibleVector(_x_old);
     }
 
     void nextTimestep(const double t, const double delta_t) override
@@ -270,12 +276,18 @@ public:
         _t = t0;
         _t_old = t0;
         MathLib::LinAlg::copy(x0, _x_old);
+        // The function only has computation if DDC is appied,
+        // e.g. Parallel comuting.
+        MathLib::LinAlg::setLocalAccessibleVector(_x_old);
     }
 
     void pushState(const double /*t*/, GlobalVector const& x,
                    InternalMatrixStorage const&) override
     {
         MathLib::LinAlg::copy(x, _x_old);
+        // The function only has computation if DDC is appied,
+        // e.g. Parallel comuting.
+        MathLib::LinAlg::setLocalAccessibleVector(_x_old);
     }
 
     void nextTimestep(const double t, const double delta_t) override
@@ -344,12 +356,18 @@ public:
     {
         _t = t0;
         MathLib::LinAlg::copy(x0, _x_old);
+        // The function only has computation if DDC is appied,
+        // e.g. Parallel comuting.
+        MathLib::LinAlg::setLocalAccessibleVector(_x_old);
     }
 
     void pushState(const double, GlobalVector const& x,
                    InternalMatrixStorage const& strg) override
     {
         MathLib::LinAlg::copy(x, _x_old);
+        // The function only has computation if DDC is appied,
+        // e.g. Parallel comuting.
+        MathLib::LinAlg::setLocalAccessibleVector(_x_old);
         strg.pushMatrices();
     }
 
@@ -433,6 +451,12 @@ public:
         _t = t0;
         _xs_old.push_back(
             &NumLib::GlobalVectorProvider::provider.getVector(x0));
+        for (auto x_old : _xs_old)
+        {
+            // The function only has computation if DDC is appied,
+            // e.g. Parallel comuting.
+            MathLib::LinAlg::setLocalAccessibleVector(*x_old);
+        }
     }
 
     void pushState(const double, GlobalVector const& x,
@@ -453,6 +477,13 @@ public:
             _offset = (_offset + 1) %
                       _num_steps;  // treat _xs_old as a circular buffer
         }
+
+        for (auto x_old : _xs_old)
+        {
+            // The function only has computation if DDC is appied,
+            // e.g. Parallel comuting.
+            MathLib::LinAlg::setLocalAccessibleVector(*x_old);
+        }
     }
 
     void nextTimestep(const double t, const double delta_t) override
diff --git a/NumLib/ODESolver/TimeDiscretizedODESystem.cpp b/NumLib/ODESolver/TimeDiscretizedODESystem.cpp
index 0684b2657f4..48de599f3d6 100644
--- a/NumLib/ODESolver/TimeDiscretizedODESystem.cpp
+++ b/NumLib/ODESolver/TimeDiscretizedODESystem.cpp
@@ -10,6 +10,7 @@
 #include "TimeDiscretizedODESystem.h"
 
 #include "MathLib/LinAlg/ApplyKnownSolution.h"
+#include "MathLib/LinAlg/LinAlg.h"
 #include "MathLib/LinAlg/UnifiedMatrixSetters.h"
 #include "NumLib/IndexValueVector.h"
 
@@ -79,6 +80,9 @@ void TimeDiscretizedODESystem<ODESystemTag::FirstOrderImplicitQuasilinear,
 
     auto& xdot = NumLib::GlobalVectorProvider::provider.getVector(_xdot_id);
     _time_disc.getXdot(x_new_timestep, xdot);
+    // The function only has computation if DDC is appied,
+    // e.g. Parallel comuting.
+    MathLib::LinAlg::setLocalAccessibleVector(xdot);
 
     _M->setZero();
     _K->setZero();
@@ -106,6 +110,9 @@ void TimeDiscretizedODESystem<
     //      fragile.
     auto& xdot = NumLib::GlobalVectorProvider::provider.getVector(_xdot_id);
     _time_disc.getXdot(x_new_timestep, xdot);
+    // The function only has computation if DDC is appied,
+    // e.g. Parallel comuting.
+    MathLib::LinAlg::setLocalAccessibleVector(xdot);
 
     _mat_trans->computeResidual(*_M, *_K, *_b, x_new_timestep, xdot, res);
 
diff --git a/ProcessLib/UncoupledProcessesTimeLoop.cpp b/ProcessLib/UncoupledProcessesTimeLoop.cpp
index f31970e6894..2ed13eb7e58 100644
--- a/ProcessLib/UncoupledProcessesTimeLoop.cpp
+++ b/ProcessLib/UncoupledProcessesTimeLoop.cpp
@@ -587,7 +587,7 @@ bool UncoupledProcessesTimeLoop::loop()
         for (auto& spd : _per_process_data)
         {
             auto& pcs = spd->process;
-            auto const& x0 = *_process_solutions[pcs_idx];
+            auto& x0 = *_process_solutions[pcs_idx];
 
             pcs.preTimestep(x0, t0, _timestepper->getTimeStep().dt());
             _output->doOutput(pcs, spd->process_output, 0, t0, x0);
@@ -636,7 +636,7 @@ bool UncoupledProcessesTimeLoop::loop()
         for (auto& spd : _per_process_data)
         {
             auto& pcs = spd->process;
-            auto const& x = *_process_solutions[pcs_idx];
+            auto& x = *_process_solutions[pcs_idx];
             _output->doOutputLastTimestep(pcs, spd->process_output, timestep, t,
                                           x);
 
@@ -667,6 +667,11 @@ bool UncoupledProcessesTimeLoop::solveUncoupledEquationSystems(
         const auto nonlinear_solver_succeeded =
             solveOneTimeStepOneProcess(x, timestep_id, t, dt, *spd,
                                        void_staggered_coupling_term, *_output);
+
+        // The function only has computation if DDC is appied,
+        // e.g. Parallel comuting.
+        MathLib::LinAlg::setLocalAccessibleVector(x);
+
         pcs.postTimestep(x);
         pcs.computeSecondaryVariable(t, x, void_staggered_coupling_term);
 
@@ -798,6 +803,11 @@ bool UncoupledProcessesTimeLoop::solveCoupledEquationSystemsByStaggeredScheme(
         StaggeredCouplingTerm coupled_term(
             spd->coupled_processes, _solutions_of_coupled_processes[pcs_idx],
             0.0);
+
+        // The function only has computation if DDC is appied,
+        // e.g. Parallel comuting.
+        MathLib::LinAlg::setLocalAccessibleVector(x);
+
         pcs.computeSecondaryVariable(t, x, coupled_term);
 
         _output->doOutput(pcs, spd->process_output, timestep_id, t, x);
diff --git a/Tests/NumLib/TestODEInt.cpp b/Tests/NumLib/TestODEInt.cpp
index 9e8ee9b0ce6..229f5fe7be7 100644
--- a/Tests/NumLib/TestODEInt.cpp
+++ b/Tests/NumLib/TestODEInt.cpp
@@ -19,6 +19,7 @@
 #include "BaseLib/BuildInfo.h"
 #include "NumLib/NumericsConfig.h"
 #include "NumLib/ODESolver/ConvergenceCriterionDeltaX.h"
+#include "MathLib/LinAlg/LinAlg.h"
 #include "Tests/TestTools.h"
 #include "ODEs.h"
 
@@ -103,6 +104,9 @@ public:
         if (num_timesteps > 0)
             EXPECT_TRUE(loop.loop(t0, x0, t_end, delta_t, cb));
 
+        for (auto& x :  sol.solutions)
+            MathLib::LinAlg::setLocalAccessibleVector(x);
+
         return sol;
     }
 
@@ -258,7 +262,8 @@ public:
                             TestParams::tol_picard_newton);
 
                 auto const t = sol_picard.ts[i];
-                auto const sol_analyt = ODETraits<ODE>::solution(t);
+                auto sol_analyt = ODETraits<ODE>::solution(t);
+                MathLib::LinAlg::setLocalAccessibleVector(sol_analyt);
 
                 EXPECT_NEAR(sol_picard.solutions[i][comp], sol_analyt[comp],
                             TestParams::tol_analyt);
diff --git a/Tests/NumLib/TimeLoopSingleODE.h b/Tests/NumLib/TimeLoopSingleODE.h
index 531ef5432da..1458f420a01 100644
--- a/Tests/NumLib/TimeLoopSingleODE.h
+++ b/Tests/NumLib/TimeLoopSingleODE.h
@@ -12,6 +12,7 @@
 #include "NumLib/DOF/GlobalMatrixProviders.h"
 #include "NumLib/ODESolver/TimeDiscretizedODESystem.h"
 #include "NumLib/ODESolver/NonlinearSolver.h"
+#include "MathLib/LinAlg/LinAlg.h"
 
 #include "ProcessLib/StaggeredCouplingTerm.h"
 
@@ -97,6 +98,7 @@ bool TimeLoopSingleODE<NLTag>::loop(const double t0, GlobalVector const& x0,
 
     if (time_disc.needsPreload())
     {
+        MathLib::LinAlg::setLocalAccessibleVector(x);
         _nonlinear_solver->assemble(x,
                                  ProcessLib::createVoidStaggeredCouplingTerm());
         time_disc.pushState(t0, x0,
-- 
GitLab