diff --git a/.github/workflows/causal_lm_cpp.yml b/.github/workflows/causal_lm_cpp.yml index 5dff0a58d3..a3e3e56312 100644 --- a/.github/workflows/causal_lm_cpp.yml +++ b/.github/workflows/causal_lm_cpp.yml @@ -466,9 +466,13 @@ jobs: - name: run and compare run: | source ./ov/setupvars.sh + echo Running speculative_decoding_lm C++ sample... ./build/samples/cpp/text_generation/speculative_decoding_lm ./dolly-v2-7b/ ./dolly-v2-3b/ "Alan Turing was a" > predictions_speculative.txt + echo Running greedy_causal_lm C++ sample... ./build/samples/cpp/text_generation/greedy_causal_lm ./dolly-v2-7b/ "Alan Turing was a" > predictions_greedy.txt + echo Running speculative_decoding_lm Python sample... python ./samples/python/text_generation/speculative_decoding_lm.py ./dolly-v2-7b/ ./dolly-v2-3b/ "Alan Turing was a" > predictions_py.txt + echo All samples executed, checking result correctness... python -c " with open('predictions_greedy.txt', 'r') as f: predicted_greedy = f.readline() @@ -476,6 +480,8 @@ jobs: predicted_speculative = f.readline() with open('predictions_py.txt', 'r') as f: predicted_py = f.readline() + print(f'Predicted greedy: {predicted_greedy}') + print(f'Predicted speculative: {predicted_speculative}') assert predicted_greedy == predicted_speculative assert predicted_greedy == predicted_py assert predicted_speculative == predicted_py @@ -523,10 +529,13 @@ jobs: ``` Question: Can you please add 2 and 3 A:' > ./prompt.txt - + echo Running prompt_lookup_decoding_lm C++ sample... ./build/samples/cpp/text_generation/prompt_lookup_decoding_lm ./TinyLlama-1.1B-Chat-v1.0/ "$( predictions_prompt_lookup.txt + echo Running greedy_causal_lm C++ sample... ./build/samples/cpp/text_generation/greedy_causal_lm ./TinyLlama-1.1B-Chat-v1.0/ "$( predictions_greedy.txt + echo Running prompt_lookup_decoding_lm Python sample... python ./samples/python/text_generation/prompt_lookup_decoding_lm.py ./TinyLlama-1.1B-Chat-v1.0/ "$( predictions_py.txt + echo All samples executed, checking result correctness... python -c " with open('predictions_greedy.txt', 'r') as f: predicted_greedy = f.readline() @@ -534,6 +543,9 @@ jobs: predicted_prompt_lookup = f.readline() with open('predictions_py.txt', 'r') as f: predicted_prompt_lookup_py = f.readline() + + print(f'Predicted greedy: {predicted_greedy}') + print(f'Predicted prompt lookup: {predicted_prompt_lookup}') assert predicted_greedy == predicted_prompt_lookup assert predicted_greedy == predicted_prompt_lookup_py assert predicted_prompt_lookup == predicted_prompt_lookup_py diff --git a/src/cpp/src/continuous_batching_adapter.hpp b/src/cpp/src/continuous_batching_adapter.hpp index 00928b342d..c1ab881371 100644 --- a/src/cpp/src/continuous_batching_adapter.hpp +++ b/src/cpp/src/continuous_batching_adapter.hpp @@ -1,10 +1,10 @@ - // Copyright (C) 2023-2025 Intel Corporation // SPDX-License-Identifier: Apache-2.0 #include "llm_pipeline_base.hpp" #include "openvino/genai/continuous_batching_pipeline.hpp" +#include namespace ov::genai { @@ -17,29 +17,27 @@ template struct overloaded : Ts... {using Ts::operator()...;}; template overloaded(Ts...) -> overloaded; class ContinuousBatchingAdapter final : public LLMPipelineImplBase { - ContinuousBatchingPipeline m_impl; + std::unique_ptr m_impl; public: ContinuousBatchingAdapter( const ov::InferRequest& request, const Tokenizer& tokenizer, OptionalGenerationConfig generation_config - ): LLMPipelineImplBase{dont_construct(), GenerationConfig{}}, - m_impl{std::filesystem::path{}, SchedulerConfig{}, std::string{}} { } - + ): LLMPipelineImplBase{dont_construct(), GenerationConfig{}}, + m_impl{std::make_unique(std::filesystem::path{}, SchedulerConfig{}, std::string{})} { } + ContinuousBatchingAdapter( const std::filesystem::path& models_path, const Tokenizer& tokenizer, const SchedulerConfig& scheduler_config, const std::string& device, const ov::AnyMap& plugin_config - ): LLMPipelineImplBase{tokenizer, GenerationConfig()}, m_impl{ - models_path, - tokenizer, - scheduler_config, - device, - plugin_config} { - m_generation_config = m_impl.get_config(); - } + ): LLMPipelineImplBase{tokenizer, GenerationConfig()} { + auto mutable_plugin_config = plugin_config; + mutable_plugin_config["sampler_num_threads"] = 1; + m_impl = std::make_unique(models_path, tokenizer, scheduler_config, device, mutable_plugin_config); + m_generation_config = m_impl->get_config(); + } ContinuousBatchingAdapter( const std::string& model_str, @@ -49,27 +47,22 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { const std::string& device, const ov::AnyMap& plugin_config, const ov::genai::GenerationConfig& generation_config - ): LLMPipelineImplBase{tokenizer, GenerationConfig()}, m_impl{ - model_str, - weights_tensor, - tokenizer, - scheduler_config, - device, - plugin_config, - generation_config} {} + ): LLMPipelineImplBase{tokenizer, GenerationConfig()} { + auto mutable_plugin_config = plugin_config; + mutable_plugin_config["sampler_num_threads"] = 1; + m_impl = std::make_unique(model_str, weights_tensor, tokenizer, scheduler_config, device, mutable_plugin_config, generation_config); + } ContinuousBatchingAdapter( const std::filesystem::path& models_path, const SchedulerConfig& scheduler_config, const std::string& device, const ov::AnyMap& plugin_config - ): LLMPipelineImplBase{Tokenizer(models_path), GenerationConfig()}, m_impl{ - models_path, - m_tokenizer, - scheduler_config, - device, - plugin_config} { - m_generation_config = m_impl.get_config(); + ): LLMPipelineImplBase{Tokenizer(models_path), GenerationConfig()} { + auto mutable_plugin_config = plugin_config; + mutable_plugin_config["sampler_num_threads"] = 1; + m_impl = std::make_unique(models_path, m_tokenizer, scheduler_config, device, mutable_plugin_config); + m_generation_config = m_impl->get_config(); } DecodedResults generate( @@ -90,7 +83,7 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { }, inputs); const GenerationConfig& config = generation_config.has_value() ? *generation_config : m_generation_config; // -1 == config.eos_token_id and config.validate() are handled in m_impl. - std::vector generated = m_impl.generate(prompts, + std::vector generated = m_impl->generate(prompts, std::vector{prompts.size(), config}, streamer ); @@ -181,7 +174,7 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { const GenerationConfig& config = generation_config.has_value() ? *generation_config : m_generation_config; // -1 == config.eos_token_id and config.validate() are handled in m_impl. - std::vector generated = m_impl.generate(input_ids, + std::vector generated = m_impl->generate(input_ids, std::vector{input_ids.size(), config}, streamer ); @@ -210,11 +203,11 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { } void start_chat(const std::string& system_message) override { - m_impl.start_chat(); + m_impl->start_chat(); }; void finish_chat() override { - m_impl.finish_chat(); + m_impl->finish_chat(); }; }; diff --git a/src/cpp/src/continuous_batching_impl.cpp b/src/cpp/src/continuous_batching_impl.cpp index f95cd3b9c6..095d7dc4e2 100644 --- a/src/cpp/src/continuous_batching_impl.cpp +++ b/src/cpp/src/continuous_batching_impl.cpp @@ -142,17 +142,25 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::initialize_pipeline( const std::vector& kv_cache_config) { ov::Core core = utils::singleton_core(); ov::CompiledModel compiled_model; + ov::AnyMap mutable_properties = properties; + // Extract sampler_num_threads property if exists and remove it from properties + size_t sampler_num_threads = std::thread::hardware_concurrency(); + auto sampler_num_threads_it = mutable_properties.find("sampler_num_threads"); + if (sampler_num_threads_it != mutable_properties.end()) { + sampler_num_threads = sampler_num_threads_it->second.as(); + mutable_properties.erase(sampler_num_threads_it); + } // TODO: remove once plugin automatically set KV cache precisions - apply_kv_cache_precision(model, device, properties); + apply_kv_cache_precision(model, device, mutable_properties); // apply LoRA - if (auto filtered_properties = extract_adapters_from_properties(properties, &m_generation_config.adapters)) { + if (auto filtered_properties = extract_adapters_from_properties(mutable_properties, &m_generation_config.adapters)) { m_generation_config.adapters->set_tensor_name_prefix("base_model.model.model."); m_adapter_controller = AdapterController(model, *m_generation_config.adapters, device); // TODO: Make the prefix name configurable compiled_model = core.compile_model(model, device, *filtered_properties); } else { - compiled_model = core.compile_model(model, device, properties); + compiled_model = core.compile_model(model, device, mutable_properties); } ov::genai::utils::print_compiled_model_properties(compiled_model, "LLM with Paged Attention"); @@ -227,7 +235,7 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::initialize_pipeline( std::make_shared(infer_request, m_block_size, m_num_decoder_layers); } - m_sampler = std::make_shared(m_tokenizer); + m_sampler = std::make_shared(m_tokenizer, sampler_num_threads); m_sampler->set_seed(m_generation_config.rng_seed); // If eos_token_id was not provided, take value @@ -282,8 +290,8 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { _pull_awaiting_requests(); - Scheduler::Output scheduler_output; + { static ManualTimer scheduling_timer("scheduling"); scheduling_timer.start(); @@ -318,6 +326,7 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { return; } ov::Tensor logits; + { static ManualTimer timer("forward"); timer.start(); diff --git a/src/cpp/src/sampler.cpp b/src/cpp/src/sampler.cpp index 7a1e079746..fe3cc8239a 100644 --- a/src/cpp/src/sampler.cpp +++ b/src/cpp/src/sampler.cpp @@ -1,6 +1,7 @@ // Copyright (C) 2023-2025 Intel Corporation // SPDX-License-Identifier: Apache-2.0 +#include #include "sampler.hpp" namespace ov::genai { @@ -744,6 +745,144 @@ process_stop_strings(const std::set& stop_strings, Tokenizer& token return result; } +SequenceGroupSamplingInfo Sampler::sample_from_sequence_group(SequenceGroup::Ptr sequence_group, ov::Tensor sequence_group_logits, + LogitProcessor& logit_processor, const std::pair>& stop_strings, + bool is_validation_mode_enabled) { + SequenceGroupSamplingInfo sg_sampling_info; + // Assistant pipeline info is relevant for speculative and prompt lookup decoding + AssistingPipelineInfo& assisting_pipeline_info = sg_sampling_info.get_assisting_pipeline_info(); + const ov::genai::GenerationConfig& sampling_params = sequence_group->get_sampling_parameters(); + const size_t output_seq_len = sequence_group->get_output_seq_len(); + // get number of tokens to be validated + size_t num_tokens_to_process = sequence_group->get_num_tokens_to_validate(); + + if (num_tokens_to_process > output_seq_len - 1) { + auto delta = num_tokens_to_process - (output_seq_len - 1); + assisting_pipeline_info.updated_validation_len = std::max(assisting_pipeline_info.updated_validation_len, delta); + num_tokens_to_process -= delta; + } + + if (sampling_params.is_greedy_decoding() || sampling_params.is_multinomial()) { + std::vector running_sequences = sequence_group->get_running_sequences(); + size_t num_running_sequences = sequence_group->num_running_seqs(); + if (sampling_params.is_greedy_decoding()) { + OPENVINO_ASSERT(num_running_sequences == 1); + } + for (size_t running_sequence_id = 0; running_sequence_id < num_running_sequences; ++running_sequence_id) { + auto& running_sequence = running_sequences[running_sequence_id]; + bool is_validation_passed = true; + // make `num_tokens_to_process` iteration to validate a candidate generated by `draft_model` + 1 iteration to generate one more token by `main_model` + for (size_t i = 0; i <= num_tokens_to_process; ++i) { + sg_sampling_info.sampler_output.num_generated_tokens++; + // calculate token offset from the end of logit + size_t token_offset = num_tokens_to_process - i; + // max counter of needed to be sampled tokens + OPENVINO_ASSERT(running_sequence->get_generated_len() >= token_offset); + size_t generated_and_verified_len = running_sequence->get_generated_len() - token_offset; + OPENVINO_ASSERT(sequence_group->get_max_new_tokens() >= generated_and_verified_len); + size_t max_num_sampled_token = sequence_group->get_max_new_tokens() - generated_and_verified_len; + if (max_num_sampled_token == 0) { + stop_sample_tokens(running_sequence, token_offset, max_num_sampled_token, assisting_pipeline_info.max_removed_tokens_per_request); + break; + } + + // do sampling only for token validation/generation. + // continue in case of extending draft model sequences by main model generated tokens which + // should be taken to KV cache without validation + if (!is_validation_mode_enabled && token_offset > 0) { + continue; + } + + auto logit_vector = _get_logit_vector(sequence_group_logits, running_sequence_id, token_offset); + logit_processor.apply(logit_vector); + + Token sampled_token; + bool is_generate_n_tokens = false; + if (sampling_params.is_greedy_decoding()) { + sampled_token = { _greedy_sample(logit_vector, sampling_params.logprobs) }; + } else { + // is_multinomial() + is_generate_n_tokens = sequence_group->num_total_seqs() == 1; + const size_t num_tokens_per_sequence = is_generate_n_tokens ? sampling_params.num_return_sequences : 1; + is_generate_n_tokens &= (num_tokens_per_sequence > 1); + auto sampled_token_ids = _multinomial_sample(logit_vector, num_tokens_per_sequence); + OPENVINO_ASSERT(sampled_token_ids.size(), num_tokens_per_sequence); + // to create n sequence just in case of `sequence_group->num_total_seqs() == 1` and `sampling_params.num_return_sequences > 1` + if (is_generate_n_tokens) { + const auto forked_seq_ids = create_n_forked_sequences(sequence_group, logit_processor, sampled_token_ids); + sg_sampling_info.sampler_output.m_forked_sequences.insert({running_sequences[0]->get_id(), forked_seq_ids}); + } + sampled_token = sampled_token_ids.front(); + // make `_speculative_sampling` in case of previous token was not accepted in speculative decoding + if (!is_validation_passed) { + float p_prime = get_p_prime(running_sequence, sampled_token, token_offset + 1); + assisting_pipeline_info.max_removed_tokens_per_request = std::max(assisting_pipeline_info.max_removed_tokens_per_request, token_offset); + // update prob only in case candidate prob > sampled token prob + if (p_prime > 0.f) { + auto prob = std::exp(sampled_token.m_log_prob); + prob /= p_prime; + sampled_token.m_log_prob = std::log(prob); + } + } + } + // flag to add sampled token to generated sequence or extend logit processors only + bool is_extend_sequence = token_offset == 0 || is_generate_n_tokens || !is_validation_passed; + if (is_validation_mode_enabled && !is_extend_sequence) { + is_validation_passed = validate_candidate(running_sequences[running_sequence_id], token_offset, sampled_token, + is_extend_sequence, assisting_pipeline_info.max_removed_tokens_per_request, sampling_params.do_sample); + // doing resample in case of non accepted tokens in specualtive sampling + if (!is_validation_passed && sampling_params.do_sample) { + continue; + } + // update log prob just while validation process + if (!is_extend_sequence) { + OPENVINO_ASSERT(generated_and_verified_len < running_sequences[running_sequence_id]->get_generated_len()); + running_sequence->update_generated_log_prob(generated_and_verified_len, sampled_token.m_log_prob); + } + } + register_new_token(sampled_token, running_sequences[running_sequence_id], logit_processor, is_extend_sequence, is_validation_mode_enabled); + // to exit from sampling in case of failed token validation + if (!is_validation_passed) { + break; + } + } + assisting_pipeline_info.min_generated_len = std::min(assisting_pipeline_info.min_generated_len, running_sequence->get_generated_len()); + } + align_all_sequence_len(sequence_group, assisting_pipeline_info.min_generated_len, logit_processor); + for (const auto& dropped_seq_id : _try_finish_generation(sequence_group)) { + sg_sampling_info.sampler_output.m_dropped_sequences.push_back(dropped_seq_id); + } + } else if (sampling_params.is_beam_search()) { + uint64_t request_id = sequence_group->get_request_id(); + + // create beam search info if we are on the first generate + GroupBeamSearcher* beam_searcher; + { + std::lock_guard lock(m_beam_search_info_mutex); + if (m_beam_search_info.find(request_id) == m_beam_search_info.end()) { + m_beam_search_info.emplace(request_id, GroupBeamSearcher(sequence_group, m_tokenizer)); + } + beam_searcher = &m_beam_search_info.at(request_id); + } + + // current algorithm already adds new tokens to running sequences and + beam_searcher->select_next_tokens(sequence_group_logits, sg_sampling_info.sampler_output, stop_strings); + + // check max length stop criteria + std::vector running_sequences = sequence_group->get_running_sequences(); + if (!sequence_group->has_finished() && + running_sequences[0]->get_generated_len() == sequence_group->get_max_new_tokens()) { + // stop sequence by max_new_tokens + beam_searcher->finalize(sg_sampling_info.sampler_output); + } + } + // Notify handle after sampling is done. + // For non-streaming this is effective only when the generation is finished. + OPENVINO_ASSERT(num_tokens_to_process >= assisting_pipeline_info.max_removed_tokens_per_request); + sequence_group->notify_handle(); + return sg_sampling_info; +} + SamplerOutput Sampler::sample(const std::vector & sequence_groups, ov::Tensor logits, bool is_validation_mode_enabled) { @@ -753,13 +892,14 @@ SamplerOutput Sampler::sample(const std::vector & sequence_g size_t vocab_size = logits_shape[2]; SamplerOutput sampler_output; + std::unordered_map> sg_sampling_future_map; for (size_t sequence_group_id = 0, currently_processed_tokens = 0; sequence_group_id < sequence_groups.size(); ++sequence_group_id) { SequenceGroup::Ptr sequence_group = sequence_groups[sequence_group_id]; if (!sequence_group->is_scheduled()) continue; - size_t num_running_sequences = sequence_group->num_running_seqs(); - size_t output_seq_len = sequence_group->get_output_seq_len(); + const size_t num_running_sequences = sequence_group->num_running_seqs(); + const size_t output_seq_len = sequence_group->get_output_seq_len(); const ov::genai::GenerationConfig& sampling_params = sequence_group->get_sampling_parameters(); const auto request_id = sequence_group->get_request_id(); @@ -771,153 +911,62 @@ SamplerOutput Sampler::sample(const std::vector & sequence_g m_stop_strings.insert({request_id, processed_stop_string}); sequence_group->set_stream_window_size(processed_stop_string.first); } - auto& stop_strings = m_stop_strings.at(request_id); + const auto& stop_strings = m_stop_strings.at(request_id); auto& logit_processor = m_logit_processors.at(request_id); const void * sequence_group_logits_data = logits_data + vocab_size * currently_processed_tokens; ov::Tensor sequence_group_logits(ov::element::f32, ov::Shape{num_running_sequences, output_seq_len, vocab_size}, (void *)sequence_group_logits_data); - size_t max_removed_tokens_per_request = 0, min_generated_len = std::numeric_limits::max(), updated_validation_len = 0; if (sequence_group->requires_sampling()) { - // get number of token to be validated - auto num_tokens_to_process = sequence_group->get_num_tokens_to_validate(); - if (num_tokens_to_process > output_seq_len - 1) { - auto delta = num_tokens_to_process - (output_seq_len - 1); - updated_validation_len = std::max(updated_validation_len, delta); - num_tokens_to_process -= delta; - } - if (sampling_params.is_greedy_decoding() || sampling_params.is_multinomial()) { - std::vector running_sequences = sequence_group->get_running_sequences(); - if (sampling_params.is_greedy_decoding()) { - OPENVINO_ASSERT(num_running_sequences == 1); - } - for (size_t running_sequence_id = 0; running_sequence_id < num_running_sequences; ++running_sequence_id) { - auto& running_sequence = running_sequences[running_sequence_id]; - bool is_validation_passed = true; - // make `num_tokens_to_process` iteration to validate a candidate generated by `draft_model` + 1 iteration to generate one more token by `main_model` - for (size_t i = 0; i <= num_tokens_to_process; ++i) { - sampler_output.num_generated_tokens++; - // calculate token offset from the end of logit - size_t token_offset = num_tokens_to_process - i; - // max counter of needed to be sampled tokens - OPENVINO_ASSERT(running_sequence->get_generated_len() >= token_offset); - size_t generated_and_verified_len = running_sequence->get_generated_len() - token_offset; - OPENVINO_ASSERT(sequence_group->get_max_new_tokens() >= generated_and_verified_len); - size_t max_num_sampled_token = sequence_group->get_max_new_tokens() - generated_and_verified_len; - if (max_num_sampled_token == 0) { - stop_sample_tokens(running_sequence, token_offset, max_num_sampled_token, max_removed_tokens_per_request); - break; - } - - // do sampling only for token validation/generation. - // continue in case of extending draft model sequences by main model generated tokens which - // should be taken to KV cache without validation - if (!is_validation_mode_enabled && token_offset > 0) { - continue; - } - - auto logit_vector = _get_logit_vector(sequence_group_logits, running_sequence_id, token_offset); - logit_processor.apply(logit_vector); - - Token sampled_token; - bool is_generate_n_tokens = false; - if (sampling_params.is_greedy_decoding()) { - sampled_token = { _greedy_sample(logit_vector, sampling_params.logprobs) }; - } else { - // is_multinomial() - is_generate_n_tokens = sequence_group->num_total_seqs() == 1; - const size_t num_tokens_per_sequence = is_generate_n_tokens ? sampling_params.num_return_sequences : 1; - is_generate_n_tokens &= (num_tokens_per_sequence > 1); - auto sampled_token_ids = _multinomial_sample(logit_vector, num_tokens_per_sequence); - OPENVINO_ASSERT(sampled_token_ids.size(), num_tokens_per_sequence); - // to create n sequence just in case of `sequence_group->num_total_seqs() == 1` and `sampling_params.num_return_sequences > 1` - if (is_generate_n_tokens) { - const auto forked_seq_ids = create_n_forked_sequences(sequence_group, logit_processor, sampled_token_ids); - sampler_output.m_forked_sequences.insert({running_sequences[0]->get_id(), forked_seq_ids}); - } - sampled_token = sampled_token_ids.front(); - // make `_speculative_sampling` in case of previous token was not accepted in speculative decoding - if (!is_validation_passed) { - float p_prime = get_p_prime(running_sequence, sampled_token, token_offset + 1); - max_removed_tokens_per_request = std::max(max_removed_tokens_per_request, token_offset); - // update prob only in case candidate prob > sampled token prob - if (p_prime > 0.f) { - auto prob = std::exp(sampled_token.m_log_prob); - prob /= p_prime; - sampled_token.m_log_prob = std::log(prob); - } - } - } - // flag to add sampled token to generated sequence or extend logit processors only - bool is_extend_sequence = token_offset == 0 || is_generate_n_tokens || !is_validation_passed; - if (is_validation_mode_enabled && !is_extend_sequence) { - is_validation_passed = validate_candidate(running_sequences[running_sequence_id], token_offset, sampled_token, - is_extend_sequence, max_removed_tokens_per_request, sampling_params.do_sample); - // doing resample in case of non accepted tokens in specualtive sampling - if (!is_validation_passed && sampling_params.do_sample) { - continue; - } - // update log prob just while validation process - if (!is_extend_sequence) { - OPENVINO_ASSERT(generated_and_verified_len < running_sequences[running_sequence_id]->get_generated_len()); - running_sequence->update_generated_log_prob(generated_and_verified_len, sampled_token.m_log_prob); - } - } - register_new_token(sampled_token, running_sequences[running_sequence_id], logit_processor, is_extend_sequence, is_validation_mode_enabled); - // to exit from sampling in case of failed token validation - if (!is_validation_passed) { - break; - } - } - min_generated_len = std::min(min_generated_len, running_sequence->get_generated_len()); - } - align_all_sequence_len(sequence_group, min_generated_len, logit_processor); - for (const auto& dropped_seq_id : _try_finish_generation(sequence_group)) { - sampler_output.m_dropped_sequences.push_back(dropped_seq_id); - } - } else if (sampling_params.is_beam_search()) { - uint64_t request_id = sequence_group->get_request_id(); - - // create beam search info if we are on the first generate - if (m_beam_search_info.find(request_id) == m_beam_search_info.end()) { - m_beam_search_info.emplace(request_id, GroupBeamSearcher(sequence_group, m_tokenizer)); - } - - // current algorithm already adds new tokens to running sequences and - m_beam_search_info.at(request_id).select_next_tokens(sequence_group_logits, sampler_output, stop_strings); - - // check max length stop criteria - std::vector running_sequences = sequence_group->get_running_sequences(); - if (!sequence_group->has_finished() && - running_sequences[0]->get_generated_len() == sequence_group->get_max_new_tokens()) { - // stop sequence by max_new_tokens - m_beam_search_info.at(request_id).finalize(sampler_output); - } - } - // Notify handle after sampling is done. - // For non-streaming this is effective only when the generation is finished. - OPENVINO_ASSERT(num_tokens_to_process >= max_removed_tokens_per_request); - sequence_group->notify_handle(); + // Call sample_from_sequence_group asynchronously + sg_sampling_future_map[request_id] = m_thread_pool.submit(&Sampler::sample_from_sequence_group, this, sequence_group, sequence_group_logits, + logit_processor, stop_strings, is_validation_mode_enabled); } else { // we are in prompt processing phase when prompt is split into chunks and processed step by step } + // accumulate a number of processed tokens + currently_processed_tokens += output_seq_len * num_running_sequences; + } + // Update sequence groups internal states after sampling is done + for (auto& sequence_group : sequence_groups) { + if (!sequence_group->is_scheduled()) + continue; + SequenceGroupSamplingInfo sg_sampling_info; + const auto request_id = sequence_group->get_request_id(); + if (sg_sampling_future_map.find(request_id) != sg_sampling_future_map.end()) { + // If there is a future assigned to a sequence group we read it's result (blocking if results not available yet) + sg_sampling_info = sg_sampling_future_map[request_id].get(); + sampler_output.num_generated_tokens += sg_sampling_info.sampler_output.num_generated_tokens; + + // Merge sampler output from sequence group to the main one + sampler_output.m_dropped_sequences.insert( + sampler_output.m_dropped_sequences.end(), + sg_sampling_info.sampler_output.m_dropped_sequences.begin(), + sg_sampling_info.sampler_output.m_dropped_sequences.end() + ); + + for (const auto& forked_seq : sg_sampling_info.sampler_output.m_forked_sequences) { + sampler_output.m_forked_sequences[forked_seq.first].insert( + sampler_output.m_forked_sequences[forked_seq.first].end(), + forked_seq.second.begin(), + forked_seq.second.end() + ); + } + } // NOTE: it should be before 'get_num_scheduled_tokens' is used // update internal state of sequence group to reset scheduler tokens and update currently processed ones - auto min_validated_tokens = sequence_group->get_num_tokens_to_validate() - max_removed_tokens_per_request; + const AssistingPipelineInfo& assisting_pipeline_info = std::as_const(sg_sampling_info.get_assisting_pipeline_info()); sequence_group->finish_iteration(); // decrease sequence_group context in case of candidates generated by draft_model were not accepted by main_model - if (max_removed_tokens_per_request) { - auto min_processed_tokens = sequence_group->get_prompt_len() + min_generated_len - 1; + if (assisting_pipeline_info.max_removed_tokens_per_request) { + auto min_processed_tokens = sequence_group->get_prompt_len() + assisting_pipeline_info.min_generated_len - 1; sequence_group->update_processed_tokens_num(min_processed_tokens); + auto& logit_processor = get_logit_processor(sequence_group->get_request_id()); logit_processor.update_generated_len(min_processed_tokens); } - if (updated_validation_len) { - sequence_group->set_num_validated_tokens(updated_validation_len); + if (assisting_pipeline_info.updated_validation_len) { + sequence_group->set_num_validated_tokens(assisting_pipeline_info.updated_validation_len); } - - // accumulate a number of processed tokens - currently_processed_tokens += output_seq_len * num_running_sequences; } - return sampler_output; } diff --git a/src/cpp/src/sampler.hpp b/src/cpp/src/sampler.hpp index 9768e0a7af..c53676d23c 100644 --- a/src/cpp/src/sampler.hpp +++ b/src/cpp/src/sampler.hpp @@ -19,6 +19,7 @@ #include "logit_processor.hpp" #include "scheduler.hpp" #include "sequence_group.hpp" +#include "threadpool.hpp" namespace ov::genai { // Handle stop_token_ids @@ -42,6 +43,21 @@ struct SamplerOutput { size_t num_generated_tokens = 0; }; +struct AssistingPipelineInfo { + size_t max_removed_tokens_per_request = 0; + size_t min_generated_len = std::numeric_limits::max(); + size_t updated_validation_len = 0; +}; + +struct SequenceGroupSamplingInfo { + SamplerOutput sampler_output; + AssistingPipelineInfo assisting_pipeline_info; + + AssistingPipelineInfo& get_assisting_pipeline_info() { + return assisting_pipeline_info; + } +}; + class Sampler { class GroupBeamSearcher; @@ -53,8 +69,13 @@ class Sampler { bool validate_candidate(Sequence::Ptr running_sequence, size_t& token_idx, Token& sampled_token, bool& is_extend_sequence, size_t& max_removed_tokens, bool do_sample); + SequenceGroupSamplingInfo sample_from_sequence_group(SequenceGroup::Ptr sequence_group, ov::Tensor sequence_group_logits, + LogitProcessor& logit_processor, const std::pair>& stop_strings, + bool is_validation_mode_enabled); + // request ID => beam search tracking information std::map m_beam_search_info; + std::mutex m_beam_search_info_mutex; std::mt19937 rng_engine; size_t seed = rng_engine.default_seed; @@ -65,9 +86,13 @@ class Sampler { Tokenizer m_tokenizer; + ThreadPool m_thread_pool; + public: - Sampler() = default; - Sampler(Tokenizer & tokenizer) : m_tokenizer(tokenizer) {}; + Sampler(const Sampler& rhs) = delete; + Sampler(Sampler&& rhs) = delete; + Sampler(size_t num_threads = 1): m_thread_pool(num_threads) {}; + explicit Sampler(const Tokenizer & tokenizer, size_t num_threads = 1) : m_tokenizer(tokenizer), m_thread_pool(num_threads) {}; SamplerOutput sample(const std::vector & sequence_groups, ov::Tensor logits, bool is_validation_mode_enabled = false); void set_seed(size_t new_seed) { @@ -76,6 +101,10 @@ class Sampler { } size_t get_seed() { return seed; } + void set_tokenizer(const Tokenizer& tokenizer) { + m_tokenizer = tokenizer; + } + void clear_request_info(uint64_t request_id); LogitProcessor& get_logit_processor(uint64_t request_id); diff --git a/src/cpp/src/threadpool.hpp b/src/cpp/src/threadpool.hpp new file mode 100644 index 0000000000..a5576c2d6c --- /dev/null +++ b/src/cpp/src/threadpool.hpp @@ -0,0 +1,70 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +class ThreadPool { + +private: + std::vector threads; + std::queue> tasks; + std::mutex queue_mutex; + std::condition_variable cv; + std::atomic stop{false}; + +public: + ThreadPool(const ThreadPool& rhs) = delete; + ThreadPool(ThreadPool&& rhs) = delete; + ThreadPool(size_t num_threads = std::thread::hardware_concurrency()) + { + for (size_t i = 0; i < num_threads; ++i) { + threads.emplace_back([this] { + while (true) { + std::function task; + { + std::unique_lock lock(queue_mutex); + cv.wait(lock, [this] { + return !tasks.empty() || stop; + }); + if (stop && tasks.empty()) { + return; + } + task = move(tasks.front()); + tasks.pop(); + } + task(); + } + }); + } + } + + ~ThreadPool() + { + stop = true; + cv.notify_all(); + for (auto& thread : threads) { + thread.join(); + } + } + + template + auto submit(F&& f, Args&&... args) -> std::future> + { + using return_type = std::invoke_result_t; + auto task = std::make_shared>( + std::bind(std::forward(f), std::forward(args)...) + ); + std::future result = task->get_future(); + { + std::unique_lock lock(queue_mutex); + tasks.emplace([task]() { (*task)(); }); + } + cv.notify_one(); + return result; + } +}; diff --git a/src/cpp/src/visual_language/pipeline.cpp b/src/cpp/src/visual_language/pipeline.cpp index a3f9859384..aefb765874 100644 --- a/src/cpp/src/visual_language/pipeline.cpp +++ b/src/cpp/src/visual_language/pipeline.cpp @@ -108,7 +108,7 @@ class ov::genai::VLMPipeline::VLMPipelineImpl { m_generation_config.set_eos_token_id(m_tokenizer.get_eos_token_id()); } - m_sampler = Sampler(m_tokenizer); + m_sampler.set_tokenizer(m_tokenizer); m_sampler.set_seed(m_generation_config.rng_seed); } @@ -146,7 +146,7 @@ class ov::genai::VLMPipeline::VLMPipelineImpl { m_generation_config.set_eos_token_id(m_tokenizer.get_eos_token_id()); } - m_sampler = Sampler(m_tokenizer); + m_sampler.set_tokenizer(m_tokenizer); m_sampler.set_seed(m_generation_config.rng_seed); }