Skip to content

Commit 7b6a2da

Browse files
authored
Merge pull request #5504 from rhc54/cmr40/ofi
MTL OFI: send/isend split into blocking/non-blocking paths
2 parents 9a6f6e6 + 1fbbae1 commit 7b6a2da

File tree

1 file changed

+150
-79
lines changed

1 file changed

+150
-79
lines changed

ompi/mca/mtl/ofi/mtl_ofi.h

Lines changed: 150 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -238,34 +238,82 @@ ompi_mtl_ofi_isend_callback(struct fi_cq_tagged_entry *wc,
238238
}
239239

240240
__opal_attribute_always_inline__ static inline int
241-
ompi_mtl_ofi_send_start(struct mca_mtl_base_module_t *mtl,
242-
struct ompi_communicator_t *comm,
243-
int dest,
244-
int tag,
245-
struct opal_convertor_t *convertor,
246-
mca_pml_base_send_mode_t mode,
247-
ompi_mtl_ofi_request_t *ofi_req)
241+
ompi_mtl_ofi_ssend_recv(ompi_mtl_ofi_request_t *ack_req,
242+
struct ompi_communicator_t *comm,
243+
fi_addr_t *src_addr,
244+
ompi_mtl_ofi_request_t *ofi_req,
245+
mca_mtl_ofi_endpoint_t *endpoint,
246+
uint64_t *match_bits,
247+
int tag)
248+
{
249+
ssize_t ret = OMPI_SUCCESS;
250+
ack_req = malloc(sizeof(ompi_mtl_ofi_request_t));
251+
252+
assert(ack_req);
253+
254+
ack_req->parent = ofi_req;
255+
ack_req->event_callback = ompi_mtl_ofi_send_ack_callback;
256+
ack_req->error_callback = ompi_mtl_ofi_send_ack_error_callback;
257+
258+
ofi_req->completion_count += 1;
259+
260+
MTL_OFI_RETRY_UNTIL_DONE(fi_trecv(ompi_mtl_ofi.ep,
261+
NULL,
262+
0,
263+
NULL,
264+
*src_addr,
265+
*match_bits | ompi_mtl_ofi.sync_send_ack,
266+
0, /* Exact match, no ignore bits */
267+
(void *) &ack_req->ctx), ret);
268+
if (OPAL_UNLIKELY(0 > ret)) {
269+
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
270+
"%s:%d: fi_trecv failed: %s(%zd)",
271+
__FILE__, __LINE__, fi_strerror(-ret), ret);
272+
free(ack_req);
273+
return ompi_mtl_ofi_get_error(ret);
274+
}
275+
276+
/* The SYNC_SEND tag bit is set for the send operation only.*/
277+
MTL_OFI_SET_SYNC_SEND(*match_bits);
278+
return OMPI_SUCCESS;
279+
}
280+
281+
__opal_attribute_always_inline__ static inline int
282+
ompi_mtl_ofi_send(struct mca_mtl_base_module_t *mtl,
283+
struct ompi_communicator_t *comm,
284+
int dest,
285+
int tag,
286+
struct opal_convertor_t *convertor,
287+
mca_pml_base_send_mode_t mode)
248288
{
289+
ssize_t ret = OMPI_SUCCESS;
290+
ompi_mtl_ofi_request_t ofi_req;
249291
int ompi_ret;
250292
void *start;
251-
size_t length;
252-
ssize_t ret;
253293
bool free_after;
294+
size_t length;
254295
uint64_t match_bits;
255296
ompi_proc_t *ompi_proc = NULL;
256297
mca_mtl_ofi_endpoint_t *endpoint = NULL;
257298
ompi_mtl_ofi_request_t *ack_req = NULL; /* For synchronous send */
258299
fi_addr_t src_addr = 0;
259300

301+
/**
302+
* Create a send request, start it and wait until it completes.
303+
*/
304+
ofi_req.event_callback = ompi_mtl_ofi_send_callback;
305+
ofi_req.error_callback = ompi_mtl_ofi_send_error_callback;
306+
260307
ompi_proc = ompi_comm_peer_lookup(comm, dest);
261308
endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
262309

263310
ompi_ret = ompi_mtl_datatype_pack(convertor, &start, &length, &free_after);
264311
if (OMPI_SUCCESS != ompi_ret) return ompi_ret;
265312

266-
ofi_req->buffer = (free_after) ? start : NULL;
267-
ofi_req->length = length;
268-
ofi_req->status.MPI_ERROR = OMPI_SUCCESS;
313+
ofi_req.buffer = (free_after) ? start : NULL;
314+
ofi_req.length = length;
315+
ofi_req.status.MPI_ERROR = OMPI_SUCCESS;
316+
ofi_req.completion_count = 0;
269317

270318
if (ompi_mtl_ofi.fi_cq_data) {
271319
match_bits = mtl_ofi_create_send_tag_CQD(comm->c_contextid, tag);
@@ -277,33 +325,11 @@ ompi_mtl_ofi_send_start(struct mca_mtl_base_module_t *mtl,
277325
}
278326

279327
if (OPAL_UNLIKELY(MCA_PML_BASE_SEND_SYNCHRONOUS == mode)) {
280-
ack_req = malloc(sizeof(ompi_mtl_ofi_request_t));
281-
assert(ack_req);
282-
ack_req->parent = ofi_req;
283-
ack_req->event_callback = ompi_mtl_ofi_send_ack_callback;
284-
ack_req->error_callback = ompi_mtl_ofi_send_ack_error_callback;
285-
286-
ofi_req->completion_count = 2;
287-
288-
MTL_OFI_RETRY_UNTIL_DONE(fi_trecv(ompi_mtl_ofi.ep,
289-
NULL,
290-
0,
291-
NULL,
292-
src_addr,
293-
match_bits | ompi_mtl_ofi.sync_send_ack,
294-
0, /* Exact match, no ignore bits */
295-
(void *) &ack_req->ctx), ret);
296-
if (OPAL_UNLIKELY(0 > ret)) {
297-
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
298-
"%s:%d: fi_trecv failed: %s(%zd)",
299-
__FILE__, __LINE__, fi_strerror(-ret), ret);
300-
free(ack_req);
301-
return ompi_mtl_ofi_get_error(ret);
302-
}
303-
/* The SYNC_SEND tag bit is set for the send operation only.*/
304-
MTL_OFI_SET_SYNC_SEND(match_bits);
305-
} else {
306-
ofi_req->completion_count = 1;
328+
ofi_req.status.MPI_ERROR = ompi_mtl_ofi_ssend_recv(ack_req, comm, &src_addr,
329+
&ofi_req, endpoint,
330+
&match_bits, tag);
331+
if (OPAL_UNLIKELY(ofi_req.status.MPI_ERROR != OMPI_SUCCESS))
332+
goto free_request_buffer;
307333
}
308334

309335
if (ompi_mtl_ofi.max_inject_size >= length) {
@@ -331,11 +357,12 @@ ompi_mtl_ofi_send_start(struct mca_mtl_base_module_t *mtl,
331357
fi_cancel((fid_t)ompi_mtl_ofi.ep, &ack_req->ctx);
332358
free(ack_req);
333359
}
334-
return ompi_mtl_ofi_get_error(ret);
335-
}
336360

337-
ofi_req->event_callback(NULL,ofi_req);
361+
ofi_req.status.MPI_ERROR = ompi_mtl_ofi_get_error(ret);
362+
goto free_request_buffer;
363+
}
338364
} else {
365+
ofi_req.completion_count += 1;
339366
if (ompi_mtl_ofi.fi_cq_data) {
340367
MTL_OFI_RETRY_UNTIL_DONE(fi_tsenddata(ompi_mtl_ofi.ep,
341368
start,
@@ -344,52 +371,26 @@ ompi_mtl_ofi_send_start(struct mca_mtl_base_module_t *mtl,
344371
comm->c_my_rank,
345372
endpoint->peer_fiaddr,
346373
match_bits,
347-
(void *) &ofi_req->ctx), ret);
374+
(void *) &ofi_req.ctx), ret);
348375
} else {
349376
MTL_OFI_RETRY_UNTIL_DONE(fi_tsend(ompi_mtl_ofi.ep,
350377
start,
351378
length,
352379
NULL,
353380
endpoint->peer_fiaddr,
354381
match_bits,
355-
(void *) &ofi_req->ctx), ret);
382+
(void *) &ofi_req.ctx), ret);
356383
}
357384
if (OPAL_UNLIKELY(0 > ret)) {
358385
char *fi_api = ompi_mtl_ofi.fi_cq_data ? "fi_tsendddata" : "fi_send";
359386
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
360387
"%s:%d: %s failed: %s(%zd)",
361388
__FILE__, __LINE__,fi_api, fi_strerror(-ret), ret);
362-
return ompi_mtl_ofi_get_error(ret);
363-
}
364-
}
365-
366-
return OMPI_SUCCESS;
367-
}
368-
369-
__opal_attribute_always_inline__ static inline int
370-
ompi_mtl_ofi_send(struct mca_mtl_base_module_t *mtl,
371-
struct ompi_communicator_t *comm,
372-
int dest,
373-
int tag,
374-
struct opal_convertor_t *convertor,
375-
mca_pml_base_send_mode_t mode)
376-
{
377-
int ret = OMPI_SUCCESS;
378-
ompi_mtl_ofi_request_t ofi_req;
379-
380-
/**
381-
* Create a send request, start it and wait until it completes.
382-
*/
383-
ofi_req.event_callback = ompi_mtl_ofi_send_callback;
384-
ofi_req.error_callback = ompi_mtl_ofi_send_error_callback;
389+
free(fi_api);
385390

386-
ret = ompi_mtl_ofi_send_start(mtl, comm, dest, tag,
387-
convertor, mode, &ofi_req);
388-
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
389-
if (NULL != ofi_req.buffer) {
390-
free(ofi_req.buffer);
391+
ofi_req.status.MPI_ERROR = ompi_mtl_ofi_get_error(ret);
392+
goto free_request_buffer;
391393
}
392-
return ret;
393394
}
394395

