From 26242cc0222ba69f145fbe298200efe849e7bf3d Mon Sep 17 00:00:00 2001
From: Thomas Fischer <thomas.fischer@ufz.de>
Date: Tue, 3 Jul 2012 12:49:28 +0200
Subject: [PATCH] [CRSMatrixPThreads] - added method to calculate a balanced
 workload

[new file SimpleTests/MatrixTests/MatVecMultPthreads.cpp]
- test matrix vector multiplication employing pthreads and balanced work

[files MathLib/LinAlg/Sparse/amuxCRS.{cpp,h}]
- implemented matrix vector multiplication employing pthreads that better balances the work
---
 MathLib/LinAlg/Sparse/CRSMatrixPThreads.h     |  61 ++++++-
 MathLib/LinAlg/Sparse/amuxCRS.cpp             |  42 ++++-
 MathLib/LinAlg/Sparse/amuxCRS.h               |   7 +-
 SimpleTests/MatrixTests/CMakeLists.txt        |  17 ++
 .../MatrixTests/MatVecMultPthreads.cpp        | 167 ++++++++++++++++++
 5 files changed, 282 insertions(+), 12 deletions(-)
 create mode 100644 SimpleTests/MatrixTests/MatVecMultPthreads.cpp

diff --git a/MathLib/LinAlg/Sparse/CRSMatrixPThreads.h b/MathLib/LinAlg/Sparse/CRSMatrixPThreads.h
index 7389f10ab56..b0f95f2922d 100644
--- a/MathLib/LinAlg/Sparse/CRSMatrixPThreads.h
+++ b/MathLib/LinAlg/Sparse/CRSMatrixPThreads.h
@@ -26,29 +26,72 @@ template<class T> class CRSMatrixPThreads : public CRSMatrix<T,unsigned>
 {
 public:
 	CRSMatrixPThreads(std::string const &fname, unsigned num_of_threads) :
-		CRSMatrix<T,unsigned>(fname), _num_of_threads (num_of_threads)
-	{}
+		CRSMatrix<T,unsigned>(fname), _n_threads (num_of_threads),
+		_workload_intervals(new unsigned[num_of_threads+1])
+	{
+		calcWorkload();
+	}
 
 	CRSMatrixPThreads(unsigned n, unsigned *iA, unsigned *jA, T* A, unsigned num_of_threads) :
-		CRSMatrix<T,unsigned>(n, iA, jA, A), _num_of_threads (num_of_threads)
-	{}
+		CRSMatrix<T,unsigned>(n, iA, jA, A), _n_threads (num_of_threads),
+		_workload_intervals(new unsigned[num_of_threads+1])
+	{
+		calcWorkload();
+	}
 
 	CRSMatrixPThreads(unsigned n1) :
-		CRSMatrix<T,unsigned>(n1), _num_of_threads (1)
-	{}
+		CRSMatrix<T,unsigned>(n1), _n_threads (1),
+		_workload_intervals(new unsigned[_n_threads+1])
+	{
+		calcWorkload();
+	}
 
 	virtual ~CRSMatrixPThreads()
-	{}
+	{
+		delete [] _workload_intervals;
+	}
 
 	virtual void amux(T d, T const * const x, T *y) const
 	{
 		amuxCRSParallelPThreads(d, SparseMatrixBase<T, unsigned>::_n_rows,
 						CRSMatrix<T, unsigned>::_row_ptr, CRSMatrix<T, unsigned>::_col_idx,
-						CRSMatrix<T, unsigned>::_data, x, y, _num_of_threads);
+						CRSMatrix<T, unsigned>::_data, x, y, _n_threads, _workload_intervals);
 	}
 
 protected:
-	unsigned _num_of_threads;
+	void calcWorkload()
+	{
+		_workload_intervals[0] = 0;
+		_workload_intervals[_n_threads] = SparseMatrixBase<T, unsigned>::_n_rows;
+
+		const unsigned work_per_core (this->getNNZ()/_n_threads);
+		for (unsigned k(1); k<_n_threads; k++) {
+			unsigned upper_bound_kth_core(k * work_per_core);
+			// search in _row_ptr array for the appropriate index
+			unsigned beg (_workload_intervals[k-1]);
+			unsigned end (_workload_intervals[_n_threads]);
+			bool found (false);
+			while (beg < end && !found) {
+				unsigned m ((end+beg)/2);
+
+				if (upper_bound_kth_core == this->_row_ptr[m]) {
+					_workload_intervals[k] = m;
+					found = true;
+				} else {
+					if (upper_bound_kth_core < this->_row_ptr[m]) {
+						end = m;
+					} else {
+						beg = m+1;
+					}
+				}
+			}
+			if (!found)
+				_workload_intervals[k] = beg;
+		}
+	}
+
+	const unsigned _n_threads;
+	unsigned *_workload_intervals;
 };
 
 } // end namespace MathLib
