Skip to content

Commit

Permalink
Merge branch 'main' into chhwang/lazy-stream-creation
Browse files Browse the repository at this point in the history
  • Loading branch information
chhwang authored Jan 15, 2025
2 parents 3a70ecd + 869cdba commit 24b8458
Show file tree
Hide file tree
Showing 12 changed files with 111 additions and 57 deletions.
6 changes: 3 additions & 3 deletions apps/nccl/src/nccl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ NCCL_API ncclResult_t ncclCommInitRank(ncclComm_t* comm, int nranks, ncclUniqueI
if (mscclppComm->bootstrap()->getNranks() == mscclppComm->bootstrap()->getNranksPerNode())
ncclCommInitRankFallbackSingleNode(commPtr, mscclppComm, rank);

const std::string& collectiveDir = mscclpp::env().executionPlanDir;
const std::string& collectiveDir = mscclpp::env()->executionPlanDir;
if (collectiveDir != "") {
if (!std::filesystem::is_directory(collectiveDir)) {
WARN("The value of the environment variable %s is not a directory", collectiveDir.c_str());
Expand All @@ -431,7 +431,7 @@ NCCL_API ncclResult_t ncclCommInitRank(ncclComm_t* comm, int nranks, ncclUniqueI

*comm = commPtr;
#if defined(ENABLE_NPKIT)
if (mscclpp::env().npkitDumpDir != "") {
if (mscclpp::env()->npkitDumpDir != "") {
NpKit::Init(rank);
}
#endif
Expand All @@ -455,7 +455,7 @@ NCCL_API ncclResult_t ncclCommDestroy(ncclComm_t comm) {
return ncclInvalidArgument;
}
#if defined(ENABLE_NPKIT)
const std::string& npkitDumpDir = mscclpp::env().npkitDumpDir;
const std::string& npkitDumpDir = mscclpp::env()->npkitDumpDir;
if (npkitDumpDir != "") {
NpKit::Dump(npkitDumpDir);
NpKit::Shutdown();
Expand Down
40 changes: 26 additions & 14 deletions include/mscclpp/env.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,38 @@
#ifndef MSCCLPP_ENV_HPP_
#define MSCCLPP_ENV_HPP_

#include <memory>
#include <string>

namespace mscclpp {

struct Env {
class Env;

/// Get the MSCCL++ environment.
/// @return A reference to the global environment object.
std::shared_ptr<Env> env();

/// The MSCCL++ environment. The constructor reads environment variables and sets the corresponding fields.
/// Use the @ref env() function to get the environment object.
class Env {
public:
const std::string debug;
const std::string debugSubsys;
const std::string debugFile;
const std::string hcaDevices;
const std::string hostid;
const std::string socketFamily;
const std::string socketIfname;
const std::string commId;
const std::string executionPlanDir;
const std::string npkitDumpDir;
const bool cudaIpcUseDefaultStream;

private:
Env();
std::string debug;
std::string debugSubsys;
std::string debugFile;
std::string hcaDevices;
std::string hostid;
std::string socketFamily;
std::string socketIfname;
std::string commId;
std::string executionPlanDir;
std::string npkitDumpDir;
bool cudaIpcUseDefaultStream;
};

const Env &env();
friend std::shared_ptr<Env> env();
};

} // namespace mscclpp

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ version = "0.6.0"

[tool.scikit-build]
cmake.version = ">=3.25.0"
cmake.build-type = "Release"
build-dir = "build/{wheel_tag}"
wheel.packages = ["python/mscclpp", "python/mscclpp_benchmark"]
wheel.install-dir = "mscclpp"
Expand Down
4 changes: 3 additions & 1 deletion python/mscclpp/env_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// Licensed under the MIT license.

#include <nanobind/nanobind.h>
#include <nanobind/stl/shared_ptr.h>
#include <nanobind/stl/string.h>

#include <mscclpp/env.hpp>

Expand All @@ -22,5 +24,5 @@ void register_env(nb::module_& m) {
.def_ro("npkit_dump_dir", &Env::npkitDumpDir)
.def_ro("cuda_ipc_use_default_stream", &Env::cudaIpcUseDefaultStream);

m.def("env", &env, nb::rv_policy::copy);
m.def("env", &env);
}
13 changes: 13 additions & 0 deletions python/test/test_mscclpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@
ethernet_interface_name = "eth0"


@parametrize_mpi_groups(1)
def test_env(mpi_group: MpiGroup):
e = env()
assert isinstance(e.debug, str)
with pytest.raises(AttributeError):
# all attributes should be read-only
e.debug = "INFO"

# should be the same object
e2 = env()
assert e == e2


def all_ranks_on_the_same_node(mpi_group: MpiGroup):
if (ethernet_interface_name in ni.interfaces()) is False:
pytest.skip(f"{ethernet_interface_name} is not an interface to use on this node")
Expand Down
6 changes: 3 additions & 3 deletions src/bootstrap/socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ static uint16_t socketToPort(union SocketAddress* addr) {
/* Allow the user to force the IPv4/IPv6 interface selection */
static int envSocketFamily(void) {
int family = -1; // Family selection is not forced, will use first one found
const std::string& socketFamily = env().socketFamily;
const std::string& socketFamily = env()->socketFamily;
if (socketFamily == "") return family;

if (socketFamily == "AF_INET")
Expand Down Expand Up @@ -305,7 +305,7 @@ int FindInterfaces(char* ifNames, union SocketAddress* ifAddrs, int ifNameMaxSiz
// Allow user to force the INET socket family selection
int sock_family = envSocketFamily();
// User specified interface
const std::string& socketIfname = env().socketIfname;
const std::string& socketIfname = env()->socketIfname;
if (inputIfName) {
INFO(MSCCLPP_NET, "using iterface %s", inputIfName);
nIfs = findInterfaces(inputIfName, ifNames, ifAddrs, sock_family, ifNameMaxSize, maxIfs);
Expand All @@ -319,7 +319,7 @@ int FindInterfaces(char* ifNames, union SocketAddress* ifAddrs, int ifNameMaxSiz
nIfs = findInterfaces("ib", ifNames, ifAddrs, sock_family, ifNameMaxSize, maxIfs);
// else see if we can get some hint from COMM ID
if (nIfs == 0) {
const std::string& commId = env().commId;
const std::string& commId = env()->commId;
if (commId != "") {
// Try to find interface that is in the same subnet as the IP in comm id
union SocketAddress idAddr;
Expand Down
6 changes: 3 additions & 3 deletions src/debug.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void mscclppDebugInit() {
pthread_mutex_unlock(&mscclppDebugLock);
return;
}
const char* mscclpp_debug = mscclpp::env().debug.c_str();
const char* mscclpp_debug = mscclpp::env()->debug.c_str();
int tempNcclDebugLevel = -1;
if (mscclpp_debug == NULL) {
tempNcclDebugLevel = MSCCLPP_LOG_NONE;
Expand All @@ -55,7 +55,7 @@ void mscclppDebugInit() {
* This can be a comma separated list such as INIT,COLL
* or ^INIT,COLL etc
*/
std::string mscclppDebugSubsysStr = mscclpp::env().debugSubsys;
std::string mscclppDebugSubsysStr = mscclpp::env()->debugSubsys;
const char* mscclppDebugSubsysEnv = mscclppDebugSubsysStr.c_str();
if (mscclppDebugSubsysStr != "") {
int invert = 0;
Expand Down Expand Up @@ -110,7 +110,7 @@ void mscclppDebugInit() {
* then create the debug file. But don't bother unless the
* MSCCLPP_DEBUG level is > VERSION
*/
const char* mscclppDebugFileEnv = mscclpp::env().debugFile.c_str();
const char* mscclppDebugFileEnv = mscclpp::env()->debugFile.c_str();
if (tempNcclDebugLevel > MSCCLPP_LOG_VERSION && mscclppDebugFileEnv != NULL) {
int c = 0;
char debugFn[PATH_MAX + 1] = "";
Expand Down
84 changes: 55 additions & 29 deletions src/env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the MIT license.

#include <cstdlib>
#include <memory>
#include <type_traits>

// clang-format off
Expand All @@ -13,7 +12,19 @@
#include "debug.h"

template <typename T>
void readEnv(const std::string &envName, T &env) {
T readEnv(const std::string &envName, const T &defaultValue) {
const char *envCstr = getenv(envName.c_str());
if (envCstr == nullptr) return defaultValue;
if constexpr (std::is_same_v<T, int>) {
return atoi(envCstr);
} else if constexpr (std::is_same_v<T, bool>) {
return (std::string(envCstr) != "0");
}
return T(envCstr);
}

template <typename T>
void readAndSetEnv(const std::string &envName, T &env) {
const char *envCstr = getenv(envName.c_str());
if (envCstr == nullptr) return;
if constexpr (std::is_same_v<T, int>) {
Expand All @@ -23,39 +34,54 @@ void readEnv(const std::string &envName, T &env) {
} else {
env = std::string(envCstr);
}
INFO(MSCCLPP_ENV, "%s=%s", envName.c_str(), envCstr);
}

template <typename T>
void logEnv(const std::string &envName, const T &env) {
if (!getenv(envName.c_str())) return;
INFO(MSCCLPP_ENV, "%s=%d", envName.c_str(), env);
}

template <>
void logEnv(const std::string &envName, const std::string &env) {
if (!getenv(envName.c_str())) return;
INFO(MSCCLPP_ENV, "%s=%s", envName.c_str(), env.c_str());
}

namespace mscclpp {

Env::Env()
: debug(),
debugSubsys(),
debugFile(),
hcaDevices(),
hostid(),
socketFamily(),
socketIfname(),
commId(),
executionPlanDir(),
npkitDumpDir(),
cudaIpcUseDefaultStream(false) {
readEnv("MSCCLPP_DEBUG", debug);
readEnv("MSCCLPP_DEBUG_SUBSYS", debugSubsys);
readEnv("MSCCLPP_DEBUG_FILE", debugFile);
readEnv("MSCCLPP_HCA_DEVICES", hcaDevices);
readEnv("MSCCLPP_HOSTID", hostid);
readEnv("MSCCLPP_SOCKET_FAMILY", socketFamily);
readEnv("MSCCLPP_SOCKET_IFNAME", socketIfname);
readEnv("MSCCLPP_COMM_ID", commId);
readEnv("MSCCLPP_EXECUTION_PLAN_DIR", executionPlanDir);
readEnv("MSCCLPP_NPKIT_DUMP_DIR", npkitDumpDir);
readEnv("MSCCLPP_CUDAIPC_USE_DEFAULT_STREAM", cudaIpcUseDefaultStream);
}
: debug(readEnv<std::string>("MSCCLPP_DEBUG", "")),
debugSubsys(readEnv<std::string>("MSCCLPP_DEBUG_SUBSYS", "")),
debugFile(readEnv<std::string>("MSCCLPP_DEBUG_FILE", "")),
hcaDevices(readEnv<std::string>("MSCCLPP_HCA_DEVICES", "")),
hostid(readEnv<std::string>("MSCCLPP_HOSTID", "")),
socketFamily(readEnv<std::string>("MSCCLPP_SOCKET_FAMILY", "")),
socketIfname(readEnv<std::string>("MSCCLPP_SOCKET_IFNAME", "")),
commId(readEnv<std::string>("MSCCLPP_COMM_ID", "")),
executionPlanDir(readEnv<std::string>("MSCCLPP_EXECUTION_PLAN_DIR", "")),
npkitDumpDir(readEnv<std::string>("MSCCLPP_NPKIT_DUMP_DIR", "")),
cudaIpcUseDefaultStream(readEnv<bool>("MSCCLPP_CUDAIPC_USE_DEFAULT_STREAM", false)) {}

const Env &env() {
static std::unique_ptr<Env> globalEnv = std::make_unique<Env>();
return *globalEnv;
std::shared_ptr<Env> env() {
static std::shared_ptr<Env> globalEnv = std::shared_ptr<Env>(new Env());
static bool logged = false;
if (!logged) {
logged = true;
// cannot log inside the constructor because of circular dependency
logEnv("MSCCLPP_DEBUG", globalEnv->debug);
logEnv("MSCCLPP_DEBUG_SUBSYS", globalEnv->debugSubsys);
logEnv("MSCCLPP_DEBUG_FILE", globalEnv->debugFile);
logEnv("MSCCLPP_HCA_DEVICES", globalEnv->hcaDevices);
logEnv("MSCCLPP_HOSTID", globalEnv->hostid);
logEnv("MSCCLPP_SOCKET_FAMILY", globalEnv->socketFamily);
logEnv("MSCCLPP_SOCKET_IFNAME", globalEnv->socketIfname);
logEnv("MSCCLPP_COMM_ID", globalEnv->commId);
logEnv("MSCCLPP_EXECUTION_PLAN_DIR", globalEnv->executionPlanDir);
logEnv("MSCCLPP_NPKIT_DUMP_DIR", globalEnv->npkitDumpDir);
logEnv("MSCCLPP_CUDAIPC_USE_DEFAULT_STREAM", globalEnv->cudaIpcUseDefaultStream);
}
return globalEnv;
}

} // namespace mscclpp
2 changes: 1 addition & 1 deletion src/ib.cc
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ MSCCLPP_API_CPP int getIBDeviceCount() {
}

std::string getHcaDevices(int deviceIndex) {
std::string envStr = env().hcaDevices;
std::string envStr = env()->hcaDevices;
if (envStr != "") {
std::vector<std::string> devices;
std::stringstream ss(envStr);
Expand Down
2 changes: 1 addition & 1 deletion src/utils_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ uint64_t computeHostHash(void) {
std::string hostName = getHostName(hashLen, '\0');
strncpy(hostHash, hostName.c_str(), hostName.size());

std::string hostid = env().hostid;
std::string hostid = env()->hostid;
if (hostid != "") {
strncpy(hostHash, hostid.c_str(), hashLen);
} else if (hostName.size() < hashLen) {
Expand Down
2 changes: 1 addition & 1 deletion test/executor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ int main(int argc, char* argv[]) {
const std::string executionPlanPath = argv[2];
const int niters = std::stoi(argv[3]);
const int ngraphIters = std::stoi(argv[4]);
const char* npkitDumpDir = mscclpp::env().npkitDumpDir.c_str();
const char* npkitDumpDir = mscclpp::env()->npkitDumpDir.c_str();
mscclpp::PacketType packetType = mscclpp::PacketType::LL16;
if (argc == 6) {
packetType = parsePacketType(argv[5]);
Expand Down
2 changes: 1 addition & 1 deletion test/mp_unit/executor_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ void ExecutorTest::SetUp() {
bootstrap->initialize(id);
std::shared_ptr<mscclpp::Communicator> communicator = std::make_shared<mscclpp::Communicator>(bootstrap);
executor = std::make_shared<mscclpp::Executor>(communicator);
npkitDumpDir = mscclpp::env().npkitDumpDir;
npkitDumpDir = mscclpp::env()->npkitDumpDir;
if (npkitDumpDir != "") {
NpKit::Init(gEnv->rank);
}
Expand Down

0 comments on commit 24b8458

Please sign in to comment.