395396
/**
@@ -400,6 +401,7 @@ ompi_mtl_ofi_send(struct mca_mtl_base_module_t *mtl,
400401
ompi_mtl_ofi_progress();
401402
}
402403

404+
free_request_buffer:
403405
if (OPAL_UNLIKELY(NULL != ofi_req.buffer)) {
404406
free(ofi_req.buffer);
405407
}
@@ -417,20 +419,89 @@ ompi_mtl_ofi_isend(struct mca_mtl_base_module_t *mtl,
417419
bool blocking,
418420
mca_mtl_request_t *mtl_request)
419421
{
420-
int ret = OMPI_SUCCESS;
421-
ompi_mtl_ofi_request_t *ofi_req = (ompi_mtl_ofi_request_t*) mtl_request;
422+
ssize_t ret = OMPI_SUCCESS;
423+
ompi_mtl_ofi_request_t *ofi_req = (ompi_mtl_ofi_request_t *) mtl_request;
424+
int ompi_ret;
425+
void *start;
426+
size_t length;
427+
bool free_after;
428+
uint64_t match_bits;
429+
ompi_proc_t *ompi_proc = NULL;
430+
mca_mtl_ofi_endpoint_t *endpoint = NULL;
431+
ompi_mtl_ofi_request_t *ack_req = NULL; /* For synchronous send */
432+
fi_addr_t src_addr = 0;
422433

