Skip to content

Commit

Permalink
Add Jobserver::Client class
Browse files Browse the repository at this point in the history
This adds a new interface class for jobserver clients,
providing a way to acquire and release job slots easily.

Creating a concrete instance takes a Jobserver::Config as
argument, which is used to pick the appropriate implementation
and initialize it.

This commit includes both Posix and Win32 implementations.
  • Loading branch information
digit-google committed Nov 11, 2024
1 parent cecacba commit 65f9c60
Show file tree
Hide file tree
Showing 6 changed files with 547 additions and 2 deletions.
6 changes: 5 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ if(WIN32)
target_sources(libninja PRIVATE
src/subprocess-win32.cc
src/includes_normalize-win32.cc
src/jobserver-win32.cc
src/msvc_helper-win32.cc
src/msvc_helper_main-win32.cc
src/getopt.c
Expand All @@ -172,7 +173,10 @@ if(WIN32)
# errors by telling windows.h to not define those two.
add_compile_definitions(NOMINMAX)
else()
target_sources(libninja PRIVATE src/subprocess-posix.cc)
target_sources(libninja PRIVATE
src/jobserver-posix.cc
src/subprocess-posix.cc
)
if(CMAKE_SYSTEM_NAME STREQUAL "OS400" OR CMAKE_SYSTEM_NAME STREQUAL "AIX")
target_sources(libninja PRIVATE src/getopt.c)
# Build getopt.c, which can be compiled as either C or C++, as C++
Expand Down
5 changes: 4 additions & 1 deletion configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,14 +560,17 @@ def has_re2c() -> bool:
if platform.is_windows():
for name in ['subprocess-win32',
'includes_normalize-win32',
'jobserver-win32',
'msvc_helper-win32',
'msvc_helper_main-win32']:
objs += cxx(name, variables=cxxvariables)
if platform.is_msvc():
objs += cxx('minidump-win32', variables=cxxvariables)
objs += cc('getopt')
else:
objs += cxx('subprocess-posix')
for name in ['jobserver-posix',
'subprocess-posix']:
objs += cxx(name, variables=cxxvariables)
if platform.is_aix():
objs += cc('getopt')
if platform.is_msvc():
Expand Down
190 changes: 190 additions & 0 deletions src/jobserver-posix.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// Copyright 2024 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <string.h>
#include <sys/stat.h>
#include <unistd.h>

#include <string>

#include "jobserver.h"
#include "util.h"

namespace {

// Return true if |fd| is a fifo or pipe descriptor.
bool IsFifoDescriptor(int fd) {
struct stat info;
int ret = ::fstat(fd, &info);
return (ret == 0) && ((info.st_mode & S_IFMT) == S_IFIFO);
}

bool SetNonBlockingFd(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
if (!(flags & O_NONBLOCK)) {
int ret = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
if (ret < 0)
return false;
}
return true;
}

bool SetCloseOnExecFd(int fd) {
int flags = fcntl(fd, F_GETFD, 0);
if (!(flags & FD_CLOEXEC)) {
int ret = fcntl(fd, F_SETFD, flags | FD_CLOEXEC);
if (ret < 0)
return false;
}
return true;
}

// Duplicate the descriptor and make the result non-blocking and
// close-on-exec.
bool DuplicateDescriptor(int from_fd, int* to_fd) {
int new_fd = dup(from_fd);
if (new_fd < 0) {
return false;
}
if (!SetNonBlockingFd(new_fd) || !SetCloseOnExecFd(new_fd)) {
::close(new_fd);
return false;
}
*to_fd = new_fd;
return true;
}

// Implementation of Jobserver::Client for Posix systems
class PosixJobserverClient : public Jobserver::Client {
public:
virtual ~PosixJobserverClient() {
if (write_fd_ >= 0)
::close(write_fd_);
if (read_fd_ >= 0)
::close(read_fd_);
}

Jobserver::Slot TryAcquire() override {
if (has_implicit_slot_) {
has_implicit_slot_ = false;
return Jobserver::Slot::CreateImplicit();
}
uint8_t slot_char = '\0';
int ret;
do {
ret = ::read(read_fd_, &slot_char, 1);
} while (ret < 0 && errno == EINTR);
if (ret == 1) {
return Jobserver::Slot::CreateExplicit(slot_char);
}
return Jobserver::Slot();
}

void Release(Jobserver::Slot slot) override {
if (!slot.IsValid())
return;

if (slot.IsImplicit()) {
assert(!has_implicit_slot_ && "Implicit slot cannot be released twice!");
has_implicit_slot_ = true;
return;
}

uint8_t slot_char = slot.GetExplicitValue();
int ret;
do {
ret = ::write(write_fd_, &slot_char, 1);
} while (ret < 0 && errno == EINTR);
(void)ret; // Nothing can be done in case of error here.
}

// Initialize instance with two explicit pipe file descriptors.
bool InitWithPipeFds(int read_fd, int write_fd, std::string* error) {
// Verify that the file descriptors belong to FIFOs.
if (!IsFifoDescriptor(read_fd) || !IsFifoDescriptor(write_fd)) {
*error = "Invalid file descriptors";
return false;
}
// Duplicate the file descriptors to make then non-blocking, and
// close-on-exec. This is important because the original descriptors
// might be inherited by sub-processes of this client.
if (!DuplicateDescriptor(read_fd, &read_fd_)) {
*error = "Could not duplicate read descriptor";
return false;
}
if (!DuplicateDescriptor(write_fd, &write_fd_)) {
*error = "Could not duplicate write descriptor";
// Let destructor close read_fd_.
return false;
}
return true;
}

// Initialize with FIFO file path.
bool InitWithFifo(const std::string& fifo_path, std::string* error) {
if (fifo_path.empty()) {
*error = "Empty fifo path";
return false;
}
read_fd_ = ::open(fifo_path.c_str(), O_RDONLY | O_NONBLOCK | O_CLOEXEC);
if (read_fd_ < 0) {
*error =
std::string("Error opening fifo for reading: ") + strerror(errno);
return false;
}
if (!IsFifoDescriptor(read_fd_)) {
*error = "Not a fifo path: " + fifo_path;
// Let destructor close read_fd_.
return false;
}
write_fd_ = ::open(fifo_path.c_str(), O_WRONLY | O_NONBLOCK | O_CLOEXEC);
if (write_fd_ < 0) {
*error =
std::string("Error opening fifo for writing: ") + strerror(errno);
// Let destructor close read_fd_
return false;
}
return true;
}

private:
// Set to true if the implicit slot has not been acquired yet.
bool has_implicit_slot_ = true;

// read and write descriptors.
int read_fd_ = -1;
int write_fd_ = -1;
};

} // namespace

// static
std::unique_ptr<Jobserver::Client> Jobserver::Client::Create(
const Jobserver::Config& config, std::string* error) {
bool success = false;
auto client = std::unique_ptr<PosixJobserverClient>(new PosixJobserverClient);
if (config.mode == Jobserver::Config::kModePipe) {
success = client->InitWithPipeFds(config.read_fd, config.write_fd, error);
} else if (config.mode == Jobserver::Config::kModePosixFifo) {
success = client->InitWithFifo(config.path, error);
} else {
*error = "Unsupported jobserver mode";
}
if (!success)
client.reset();
return client;
}
105 changes: 105 additions & 0 deletions src/jobserver-win32.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2024 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <assert.h>
#include <windows.h>

#include "jobserver.h"
#include "util.h"

namespace {

// Implementation of Jobserver::Client for Win32 systems.
// At the moment, only the semaphore scheme is supported,
// even when running under Cygwin which could support the
// pipe version, in theory.
class Win32JobserverClient : public Jobserver::Client {
public:
virtual ~Win32JobserverClient() {
// NOTE: OpenSemaphore() returns NULL on failure.
if (IsValid()) {
::CloseHandle(handle_);
}
}

Jobserver::Slot TryAcquire() override {
if (IsValid()) {
if (has_implicit_slot_) {
has_implicit_slot_ = false;
return Jobserver::Slot::CreateImplicit();
}

DWORD ret = ::WaitForSingleObject(handle_, 0);
if (ret == WAIT_OBJECT_0) {
// Hard-code value 1 for the explicit slot value.
return Jobserver::Slot::CreateExplicit(1);
}
}
return Jobserver::Slot();
}

void Release(Jobserver::Slot slot) override {
if (!slot.IsValid())
return;

if (slot.IsImplicit()) {
assert(!has_implicit_slot_ && "Implicit slot cannot be released twice!");
has_implicit_slot_ = true;
return;
}

// Nothing can be done in case of error here.
(void)::ReleaseSemaphore(handle_, 1, NULL);
}

bool InitWithSemaphore(const std::string& name, std::string* error) {
handle_ = ::OpenSemaphoreA(SYNCHRONIZE | SEMAPHORE_MODIFY_STATE, FALSE,
name.c_str());
if (handle_ == NULL) {
*error = "Error opening semaphore: " + GetLastErrorString();
return false;
}
return true;
}

protected:
bool IsValid() const {
// NOTE: OpenSemaphore() returns NULL on failure, not INVALID_HANDLE_VALUE.
return handle_ != NULL;
}

// Set to true if the implicit slot has not been acquired yet.
bool has_implicit_slot_ = true;

// Semaphore handle. NULL means not in use.
HANDLE handle_ = NULL;
};

} // namespace

// static
std::unique_ptr<Jobserver::Client> Jobserver::Client::Create(
const Jobserver::Config& config, std::string* error) {
bool success = false;
auto client =
std::unique_ptr<Win32JobserverClient>(new Win32JobserverClient());
if (config.mode == Jobserver::Config::kModeWin32Semaphore) {
success = client->InitWithSemaphore(config.path, error);
} else {
*error = "Unsupported jobserver mode";
}
if (!success)
client.reset();
return client;
}
51 changes: 51 additions & 0 deletions src/jobserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ struct Jobserver {
/// extracted from MAKEFLAGS.
int read_fd = -1;
int write_fd = -1;

/// Return true if this instance matches an active implementation mode.
/// This does not try to validate configuration parameters though.
bool HasMode() { return mode != kModeNone; }
};

/// Parse the value of a MAKEFLAGS environment variable. On success return
Expand All @@ -186,4 +190,51 @@ struct Jobserver {
/// while --jobserver-auth=NAME only work on Windows.
static bool ParseNativeMakeFlagsValue(const char* makeflags_env,
Config* config, std::string* error);

/// A Jobserver::Client instance models a client of an external GNU jobserver
/// pool, which can be implemented as a Unix FIFO, or a Windows named semaphore.
/// Usage is the following:
///
/// - Call Jobserver::Client::Create(), passing a Config value as argument,
/// (e.g. one initialized with ParseNativeMakeFlagsValue()) to create
/// a new instance.
///
/// - Call TryAcquire() to try to acquire a job slot from the pool.
/// If the result is not an invalid slot, store it until the
/// corresponding command completes, then call Release() to send it
/// back to the pool.
///
/// - It is important that all acquired slots are released to the pool,
/// even if Ninja terminates early (e.g. due to a build command failing).
///
class Client {
public:
/// Destructor.
virtual ~Client() {}

/// Try to acquire a slot from the pool. On failure, i.e. if no slot
/// can be acquired, this returns an invalid Token instance.
///
/// Note that this will always return the implicit slot value the first
/// time this is called, without reading anything from the pool, as
/// specified by the protocol. This implicit value *must* be released
/// just like any other one. In general, users of this class should not
/// care about this detail, except unit-tests.
virtual Slot TryAcquire() { return Slot(); }

/// Release a slot to the pool. Does nothing if slot is invalid,
/// or if writing to the pool fails (and if this is not the implicit slot).
/// If the pool is destroyed before Ninja, then only the implicit slot
/// can be acquired in the next calls (if it was released). This simply
/// enforces serialization of all commands, instead of blocking.
virtual void Release(Slot slot) {}

/// Create a new Client instance from a given configuration. On failure,
/// this returns null after setting |*error|. Note that it is an error to
/// call this function with |config.HasMode() == false|.
static std::unique_ptr<Client> Create(const Config&, std::string* error);

protected:
Client() = default;
};
};
Loading

0 comments on commit 65f9c60

Please sign in to comment.