Skip to content

Commit cae601f

Browse files
jiaxiyanwenduwan
authored andcommitted
coll/han: implement hierarchical scatterv
Add scatterv implementation to optimize large-scale communications on multiple nodes and multiple processes per node, by avoiding high-incast traffic on the root process. Because *V collectives do not have equal datatype/count on every process, it does not natively support message-size based tuning without an additional global communication. Similar to scatter, the hierarchical scatterv requires a temporary buffer and memory copy to handle out-of-order data, or non-contiguous placement on the send buffer, which results in worse performance for large messages compared to the linear implementation. Signed-off-by: Jessie Yang <[email protected]>
1 parent 7303acf commit cae601f

File tree

9 files changed

+561
-1
lines changed

9 files changed

+561
-1
lines changed

ompi/mca/coll/han/Makefile.am

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ coll_han_barrier.c \
2121
coll_han_bcast.c \
2222
coll_han_reduce.c \
2323
coll_han_scatter.c \
24+
coll_han_scatterv.c \
2425
coll_han_gather.c \
2526
coll_han_gatherv.c \
2627
coll_han_allreduce.c \

ompi/mca/coll/han/coll_han.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ typedef struct mca_coll_han_op_module_name_t {
193193
mca_coll_han_op_up_low_module_name_t gather;
194194
mca_coll_han_op_up_low_module_name_t gatherv;
195195
mca_coll_han_op_up_low_module_name_t scatter;
196+
mca_coll_han_op_up_low_module_name_t scatterv;
196197
} mca_coll_han_op_module_name_t;
197198

198199
/**
@@ -244,6 +245,10 @@ typedef struct mca_coll_han_component_t {
244245
uint32_t han_scatter_up_module;
245246
/* low level module for scatter */
246247
uint32_t han_scatter_low_module;
248+
/* up level module for scatterv */
249+
uint32_t han_scatterv_up_module;
250+
/* low level module for scatterv */
251+
uint32_t han_scatterv_low_module;
247252
/* name of the modules */
248253
mca_coll_han_op_module_name_t han_op_module_name;
249254
/* whether we need reproducible results
@@ -287,6 +292,7 @@ typedef struct mca_coll_han_single_collective_fallback_s {
287292
mca_coll_base_module_gatherv_fn_t gatherv;
288293
mca_coll_base_module_reduce_fn_t reduce;
289294
mca_coll_base_module_scatter_fn_t scatter;
295+
mca_coll_base_module_scatterv_fn_t scatterv;
290296
} module_fn;
291297
mca_coll_base_module_t* module;
292298
} mca_coll_han_single_collective_fallback_t;
@@ -306,6 +312,7 @@ typedef struct mca_coll_han_collectives_fallback_s {
306312
mca_coll_han_single_collective_fallback_t gather;
307313
mca_coll_han_single_collective_fallback_t gatherv;
308314
mca_coll_han_single_collective_fallback_t scatter;
315+
mca_coll_han_single_collective_fallback_t scatterv;
309316
} mca_coll_han_collectives_fallback_t;
310317

311318
/** Coll han module */
@@ -384,6 +391,8 @@ OBJ_CLASS_DECLARATION(mca_coll_han_module_t);
384391
#define previous_scatter fallback.scatter.module_fn.scatter
385392
#define previous_scatter_module fallback.scatter.module
386393

394+
#define previous_scatterv fallback.scatterv.module_fn.scatterv
395+
#define previous_scatterv_module fallback.scatterv.module
387396