diff --git a/MathLib/LinAlg/Sparse/amuxCRS.cpp b/MathLib/LinAlg/Sparse/amuxCRS.cpp
index 8acd84c8cbc..b73aeb160e7 100644
--- a/MathLib/LinAlg/Sparse/amuxCRS.cpp
+++ b/MathLib/LinAlg/Sparse/amuxCRS.cpp
@@ -54,9 +54,9 @@ void* amuxCRSpthread (void* ptr)
 	double* y(thread_param->_y);
 
 	for (unsigned i(beg_row); i<end_row; i++) {
-		y[i] = 0.0;
+		y[i] = A[iA[i]] * x[jA[iA[i]]];
 		const unsigned end (iA[i+1]);
-		for (unsigned j(iA[i]); j<end; j++) {
+		for (unsigned j(iA[i]+1); j<end; j++) {
 			y[i] += A[j] * x[jA[j]];
 		}
 		y[i] *= a;
@@ -105,6 +105,44 @@ void amuxCRSParallelPThreads (double a,
 #endif
 }
 
+void amuxCRSParallelPThreads (double a,
+	unsigned n, unsigned const * const iA, unsigned const * const jA,
+	double const * const A, double const * const x, double* y,
+	unsigned num_of_pthreads, unsigned const*const workload_intervals)
+{
+#ifdef HAVE_PTHREADS
+	// fill thread data objects
+	MatMultThreadParam** thread_param_array (new MatMultThreadParam*[num_of_pthreads]);
+	for (unsigned k(0); k<num_of_pthreads; k++) {
+		thread_param_array[k] = new MatMultThreadParam (a, workload_intervals[k], workload_intervals[k+1], iA, jA, A, x, y);
+	}
+
+	// allocate thread_array and return value array
+	pthread_t *thread_array (new pthread_t[num_of_pthreads]);
+	int *ret_vals (new int[num_of_pthreads]);
+
+	// create threads
+	for (unsigned k(0); k<num_of_pthreads; k++) {
+		ret_vals[k] = pthread_create( &(thread_array[k]), NULL, amuxCRSpthread, thread_param_array[k]);
+	}
+
+	// join threads
+	for (unsigned k(0); k<num_of_pthreads; k++) {
+		pthread_join (thread_array[k], NULL);
+	}
+
+	delete [] ret_vals;
+	for (unsigned k(0); k<num_of_pthreads; k++)
+		delete thread_param_array[k];
+	delete [] thread_param_array;
+	delete [] thread_array;
+#else
+	(void)num_of_pthreads;
+	amuxCRS (a, n, iA, jA, A, x, y);
+#endif
+}
+
+
 void amuxCRSSym (double a,
 	unsigned n, unsigned const * const iA, unsigned const * const jA,
         double const * const A, double const * const x, double* y)
diff --git a/MathLib/LinAlg/Sparse/amuxCRS.h b/MathLib/LinAlg/Sparse/amuxCRS.h
index a3a6d098b84..721f24bec53 100644
--- a/MathLib/LinAlg/Sparse/amuxCRS.h
+++ b/MathLib/LinAlg/Sparse/amuxCRS.h
@@ -31,9 +31,14 @@ void amuxCRS(FP_TYPE a, IDX_TYPE n, IDX_TYPE const * const iA, IDX_TYPE const *
 
 void amuxCRSParallelPThreads (double a,
 	unsigned n, unsigned const * const iA, unsigned const * const jA,
-        double const * const A, double const * const x, double* y,
+	double const * const A, double const * const x, double* y,
 	unsigned num_of_pthreads);
 
+void amuxCRSParallelPThreads (double a,
+	unsigned n, unsigned const * const iA, unsigned const * const jA,
+	double const * const A, double const * const x, double* y,
+	unsigned num_of_pthreads, unsigned const*const workload_intervals);
+
 #ifdef _OPENMP
 template<typename FP_TYPE, typename IDX_TYPE>
 void amuxCRSParallelOpenMP (FP_TYPE a,
diff --git a/SimpleTests/MatrixTests/CMakeLists.txt b/SimpleTests/MatrixTests/CMakeLists.txt
index c06884683a9..14975e497f5 100644
--- a/SimpleTests/MatrixTests/CMakeLists.txt
+++ b/SimpleTests/MatrixTests/CMakeLists.txt
@@ -26,6 +26,23 @@ ADD_EXECUTABLE( MatMult
 SET_TARGET_PROPERTIES(MatMult PROPERTIES FOLDER SimpleTests)
 TARGET_LINK_LIBRARIES(MatMult logog)
 
+IF ( HAVE_PTHREADS )
+ADD_EXECUTABLE( MatVecMultPthreads
+        MatVecMultPthreads.cpp
+        ${SOURCES}
+        ${HEADERS}
+)
+SET_TARGET_PROPERTIES(MatVecMultPthreads PROPERTIES FOLDER SimpleTests)
+TARGET_LINK_LIBRARIES(MatVecMultPthreads 
+	pthread
+	BaseLib
+	MathLib
+	logog)
+ENDIF ( HAVE_PTHREADS )
+
+SET_TARGET_PROPERTIES(MatMult PROPERTIES FOLDER SimpleTests)
+TARGET_LINK_LIBRARIES(MatMult logog)
+
 # Create the executable
 ADD_EXECUTABLE( MatTestRemoveRowsCols
         MatTestRemoveRowsCols.cpp
diff --git a/SimpleTests/MatrixTests/MatVecMultPthreads.cpp b/SimpleTests/MatrixTests/MatVecMultPthreads.cpp
new file mode 100644
index 00000000000..c506deb2535
--- /dev/null
+++ b/SimpleTests/MatrixTests/MatVecMultPthreads.cpp
@@ -0,0 +1,167 @@
+/**
+ * Copyright (c) 2012, OpenGeoSys Community (http://www.opengeosys.net)
+ *            Distributed under a Modified BSD License.
+ *              See accompanying file LICENSE.txt or
+ *              http://www.opengeosys.net/LICENSE.txt
+ *
+ * \file MatVecMultPthreads.cpp
+ *
+ *  Created on  Jul 3, 2012 by Thomas Fischer
+ */
+
+#include <fstream>
+#include <iostream>
+#include <cmath>
+#include <limits>
+#include <cstdlib>
+#include "sparse.h"
+#include "LinAlg/Sparse/CRSMatrix.h"
+#include "LinAlg/Sparse/CRSMatrixOpenMP.h"
+#include "LinAlg/Sparse/CRSMatrixPThreads.h"
+
+// BaseLib
+#include "RunTime.h"
+#include "CPUTime.h"
+// BaseLib/logog
+#include "logog.hpp"
+#include "formatter.hpp"
+// BaseLib/tclap
+#include "tclap/CmdLine.h"
+
+#ifdef UNIX
+#include <sys/unistd.h>
+#endif
+
+#ifdef OGS_BUILD_INFO
+#include "BuildInfo.h"
+#endif
+
+#ifdef _OPENMP
+#include <omp.h>
+#endif
+
+/**
+ * new formatter for logog
+ */
+class FormatterCustom : public logog::FormatterGCC
+{
+    virtual TOPIC_FLAGS GetTopicFlags( const logog::Topic &topic )
+    {
+        return ( Formatter::GetTopicFlags( topic ) &
+                 ~( TOPIC_FILE_NAME_FLAG | TOPIC_LINE_NUMBER_FLAG ));
+    }
+};
+
+int main(int argc, char *argv[])
+{
+	LOGOG_INITIALIZE();
+
+	TCLAP::CmdLine cmd("Simple matrix vector multiplication test employing pthreads", ' ', "0.1");
+
+	// Define a value argument and add it to the command line.
+	// A value arg defines a flag and a type of value that it expects,
+	// such as "-m matrix".
+	TCLAP::ValueArg<std::string> matrix_arg("m", "matrix", "input matrix file", true, "", "string");
+
+	// Add the argument mesh_arg to the CmdLine object. The CmdLine object
+	// uses this Arg to parse the command line.
+	cmd.add( matrix_arg );
+
+	TCLAP::ValueArg<unsigned> n_cores_arg("p", "number-cores", "number of cores to use", false, 1, "number");
+	cmd.add( n_cores_arg );
+
+	TCLAP::ValueArg<unsigned> n_mults_arg("n", "number-of-multiplications", "number of multiplications to perform", true, 10, "number");
+	cmd.add( n_mults_arg );
+
+	TCLAP::ValueArg<std::string> output_arg("o", "output", "output file", false, "", "string");
+	cmd.add( output_arg );
+
+	TCLAP::ValueArg<unsigned> verbosity_arg("v", "verbose", "level of verbosity [0 very low information, 1 much information]", false, 0, "string");
+	cmd.add( verbosity_arg );
+
+	cmd.parse( argc, argv );
+
+	std::string fname_mat (matrix_arg.getValue());
+
+	FormatterCustom *custom_format (new FormatterCustom);
+	logog::Cout *logogCout(new logog::Cout);
+	logogCout->SetFormatter(*custom_format);
+
+	logog::LogFile *logog_file(NULL);
+	if (! output_arg.getValue().empty()) {
+		logog_file = new logog::LogFile(output_arg.getValue().c_str());
+		logog_file->SetFormatter( *custom_format );
+	}
+
+#ifdef OGS_BUILD_INFO
+	INFO("%s was build with compiler %s", argv[0], CMAKE_CXX_COMPILER);
+	if (std::string(CMAKE_BUILD_TYPE).compare("Release") == 0) {
+		INFO("CXX_FLAGS: %s %s", CMAKE_CXX_FLAGS, CMAKE_CXX_FLAGS_RELEASE);
+	} else {
+		INFO("CXX_FLAGS: %s %s", CMAKE_CXX_FLAGS, CMAKE_CXX_FLAGS_DEBUG);
+	}
+#endif
+
+#ifdef UNIX
+	const int max_host_name_len (255);
+	char *hostname(new char[max_host_name_len]);
+	if (gethostname(hostname, max_host_name_len) == 0)
+		INFO("hostname: %s", hostname);
+	delete [] host_name_len;
+#endif
+
+	// *** reading matrix in crs format from file
+	std::ifstream in(fname_mat.c_str(), std::ios::in | std::ios::binary);
+	double *A(NULL);
+	unsigned *iA(NULL), *jA(NULL), n;
+	if (in) {
+		INFO("reading matrix from %s ...", fname_mat.c_str());
+		BaseLib::RunTime timer;
+		timer.start();
+		CS_read(in, n, iA, jA, A);
+		timer.stop();
+		INFO("\t- took %e s", timer.elapsed());
+	} else {
+		ERR("error reading matrix from %s", fname_mat.c_str());
+		return -1;
+	}
+	unsigned nnz(iA[n]);
+	INFO("\tParameters read: n=%d, nnz=%d", n, nnz);
+
+//#ifdef HAVE_PTHREADS
+	unsigned n_threads(n_cores_arg.getValue());
+	MathLib::CRSMatrixPThreads<double> mat (n, iA, jA, A, n_threads);
+
+	double *x(new double[n]);
+	double *y(new double[n]);
+
+	for (unsigned k(0); k<n; ++k)
+		x[k] = 1.0;
+
+	// read the number of multiplication to execute
+	unsigned n_mults (n_mults_arg.getValue());
+
+	INFO("*** %d matrix vector multiplications (MVM) with Toms amuxCRS (%d threads) ...", n_mults, n_threads);
+	BaseLib::RunTime run_timer;
+	BaseLib::CPUTime cpu_timer;
+	run_timer.start();
+	cpu_timer.start();
+	for (size_t k(0); k<n_mults; k++) {
+		mat.amux (1.0, x, y);
+	}
+	cpu_timer.stop();
+	run_timer.stop();
+
+	INFO("\t[MVM] - took %e sec cpu time, %e sec run time", cpu_timer.elapsed(), run_timer.elapsed());
+
+	delete [] x;
+	delete [] y;
+//#endif
+
+	delete custom_format;
+	delete logogCout;
+	delete logog_file;
+	LOGOG_SHUTDOWN();
+
+	return 0;
+}
-- 
GitLab