diff --git a/xdl/ps-plus/ps-plus/plugins/hdfs/libhdfs.cc b/xdl/ps-plus/ps-plus/plugins/hdfs/libhdfs.cc index e034c908..ec241480 100644 --- a/xdl/ps-plus/ps-plus/plugins/hdfs/libhdfs.cc +++ b/xdl/ps-plus/ps-plus/plugins/hdfs/libhdfs.cc @@ -74,11 +74,11 @@ Status LibHDFS::LoadAndBind() { const char* hdfs_root = getenv("HADOOP_HDFS_HOME"); if (hdfs_root == nullptr) { return Status::NotFound("HADOOP_HDFS_HOME is not set"); - } + } const std::string& libhdfs = std::string(hdfs_root) + "/lib/native/libhdfs.so"; void* dl = dlopen(libhdfs.c_str(), RTLD_LAZY); if (dl == nullptr) { - return Status::NotFound("cannot find $HADOOP_HDFS_HOME/lib/native/libhdfs.so"); + return Status::NotFound(dlerror()); } return LoadSymbols(dl); } diff --git a/xdl/ps-plus/ps-plus/scheduler/scheduler_impl.cc b/xdl/ps-plus/ps-plus/scheduler/scheduler_impl.cc index 1adbeabf..c2c4f979 100644 --- a/xdl/ps-plus/ps-plus/scheduler/scheduler_impl.cc +++ b/xdl/ps-plus/ps-plus/scheduler/scheduler_impl.cc @@ -178,8 +178,17 @@ Status SchedulerImpl::UpdateVariableInfo(Version version, unique_lock lock(m_); if (!ready_) { return Status::NotReady("Cluster is not ready"); } if (version != version_) { return VersionMismatch(version_, version); } + return InternalUpdateVariableInfo(info, result); +} + +Status SchedulerImpl::InternalUpdateVariableInfo(const vector& info, vector* result) { map m; for (const auto& i: info) { m[i.name] = i; } + for (auto& i: variable_info_) { + if (m[i.name].type == VariableInfo::Type::kHash) { + i.shape[0] = m[i.name].shape[0]; + } + } for (const auto& i: variable_info_) { m[i.name] = i; } vector v; for (const auto& it: m) { v.push_back(it.second); } @@ -187,7 +196,7 @@ Status SchedulerImpl::UpdateVariableInfo(Version version, if (!st.IsOk()) { return st; } variable_info_.clear(); for (const auto& i: *result) { variable_info_.push_back(i); } - return Status::Ok(); + return Status::Ok(); } Status SchedulerImpl::UpdateVariableVisitInfo(Version version, const std::string& var_name, int64_t ids) { @@ -376,6 +385,7 @@ Status SchedulerImpl::InternalRestore(const string& checkpoint) { { std::unique_ptr s; Status st = FileSystem::OpenReadStreamAny(checkpoint_path_ + "/checkpoints", &s); + LOG(INFO) << "st is " << st.ToString(); // Ignore st fail when we use a fresh checkpoint dir if (st.IsOk()) { size_t size; @@ -430,11 +440,20 @@ Status SchedulerImpl::InternalRestore(const string& checkpoint) { } if (old_server != service_->GetServerSize(0)) { - //TODO - return Status::NotImplemented("Server Count Change is not supported"); - } else { - variable_info_ = infos; + LOG(INFO) << "Change ps_num from " << old_server << " to " << service_->GetServerSize(0); + } + LOG(INFO) << "Info from checkpoint " << PrintVariableInfo(infos); + vector result; + vector input_infos = infos; + for (auto& i: input_infos) { + i.parts.clear(); } + Status st = InternalUpdateVariableInfo(input_infos, &result); + if (!st.IsOk()) { + return st; + } + variable_info_ = result; + LOG(INFO) << "Info from placement " << PrintVariableInfo(variable_info_); } else { infos = variable_info_; } @@ -456,6 +475,8 @@ Status SchedulerImpl::InternalRestore(const string& checkpoint) { if (!st.IsOk() && collect.IsOk()) { collect = st; } + std::string st_str = st.IsOk() ? "OK" : st.Msg(); + LOG(INFO) << "server " << id << " finish restore, status " << st_str << " waiting " << count_down - 1 << " more."; if (--count_down == 0) { lock.unlock(); result.set_value(collect); @@ -907,3 +928,19 @@ void SchedulerImpl::InternalSynchronizeLeave(Version version, int id, int64_t to } sync->Leave(id, token, cb); } + +std::string SchedulerImpl::PrintVariableInfo(const std::vector& infos) { + std::string logger = ""; + for (const auto& item : infos) { + logger += item.name + " cb); ps::Status UpdateVariableVisitInfo(Version version, const std::string& var_name, int64_t ids); ps::Status WriteMetaInfo(); + std::string PrintVariableInfo(const std::vector& infos); private: std::unique_ptr main_thread_; @@ -110,6 +111,7 @@ class SchedulerImpl { void WaitForServers(); std::vector variable_info_; + Status InternalUpdateVariableInfo(const std::vector& info, std::vector* result); Status InternalRestore(const std::string& checkpoint); Status InternalSave(const std::string& checkpoint); Status InternalTriggerStreamingDense(Version version);