Skip to content

Commit

Permalink
Merge branch 'master' into parallel_sampling_threadpool
Browse files Browse the repository at this point in the history
  • Loading branch information
mzegla authored Jan 16, 2025
2 parents 6b0e034 + eed81fe commit 4eb8696
Show file tree
Hide file tree
Showing 14 changed files with 233 additions and 123 deletions.
31 changes: 19 additions & 12 deletions samples/python/text_generation/multinomial_causal_lm.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def __init__(self, tokenizer):
self.tokens_cache = []
self.text_queue = queue.Queue()
self.print_len = 0
self.decoded_lengths = []

def __iter__(self):
"""
Expand Down Expand Up @@ -80,30 +81,35 @@ def put(self, token_id: int) -> bool:
Returns:
bool: True if generation should be stopped, False otherwise.
"""
"""
self.tokens_cache.append(token_id)
text = self.tokenizer.decode(self.tokens_cache)
self.decoded_lengths.append(len(text))

word = ''
delay_n_tokens = 3
if len(text) > self.print_len and '\n' == text[-1]:
# Flush the cache after the new line symbol.
word = text[self.print_len:]
word = text[self.print_len:]
self.tokens_cache = []
self.decoded_lengths = []
self.print_len = 0
elif len(text) >= 3 and text[-1] == chr(65533):
elif len(text) > 0 and text[-1] == chr(65533):
# Don't print incomplete text.
pass
elif len(text) > self.print_len:
# It is possible to have a shorter text after adding new token.
# Print to output only if text length is increaesed.
word = text[self.print_len:]
self.print_len = len(text)
self.put_word(word)

self.decoded_lengths[-1] = -1
elif len(self.tokens_cache) >= delay_n_tokens:
print_until = self.decoded_lengths[-delay_n_tokens]
if print_until != -1 and print_until > self.print_len:
# It is possible to have a shorter text after adding new token.
# Print to output only if text length is increased and text is complete (print_until != -1).
word = text[self.print_len:print_until]
self.print_len = print_until
self.put_word(word)

if self.get_stop_flag():
# When generation is stopped from streamer then end is not called, need to call it here manually.
self.end()
return True # True means stop generation
return True # True means stop generation
else:
return False # False means continue generation

Expand All @@ -129,6 +135,7 @@ def __init__(self, tokenizer, tokens_len):
def put(self, token_id: int) -> bool:
if (len(self.tokens_cache) + 1) % self.tokens_len != 0:
self.tokens_cache.append(token_id)
self.decoded_lengths.append(-1)
return False
return super().put(token_id)

Expand Down
10 changes: 5 additions & 5 deletions src/cpp/src/cache_state_dumper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ class CacheStateDumper {
* @param block_mgr A block manager owning the caches.
* @param sequence_groups Sequence groups currently utilizing the cache.
*/
void dump_cache_state(const BlockManager &block_mgr, const std::vector <SequenceGroup::Ptr> &sequence_groups,
void dump_cache_state(const std::shared_ptr<BlockManager> block_mgr, const std::vector <SequenceGroup::Ptr> &sequence_groups,
size_t dump_count) {
for (size_t layer_idx = 0; layer_idx < block_mgr.m_num_layers; layer_idx++) {
for (size_t layer_idx = 0; layer_idx < block_mgr->m_num_layers; layer_idx++) {
auto per_layer_folder = get_per_layer_folder(layer_idx);
auto file_path = (per_layer_folder / (m_run_id + ".txt")).string();
std::ofstream out_stream(file_path, std::ios::out);
OPENVINO_ASSERT(out_stream.is_open());

out_stream << block_mgr.m_allocator.m_total_num_blocks << std::endl;
out_stream << block_mgr->m_allocator.m_total_num_blocks << std::endl;
out_stream << sequence_groups.size() << std::endl;
for (const auto &seq_group_ptr: sequence_groups) {
out_stream << seq_group_ptr->get_request_id() << ' ';
Expand All @@ -57,7 +57,7 @@ class CacheStateDumper {
}
out_stream << std::endl;
}
for (const auto &seq_id_and_blocks: block_mgr.m_block_table) {
for (const auto &seq_id_and_blocks: block_mgr->m_block_table) {
for (const auto &block: seq_id_and_blocks.second[layer_idx]) {
const size_t seq_id = seq_id_and_blocks.first;
out_stream << seq_id << " " << block->get_index() << " " << block->get_references_count()
Expand All @@ -70,7 +70,7 @@ class CacheStateDumper {
std::ofstream out_stream_cache_usage;

out_stream_cache_usage.open(cache_usage_file_path, std::ios::app);
out_stream_cache_usage << dump_count << ' ' << block_mgr.get_used_percentage() << std::endl;
out_stream_cache_usage << dump_count << ' ' << block_mgr->get_used_percentage() << std::endl;
out_stream_cache_usage.flush();
dump_count++;
}
Expand Down
16 changes: 8 additions & 8 deletions src/cpp/src/continuous_batching_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::ContinuousBatchingImpl(
initialize_pipeline(model, scheduler_config, properties, device_config, core);
}

ContinuousBatchingPipeline::ContinuousBatchingImpl::~ContinuousBatchingImpl() {
if (m_scheduler) {
m_scheduler->release();
}
}

void ContinuousBatchingPipeline::ContinuousBatchingImpl::_pull_awaiting_requests() {
std::lock_guard<std::mutex> lock{m_awaiting_requests_mutex};
m_requests.insert(m_requests.end(), m_awaiting_requests.begin(), m_awaiting_requests.end());
Expand Down Expand Up @@ -61,7 +67,7 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::initialize_pipeline(
ov::InferRequest infer_request = compiled_model.create_infer_request();

// setup KV caches
m_cache_manager = std::make_shared<CacheManager>(device_config, infer_request, core);
std::shared_ptr<CacheManager> cache_manager = std::make_shared<CacheManager>(device_config, infer_request, core);

SchedulerConfig updated_config = scheduler_config;
// update KV blocks number in scheduler config
Expand All @@ -75,8 +81,7 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::initialize_pipeline(
// as it may lead to performance slowdown
can_use_partial_preemption = false;
}
m_scheduler = std::make_shared<Scheduler>(device_config.get_block_size(), m_cache_manager, updated_config, device_config.get_num_layers(), can_use_partial_preemption);

m_scheduler = std::make_shared<Scheduler>(device_config.get_block_size(), cache_manager, updated_config, device_config.get_num_layers(), can_use_partial_preemption);
// model runner
bool is_use_cache_eviction = m_scheduler->get_config().use_cache_eviction;
m_model_runner = std::make_shared<ModelRunner>(infer_request, m_scheduler->get_block_size(), device_config.get_num_layers(), is_use_cache_eviction);
Expand Down Expand Up @@ -158,11 +163,6 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() {
SequenceGroup::CPtr sequence_group = m_requests[seq_group_id];
m_batch_size += sequence_group->num_running_seqs();
}

static ManualTimer copy_blocks_timer("scheduling");
copy_blocks_timer.start();
m_cache_manager->copy_blocks(scheduler_output.m_block_copy_map);
copy_blocks_timer.end();
}

// if no tokens were scheduled, we are out of memory => free all requests and return
Expand Down
3 changes: 2 additions & 1 deletion src/cpp/src/continuous_batching_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ namespace ov::genai {
class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatchingPipeline::IContinuousBatchingPipeline {
protected:
std::shared_ptr<Scheduler> m_scheduler;
std::shared_ptr<CacheManager> m_cache_manager;
std::shared_ptr<ModelRunner> m_model_runner;
std::optional<AdapterController> m_adapter_controller;
std::shared_ptr<Sampler> m_sampler;
Expand Down Expand Up @@ -90,6 +89,8 @@ class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatc
const ov::AnyMap& properties,
const ov::genai::GenerationConfig& generation_config,
bool is_validation_mode_enabled = false);

virtual ~ContinuousBatchingImpl();

GenerationHandle add_request(uint64_t request_id,
const ov::Tensor& input_ids,
Expand Down
29 changes: 27 additions & 2 deletions src/cpp/src/image_generation/models/autoencoder_kl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
namespace ov {
namespace genai {

namespace {

class DiagonalGaussianDistribution {
public:
explicit DiagonalGaussianDistribution(ov::Tensor parameters)
Expand Down Expand Up @@ -64,6 +66,29 @@ class DiagonalGaussianDistribution {
ov::Tensor m_mean, m_std;
};

// for BW compatibility with 2024.6.0
ov::AnyMap handle_scale_factor(std::shared_ptr<ov::Model> model, const std::string& device, ov::AnyMap properties) {
std::cout << ov::Any(properties).as<std::string>() << std::endl;

auto it = properties.find("WA_INFERENCE_PRECISION_HINT");
ov::element::Type wa_inference_precision = it != properties.end() ? it->second.as<ov::element::Type>() : ov::element::undefined;
if (it != properties.end()) {
properties.erase(it);
}

const std::vector<std::string> activation_scale_factor_path = { "runtime_options", ov::hint::activations_scale_factor.name() };
const bool activation_scale_factor_defined = model->has_rt_info(activation_scale_factor_path);

// convert WA inference precision to actual inference precision if activation_scale_factor is not defined in IR
if (device.find("GPU") != std::string::npos && !activation_scale_factor_defined && wa_inference_precision != ov::element::undefined) {
properties[ov::hint::inference_precision.name()] = wa_inference_precision;
}

return properties;
}

} // namespace

size_t get_vae_scale_factor(const std::filesystem::path& vae_config_path) {
std::ifstream file(vae_config_path);
OPENVINO_ASSERT(file.is_open(), "Failed to open ", vae_config_path);
Expand Down Expand Up @@ -207,14 +232,14 @@ AutoencoderKL& AutoencoderKL::compile(const std::string& device, const ov::AnyMa
ov::Core core = utils::singleton_core();

if (m_encoder_model) {
ov::CompiledModel encoder_compiled_model = core.compile_model(m_encoder_model, device, properties);
ov::CompiledModel encoder_compiled_model = core.compile_model(m_encoder_model, device, handle_scale_factor(m_encoder_model, device, properties));
ov::genai::utils::print_compiled_model_properties(encoder_compiled_model, "Auto encoder KL encoder model");
m_encoder_request = encoder_compiled_model.create_infer_request();
// release the original model
m_encoder_model.reset();
}

ov::CompiledModel decoder_compiled_model = core.compile_model(m_decoder_model, device, properties);
ov::CompiledModel decoder_compiled_model = core.compile_model(m_decoder_model, device, handle_scale_factor(m_decoder_model, device, properties));
ov::genai::utils::print_compiled_model_properties(decoder_compiled_model, "Auto encoder KL decoder model");
m_decoder_request = decoder_compiled_model.create_infer_request();
// release the original model
Expand Down
18 changes: 5 additions & 13 deletions src/cpp/src/image_generation/stable_diffusion_3_pipeline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,25 +137,17 @@ class StableDiffusion3Pipeline : public DiffusionPipeline {

set_scheduler(Scheduler::from_config(root_dir / "scheduler/scheduler_config.json"));

// Temporary fix for GPU
ov::AnyMap updated_properties = properties;
if (device.find("GPU") != std::string::npos &&
updated_properties.find("INFERENCE_PRECISION_HINT") == updated_properties.end()) {
updated_properties["INFERENCE_PRECISION_HINT"] = ov::element::f32;
}

const std::string text_encoder = data["text_encoder"][1].get<std::string>();
if (text_encoder == "CLIPTextModelWithProjection") {
m_clip_text_encoder_1 =
std::make_shared<CLIPTextModelWithProjection>(root_dir / "text_encoder", device, updated_properties);
std::make_shared<CLIPTextModelWithProjection>(root_dir / "text_encoder", device, properties);
} else {
OPENVINO_THROW("Unsupported '", text_encoder, "' text encoder type");
}

const std::string text_encoder_2 = data["text_encoder_2"][1].get<std::string>();
if (text_encoder_2 == "CLIPTextModelWithProjection") {
m_clip_text_encoder_2 =
std::make_shared<CLIPTextModelWithProjection>(root_dir / "text_encoder_2", device, updated_properties);
m_clip_text_encoder_2 = std::make_shared<CLIPTextModelWithProjection>(root_dir / "text_encoder_2", device, properties);
} else {
OPENVINO_THROW("Unsupported '", text_encoder_2, "' text encoder type");
}
Expand All @@ -164,7 +156,7 @@ class StableDiffusion3Pipeline : public DiffusionPipeline {
if (!text_encoder_3_json.is_null()) {
const std::string text_encoder_3 = text_encoder_3_json.get<std::string>();
if (text_encoder_3 == "T5EncoderModel") {
m_t5_text_encoder = std::make_shared<T5EncoderModel>(root_dir / "text_encoder_3", device, updated_properties);
m_t5_text_encoder = std::make_shared<T5EncoderModel>(root_dir / "text_encoder_3", device, properties);
} else {
OPENVINO_THROW("Unsupported '", text_encoder_3, "' text encoder type");
}
Expand All @@ -180,9 +172,9 @@ class StableDiffusion3Pipeline : public DiffusionPipeline {
const std::string vae = data["vae"][1].get<std::string>();
if (vae == "AutoencoderKL") {
if (m_pipeline_type == PipelineType::TEXT_2_IMAGE)
m_vae = std::make_shared<AutoencoderKL>(root_dir / "vae_decoder", device, updated_properties);
m_vae = std::make_shared<AutoencoderKL>(root_dir / "vae_decoder", device, properties);
else if (m_pipeline_type == PipelineType::IMAGE_2_IMAGE || m_pipeline_type == PipelineType::INPAINTING) {
m_vae = std::make_shared<AutoencoderKL>(root_dir / "vae_encoder", root_dir / "vae_decoder", device, updated_properties);
m_vae = std::make_shared<AutoencoderKL>(root_dir / "vae_encoder", root_dir / "vae_decoder", device, properties);
} else {
OPENVINO_ASSERT("Unsupported pipeline type");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class StableDiffusionXLPipeline : public StableDiffusionPipeline {
ov::AnyMap updated_properties = properties;
if (device.find("GPU") != std::string::npos &&
updated_properties.find("INFERENCE_PRECISION_HINT") == updated_properties.end()) {
updated_properties["INFERENCE_PRECISION_HINT"] = ov::element::f32;
updated_properties["WA_INFERENCE_PRECISION_HINT"] = ov::element::f32;
}

const std::string vae = data["vae"][1].get<std::string>();
Expand Down
Loading

0 comments on commit 4eb8696

Please sign in to comment.