423434
ofi_req->event_callback = ompi_mtl_ofi_isend_callback;
424435
ofi_req->error_callback = ompi_mtl_ofi_send_error_callback;
425436

426-
ret = ompi_mtl_ofi_send_start(mtl, comm, dest, tag,
427-
convertor, mode, ofi_req);
437+
ompi_proc = ompi_comm_peer_lookup(comm, dest);
438+
endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
428439

429-
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret && NULL != ofi_req->buffer)) {
440+
ompi_ret = ompi_mtl_datatype_pack(convertor, &start, &length, &free_after);
441+
if (OMPI_SUCCESS != ompi_ret) return ompi_ret;
442+
443+
ofi_req->buffer = (free_after) ? start : NULL;
444+
ofi_req->length = length;
445+
ofi_req->status.MPI_ERROR = OMPI_SUCCESS;
446+
ofi_req->completion_count = 1;
447+
448+
if (ompi_mtl_ofi.fi_cq_data) {
449+
match_bits = mtl_ofi_create_send_tag_CQD(comm->c_contextid, tag);
450+
src_addr = endpoint->peer_fiaddr;
451+
} else {
452+
match_bits = mtl_ofi_create_send_tag(comm->c_contextid,
453+
comm->c_my_rank, tag);
454+
/* src_addr is ignored when FI_DIRECTED_RECV is not supported */
455+
}
456+
457+
if (OPAL_UNLIKELY(MCA_PML_BASE_SEND_SYNCHRONOUS == mode)) {
458+
ofi_req->status.MPI_ERROR = ompi_mtl_ofi_ssend_recv(ack_req, comm, &src_addr,
459+
ofi_req, endpoint,
460+
&match_bits, tag);
461+
if (OPAL_UNLIKELY(ofi_req->status.MPI_ERROR != OMPI_SUCCESS))
462+
goto free_request_buffer;
463+
}
464+
465+
if (ompi_mtl_ofi.fi_cq_data) {
466+
MTL_OFI_RETRY_UNTIL_DONE(fi_tsenddata(ompi_mtl_ofi.ep,
467+
start,
468+
length,
469+
NULL,
470+
comm->c_my_rank,
471+
endpoint->peer_fiaddr,
472+
match_bits,
473+
(void *) &ofi_req->ctx), ret);
474+
} else {
475+
MTL_OFI_RETRY_UNTIL_DONE(fi_tsend(ompi_mtl_ofi.ep,
476+
start,
477+
length,
478+
NULL,
479+
endpoint->peer_fiaddr,
480+
match_bits,
481+
(void *) &ofi_req->ctx), ret);
482+
}
483+
if (OPAL_UNLIKELY(0 > ret)) {
484+
char *fi_api;
485+
if (ompi_mtl_ofi.fi_cq_data) {
486+
asprintf( &fi_api, "fi_tsendddata") ;
487+
}
488+
else {
489+
asprintf( &fi_api, "fi_send") ;
490+
}
491+
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
492+
"%s:%d: %s failed: %s(%zd)",
493+
__FILE__, __LINE__,fi_api, fi_strerror(-ret), ret);
494+
free(fi_api);
495+
ofi_req->status.MPI_ERROR = ompi_mtl_ofi_get_error(ret);
496+
}
497+
498+
free_request_buffer:
499+
if (OPAL_UNLIKELY(OMPI_SUCCESS != ofi_req->status.MPI_ERROR
500+
&& NULL != ofi_req->buffer)) {
430501
free(ofi_req->buffer);
431502
}
432503

433-
return ret;
504+
return ofi_req->status.MPI_ERROR;
434505
}
435506

436507
/**

0 commit comments

Comments
 (0)