DNDSR 0.1.0.dev1+gcd065ad
Distributed Numeric Data Structure for CFV
Loading...
Searching...
No Matches
MPI.cpp
Go to the documentation of this file.
1/// @file MPI.cpp
2/// @brief Implementations of the MPI wrapper functions declared in @ref MPI.hpp:
3/// retry-aware @ref Bcast/@ref Alltoall/@ref Alltoallv/@ref Allreduce/@ref Allgather/@ref Barrier
4/// variants, lazy waits, singleton definitions, CUDA-aware probe.
5
6#include <ctime>
7#include <cstdio>
8#include <cstdlib>
9
10#include <iostream>
11#include <chrono>
12#include <thread>
13#include "MPI.hpp"
14#include "Profiling.hpp"
15
16#ifdef DNDS_UNIX_LIKE
17# include <sys/ptrace.h>
18# include <unistd.h>
19# include <sys/stat.h>
20#endif
21#if defined(_WIN32) || defined(__WINDOWS_)
22# define NOMINMAX
23# include <Windows.h>
24# include <process.h>
25#endif
26
27
28#ifdef NDEBUG
29# define NDEBUG_DISABLED
30# undef NDEBUG
31#endif
32
33namespace DNDS::Debug
34{
36 {
37
38#ifdef DNDS_UNIX_LIKE
39 std::ifstream fin("/proc/self/status"); // able to detect gdb
40 std::string buf;
41 int tpid = 0;
42 while (!fin.eof())
43 {
44 fin >> buf;
45 if (buf == "TracerPid:")
46 {
47 fin >> tpid;
48 break;
49 }
50 }
51 fin.close();
52 return tpid != 0;
53#endif
54#if defined(_WIN32) || defined(__WINDOWS_)
55 return IsDebuggerPresent();
56#endif
57 }
58
59 void MPIDebugHold(const MPIInfo &mpi)
60 {
61#ifdef DNDS_UNIX_LIKE
62 MPISerialDo(mpi, [&]
63 { log() << "Rank " << mpi.rank << " PID: " << getpid() << std::endl; });
64#endif
65#if defined(_WIN32) || defined(__WINDOWS_)
66 MPISerialDo(mpi, [&]
67 { log() << "Rank " << mpi.rank << " PID: " << _getpid() << std::endl; });
68#endif
69 int holdFlag = 1;
70 while (holdFlag)
71 {
72 for (MPI_int ir = 0; ir < mpi.size; ir++)
73 {
74 int newDebugFlag;
75 if (mpi.rank == ir)
76 {
77 newDebugFlag = int(IsDebugged());
78 MPI_Bcast(&newDebugFlag, 1, MPI_INT, ir, mpi.comm);
79 }
80 else
81 MPI_Bcast(&newDebugFlag, 1, MPI_INT, ir, mpi.comm);
82
83 // std::cout << "DBG " << newDebugFlag;
84 if (newDebugFlag)
85 holdFlag = 0;
86 }
87 }
88 }
89
90 bool isDebugging = false;
91}
92
93namespace DNDS
94{
95 void assert_false_info_mpi(const char *expr, const char *file, int line, const std::string &info, const DNDS::MPIInfo &mpi)
96 {
97 std::cerr << ::DNDS::getTraceString() << "\n";
98 std::cerr << "\033[91m DNDS_assertion failed\033[39m: \"" << expr << "\" at [ " << file << ":" << line << " ]\n"
99 << info << std::endl;
101 MPI_Barrier(mpi.comm);
102 std::abort();
103 }
104}
105namespace DNDS
106{
108 {
109 static MPIBufferHandler instance;
110 return instance;
111 }
112}
113
114namespace DNDS
115{
116 std::string getTimeStamp(const MPIInfo &mpi)
117 {
118 auto result = static_cast<int64_t>(std::time(nullptr));
119 std::array<char, 512> bufTime;
120 std::array<char, 512 + 32> buf;
121 int64_t pid = 0;
122#ifdef DNDS_UNIX_LIKE
123 // pid = Debug::getpid();
124 pid = getpid();
125#endif
126#if defined(_WIN32) || defined(__WINDOWS_)
127 // pid = Debug::GetCurrentProcessId();
128 pid = GetCurrentProcessId();
129#endif
130 MPI_Bcast(&result, 1, MPI_INT64_T, 0, mpi.comm);
131 MPI_Bcast(&pid, 1, MPI_INT64_T, 0, mpi.comm);
132
133 auto time_result = static_cast<time_t>(result);
134
135 std::strftime(bufTime.data(), 512, "%F_%H-%M-%S", std::localtime(&time_result));
136
137 long pidc = static_cast<long>(pid);
138 std::sprintf(buf.data(), "%s_%ld", bufTime.data(), pidc);
139
140 return {buf.data()};
141 }
142}
143
144namespace DNDS::MPI
145{
146
147#define __start_timer PerformanceTimer::Instance().StartTimer(PerformanceTimer::Comm)
148#define __stop_timer PerformanceTimer::Instance().StopTimer(PerformanceTimer::Comm)
149 /// @brief dumb wrapper
150 MPI_int Bcast(void *buf, MPI_int num, MPI_Datatype type, MPI_int source_rank, MPI_Comm comm)
151 {
152 int ret{0};
154 if (MPI::CommStrategy::Instance().GetUseLazyWait() == 0)
155 ret = MPI_Bcast(buf, num, type, source_rank, comm);
156 else
157 {
158 MPI_Request req{MPI_REQUEST_NULL};
159 ret = MPI_Ibcast(buf, num, type, source_rank, comm, &req);
160 ret = MPI::WaitallLazy(1, &req, MPI_STATUSES_IGNORE, static_cast<uint64_t>(MPI::CommStrategy::Instance().GetUseLazyWait()));
161 }
163 return ret;
164 }
165
166 MPI_int Alltoall(void *send, MPI_int sendNum, MPI_Datatype typeSend, void *recv, MPI_int recvNum, MPI_Datatype typeRecv, MPI_Comm comm)
167 {
168 int ret{0};
170 if (MPI::CommStrategy::Instance().GetUseLazyWait() == 0)
171 ret = MPI_Alltoall(send, sendNum, typeSend, recv, recvNum, typeRecv, comm);
172 else
173 {
174 MPI_Request req{MPI_REQUEST_NULL};
175 ret = MPI_Ialltoall(send, sendNum, typeSend, recv, recvNum, typeRecv, comm, &req);
176 ret = MPI::WaitallLazy(1, &req, MPI_STATUSES_IGNORE, static_cast<uint64_t>(MPI::CommStrategy::Instance().GetUseLazyWait()));
177 }
179 return ret;
180 }
181
183 void *send, MPI_int *sendSizes, MPI_int *sendStarts, MPI_Datatype sendType,
184 void *recv, MPI_int *recvSizes, MPI_int *recvStarts, MPI_Datatype recvType, MPI_Comm comm)
185 {
186 int ret{0};
188 if (MPI::CommStrategy::Instance().GetUseLazyWait() == 0)
189 ret = MPI_Alltoallv(
190 send, sendSizes, sendStarts, sendType,
191 recv, recvSizes, recvStarts, recvType, comm);
192 else
193 {
194 MPI_Request req{MPI_REQUEST_NULL};
195 ret = MPI_Ialltoallv(send, sendSizes, sendStarts, sendType,
196 recv, recvSizes, recvStarts, recvType, comm, &req);
197 ret = MPI::WaitallLazy(1, &req, MPI_STATUSES_IGNORE, static_cast<uint64_t>(MPI::CommStrategy::Instance().GetUseLazyWait()));
198 }
200 return ret;
201 }
202
203 MPI_int Allreduce(const void *sendbuf, void *recvbuf, MPI_int count,
204 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
205 {
206 int ret{0};
208 if (MPI::CommStrategy::Instance().GetUseLazyWait() == 0)
209 ret = MPI_Allreduce(sendbuf, recvbuf, count, datatype, op, comm);
210 else
211 {
212 MPI_Request req{MPI_REQUEST_NULL};
213 ret = MPI_Iallreduce(sendbuf, recvbuf, count, datatype, op, comm, &req);
214 ret = MPI::WaitallLazy(1, &req, MPI_STATUSES_IGNORE, static_cast<uint64_t>(MPI::CommStrategy::Instance().GetUseLazyWait()));
215 }
217 return ret;
218 }
219
220 MPI_int Scan(const void *sendbuf, void *recvbuf, MPI_int count,
221 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
222 {
223 int ret{0}; // todo: add wait lazy?
225 ret = MPI_Scan(sendbuf, recvbuf, count, datatype, op, comm);
227 return ret;
228 }
229
230 MPI_int Allgather(const void *sendbuf, MPI_int sendcount, MPI_Datatype sendtype,
231 void *recvbuf, MPI_int recvcount,
232 MPI_Datatype recvtype, MPI_Comm comm)
233 {
234 int ret{0};
236 if (MPI::CommStrategy::Instance().GetUseLazyWait() == 0)
237 ret = MPI_Allgather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm);
238 else
239 {
240 MPI_Request req{MPI_REQUEST_NULL};
241 ret = MPI_Iallgather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm, &req);
242 ret = MPI::WaitallLazy(1, &req, MPI_STATUSES_IGNORE, static_cast<uint64_t>(MPI::CommStrategy::Instance().GetUseLazyWait()));
243 }
245 return ret;
246 }
247
248 MPI_int Barrier(MPI_Comm comm)
249 {
250 int ret{0};
252 if (MPI::CommStrategy::Instance().GetUseLazyWait() == 0)
253 ret = MPI_Barrier(comm);
254 else
255 ret = MPI::BarrierLazy(comm, static_cast<uint64_t>(MPI::CommStrategy::Instance().GetUseLazyWait()));
257 return ret;
258 }
259
260 MPI_int BarrierLazy(MPI_Comm comm, uint64_t checkNanoSecs)
261 {
262 MPI_Request req{MPI_REQUEST_NULL};
263 MPI_Status stat;
264 MPI_Ibarrier(comm, &req);
265 MPI_int ret = MPI::WaitallLazy(1, &req, &stat, checkNanoSecs);
266 if (req != MPI_REQUEST_NULL)
267 MPI_Request_free(&req);
268 return ret;
269 }
270
271 MPI_int WaitallLazy(MPI_int count, MPI_Request *reqs, MPI_Status *statuses, uint64_t checkNanoSecs)
272 {
273 MPI_int flag = 0;
274 MPI_int ret;
275 while (!flag)
276 {
277 ret = MPI_Testall(count, reqs, &flag, statuses);
278 std::this_thread::sleep_for(std::chrono::nanoseconds(checkNanoSecs));
279 }
280 return ret;
281 }
282
283 MPI_int WaitallAuto(MPI_int count, MPI_Request *reqs, MPI_Status *statuses)
284 {
285 if (MPI::CommStrategy::Instance().GetUseLazyWait() == 0)
286 return MPI_Waitall(count, reqs, statuses);
287 else
288 return MPI::WaitallLazy(count, reqs, statuses, static_cast<uint64_t>(MPI::CommStrategy::Instance().GetUseLazyWait()));
289 }
290
291#undef __start_timer
292#undef __stop_timer
293
294}
295
296namespace DNDS::MPI
297{
299 {
300#ifdef OPEN_MPI
301 return 1 == MPIX_Query_cuda_support();
302#else
303 return false;
304#endif
305 }
306}
307
308namespace DNDS::MPI
309{
311 {
312 static ResourceRecycler recycler;
313 return recycler;
314 }
315
316 void ResourceRecycler::RegisterCleaner(void *p, std::function<void()> nCleaner)
317 {
318 DNDS_assert(cleaners.count(p) == 0);
319 cleaners.emplace(std::make_pair(p, std::move(nCleaner)));
320 }
321
323 {
324 DNDS_assert(cleaners.count(p) == 1);
325 cleaners.erase(p);
326 }
327
329 {
330 for (auto &[k, f] : cleaners)
331 f();
332 }
333}
334
335namespace DNDS::MPI
336{
337 CommStrategy::CommStrategy()
338 {
339 try
340 {
341 auto *ret = std::getenv("DNDS_USE_LAZY_WAIT");
342 if (ret != NULL && (std::stod(ret) != 0))
343 {
344 _use_lazy_wait = std::stod(ret);
345 auto mpi = MPIInfo();
346 mpi.setWorld();
347 // std::cout << mpi.rank << std::endl;
348 if (mpi.rank == 0)
349 log() << "Detected DNDS_USE_LAZY_WAIT, setting to " << _use_lazy_wait << std::endl;
350 MPI::BarrierLazy(mpi.comm, static_cast<uint64_t>(_use_lazy_wait));
351 }
352 }
353 catch (...)
354 {
355 }
356 try
357 {
358 auto *ret = std::getenv("DNDS_ARRAY_STRATEGY_USE_IN_SITU");
359 if (ret != NULL && (std::stoi(ret) != 0))
360 {
361 _array_strategy = InSituPack;
362 auto mpi = MPIInfo();
363 mpi.setWorld();
364 if (mpi.rank == 0)
365 log() << "Detected DNDS_ARRAY_STRATEGY_USE_IN_SITU, setting" << std::endl;
366 if (_use_lazy_wait)
367 MPI::BarrierLazy(mpi.comm, static_cast<uint64_t>(_use_lazy_wait));
368 else
369 MPI_Barrier(mpi.comm);
370 }
371 }
372 catch (...)
373 {
374 }
375 try
376 {
377 auto *ret = std::getenv("DNDS_USE_STRONG_SYNC_WAIT");
378 if (ret != NULL && (std::stoi(ret) != 0))
379 {
380 _use_strong_sync_wait = true;
381 auto mpi = MPIInfo();
382 mpi.setWorld();
383 if (mpi.rank == 0)
384 log() << "Detected DNDS_USE_STRONG_SYNC_WAIT, setting" << std::endl;
385 if (_use_lazy_wait)
386 MPI::BarrierLazy(mpi.comm, static_cast<uint64_t>(_use_lazy_wait));
387 else
388 MPI_Barrier(mpi.comm);
389 }
390 }
391 catch (...)
392 {
393 }
394 try
395 {
396 auto *ret = std::getenv("DNDS_USE_ASYNC_ONE_BY_ONE");
397 if (ret != NULL && (std::stoi(ret) != 0))
398 {
399 _use_async_one_by_one = true;
400 auto mpi = MPIInfo();
401 mpi.setWorld();
402 if (mpi.rank == 0)
403 log() << "Detected DNDS_USE_ASYNC_ONE_BY_ONE, setting" << std::endl;
404 if (bool(_use_lazy_wait))
405 MPI::BarrierLazy(mpi.comm, static_cast<uint64_t>(_use_lazy_wait));
406 else
407 MPI_Barrier(mpi.comm);
408 }
409 }
410 catch (...)
411 {
412 }
413 }
414
416 {
417 static CommStrategy strategy;
418 return strategy;
419 }
420
422 {
423 return _array_strategy;
424 }
425
427 {
428 _array_strategy = t;
429 }
430
432 {
433 return _use_strong_sync_wait;
434 }
435
437 {
438 return _use_async_one_by_one;
439 }
440
442 {
443 return _use_lazy_wait;
444 }
445}
446
447namespace DNDS // TODO: get a concurrency header
448{
449 std::mutex HDF_mutex;
450}
451
452#ifdef NDEBUG_DISABLED
453# define NDEBUG
454# undef NDEBUG_DISABLED
455#endif
#define DNDS_assert(expr)
Debug-only assertion (compiled out when DNDS_NDEBUG is defined). Prints the expression + file/line + ...
Definition Errors.hpp:108
#define __stop_timer
Definition MPI.cpp:148
#define __start_timer
Definition MPI.cpp:147
MPI wrappers: MPIInfo, collective operations, type mapping, CommStrategy.
Wall-clock performance timer and running scalar statistics utilities.
Process-singleton managing the buffer attached to MPI for MPI_Bsend (buffered sends).
Definition MPI.hpp:561
static MPIBufferHandler & Instance()
Access the process-wide singleton.
Definition MPI.cpp:107
Process-wide singleton that selects how ArrayTransformer packs and waits for MPI messages.
Definition MPI.hpp:729
static CommStrategy & Instance()
Access the process-wide singleton.
Definition MPI.cpp:415
bool GetUseStrongSyncWait() const
Whether barriers are inserted around Waitall for profiling.
Definition MPI.cpp:431
double GetUseLazyWait() const
Polling interval (ns) for MPI::WaitallLazy. 0 means use MPI_Waitall.
Definition MPI.cpp:441
ArrayCommType GetArrayStrategy()
Current array-pack strategy.
Definition MPI.cpp:421
bool GetUseAsyncOneByOne() const
Whether transformers should use one-by-one Isend/Irecv.
Definition MPI.cpp:436
ArrayCommType
Which derived-type strategy ArrayTransformer should use.
Definition MPI.hpp:733
@ InSituPack
Manually pack / unpack into contiguous buffers.
Definition MPI.hpp:736
void SetArrayStrategy(ArrayCommType t)
Override the array-pack strategy (affects subsequently-created transformers).
Definition MPI.cpp:426
Singleton that tracks and releases long-lived MPI resources at MPI_Finalize time.
Definition MPI.hpp:273
static ResourceRecycler & Instance()
Access the process-wide singleton.
Definition MPI.cpp:310
void RegisterCleaner(void *p, std::function< void()> nCleaner)
Register a cleanup callback keyed by p.
Definition MPI.cpp:316
void clean()
Invoke all registered cleaners and drop them. Called by MPI::Finalize().
Definition MPI.cpp:328
void RemoveCleaner(void *p)
Remove a previously-registered cleaner.
Definition MPI.cpp:322
bool IsDebugged()
Whether the current process is running under a debugger. Implemented via /proc/self/status TracerPid ...
Definition MPI.cpp:35
void MPIDebugHold(const MPIInfo &mpi)
If isDebugging is set, block every rank in a busy-wait loop so the user can attach a debugger and ins...
Definition MPI.cpp:59
bool isDebugging
Flag consulted by MPIDebugHold and assert_false_info_mpi.
Definition MPI.cpp:90
MPI_int Allgather(const void *sendbuf, MPI_int sendcount, MPI_Datatype sendtype, void *recvbuf, MPI_int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
Wrapper over MPI_Allgather.
Definition MPI.cpp:230
MPI_int Bcast(void *buf, MPI_int num, MPI_Datatype type, MPI_int source_rank, MPI_Comm comm)
dumb wrapper
Definition MPI.cpp:150
MPI_int WaitallAuto(MPI_int count, MPI_Request *reqs, MPI_Status *statuses)
Wait on an array of requests, choosing between MPI_Waitall and the lazy-poll variant based on CommStr...
Definition MPI.cpp:283
MPI_int BarrierLazy(MPI_Comm comm, uint64_t checkNanoSecs)
Polling barrier that sleeps checkNanoSecs ns between MPI_Test calls. Reduces CPU spin when many ranks...
Definition MPI.cpp:260
MPI_int Alltoall(void *send, MPI_int sendNum, MPI_Datatype typeSend, void *recv, MPI_int recvNum, MPI_Datatype typeRecv, MPI_Comm comm)
Wrapper over MPI_Alltoall (fixed per-peer count).
Definition MPI.cpp:166
MPI_int Scan(const void *sendbuf, void *recvbuf, MPI_int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
Wrapper over MPI_Scan (inclusive prefix reduction).
Definition MPI.cpp:220
MPI_int Alltoallv(void *send, MPI_int *sendSizes, MPI_int *sendStarts, MPI_Datatype sendType, void *recv, MPI_int *recvSizes, MPI_int *recvStarts, MPI_Datatype recvType, MPI_Comm comm)
Wrapper over MPI_Alltoallv (variable per-peer counts + displacements).
Definition MPI.cpp:182
MPI_int Allreduce(const void *sendbuf, void *recvbuf, MPI_int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
Wrapper over MPI_Allreduce.
Definition MPI.cpp:203
MPI_int Barrier(MPI_Comm comm)
Wrapper over MPI_Barrier.
Definition MPI.cpp:248
bool isCudaAware()
Runtime probe: is the current MPI implementation configured with CUDA-aware support?...
Definition MPI.cpp:298
MPI_int WaitallLazy(MPI_int count, MPI_Request *reqs, MPI_Status *statuses, uint64_t checkNanoSecs)
Like WaitallAuto but sleeps checkNanoSecs ns between polls.
Definition MPI.cpp:271
the host side operators are provided as implemented
void MPISerialDo(const MPIInfo &mpi, F f)
Execute f on each rank serially, in rank order.
Definition MPI.hpp:698
std::string getTraceString()
Return a symbolicated stack trace for the calling thread.
Definition Defines.cpp:162
std::string getTimeStamp(const MPIInfo &mpi)
Format a human-readable timestamp using the calling rank as context.
Definition MPI.cpp:116
std::ostream & log()
Return the current DNDSR log stream (either std::cout or the installed file).
Definition Defines.cpp:49
void assert_false_info_mpi(const char *expr, const char *file, int line, const std::string &info, const DNDS::MPIInfo &mpi)
MPI-aware assertion-failure reporter.
Definition MPI.cpp:95
std::mutex HDF_mutex
Global mutex serialising host-side HDF5 calls.
Definition MPI.cpp:449
int MPI_int
MPI counterpart type for MPI_int (= C int). Used for counts and ranks in MPI calls.
Definition MPI.hpp:43
Lightweight bundle of an MPI communicator and the calling rank's coordinates.
Definition MPI.hpp:215
auto result