Skip to content

Commit

Permalink
[Continuous batching] In the event of OOM, return tokens generated so…
Browse files Browse the repository at this point in the history
… far for the request (openvinotoolkit#661)
  • Loading branch information
mzegla authored Jul 24, 2024
1 parent cc5e235 commit 42dd049
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 46 deletions.
71 changes: 29 additions & 42 deletions src/cpp/src/sequence_group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,59 +425,46 @@ class SequenceGroup {
return m_generation_stream->get_status() == GenerationStatus::DROPPED_BY_HANDLE;
}

void notify_handle() {
void push_outputs() {
GenerationOutputs outputs;
for (auto& sequence: m_sequences) {
GenerationOutput output;
output.generated_token_ids = sequence->get_generated_ids();
output.score = sequence->get_beam_search_score(m_sampling_params);
outputs.emplace(sequence->get_grouped_id(), output);
}
m_generation_stream->push(outputs);
}

void push_partial_outputs() {
GenerationOutputs outputs;
// TODO: support streamimg for n seqs
for (auto& sequence : m_sequences) {
// todo: check seq.is_finished() to generate without several </s>
// or is it ok to use padding?
const auto last_gen_token = sequence->get_last_generation_output();
outputs.emplace(sequence->get_grouped_id(), last_gen_token);
}
m_generation_stream->push(outputs);
}

void notify_handle() {
if (out_of_memory()) {
set_generation_status(GenerationStatus::IGNORED);
} else if (has_finished()) {
set_generation_status(GenerationStatus::FINISHED);
}

GenerationOutputs outputs;

// For beam search streaming is not available, so we notify only upon finishing
if(m_sampling_params.is_beam_search()) {
if (has_finished()) {
std::vector<Sequence::CPtr> finished_sequences = get_finished_sequences();

OPENVINO_ASSERT(finished_sequences.size() == num_total_seqs() && has_finished());
for (auto& sequence: finished_sequences) {
GenerationOutput output;
output.generated_token_ids = sequence->get_generated_ids();
output.score = sequence->get_beam_search_score(m_sampling_params);
outputs.emplace(sequence->get_grouped_id(), output);
}

if (outputs.size()) {
m_generation_stream->push(outputs);
}
if (has_finished() || out_of_memory()) {
push_outputs();
}
// For greedy or multinomial sampling we decide whever to stream partial results depending on the user parameter
} else if (m_sampling_params.is_greedy_decoding() || m_sampling_params.is_multinomial()) {
// TO DO: Now we always stream for greedy search for the sake of benchmarking
if (num_total_seqs() == 1 /* m_sampling_params.stream */) {
// TODO: support streamimg for n seqs
for (auto& sequence : m_sequences) {
// todo: check seq.is_finished() to generate without several </s>
// or is it ok to use padding?
const auto last_gen_token = sequence->get_last_generation_output();
outputs.emplace(sequence->get_grouped_id(), last_gen_token);
}
m_generation_stream->push(outputs);
} else if (has_finished()) {
std::vector<Sequence::CPtr> finished_sequences = get_finished_sequences();

OPENVINO_ASSERT(finished_sequences.size() == num_total_seqs() && has_finished());
for (auto& sequence: finished_sequences) {
GenerationOutput output;
output.generated_token_ids = sequence->get_generated_ids();
output.score = sequence->get_cumulative_log_probs();
outputs.emplace(sequence->get_grouped_id(), output);
}

if (outputs.size()) {
m_generation_stream->push(outputs);
}
if (num_total_seqs() == 1) {
push_partial_outputs();
} else if (has_finished() || out_of_memory()) {
push_outputs();
}
}
}
Expand Down
11 changes: 7 additions & 4 deletions tests/python_tests/test_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,9 @@ def test_individual_generation_configs_random(tmp_path, test_struct: RandomSampl


@pytest.mark.precommit
def test_post_oom_health(tmp_path):
generation_config = get_greedy()
@pytest.mark.parametrize("sampling_config", [get_greedy(), get_beam_search(), get_multinomial_all_parameters()])
def test_post_oom_health(tmp_path, sampling_config):
generation_config = sampling_config
generation_config.ignore_eos = True
generation_config.max_new_tokens = 1000000

Expand All @@ -309,9 +310,11 @@ def test_post_oom_health(tmp_path):
pipe = ContinuousBatchingPipeline(model_path.absolute().as_posix(), Tokenizer(model_path.absolute().as_posix(), {}), scheduler_config, "CPU", {})
# First run should return incomplete response
output = pipe.generate(["What is OpenVINO?"], generation_configs)
assert(len(output))
assert (len(output))
assert(len(output[0].m_generation_ids))
# Same for the second run, here we want to make sure the cleanup works and we have free blocks after recent OOM
output = pipe.generate(["What is OpenVINO?"], generation_configs)
assert(len(output))
assert (len(output))
assert(len(output[0].m_generation_ids))
del pipe
shutil.rmtree(model_path)

0 comments on commit 42dd049

Please sign in to comment.