Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to fix macOS hang when building Python 2 #1343

Merged
merged 7 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions include/vcpkg/base/files.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ namespace vcpkg
long long tell() const noexcept;
int eof() const noexcept;
std::error_code error() const noexcept;
int error_raw() const noexcept;

const Path& path() const;
ExpectedL<Unit> try_seek_to(long long offset);
Expand All @@ -97,6 +98,9 @@ namespace vcpkg
ExpectedL<char> try_getc();
ExpectedL<Unit> try_read_all_from(long long offset, void* buffer, std::uint32_t size);
std::string read_to_end(std::error_code& ec);
// reads any remaining chunks of the file; used to implement read_to_end
void read_to_end_suffix(
std::string& output, std::error_code& ec, char* buffer, size_t buffer_size, size_t last_read);
};

struct WriteFilePointer : FilePointer
Expand Down Expand Up @@ -135,6 +139,10 @@ namespace vcpkg

ExpectedL<FileContents> try_read_contents(const Path& file_path) const;

// Tries to read `file_path`, and if the file starts with a shebang sequence #!, returns the contents of the
// file. If an I/O error occurs or the file does not start with a shebang sequence, returns an empty string.
virtual std::string best_effort_read_contents_if_shebang(const Path& file_path) const = 0;

virtual Path find_file_recursively_up(const Path& starting_dir,
const Path& filename,
std::error_code& ec) const = 0;
Expand Down
175 changes: 130 additions & 45 deletions include/vcpkg/base/parallel-algorithms.h
Original file line number Diff line number Diff line change
@@ -1,85 +1,170 @@
#pragma once

#if defined(_WIN32)
#include <vcpkg/base/system-headers.h>
#else // ^^^ _WIN32 / !_WIN32 vvv
#include <system_error>
#include <thread>
#include <type_traits>
#include <vector>
#endif // ^^^ _WIN32
#include <vcpkg/base/system.h>

#include <limits.h>

#include <algorithm>
#include <atomic>
#include <future>
#include <vector>

namespace vcpkg
{
template<class F>
inline void execute_in_parallel(size_t work_count, F&& work) noexcept
struct WorkCallbackContext
{
const size_t thread_count = static_cast<size_t>(get_concurrency());
const size_t num_threads = std::max(static_cast<size_t>(1), std::min(thread_count, work_count));
F work;
size_t work_count;
std::atomic<size_t> next_offset;

std::vector<std::future<void>> workers;
workers.reserve(num_threads - 1);
WorkCallbackContext(F init_f, size_t work_count) : work(init_f), work_count(work_count), next_offset(0) { }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
WorkCallbackContext(F init_f, size_t work_count) : work(init_f), work_count(work_count), next_offset(0) { }
WorkCallbackContext(F init_f, size_t work_count) : work(std::move(init_f)), work_count(work_count), next_offset(0) { }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a functor/callable, which are usually pointers, and for which std::move is a pessimization. (This is also why they are passed by value)

Copy link
Contributor

@ras0219-msft ras0219-msft Feb 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand why taking it by rvalue reference would be a pessimization, but how would std::move reduce performance? For simple pointers, isn't it the same as a copy?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::move forces that the value be bound to an rvalue-reference. This is why it, for example, breaks NRVO in return std::move(blah); cases. It is likely that it wouldn't matter in this case; I'm just used to 'pass the functors by value' resulting from the mess that caused us to add _Pass_fn: passing functors by reference broke the ability to inline through function pointers. Maybe my paranoia here is out of date by now but given that we control all the functors here and aren't going to ever pass ones that are expensive to copy I'm inclined to keep following the convention.

https://github.com/microsoft/STL/blob/bd3d740ae5de7255c720b8133c5d23aa131e0760/stl/inc/xutility#L558-L584


for (size_t i = 0; i < num_threads - 1; ++i)
// pre: run() is called at most SIZE_MAX - work_count times
void run()
{
workers.emplace_back(std::async(std::launch::async | std::launch::deferred, [&work]() { work(); }));
for (;;)
{
auto offset = next_offset.fetch_add(1, std::memory_order_relaxed);
if (offset >= work_count)
{
return;
}

work(offset);
}
}
work();
};

for (auto&& w : workers)
#if defined(_WIN32)
struct PtpWork
{
PtpWork(_In_ PTP_WORK_CALLBACK pfnwk, _Inout_opt_ PVOID pv, _In_opt_ PTP_CALLBACK_ENVIRON pcbe)
: ptp_work(CreateThreadpoolWork(pfnwk, pv, pcbe))
{
w.get();
}
}
PtpWork(const PtpWork&) = delete;
PtpWork& operator=(const PtpWork&) = delete;
~PtpWork()
{
if (ptp_work)
{
::WaitForThreadpoolWorkCallbacks(ptp_work, TRUE);
::CloseThreadpoolWork(ptp_work);
}
}

template<class Container, class F>
void parallel_for_each(Container&& c, F cb) noexcept
explicit operator bool() { return ptp_work != nullptr; }

void submit() { ::SubmitThreadpoolWork(ptp_work); }

private:
PTP_WORK ptp_work;
};