388397
/* macro to correctly load a fallback collective module */
389398
#define HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, COLL) \
@@ -403,6 +412,7 @@ OBJ_CLASS_DECLARATION(mca_coll_han_module_t);
403412
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, barrier); \
404413
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, bcast); \
405414
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, scatter); \
415+
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, scatterv); \
406416
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, gather); \
407417
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, gatherv); \
408418
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, reduce); \
@@ -495,6 +505,9 @@ mca_coll_han_reduce_intra_dynamic(REDUCE_BASE_ARGS,
495505
int
496506
mca_coll_han_scatter_intra_dynamic(SCATTER_BASE_ARGS,
497507
mca_coll_base_module_t *module);
508+
int
509+
mca_coll_han_scatterv_intra_dynamic(SCATTERV_BASE_ARGS,
510+
mca_coll_base_module_t *module);
498511

499512
int mca_coll_han_barrier_intra_simple(struct ompi_communicator_t *comm,
500513
mca_coll_base_module_t *module);

ompi/mca/coll/han/coll_han_algorithms.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ mca_coll_han_algorithm_value_t* mca_coll_han_available_algorithms[COLLCOUNT] =
5959
{"simple", (fnptr_t) &mca_coll_han_scatter_intra_simple}, // 2-level
6060
{ 0 }
6161
},
62+
[SCATTERV] = (mca_coll_han_algorithm_value_t[]){
63+
{"intra", (fnptr_t) &mca_coll_han_scatterv_intra}, // 2-level
64+
{ 0 }
65+
},
6266
[GATHER] = (mca_coll_han_algorithm_value_t[]){
6367
{"intra", (fnptr_t) &mca_coll_han_gather_intra}, // 2-level
6468
{"simple", (fnptr_t) &mca_coll_han_gather_intra_simple}, // 2-level

ompi/mca/coll/han/coll_han_algorithms.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,16 @@ mca_coll_han_scatter_intra_simple(const void *sbuf, int scount,
159159
struct ompi_communicator_t *comm,
160160
mca_coll_base_module_t * module);
161161

162+
/* Scatterv */
163+
int
164+
mca_coll_han_scatterv_intra(const void *sbuf, const int *scounts,
165+
const int *displs, struct ompi_datatype_t *sdtype,
166+
void *rbuf, int rcount,
167+
struct ompi_datatype_t *rdtype,
168+
int root,
169+
struct ompi_communicator_t *comm,
170+
mca_coll_base_module_t *module);
171+
162172
/* Gather */
163173
int
164174
mca_coll_han_gather_intra(const void *sbuf, int scount,

ompi/mca/coll/han/coll_han_component.c

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,11 @@ static int han_close(void)
156156
free(mca_coll_han_component.han_op_module_name.scatter.han_op_low_module_name);
157157
mca_coll_han_component.han_op_module_name.scatter.han_op_low_module_name = NULL;
158158

159+
free(mca_coll_han_component.han_op_module_name.scatterv.han_op_up_module_name);
160+
mca_coll_han_component.han_op_module_name.scatterv.han_op_up_module_name = NULL;
161+
free(mca_coll_han_component.han_op_module_name.scatterv.han_op_low_module_name);
162+
mca_coll_han_component.han_op_module_name.scatterv.han_op_low_module_name = NULL;
163+
159164
return OMPI_SUCCESS;
160165
}
161166

@@ -373,6 +378,18 @@ static int han_register(void)
373378
OPAL_INFO_LVL_9, &cs->han_scatter_low_module,
374379
&cs->han_op_module_name.scatter.han_op_low_module_name);
375380

