DNDSR 0.2.1
Distributed Numeric Data Structure for CFV
Loading...
Searching...
No Matches
MPI.cpp
Go to the documentation of this file.
1/// @file MPI.cpp
2/// @brief Implementations of the MPI wrapper functions declared in @ref MPI.hpp:
3/// retry-aware @ref Bcast/@ref Alltoall/@ref Alltoallv/@ref Allreduce/@ref Allgather/@ref Barrier
4/// variants, lazy waits, singleton definitions, CUDA-aware probe.
5
6#include <ctime>
7#include <cstdio>
8#include <cstdlib>
9
10#include <iostream>
11#include <chrono>
12#include <thread>
13#include "MPI.hpp"
14#include "Profiling.hpp"
15
16#ifdef DNDS_UNIX_LIKE
17# include <sys/ptrace.h>
18# include <unistd.h>
19# include <sys/stat.h>
20#endif
21#if defined(_WIN32) || defined(__WINDOWS_)
22# define NOMINMAX
23# include <Windows.h>
24# include <process.h>
25#endif
26
27#ifdef NDEBUG
28# define NDEBUG_DISABLED
29# undef NDEBUG
30#endif
31
32namespace DNDS::Debug
33{
35 {
36
37#ifdef DNDS_UNIX_LIKE
38 std::ifstream fin("/proc/self/status"); // able to detect gdb
39 std::string buf;
40 int tpid = 0;
41 while (!fin.eof())
42 {
43 fin >> buf;
44 if (buf == "TracerPid:")
45 {
46 fin >> tpid;
47 break;
48 }
49 }
50 fin.close();
51 return tpid != 0;
52#endif
53#if defined(_WIN32) || defined(__WINDOWS_)
54 return IsDebuggerPresent();
55#endif
56 }
57
58 void MPIDebugHold(const MPIInfo &mpi)
59 {
60#ifdef DNDS_UNIX_LIKE
61 MPISerialDo(mpi, [&]
62 { log() << "Rank " << mpi.rank << " PID: " << getpid() << std::endl; });
63#endif
64#if defined(_WIN32) || defined(__WINDOWS_)
65 MPISerialDo(mpi, [&]
66 { log() << "Rank " << mpi.rank << " PID: " << _getpid() << std::endl; });
67#endif
68 int holdFlag = 1;
69 while (holdFlag)
70 {
71 for (MPI_int ir = 0; ir < mpi.size; ir++)
72 {
73 int newDebugFlag = 0;
74 if (mpi.rank == ir)
75 {
76 newDebugFlag = int(IsDebugged());
77 MPI_Bcast(&newDebugFlag, 1, MPI_INT, ir, mpi.comm);
78 }
79 else
80 MPI_Bcast(&newDebugFlag, 1, MPI_INT, ir, mpi.comm);
81
82 // std::cout << "DBG " << newDebugFlag;
83 if (newDebugFlag)
84 holdFlag = 0;
85 }
86 }
87 }
88
89 bool isDebugging = false;
90}
91
92namespace DNDS
93{
94 void assert_false_info_mpi(const char *expr, const char *file, int line, const std::string &info, const DNDS::MPIInfo &mpi)
95 {
96 std::cerr << ::DNDS::getTraceString() << "\n";
97 std::cerr << "\033[91m DNDS_assertion failed\033[39m: \"" << expr << "\" at [ " << file << ":" << line << " ]\n"
98 << info << std::endl;
100 MPI_Barrier(mpi.comm);
101 std::abort();
102 }
103}
104namespace DNDS
105{
107 {
108 static MPIBufferHandler instance;
109 return instance;
110 }
111}
112
113namespace DNDS
114{
115 std::string getTimeStamp(const MPIInfo &mpi)
116 {
117 auto result = static_cast<int64_t>(std::time(nullptr));
118 std::array<char, 512> bufTime{};
119 std::array<char, 512 + 32> buf{};
120 int64_t pid = 0;
121#ifdef DNDS_UNIX_LIKE
122 // pid = Debug::getpid();
123 pid = getpid();
124#endif
125#if defined(_WIN32) || defined(__WINDOWS_)
126 // pid = Debug::GetCurrentProcessId();
127 pid = GetCurrentProcessId();
128#endif
129 MPI_Bcast(&result, 1, MPI_INT64_T, 0, mpi.comm);
130 MPI_Bcast(&pid, 1, MPI_INT64_T, 0, mpi.comm);
131
132 auto time_result = static_cast<time_t>(result);
133
134 std::strftime(bufTime.data(), 512, "%F_%H-%M-%S", std::localtime(&time_result));
135
136 long pidc = static_cast<long>(pid);
137 std::sprintf(buf.data(), "%s_%ld", bufTime.data(), pidc);
138
139 return {buf.data()};
140 }
141}
142
143namespace DNDS::MPI
144{
145
146#define start_timer PerformanceTimer::Instance().StartTimer(PerformanceTimer::Comm)
147#define stop_timer PerformanceTimer::Instance().StopTimer(PerformanceTimer::Comm)
148 /// @brief dumb wrapper
149 MPI_int Bcast(void *buf, MPI_int num, MPI_Datatype type, MPI_int source_rank, MPI_Comm comm)
150 {
151 int ret{0};
153 if (MPI::CommStrategy::Instance().GetUseLazyWait() == 0)
154 ret = MPI_Bcast(buf, num, type, source_rank, comm);
155 else
156 {
157 MPI_Request req{MPI_REQUEST_NULL};
158 ret = MPI_Ibcast(buf, num, type, source_rank, comm, &req);
159 ret = MPI::WaitallLazy(1, &req, MPI_STATUSES_IGNORE, static_cast<uint64_t>(MPI::CommStrategy::Instance().GetUseLazyWait()));
160 }
162 return ret;
163 }
164
165 MPI_int Alltoall(void *send, MPI_int sendNum, MPI_Datatype typeSend, void *recv, MPI_int recvNum, MPI_Datatype typeRecv, MPI_Comm comm)
166 {
167 int ret{0};
169 if (MPI::CommStrategy::Instance().GetUseLazyWait() == 0)
170 ret = MPI_Alltoall(send, sendNum, typeSend, recv, recvNum, typeRecv, comm);
171 else
172 {
173 MPI_Request req{MPI_REQUEST_NULL};
174 ret = MPI_Ialltoall(send, sendNum, typeSend, recv, recvNum, typeRecv, comm, &req);
175 ret = MPI::WaitallLazy(1, &req, MPI_STATUSES_IGNORE, static_cast<uint64_t>(MPI::CommStrategy::Instance().GetUseLazyWait()));
176 }
178 return ret;
179 }
180
182 void *send, MPI_int *sendSizes, MPI_int *sendStarts, MPI_Datatype sendType,
183 void *recv, MPI_int *recvSizes, MPI_int *recvStarts, MPI_Datatype recvType, MPI_Comm comm)
184 {
185 int ret{0};
187 if (MPI::CommStrategy::Instance().GetUseLazyWait() == 0)
188 ret = MPI_Alltoallv(
189 send, sendSizes, sendStarts, sendType,
190 recv, recvSizes, recvStarts, recvType, comm);
191 else
192 {
193 MPI_Request req{MPI_REQUEST_NULL};
194 ret = MPI_Ialltoallv(send, sendSizes, sendStarts, sendType,
195 recv, recvSizes, recvStarts, recvType, comm, &req);
196 ret = MPI::WaitallLazy(1, &req, MPI_STATUSES_IGNORE, static_cast<uint64_t>(MPI::CommStrategy::Instance().GetUseLazyWait()));
197 }
199 return ret;
200 }
201
202 MPI_int Allreduce(const void *sendbuf, void *recvbuf, MPI_int count,
203 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
204 {
205 int ret{0};
207 if (MPI::CommStrategy::Instance().GetUseLazyWait() == 0)
208 ret = MPI_Allreduce(sendbuf, recvbuf, count, datatype, op, comm);
209 else
210 {
211 MPI_Request req{MPI_REQUEST_NULL};
212 ret = MPI_Iallreduce(sendbuf, recvbuf, count, datatype, op, comm, &req);
213 ret = MPI::WaitallLazy(1, &req, MPI_STATUSES_IGNORE, static_cast<uint64_t>(MPI::CommStrategy::Instance().GetUseLazyWait()));
214 }
216 return ret;
217 }
218
219 MPI_int Scan(const void *sendbuf, void *recvbuf, MPI_int count,
220 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
221 {
222 int ret{0}; // todo: add wait lazy?
224 ret = MPI_Scan(sendbuf, recvbuf, count, datatype, op, comm);
226 return ret;
227 }
228
229 MPI_int Allgather(const void *sendbuf, MPI_int sendcount, MPI_Datatype sendtype,
230 void *recvbuf, MPI_int recvcount,
231 MPI_Datatype recvtype, MPI_Comm comm)
232 {
233 int ret{0};
235 if (MPI::CommStrategy::Instance().GetUseLazyWait() == 0)
236 ret = MPI_Allgather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm);
237 else
238 {
239 MPI_Request req{MPI_REQUEST_NULL};
240 ret = MPI_Iallgather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm, &req);
241 ret = MPI::WaitallLazy(1, &req, MPI_STATUSES_IGNORE, static_cast<uint64_t>(MPI::CommStrategy::Instance().GetUseLazyWait()));
242 }
244 return ret;
245 }
246
247 MPI_int Barrier(MPI_Comm comm)
248 {
249 int ret{0};
251 if (MPI::CommStrategy::Instance().GetUseLazyWait() == 0)
252 ret = MPI_Barrier(comm);
253 else
254 ret = MPI::BarrierLazy(comm, static_cast<uint64_t>(MPI::CommStrategy::Instance().GetUseLazyWait()));
256 return ret;
257 }
258
259 MPI_int BarrierLazy(MPI_Comm comm, uint64_t checkNanoSecs)
260 {
261 MPI_Request req{MPI_REQUEST_NULL};
262 MPI_Status stat;
263 MPI_Ibarrier(comm, &req);
264 MPI_int ret = MPI::WaitallLazy(1, &req, &stat, checkNanoSecs);
265 if (req != MPI_REQUEST_NULL)
266 MPI_Request_free(&req);
267 return ret;
268 }
269
270 MPI_int WaitallLazy(MPI_int count, MPI_Request *reqs, MPI_Status *statuses, uint64_t checkNanoSecs)
271 {
272 MPI_int flag = 0;
273 MPI_int ret = 0;
274 while (!flag)
275 {
276 ret = MPI_Testall(count, reqs, &flag, statuses);
277 std::this_thread::sleep_for(std::chrono::nanoseconds(checkNanoSecs));
278 }
279 return ret;
280 }
281
282 MPI_int WaitallAuto(MPI_int count, MPI_Request *reqs, MPI_Status *statuses)
283 {
284 if (MPI::CommStrategy::Instance().GetUseLazyWait() == 0)
285 return MPI_Waitall(count, reqs, statuses);
286 else
287 return MPI::WaitallLazy(count, reqs, statuses, static_cast<uint64_t>(MPI::CommStrategy::Instance().GetUseLazyWait()));
288 }
289
290#undef start_timer
291#undef stop_timer
292
293}
294
295namespace DNDS::MPI
296{
298 {
299#ifdef OPEN_MPI
300 return 1 == MPIX_Query_cuda_support();
301#else
302 return false;
303#endif
304 }
305}
306
307namespace DNDS::MPI
308{
310 {
311 static ResourceRecycler recycler;
312 return recycler;
313 }
314
315 void ResourceRecycler::RegisterCleaner(void *p, std::function<void()> nCleaner)
316 {
317 DNDS_assert(cleaners.count(p) == 0);
318 cleaners.emplace(p, std::move(nCleaner));
319 }
320
322 {
323 DNDS_assert(cleaners.count(p) == 1);
324 cleaners.erase(p);
325 }
326
328 {
329 for (auto &[k, f] : cleaners)
330 f();
331 }
332}
333
334namespace DNDS::MPI
335{
336 CommStrategy::CommStrategy()
337 {
338 try
339 {
340 auto *ret = std::getenv("DNDS_USE_LAZY_WAIT");
341 if (ret != nullptr && (std::stod(ret) != 0))
342 {
343 _use_lazy_wait = std::stod(ret);
344 auto mpi = MPIInfo();
345 mpi.setWorld();
346 // std::cout << mpi.rank << std::endl;
347 if (mpi.rank == 0)
348 log() << "Detected DNDS_USE_LAZY_WAIT, setting to " << _use_lazy_wait << std::endl;
349 MPI::BarrierLazy(mpi.comm, static_cast<uint64_t>(_use_lazy_wait));
350 }
351 }
352 // NOLINTBEGIN(bugprone-empty-catch)
353 // Empty catch intentional: env var contains a malformed
354 // number (stod/stoi throws); treat as "unset" and leave the
355 // default. Logging here would fail inside static-ctor phase.
356 catch (...)
357 {
358 }
359 // NOLINTEND(bugprone-empty-catch)
360 try
361 {
362 auto *ret = std::getenv("DNDS_ARRAY_STRATEGY_USE_IN_SITU");
363 if (ret != nullptr && (std::stoi(ret) != 0))
364 {
365 _array_strategy = InSituPack;
366 auto mpi = MPIInfo();
367 mpi.setWorld();
368 if (mpi.rank == 0)
369 log() << "Detected DNDS_ARRAY_STRATEGY_USE_IN_SITU, setting" << std::endl;
370 if (_use_lazy_wait)
371 MPI::BarrierLazy(mpi.comm, static_cast<uint64_t>(_use_lazy_wait));
372 else
373 MPI_Barrier(mpi.comm);
374 }
375 }
376 // NOLINTBEGIN(bugprone-empty-catch)
377 // Empty catch intentional: env var contains a malformed
378 // number (stod/stoi throws); treat as "unset" and leave the
379 // default. Logging here would fail inside static-ctor phase.
380 catch (...)
381 {
382 }
383 // NOLINTEND(bugprone-empty-catch)
384 try
385 {
386 auto *ret = std::getenv("DNDS_USE_STRONG_SYNC_WAIT");
387 if (ret != nullptr && (std::stoi(ret) != 0))
388 {
389 _use_strong_sync_wait = true;
390 auto mpi = MPIInfo();
391 mpi.setWorld();
392 if (mpi.rank == 0)
393 log() << "Detected DNDS_USE_STRONG_SYNC_WAIT, setting" << std::endl;
394 if (_use_lazy_wait)
395 MPI::BarrierLazy(mpi.comm, static_cast<uint64_t>(_use_lazy_wait));
396 else
397 MPI_Barrier(mpi.comm);
398 }
399 }
400 // NOLINTBEGIN(bugprone-empty-catch)
401 // Empty catch intentional: env var contains a malformed
402 // number (stod/stoi throws); treat as "unset" and leave the
403 // default. Logging here would fail inside static-ctor phase.
404 catch (...)
405 {
406 }
407 // NOLINTEND(bugprone-empty-catch)
408 try
409 {
410 auto *ret = std::getenv("DNDS_USE_ASYNC_ONE_BY_ONE");
411 if (ret != nullptr && (std::stoi(ret) != 0))
412 {
413 _use_async_one_by_one = true;
414 auto mpi = MPIInfo();
415 mpi.setWorld();
416 if (mpi.rank == 0)
417 log() << "Detected DNDS_USE_ASYNC_ONE_BY_ONE, setting" << std::endl;
418 if (bool(_use_lazy_wait))
419 MPI::BarrierLazy(mpi.comm, static_cast<uint64_t>(_use_lazy_wait));
420 else
421 MPI_Barrier(mpi.comm);
422 }
423 }
424 // NOLINTBEGIN(bugprone-empty-catch)
425 // Empty catch intentional: env var contains a malformed
426 // number (stod/stoi throws); treat as "unset" and leave the
427 // default. Logging here would fail inside static-ctor phase.
428 catch (...)
429 {
430 }
431 // NOLINTEND(bugprone-empty-catch)
432 }
433
435 {
436 static CommStrategy strategy;
437 return strategy;
438 }
439
441 {
442 return _array_strategy;
443 }
444
446 {
447 _array_strategy = t;
448 }
449
451 {
452 return _use_strong_sync_wait;
453 }
454
456 {
457 return _use_async_one_by_one;
458 }
459
461 {
462 return _use_lazy_wait;
463 }
464}
465
466namespace DNDS // TODO: get a concurrency header
467{
468 std::mutex HDF_mutex;
469}
470
471#ifdef NDEBUG_DISABLED
472# define NDEBUG
473# undef NDEBUG_DISABLED
474#endif
#define DNDS_assert(expr)
Debug-only assertion (compiled out when DNDS_NDEBUG is defined). Prints the expression + file/line + ...
Definition Errors.hpp:112
#define start_timer
Definition MPI.cpp:146
#define stop_timer
Definition MPI.cpp:147
MPI wrappers: MPIInfo, collective operations, type mapping, CommStrategy.
Wall-clock performance timer and running scalar statistics utilities.
Process-singleton managing the buffer attached to MPI for MPI_Bsend (buffered sends).
Definition MPI.hpp:600
static MPIBufferHandler & Instance()
Access the process-wide singleton.
Definition MPI.cpp:106
Process-wide singleton that selects how ArrayTransformer packs and waits for MPI messages.
Definition MPI.hpp:780
static CommStrategy & Instance()
Access the process-wide singleton.
Definition MPI.cpp:434
bool GetUseStrongSyncWait() const
Whether barriers are inserted around Waitall for profiling.
Definition MPI.cpp:450
double GetUseLazyWait() const
Polling interval (ns) for MPI::WaitallLazy. 0 means use MPI_Waitall.
Definition MPI.cpp:460
ArrayCommType GetArrayStrategy()
Current array-pack strategy.
Definition MPI.cpp:440
bool GetUseAsyncOneByOne() const
Whether transformers should use one-by-one Isend/Irecv.
Definition MPI.cpp:455
ArrayCommType
Which derived-type strategy ArrayTransformer should use.
Definition MPI.hpp:784
@ InSituPack
Manually pack / unpack into contiguous buffers.
Definition MPI.hpp:787
void SetArrayStrategy(ArrayCommType t)
Override the array-pack strategy (affects subsequently-created transformers).
Definition MPI.cpp:445
Singleton that tracks and releases long-lived MPI resources at MPI_Finalize time.
Definition MPI.hpp:289
static ResourceRecycler & Instance()
Access the process-wide singleton.
Definition MPI.cpp:309
void RegisterCleaner(void *p, std::function< void()> nCleaner)
Register a cleanup callback keyed by p.
Definition MPI.cpp:315
void clean()
Invoke all registered cleaners and drop them. Called by MPI::Finalize().
Definition MPI.cpp:327
void RemoveCleaner(void *p)
Remove a previously-registered cleaner.
Definition MPI.cpp:321
bool IsDebugged()
Whether the current process is running under a debugger. Implemented via /proc/self/status TracerPid ...
Definition MPI.cpp:34
void MPIDebugHold(const MPIInfo &mpi)
If isDebugging is set, block every rank in a busy-wait loop so the user can attach a debugger and ins...
Definition MPI.cpp:58
bool isDebugging
Flag consulted by MPIDebugHold and assert_false_info_mpi.
Definition MPI.cpp:89
MPI_int Allgather(const void *sendbuf, MPI_int sendcount, MPI_Datatype sendtype, void *recvbuf, MPI_int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
Wrapper over MPI_Allgather.
Definition MPI.cpp:229
MPI_int Bcast(void *buf, MPI_int num, MPI_Datatype type, MPI_int source_rank, MPI_Comm comm)
dumb wrapper
Definition MPI.cpp:149
MPI_int WaitallAuto(MPI_int count, MPI_Request *reqs, MPI_Status *statuses)
Wait on an array of requests, choosing between MPI_Waitall and the lazy-poll variant based on CommStr...
Definition MPI.cpp:282
MPI_int BarrierLazy(MPI_Comm comm, uint64_t checkNanoSecs)
Polling barrier that sleeps checkNanoSecs ns between MPI_Test calls. Reduces CPU spin when many ranks...
Definition MPI.cpp:259
MPI_int Alltoall(void *send, MPI_int sendNum, MPI_Datatype typeSend, void *recv, MPI_int recvNum, MPI_Datatype typeRecv, MPI_Comm comm)
Wrapper over MPI_Alltoall (fixed per-peer count).
Definition MPI.cpp:165
MPI_int Scan(const void *sendbuf, void *recvbuf, MPI_int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
Wrapper over MPI_Scan (inclusive prefix reduction).
Definition MPI.cpp:219
MPI_int Alltoallv(void *send, MPI_int *sendSizes, MPI_int *sendStarts, MPI_Datatype sendType, void *recv, MPI_int *recvSizes, MPI_int *recvStarts, MPI_Datatype recvType, MPI_Comm comm)
Wrapper over MPI_Alltoallv (variable per-peer counts + displacements).
Definition MPI.cpp:181
MPI_int Allreduce(const void *sendbuf, void *recvbuf, MPI_int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
Wrapper over MPI_Allreduce.
Definition MPI.cpp:202
MPI_int Barrier(MPI_Comm comm)
Wrapper over MPI_Barrier.
Definition MPI.cpp:247
bool isCudaAware()
Runtime probe: is the current MPI implementation configured with CUDA-aware support?...
Definition MPI.cpp:297
MPI_int WaitallLazy(MPI_int count, MPI_Request *reqs, MPI_Status *statuses, uint64_t checkNanoSecs)
Like WaitallAuto but sleeps checkNanoSecs ns between polls.
Definition MPI.cpp:270
the host side operators are provided as implemented
void MPISerialDo(const MPIInfo &mpi, F f)
Execute f on each rank serially, in rank order.
Definition MPI.hpp:749
std::string getTraceString()
Return a symbolicated stack trace for the calling thread.
Definition Defines.cpp:165
std::string getTimeStamp(const MPIInfo &mpi)
Format a human-readable timestamp using the calling rank as context.
Definition MPI.cpp:115
std::ostream & log()
Return the current DNDSR log stream (either std::cout or the installed file).
Definition Defines.cpp:50
void assert_false_info_mpi(const char *expr, const char *file, int line, const std::string &info, const DNDS::MPIInfo &mpi)
MPI-aware assertion-failure reporter.
Definition MPI.cpp:94
std::mutex HDF_mutex
Global mutex serialising host-side HDF5 calls.
Definition MPI.cpp:468
int MPI_int
MPI counterpart type for MPI_int (= C int). Used for counts and ranks in MPI calls.
Definition MPI.hpp:54
Lightweight bundle of an MPI communicator and the calling rank's coordinates.
Definition MPI.hpp:231
auto result
const tPoint const tPoint const tPoint & p