template<class F>
inline void execute_in_parallel(size_t work_count, F work) noexcept
{
if (c.size() == 0)
if (work_count == 0)
{
return;
}
if (c.size() == 1)

if (work_count == 1)
{
cb(c[0]);
return;
work(size_t{});
}

std::atomic_size_t next{0};

execute_in_parallel(c.size(), [&]() {
size_t i = 0;
while (i < c.size())
WorkCallbackContext<F> context{work, work_count};
PtpWork ptp_work([](PTP_CALLBACK_INSTANCE,
void* context,
PTP_WORK) noexcept { static_cast<WorkCallbackContext<F>*>(context)->run(); },
&context,
nullptr);
if (ptp_work)
{
auto max_threads = (std::min)(work_count, static_cast<size_t>(get_concurrency()));
max_threads = (std::min)(max_threads, (SIZE_MAX - work_count) + 1u); // to avoid overflow in fetch_add
// start at 1 to account for the running thread
for (size_t i = 1; i < max_threads; ++i)
{
if (next.compare_exchange_weak(i, i + 1, std::memory_order_relaxed))
{
cb(c[i]);
}
ptp_work.submit();
}
});
}

context.run();
}
#else // ^^^ _WIN32 / !_WIN32 vvv
struct JThread
{
template<class Arg0, std::enable_if_t<!std::is_same<JThread, std::decay_t<Arg0>>::value, int> = 0>
JThread(Arg0&& arg0) : m_thread(std::forward<Arg0>(arg0))
{
}

template<class Container, class RanItTarget, class F>
void parallel_transform(const Container& c, RanItTarget out_begin, F&& cb) noexcept
~JThread() { m_thread.join(); }

JThread(const JThread&) = delete;
JThread& operator=(const JThread&) = delete;
JThread(JThread&&) = default;
JThread& operator=(JThread&&) = default;

private:
std::thread m_thread;
};

template<class F>
inline void execute_in_parallel(size_t work_count, F work) noexcept
{
if (c.size() == 0)
if (work_count == 0)
{
return;
}
if (c.size() == 1)

if (work_count == 1)
{
*out_begin = cb(c[0]);
work(size_t{});
return;
}

std::atomic_size_t next{0};

execute_in_parallel(c.size(), [&]() {
size_t i = 0;
while (i < c.size())
WorkCallbackContext<F> context{work, work_count};
auto max_threads = std::min(work_count, static_cast<size_t>(get_concurrency()));
max_threads = std::min(max_threads, (SIZE_MAX - work_count) + 1u); // to avoid overflow in fetch_add
auto bg_thread_count = max_threads - 1;
std::vector<JThread> bg_threads;
bg_threads.reserve(bg_thread_count);
for (size_t i = 0; i < bg_thread_count; ++i)
{
try
{
if (next.compare_exchange_weak(i, i + 1, std::memory_order_relaxed))
{
*(out_begin + i) = cb(c[i]);
}
bg_threads.emplace_back([&]() { context.run(); });
}
});
catch (const std::system_error&)
{
// ok, just give up trying to create threads
break;
}
}

context.run();
// destroying workers joins
}
#endif // ^^^ !_WIN32

template<class Container, class F>
void parallel_for_each(Container&& c, F cb) noexcept
{
execute_in_parallel(c.size(), [&](size_t offset) { cb(c[offset]); });
}

template<class Container, class RanItTarget, class F>
void parallel_transform(const Container& c, RanItTarget out_begin, F cb) noexcept
{
execute_in_parallel(c.size(), [&](size_t offset) { out_begin[offset] = cb(c[offset]); });
}
}
Loading
Loading