381+
cs->han_scatterv_up_module = 0;
382+
(void) mca_coll_han_query_module_from_mca(c, "scatterv_up_module",
383+
"up level module for scatterv, 0 basic",
384+
OPAL_INFO_LVL_9, &cs->han_scatterv_up_module,
385+
&cs->han_op_module_name.scatterv.han_op_up_module_name);
386+
387+
cs->han_scatterv_low_module = 0;
388+
(void) mca_coll_han_query_module_from_mca(c, "scatterv_low_module",
389+
"low level module for scatterv, 0 basic",
390+
OPAL_INFO_LVL_9, &cs->han_scatterv_low_module,
391+
&cs->han_op_module_name.scatterv.han_op_low_module_name);
392+
376393
cs->han_reproducible = 0;
377394
(void) mca_base_component_var_register(c, "reproducible",
378395
"whether we need reproducible results "

ompi/mca/coll/han/coll_han_dynamic.c

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ bool mca_coll_han_is_coll_dynamic_implemented(COLLTYPE_T coll_id)
4646
case GATHERV:
4747
case REDUCE:
4848
case SCATTER:
49+
case SCATTERV:
4950
return true;
5051
default:
5152
return false;
@@ -1397,3 +1398,114 @@ mca_coll_han_scatter_intra_dynamic(const void *sbuf, int scount,
13971398
root, comm,
13981399
sub_module);
13991400
}
1401+
1402+
1403+
/*
1404+
* Scatterv selector:
1405+
* On a sub-communicator, checks the stored rules to find the module to use
1406+
* On the global communicator, calls the han collective implementation, or
1407+
* calls the correct module if fallback mechanism is activated
1408+
*/
1409+
int
1410+
mca_coll_han_scatterv_intra_dynamic(const void *sbuf, const int *scounts,
1411+
const int *displs, struct ompi_datatype_t *sdtype,
1412+
void *rbuf, int rcount,
1413+
struct ompi_datatype_t *rdtype,
1414+
int root,
1415+
struct ompi_communicator_t *comm,
1416+
mca_coll_base_module_t *module)
1417+
{
1418+
mca_coll_han_module_t *han_module = (mca_coll_han_module_t*) module;
1419+
TOPO_LVL_T topo_lvl = han_module->topologic_level;
1420+
mca_coll_base_module_scatterv_fn_t scatterv;
1421+
mca_coll_base_module_t *sub_module;
1422+
size_t dtype_size;
1423+
int rank, verbosity = 0;
1424+
1425+
if (!han_module->enabled) {
1426+
return han_module->previous_scatterv(sbuf, scounts, displs, sdtype, rbuf, rcount, rdtype,
1427+
root, comm, han_module->previous_scatterv_module);
1428+
}
1429+
1430+
/* v collectives do not support message-size based dynamic rules */
1431+
sub_module = get_module(SCATTERV,
1432+
MCA_COLL_HAN_ANY_MESSAGE_SIZE,
1433+
comm,
1434+
han_module);
1435+
1436+
/* First errors are always printed by rank 0 */
1437+
rank = ompi_comm_rank(comm);
1438+
if( (0 == rank) && (han_module->dynamic_errors < mca_coll_han_component.max_dynamic_errors) ) {
1439+
verbosity = 30;
1440+
}
1441+
1442+
if(NULL == sub_module) {
1443+
/*
1444+
* No valid collective module from dynamic rules
1445+
* nor from mca parameter
1446+
*/
1447+
han_module->dynamic_errors++;
1448+
opal_output_verbose(verbosity, mca_coll_han_component.han_output,
1449+
"coll:han:mca_coll_han_scatterv_intra_dynamic "
1450+
"HAN did not find any valid module for collective %d (%s) "
1451+
"with topological level %d (%s) on communicator (%s/%s). "
1452+
"Please check dynamic file/mca parameters\n",
1453+
SCATTERV, mca_coll_base_colltype_to_str(SCATTERV),
1454+
topo_lvl, mca_coll_han_topo_lvl_to_str(topo_lvl),
1455+
ompi_comm_print_cid(comm), comm->c_name);
1456+
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
1457+
"HAN/SCATTERV: No module found for the sub-communicator. "
1458+
"Falling back to another component\n"));
1459+
scatterv = han_module->previous_scatterv;
1460+
sub_module = han_module->previous_scatterv_module;
1461+
} else if (NULL == sub_module->coll_scatterv) {
1462+
/*
1463+
* No valid collective from dynamic rules
1464+
* nor from mca parameter
1465+
*/
1466+
han_module->dynamic_errors++;
1467+
opal_output_verbose(verbosity, mca_coll_han_component.han_output,
1468+
"coll:han:mca_coll_han_scatterv_intra_dynamic "
1469+
"HAN found valid module for collective %d (%s) "
1470+
"with topological level %d (%s) on communicator (%s/%s) "
1471+
"but this module cannot handle this collective. "
1472+
"Please check dynamic file/mca parameters\n",
1473+
SCATTERV, mca_coll_base_colltype_to_str(SCATTERV),
1474+
topo_lvl, mca_coll_han_topo_lvl_to_str(topo_lvl),
1475+
ompi_comm_print_cid(comm), comm->c_name);
1476+
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
1477+
"HAN/SCATTERV: the module found for the sub-"
1478+
"communicator cannot handle the SCATTERV operation. "
1479+
"Falling back to another component\n"));
1480+
scatterv = han_module->previous_scatterv;
1481+
sub_module = han_module->previous_scatterv_module;
1482+
} else if (GLOBAL_COMMUNICATOR == topo_lvl && sub_module == module) {
1483+
/*
1484+
* No fallback mechanism activated for this configuration
1485+
* sub_module is valid
1486+
* sub_module->coll_scatterv is valid and point to this function
1487+
* Call han topological collective algorithm
1488+
*/
1489+
int algorithm_id = get_algorithm(SCATTERV,
1490+
MCA_COLL_HAN_ANY_MESSAGE_SIZE,
1491+
comm,
1492+
han_module);
1493+
scatterv = (mca_coll_base_module_scatterv_fn_t)mca_coll_han_algorithm_id_to_fn(SCATTERV, algorithm_id);
1494+
if (NULL == scatterv) { /* default behaviour */
1495+
scatterv = mca_coll_han_scatterv_intra;
1496+
}
1497+
} else {
1498+
/*
1499+
* If we get here:
1500+
* sub_module is valid
1501+
* sub_module->coll_scatterv is valid
1502+
* They point to the collective to use, according to the dynamic rules
1503+
* Selector's job is done, call the collective
1504+
*/
1505+
scatterv = sub_module->coll_scatterv;
1506+
}
1507+
1508+
return scatterv(sbuf, scounts, displs, sdtype,
1509+
rbuf, rcount, rdtype,
1510+
root, comm, sub_module);
1511+
}

