Skip to content

Commit

Permalink
Run all example binaries in CI and fix
Browse files Browse the repository at this point in the history
Summary:
Running all example binaries in CI (not only the two hard coded) and
fix broken example due to recent API change in Tasks.

Reviewed By: mbasmanova

Differential Revision: D38095807

fbshipit-source-id: ebeedee521c930e101cd33d4ab4631eb491f3268
  • Loading branch information
pedroerp authored and facebook-github-bot committed Jul 25, 2022
1 parent 49ec747 commit 0baf13f
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 34 deletions.
5 changes: 2 additions & 3 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,9 @@ jobs:
make fuzzertest
no_output_timeout: 5m
- run:
name: "Run the velox examples"
name: "Run Example Binaries"
command: |
_build/debug/velox/examples/velox_example_expression_eval
_build/debug/velox/examples/velox_example_opaque_type
find _build/debug/velox/examples/ -maxdepth 1 -type f -executable -exec "{}" \;
linux-build-release:
executor: build
Expand Down
46 changes: 16 additions & 30 deletions velox/examples/ScanAndSort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,10 @@ int main(int argc, char** argv) {
/*destination=*/0,
core::QueryCtx::createForTest());

// Then start the task. `maxDrivers` controls the maximum number of threads
// that will be used to run operators in this pipeline.
exec::Task::start(writeTask, /*maxDrivers=*/1);

// Ensures that the task is finished before moving on. 0 timeout means it will
// block forever until the task finishes.
auto& inlineExecutor = folly::QueuedImmediateExecutor::instance();
writeTask->stateChangeFuture(0).via(&inlineExecutor).wait();
// next() starts execution using the client thread. The loop pumps output
// vectors out of the task (there are none in this query fragment).
while (auto result = writeTask->next())
;

// At this point, the first part of the example is done; there is now a
// file encoded using dwrf at `filePath`. The next part of the example
Expand All @@ -155,27 +151,12 @@ int main(int argc, char** argv) {
.orderBy({"my_col"}, /*isPartial=*/false)
.planFragment();

// For the reader task, we also specify a `Consumer` callback, which is called
// whenever a new processed batch is available. It returns a `BlockingReason`,
// specifying whether the upstream task should be blocked (and the reason), or
// continue generating output vectors.
// Create the reader task.
auto readTask = std::make_shared<exec::Task>(
"my_read_task",
readPlanFragment,
/*destination=*/0,
core::QueryCtx::createForTest(),
[](RowVectorPtr vector, facebook::velox::ContinueFuture*) {
if (vector) {
LOG(INFO) << "Vector available after processing (scan + sort):";
for (size_t i = 0; i < vector->size(); ++i) {
LOG(INFO) << vector->toString(i);
}
}
return exec::BlockingReason::kNotBlocked;
});

// Start the task.
exec::Task::start(readTask, /*maxDrivers=*/1);
core::QueryCtx::createForTest());

// Now that we have the query fragment and Task structure set up, we will
// add data to it via `splits`.
Expand All @@ -193,12 +174,17 @@ int main(int argc, char** argv) {
// TableScan.
readTask->addSplit(scanNodeId, exec::Split{connectorSplit});

// Signal that no more splits will be added. After this point, the consumer
// callbacks will be called once the data is processed.
// Signal that no more splits will be added. After this point, calling next()
// on the task will start the plan execution using the current thread.
readTask->noMoreSplits(scanNodeId);

// Here we need to make sure main() doesn't return (and hence destructs the
// local Task object) before the task is done executing.
readTask->stateChangeFuture(0).via(&inlineExecutor).wait();
// Read output vectors and print them.
while (auto result = readTask->next()) {
LOG(INFO) << "Vector available after processing (scan + sort):";
for (size_t i = 0; i < result->size(); ++i) {
LOG(INFO) << result->toString(i);
}
}

return 0;
}
5 changes: 4 additions & 1 deletion velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,10 @@ void Driver::run(std::shared_ptr<Driver> self) {

// When Driver runs on an executor, the last operator (sink) must not produce
// any results.
VELOX_CHECK_NULL(nullResult);
VELOX_CHECK_NULL(
nullResult,
"The last operator (sink) must not produce any results. "
"Results need to be consumed by either a callback or another operator. ")

switch (reason) {
case StopReason::kBlock:
Expand Down

0 comments on commit 0baf13f

Please sign in to comment.