17# include <sys/ptrace.h>
21#if defined(_WIN32) || defined(__WINDOWS_)
29# define NDEBUG_DISABLED
39 std::ifstream fin(
"/proc/self/status");
45 if (buf ==
"TracerPid:")
54#if defined(_WIN32) || defined(__WINDOWS_)
55 return IsDebuggerPresent();
63 {
log() <<
"Rank " << mpi.rank <<
" PID: " << getpid() << std::endl; });
65#if defined(_WIN32) || defined(__WINDOWS_)
67 {
log() <<
"Rank " << mpi.rank <<
" PID: " << _getpid() << std::endl; });
72 for (
MPI_int ir = 0; ir < mpi.size; ir++)
78 MPI_Bcast(&newDebugFlag, 1, MPI_INT, ir, mpi.comm);
81 MPI_Bcast(&newDebugFlag, 1, MPI_INT, ir, mpi.comm);
98 std::cerr <<
"\033[91m DNDS_assertion failed\033[39m: \"" << expr <<
"\" at [ " << file <<
":" << line <<
" ]\n"
101 MPI_Barrier(mpi.comm);
118 auto result =
static_cast<int64_t
>(std::time(
nullptr));
119 std::array<char, 512> bufTime;
120 std::array<char, 512 + 32> buf;
126#if defined(_WIN32) || defined(__WINDOWS_)
128 pid = GetCurrentProcessId();
130 MPI_Bcast(&
result, 1, MPI_INT64_T, 0, mpi.comm);
131 MPI_Bcast(&pid, 1, MPI_INT64_T, 0, mpi.comm);
133 auto time_result =
static_cast<time_t
>(
result);
135 std::strftime(bufTime.data(), 512,
"%F_%H-%M-%S", std::localtime(&time_result));
137 long pidc =
static_cast<long>(pid);
138 std::sprintf(buf.data(),
"%s_%ld", bufTime.data(), pidc);
147#define __start_timer PerformanceTimer::Instance().StartTimer(PerformanceTimer::Comm)
148#define __stop_timer PerformanceTimer::Instance().StopTimer(PerformanceTimer::Comm)
155 ret = MPI_Bcast(buf, num, type, source_rank, comm);
158 MPI_Request req{MPI_REQUEST_NULL};
159 ret = MPI_Ibcast(buf, num, type, source_rank, comm, &req);
171 ret = MPI_Alltoall(send, sendNum, typeSend, recv, recvNum, typeRecv, comm);
174 MPI_Request req{MPI_REQUEST_NULL};
175 ret = MPI_Ialltoall(send, sendNum, typeSend, recv, recvNum, typeRecv, comm, &req);
183 void *send,
MPI_int *sendSizes,
MPI_int *sendStarts, MPI_Datatype sendType,
184 void *recv,
MPI_int *recvSizes,
MPI_int *recvStarts, MPI_Datatype recvType, MPI_Comm comm)
190 send, sendSizes, sendStarts, sendType,
191 recv, recvSizes, recvStarts, recvType, comm);
194 MPI_Request req{MPI_REQUEST_NULL};
195 ret = MPI_Ialltoallv(send, sendSizes, sendStarts, sendType,
196 recv, recvSizes, recvStarts, recvType, comm, &req);
204 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
209 ret = MPI_Allreduce(sendbuf, recvbuf, count, datatype, op, comm);
212 MPI_Request req{MPI_REQUEST_NULL};
213 ret = MPI_Iallreduce(sendbuf, recvbuf, count, datatype, op, comm, &req);
221 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
225 ret = MPI_Scan(sendbuf, recvbuf, count, datatype, op, comm);
231 void *recvbuf,
MPI_int recvcount,
232 MPI_Datatype recvtype, MPI_Comm comm)
237 ret = MPI_Allgather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm);
240 MPI_Request req{MPI_REQUEST_NULL};
241 ret = MPI_Iallgather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm, &req);
253 ret = MPI_Barrier(comm);
262 MPI_Request req{MPI_REQUEST_NULL};
264 MPI_Ibarrier(comm, &req);
266 if (req != MPI_REQUEST_NULL)
267 MPI_Request_free(&req);
277 ret = MPI_Testall(count, reqs, &flag, statuses);
278 std::this_thread::sleep_for(std::chrono::nanoseconds(checkNanoSecs));
286 return MPI_Waitall(count, reqs, statuses);
301 return 1 == MPIX_Query_cuda_support();
319 cleaners.emplace(std::make_pair(p, std::move(nCleaner)));
330 for (
auto &[k, f] : cleaners)
337 CommStrategy::CommStrategy()
341 auto *ret = std::getenv(
"DNDS_USE_LAZY_WAIT");
342 if (ret != NULL && (std::stod(ret) != 0))
344 _use_lazy_wait = std::stod(ret);
349 log() <<
"Detected DNDS_USE_LAZY_WAIT, setting to " << _use_lazy_wait << std::endl;
358 auto *ret = std::getenv(
"DNDS_ARRAY_STRATEGY_USE_IN_SITU");
359 if (ret != NULL && (std::stoi(ret) != 0))
362 auto mpi = MPIInfo();
365 log() <<
"Detected DNDS_ARRAY_STRATEGY_USE_IN_SITU, setting" << std::endl;
369 MPI_Barrier(
mpi.comm);
377 auto *ret = std::getenv(
"DNDS_USE_STRONG_SYNC_WAIT");
378 if (ret != NULL && (std::stoi(ret) != 0))
380 _use_strong_sync_wait =
true;
381 auto mpi = MPIInfo();
384 log() <<
"Detected DNDS_USE_STRONG_SYNC_WAIT, setting" << std::endl;
388 MPI_Barrier(
mpi.comm);
396 auto *ret = std::getenv(
"DNDS_USE_ASYNC_ONE_BY_ONE");
397 if (ret != NULL && (std::stoi(ret) != 0))
399 _use_async_one_by_one =
true;
400 auto mpi = MPIInfo();
403 log() <<
"Detected DNDS_USE_ASYNC_ONE_BY_ONE, setting" << std::endl;
404 if (
bool(_use_lazy_wait))
407 MPI_Barrier(
mpi.comm);
423 return _array_strategy;
433 return _use_strong_sync_wait;
438 return _use_async_one_by_one;
443 return _use_lazy_wait;
452#ifdef NDEBUG_DISABLED
454# undef NDEBUG_DISABLED
#define DNDS_assert(expr)
Debug-only assertion (compiled out when DNDS_NDEBUG is defined). Prints the expression + file/line + ...
MPI wrappers: MPIInfo, collective operations, type mapping, CommStrategy.
Wall-clock performance timer and running scalar statistics utilities.
Process-singleton managing the buffer attached to MPI for MPI_Bsend (buffered sends).
static MPIBufferHandler & Instance()
Access the process-wide singleton.
Process-wide singleton that selects how ArrayTransformer packs and waits for MPI messages.
static CommStrategy & Instance()
Access the process-wide singleton.
bool GetUseStrongSyncWait() const
Whether barriers are inserted around Waitall for profiling.
double GetUseLazyWait() const
Polling interval (ns) for MPI::WaitallLazy. 0 means use MPI_Waitall.
ArrayCommType GetArrayStrategy()
Current array-pack strategy.
bool GetUseAsyncOneByOne() const
Whether transformers should use one-by-one Isend/Irecv.
ArrayCommType
Which derived-type strategy ArrayTransformer should use.
@ InSituPack
Manually pack / unpack into contiguous buffers.
void SetArrayStrategy(ArrayCommType t)
Override the array-pack strategy (affects subsequently-created transformers).
Singleton that tracks and releases long-lived MPI resources at MPI_Finalize time.
static ResourceRecycler & Instance()
Access the process-wide singleton.
void RegisterCleaner(void *p, std::function< void()> nCleaner)
Register a cleanup callback keyed by p.
void clean()
Invoke all registered cleaners and drop them. Called by MPI::Finalize().
void RemoveCleaner(void *p)
Remove a previously-registered cleaner.
bool IsDebugged()
Whether the current process is running under a debugger. Implemented via /proc/self/status TracerPid ...
void MPIDebugHold(const MPIInfo &mpi)
If isDebugging is set, block every rank in a busy-wait loop so the user can attach a debugger and ins...
bool isDebugging
Flag consulted by MPIDebugHold and assert_false_info_mpi.
MPI_int Allgather(const void *sendbuf, MPI_int sendcount, MPI_Datatype sendtype, void *recvbuf, MPI_int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
Wrapper over MPI_Allgather.
MPI_int Bcast(void *buf, MPI_int num, MPI_Datatype type, MPI_int source_rank, MPI_Comm comm)
dumb wrapper
MPI_int WaitallAuto(MPI_int count, MPI_Request *reqs, MPI_Status *statuses)
Wait on an array of requests, choosing between MPI_Waitall and the lazy-poll variant based on CommStr...
MPI_int BarrierLazy(MPI_Comm comm, uint64_t checkNanoSecs)
Polling barrier that sleeps checkNanoSecs ns between MPI_Test calls. Reduces CPU spin when many ranks...
MPI_int Alltoall(void *send, MPI_int sendNum, MPI_Datatype typeSend, void *recv, MPI_int recvNum, MPI_Datatype typeRecv, MPI_Comm comm)
Wrapper over MPI_Alltoall (fixed per-peer count).
MPI_int Scan(const void *sendbuf, void *recvbuf, MPI_int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
Wrapper over MPI_Scan (inclusive prefix reduction).
MPI_int Alltoallv(void *send, MPI_int *sendSizes, MPI_int *sendStarts, MPI_Datatype sendType, void *recv, MPI_int *recvSizes, MPI_int *recvStarts, MPI_Datatype recvType, MPI_Comm comm)
Wrapper over MPI_Alltoallv (variable per-peer counts + displacements).
MPI_int Allreduce(const void *sendbuf, void *recvbuf, MPI_int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
Wrapper over MPI_Allreduce.
MPI_int Barrier(MPI_Comm comm)
Wrapper over MPI_Barrier.
bool isCudaAware()
Runtime probe: is the current MPI implementation configured with CUDA-aware support?...
MPI_int WaitallLazy(MPI_int count, MPI_Request *reqs, MPI_Status *statuses, uint64_t checkNanoSecs)
Like WaitallAuto but sleeps checkNanoSecs ns between polls.
the host side operators are provided as implemented
void MPISerialDo(const MPIInfo &mpi, F f)
Execute f on each rank serially, in rank order.
std::string getTraceString()
Return a symbolicated stack trace for the calling thread.
std::string getTimeStamp(const MPIInfo &mpi)
Format a human-readable timestamp using the calling rank as context.
std::ostream & log()
Return the current DNDSR log stream (either std::cout or the installed file).
void assert_false_info_mpi(const char *expr, const char *file, int line, const std::string &info, const DNDS::MPIInfo &mpi)
MPI-aware assertion-failure reporter.
std::mutex HDF_mutex
Global mutex serialising host-side HDF5 calls.
int MPI_int
MPI counterpart type for MPI_int (= C int). Used for counts and ranks in MPI calls.
Lightweight bundle of an MPI communicator and the calling rank's coordinates.