ompi/mca/coll/han/coll_han_module.c

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ static void han_module_clear(mca_coll_han_module_t *han_module)
5454
CLEAN_PREV_COLL(han_module, gather);
5555
CLEAN_PREV_COLL(han_module, gatherv);
5656
CLEAN_PREV_COLL(han_module, scatter);
57+
CLEAN_PREV_COLL(han_module, scatterv);
5758

5859
han_module->reproducible_reduce = NULL;
5960
han_module->reproducible_reduce_module = NULL;
@@ -152,6 +153,7 @@ mca_coll_han_module_destruct(mca_coll_han_module_t * module)
152153
OBJ_RELEASE_IF_NOT_NULL(module->previous_gatherv_module);
153154
OBJ_RELEASE_IF_NOT_NULL(module->previous_reduce_module);
154155
OBJ_RELEASE_IF_NOT_NULL(module->previous_scatter_module);
156+
OBJ_RELEASE_IF_NOT_NULL(module->previous_scatterv_module);
155157

156158
han_module_clear(module);
157159
}
@@ -254,7 +256,7 @@ mca_coll_han_comm_query(struct ompi_communicator_t * comm, int *priority)
254256
han_module->super.coll_exscan = NULL;
255257
han_module->super.coll_reduce_scatter = NULL;
256258
han_module->super.coll_scan = NULL;
257-
han_module->super.coll_scatterv = NULL;
259+
han_module->super.coll_scatterv = mca_coll_han_scatterv_intra_dynamic;
258260
han_module->super.coll_barrier = mca_coll_han_barrier_intra_dynamic;
259261
han_module->super.coll_scatter = mca_coll_han_scatter_intra_dynamic;
260262
han_module->super.coll_reduce = mca_coll_han_reduce_intra_dynamic;
@@ -316,6 +318,7 @@ han_module_enable(mca_coll_base_module_t * module,
316318
HAN_SAVE_PREV_COLL_API(gatherv);
317319
HAN_SAVE_PREV_COLL_API(reduce);
318320
HAN_SAVE_PREV_COLL_API(scatter);
321+
HAN_SAVE_PREV_COLL_API(scatterv);
319322

320323
/* set reproducible algos */
321324
mca_coll_han_reduce_reproducible_decision(comm, module);
@@ -332,6 +335,7 @@ han_module_enable(mca_coll_base_module_t * module,
332335
OBJ_RELEASE_IF_NOT_NULL(han_module->previous_gatherv_module);
333336
OBJ_RELEASE_IF_NOT_NULL(han_module->previous_reduce_module);
334337
OBJ_RELEASE_IF_NOT_NULL(han_module->previous_scatter_module);
338+
OBJ_RELEASE_IF_NOT_NULL(han_module->previous_scatterv_module);
335339

336340
return OMPI_ERROR;
337341
}
@@ -354,6 +358,7 @@ mca_coll_han_module_disable(mca_coll_base_module_t * module,
354358
OBJ_RELEASE_IF_NOT_NULL(han_module->previous_gatherv_module);
355359
OBJ_RELEASE_IF_NOT_NULL(han_module->previous_reduce_module);
356360
OBJ_RELEASE_IF_NOT_NULL(han_module->previous_scatter_module);
361+
OBJ_RELEASE_IF_NOT_NULL(han_module->previous_scatterv_module);
357362

358363
han_module_clear(han_module);
359364

0 commit comments

Comments
 (0)