Skip to content

Commit

Permalink
HashedDictionaryParallelLoader exception safe constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
kitaisreal committed Apr 14, 2024
1 parent b6cfba3 commit 7ebaa4d
Showing 1 changed file with 29 additions and 16 deletions.
45 changes: 29 additions & 16 deletions src/Dictionaries/HashedDictionaryParallelLoader.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <Dictionaries/IDictionary.h>
#include <Dictionaries/DictionaryHelpers.h>
#include <Common/CurrentThread.h>
#include <Common/iota.h>
#include <Common/scope_guard_safe.h>
Expand Down Expand Up @@ -62,28 +63,40 @@ class HashedDictionaryParallelLoader : public boost::noncopyable
for (size_t shard = 0; shard < shards; ++shard)
{
shards_queues[shard].emplace(backlog);
pool.scheduleOrThrowOnError([this, shard, thread_group = CurrentThread::getGroup()]

try
{
WorkerStatistic statistic;
SCOPE_EXIT_SAFE(
LOG_TRACE(dictionary.log, "Finished worker for dictionary {} shard {}, processed {} blocks, {} rows, total time {}ms",
dictionary_name, shard, statistic.total_blocks, statistic.total_rows, statistic.total_elapsed_ms);
pool.scheduleOrThrowOnError([this, shard, thread_group = CurrentThread::getGroup()]
{
WorkerStatistic statistic;
SCOPE_EXIT_SAFE(
LOG_TRACE(dictionary.log, "Finished worker for dictionary {} shard {}, processed {} blocks, {} rows, total time {}ms",
dictionary_name, shard, statistic.total_blocks, statistic.total_rows, statistic.total_elapsed_ms);

if (thread_group)
CurrentThread::detachFromGroupIfNotDetached();
);
if (thread_group)
CurrentThread::detachFromGroupIfNotDetached();
);

/// Do not account memory that was occupied by the dictionaries for the query/user context.
MemoryTrackerBlockerInThread memory_blocker;

/// Do not account memory that was occupied by the dictionaries for the query/user context.
MemoryTrackerBlockerInThread memory_blocker;
if (thread_group)
CurrentThread::attachToGroupIfDetached(thread_group);
setThreadName("HashedDictLoad");

if (thread_group)
CurrentThread::attachToGroupIfDetached(thread_group);
setThreadName("HashedDictLoad");
LOG_TRACE(dictionary.log, "Starting worker for dictionary {}, shard {}", dictionary_name, shard);

LOG_TRACE(dictionary.log, "Starting worker for dictionary {}, shard {}", dictionary_name, shard);
threadWorker(shard, statistic);
});
}
catch (...)
{
for (size_t shard_to_finish = 0; shard_to_finish < shard; ++shard_to_finish)
shards_queues[shard_to_finish]->clearAndFinish();

threadWorker(shard, statistic);
});
pool.wait();
throw;
}
}
}

Expand Down

0 comments on commit 7ebaa4d

Please sign in to comment.