Skip to content

MPI_Waitall crashes with mixed ISend IRecv requests #12942

Closed
@chhu

Description

@chhu

Background information

After a recent update of ompi caused our simulation and benchmark crashing unexpectedly on MPI_Waitall.

[athos:00000] *** An error occurred in MPI_Waitall
[athos:00000] *** reported by process [3991207937,1]
[athos:00000] *** on communicator MPI_COMM_WORLD
[athos:00000] *** MPI_ERR_REQUEST: invalid request
[athos:00000] *** MPI_ERRORS_ARE_FATAL (processes in this communicator will now abort,
[athos:00000] *** and MPI will try to terminate your MPI job as well)

This was working before and after experimenting I found a fix by issuing two separate MPI_Waitall on the ISend reqs and IRecv reqs. So my suspicion is the mix.
The little benchmark code is attached.

WARNING: This is one of the bugs that I cannot safely reproduce. In 3 out of 10 times it actually works... :(

What version of Open MPI are you using? (e.g., v4.1.6, v5.0.1, git branch name and hash, etc.)

mpirun --version
mpirun (Open MPI) 5.1.0a1
(recent pull on main)

Describe how Open MPI was installed (e.g., from a source/distribution tarball, from a git clone, from an operating system distribution package, etc.)

mkdir ucx_build
git clone https://github.com/openucx/ucx.git
cd ucx
./autogen.sh
contrib/configure-release-mt --enable-optimizations --with-mlx5 --with-march --with-cuda=$CUDA_PATH --with-gdrcopy=$P/gdrcopy_build --with-mlx5-dv --prefix=$P/ucx_build
make V=1 -j
make install
cd ..
mkdir ompi_build
git clone --depth 1 https://github.com/open-mpi/ompi.git --recursive
cd ompi
./autogen.pl
./configure --prefix=$P/ompi_build --with-cuda=$CUDA_PATH --with-ucx=$P/ucx_build
make -j
make install

If you are building/installing from a git clone, please copy-n-paste the output from git submodule status.

08e41ed 3rd-party/openpmix (v1.1.3-4067-g08e41ed5)
30cadc6746ebddd69ea42ca78b964398f782e4e3 3rd-party/prrte (psrvr-v2.0.0rc1-4839-g30cadc6746)
dfff67569fb72dbf8d73a1dcf74d091dad93f71b config/oac (dfff675)

Please describe the system on which you are running

  • Operating system/version: RH8
  • Computer hardware: Epyc 1stgen w/ CUDA
  • Network type: MLX5

Details of the problem

mpicxx  -O2 -std=c++17 -march=native -w -DUSE_MPI gflops.cpp -o gflops
mpirun -n 4 ./gflops

gflops.cpp:

#include <stdio.h>
#include <iostream>
#include <string>
#include <algorithm>
#include <numeric>
#include <vector>
#include <valarray>
#include <chrono>
//#include <cblas.h>
#ifdef USE_MPI
#include <mpi.h>
#endif

using namespace std;
using namespace chrono;


double dot(const vector<double> &a, const vector<double> &b) {

	//return std::inner_product(a.begin(), a.end(), b.begin(), 0); // Comparison to STLs own dot product. Very slow.
	//return cblas_ddot (a.size(), &(a[0]), 1, &(b[0]), 1);		 // Comparison to cblas / MKL / BLIS. No speed gain.
	double result = 0;
	for (size_t i = 0; i < a.size(); i += 8)
		result += a[i + 0] * b[i + 0] + a[i + 1] * b[i + 1]	// Vectorizing hint, should produce AVX512 machine code where possible
	        	+ a[i + 2] * b[i + 2] + a[i + 3] * b[i + 3]
			+ a[i + 4] * b[i + 4] + a[i + 5] * b[i + 5]
			+ a[i + 6] * b[i + 6] + a[i + 7] * b[i + 7];
	return result;
}

int main(int argc, char **argv) {

	size_t mem_size = 0;
	unsigned loops = 0;
	int rank = 0, cpu_count = 1;
#ifdef USE_MPI
	MPI_Init( &argc, &argv );
	MPI_Comm communicator = MPI_COMM_WORLD;

	MPI_Comm_rank(communicator, &rank);
	MPI_Comm_size(communicator, &cpu_count);
	if (rank == 0)
		printf("MPI Enabled, node %d of %d\nOutput is [min of single rank, max of single rank, TOTAL]\nThis benchmark does not account for MPI overhead. ", rank, cpu_count);
#endif
	if (rank == 0)
		printf("A large dot product is performed that quickly evades CPU cache.\n");

	if (argc != 3) {
		if (rank == 0)
			cout << "Use: ./gflops <MByte to test per rank> <N loops>\nUsing default: 200MB / rank, 30 loops.\n";
		mem_size = 200;
		loops = 30;

	} else {
		mem_size = (size_t) stoi(string(argv[1]), nullptr);
		loops = stoi(string(argv[2]), nullptr);
	}
	mem_size *=  1024 * 1024;

	size_t elements = mem_size / sizeof(double) / 2;
	vector<double> best_gflop;
	vector<double> a(elements, 0.);
	vector<double> b(elements, 0.);

	for (size_t i = 0; i < elements; i++) {
		a[i] = double(elements) / (i + 1.);
		b[i] = 1. - a[i];
	}

	if (rank == 0)
		cout << "Dot product with 2 " << elements << " double arrays per rank\n";

#ifdef USE_MPI
	double single_ref_gflop = 1;
	if (rank == 0) {
		auto t1 = high_resolution_clock::now();
		volatile double result = 0;
		result += dot(a,b);
		result += dot(a,b);
		result += dot(a,b);
		result += dot(a,b);
		auto t2 = high_resolution_clock::now();
	
		auto duration_i = duration_cast<microseconds>(t2-t1).count();
		double duration = double(duration_i) / 1e6 / 4.;  // 4 times loop

		single_ref_gflop = (1e-9) * ((elements) / (double)duration);
		printf("Single reference for speedup: %5.3g GFLOPs\n", single_ref_gflop);
	}
	MPI_Barrier(communicator);
#endif

	int n_loops = loops;
	while (loops--) {

		volatile double result = 0; // Avoid optimization

		auto t1 = high_resolution_clock::now();

		result += dot(a, b);
		result += dot(a, b);
		result += dot(a, b);
		result += dot(a, b);

		auto t2 = high_resolution_clock::now();

		auto duration_i = duration_cast<microseconds>(t2 - t1).count();
		double duration = double(duration_i) / 1e6 / 4.;  // 4 times loop

		double gflops = (1e-9) * ((elements) / (double) duration);

		double mem_speed =
				double(elements) * 2. * double(sizeof(double))
						/ double(duration) / 1024. / 1024. / 1024.; // GB/s
#ifdef USE_MPI
		// Get min/max/sum from all ranks
		double send_data[2] = {gflops, mem_speed};
		double receive_data_sum[2];
		double receive_data_min[2];
		double receive_data_max[2];
		MPI_Allreduce(send_data, receive_data_sum, 2, MPI_DOUBLE, MPI_SUM, communicator);
		MPI_Allreduce(send_data, receive_data_min, 2, MPI_DOUBLE, MPI_MIN, communicator);
		MPI_Allreduce(send_data, receive_data_max, 2, MPI_DOUBLE, MPI_MAX, communicator);
		best_gflop.push_back(receive_data_sum[0]);
		if (rank == 0)
			printf("Run %u: [%5.3g, %5.3g, %5.3g] GFLOP/s (FMA is 1 FLOP), [%5.3g, %5.3g, %5.3g] GByte/s. Speedup: %.4g\n", n_loops - loops, receive_data_min[0],  receive_data_max[0], receive_data_sum[0],  receive_data_min[1], receive_data_max[1],  receive_data_sum[1], receive_data_sum[0] / single_ref_gflop);
#else
		best_gflop.push_back(gflops);
		if (rank == 0)
			printf("Run %u: %5.3g GFLOP/s (FMA is 1 FLOP), %5.3g GByte/s.\n", n_loops - loops, gflops, mem_speed);
#endif
	}
	if (rank == 0) {
		printf("Peak performance: %g GFLOP/s.\n",
				*max_element(best_gflop.begin(), best_gflop.end()));
		printf("Average performance: %g GFLOP/s.\n",
				accumulate(best_gflop.begin(), best_gflop.end(), 0.) / best_gflop.size());
	}
#ifdef USE_MPI
	// source: https://www.mcs.anl.gov/research/projects/mpi/tutorial/mpiexmpl/src3/pingpong/C/nbhead/main.html
	double *sbuf, *rbuf;
	int n;
	double t1, t2;
	valarray<double> tmin(0.0, cpu_count);
	int i, j, k, nloop;
	MPI_Status status, statuses[2];
	MPI_Request r[2] = {MPI_REQUEST_NULL};

	if (rank == 0)
		printf("\nMPI-Latency and Bandwidth Test\nKind\t\t\t\tbytes\ttime (sec)\tRate (MB/sec)\n");

	for (n = 1; n < 1100000*16; n *= 2) {
		if (n == 0)
			nloop = 1000;
		else
			nloop = 1000 / n;
		if (nloop < 1)
			nloop = 1;

		sbuf = (double*) malloc(n * sizeof(double));
		rbuf = (double*) malloc(n * sizeof(double));

		if (!sbuf || !rbuf) {
			fprintf( stderr,"Could not allocate send/recv buffers of size %d\n", n);
			MPI_Abort(MPI_COMM_WORLD, 1);
		}
		for (k = 0; k < 10; k++) {
			if (rank == 0) {
				for (int target = 1; target < cpu_count; target++) {
					tmin[target] = 1e8;
					/* Make sure both processes are ready */
					MPI_Sendrecv(MPI_BOTTOM, 0, MPI_INT, target, 14, MPI_BOTTOM, 0, MPI_INT, target, 14, MPI_COMM_WORLD, &status);
					t1 = MPI_Wtime();

					for (j = 0; j < nloop; j++) {
						MPI_Isend(sbuf, n, MPI_DOUBLE, target, k, MPI_COMM_WORLD, &r[0]);
						MPI_Irecv(rbuf, n, MPI_DOUBLE, target, k, MPI_COMM_WORLD, &r[1]);
						MPI_Waitall(2, r, MPI_STATUSES_IGNORE);  // crashes !
						//MPI_Wait(&r[0], MPI_STATUS_IGNORE);  // works
						//MPI_Wait(&r[1], MPI_STATUS_IGNORE);
					}
					t2 = (MPI_Wtime() - t1) / nloop;
					if (t2 < tmin[target])
						tmin[target] = t2;
				}
			} else {
				/* Make sure both processes are ready */
				MPI_Sendrecv(MPI_BOTTOM, 0, MPI_INT, 0, 14, MPI_BOTTOM, 0, MPI_INT, 0, 14, MPI_COMM_WORLD, &status);
				for (j = 0; j < nloop; j++) {
					MPI_Irecv(rbuf, n, MPI_DOUBLE, 0, k, MPI_COMM_WORLD, &r[0]);
					MPI_Isend(sbuf, n, MPI_DOUBLE, 0, k, MPI_COMM_WORLD, &r[1]);
					//MPI_Wait(&r[0], MPI_STATUS_IGNORE);  // works!
					//MPI_Wait(&r[1], MPI_STATUS_IGNORE);  
					MPI_Waitall(2, r, MPI_STATUSES_IGNORE);  // crashes!
				}
			}
			
		}

		/* rate is MB/sec for exchange, not an estimate of the
		 component for each isend/irecv */
		if (rank == 0) {
			double rate;
			double tmin_ = tmin.max();
			rate = tmin_ == 0 ? 0 : 2 * n * sizeof(double) * 1.0e-6 / tmin_;
			printf("Worst head-to-head Isend/Irecv\t%d\t%g\t%f\n", n, tmin_, rate);
		}
		free(sbuf);
		free(rbuf);
	}
	MPI_Finalize();
#endif
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions