Skip to content

Commit

Permalink
logan
Browse files Browse the repository at this point in the history
  • Loading branch information
tlemane committed Nov 19, 2024
1 parent 05b0396 commit 655d6c6
Showing 1 changed file with 67 additions and 43 deletions.
110 changes: 67 additions & 43 deletions app/kmindex/query-blob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ namespace kmq {
Timer gtime;
index global(o->global_index_path);


spdlog::info(
"Global index: '{}'", fs::absolute(o->global_index_path + "/").parent_path().filename().string());

Expand Down Expand Up @@ -213,7 +212,7 @@ namespace kmq {
for (std::size_t p = 0; p < infos.nb_partitions(); ++p)
{
auto s = infos.get_partition(p).substr(10);
m_partitions.push_back(std::make_unique<blob_partition>(s, infos.nb_samples(), infos.bw(), &itp_client));
m_partitions.push_back(std::make_unique<blob_partition>(s, infos.nb_samples(), infos.bw(), &itp_client));
}

for (std::size_t p = 0; p < infos.nb_partitions(); ++p)
Expand All @@ -229,69 +228,94 @@ namespace kmq {
query_result_agg aggs;
aggs.add(query_result(std::move(r), o->z, infos, false));
aggs.output(infos, o->output, o->format, o->single, o->sk_threshold);
spdlog::info("Index '{}' processed. ({})", infos.name(), timer.formatted());
spdlog::info("Index '{}' processed. ({})", infos.name(), timer.formatted());
});
}
poolL.join_all();
spdlog::info("Done ({}).", gtime.formatted());
}
else
{
for (auto& index_name : o->index_names)
{
Timer timer;
auto infos = global.get(index_name);

spdlog::info("Starting '{}' query ({} samples)", infos.name(), infos.nb_samples());
ThreadPool poolL(opt->nb_threads);

klibpp::SeqStreamIn iss(o->input.c_str());
queue_type bqueue;
std::string connectionString = std::getenv("AZURE_STORAGE_CONNECTION_STRING");
auto azure_client = BlobServiceClient::CreateFromConnectionString(connectionString);
auto itp_client = BlobContainerClient(azure_client.GetBlobContainerClient("indextheplanet"));

klibpp::SeqStreamIn iss(o->input.c_str());
std::vector<klibpp::KSeq> records;
klibpp::KSeq record;
while (iss >> record)
{
records.push_back(record);
}

klibpp::KSeq record;
iss >> record;
for (auto& index_name : o->index_names)
{
poolL.add_task([&o, &global, &index_name, &itp_client, &records](int i){

std::string connectionString = std::getenv("AZURE_STORAGE_CONNECTION_STRING");
auto azure_client = BlobServiceClient::CreateFromConnectionString(connectionString);
auto itp_client = BlobContainerClient(azure_client.GetBlobContainerClient("indextheplanet"));
Timer timer;
auto infos = global.get(index_name);

std::size_t n = record.seq.size() - infos.smer_size() + 1;
std::vector<std::unique_ptr<blob_partition>> m_partitions;
auto r = std::make_unique<query_response>(record.name, n, infos.nb_samples(), infos.bw());
std::vector<qpart_type> m_smers (infos.nb_partitions());
spdlog::info("Starting '{}' query ({} samples)", infos.name(), infos.nb_samples());

auto repart = infos.get_repartition();
auto hw = infos.get_hash_w();
loop_executor<MAX_KMER_SIZE>::exec<smer_functor>(infos.smer_size(), m_smers, record.seq, 0, infos.smer_size(), repart, hw, infos.minim_size());
std::vector<std::unique_ptr<query_response>> m_responses;
std::vector<std::unique_ptr<blob_partition>> m_partitions;
std::vector<qpart_type> m_smers (infos.nb_partitions());
auto hw = infos.get_hash_w();
auto repart = infos.get_repartition();

for (std::size_t p = 0; p < infos.nb_partitions(); ++p)
{
auto s = infos.get_partition(p).substr(10);
m_partitions.push_back(std::make_unique<blob_partition>(s, infos.nb_samples(), infos.bw(), &itp_client));
}
for (std::size_t nq = 0; nq < records.size(); ++nq)
{
std::size_t n = records[nq].seq.size() - infos.smer_size() + 1;
m_responses.push_back(std::make_unique<query_response>(records[nq].name, n, infos.nb_samples(), infos.bw()));
loop_executor<MAX_KMER_SIZE>::exec<smer_functor>(infos.smer_size(), m_smers, records[nq].seq, nq, infos.smer_size(), repart, hw, infos.minim_size());
}

ThreadPool pool(opt->nb_threads);
for (std::size_t p = 0; p < infos.nb_partitions(); ++p)
{
auto s = infos.get_partition(p).substr(10);
m_partitions.push_back(std::make_unique<blob_partition>(s, infos.nb_samples(), infos.bw(), &itp_client));
}

for (std::size_t p = 0; p < infos.nb_partitions(); ++p)
{
pool.add_task([&m_smers, &m_partitions, &r, p](int i) {
for (std::size_t p = 0; p < infos.nb_partitions(); ++p)
{
auto& smers = m_smers[p];
std::sort(std::begin(smers), std::end(smers));
for (auto& [mer, qid] : smers)
for (std::size_t ii = 0; ii < smers.size(); ++ii)
{
m_partitions[p]->query(mer.h, r->get(mer.i));
}
});
}

pool.join_all();
auto& [mer, qid] = smers[ii];
if (ii > 0)
{
auto& [prev_mer, prev_qid] = smers[ii-1];
if (prev_mer.h == mer.h)
{
std::memcpy(m_responses[qid]->get(mer.i), m_responses[prev_qid]->get(prev_mer.i), m_responses[qid]->block_size());
}
else
{
m_partitions[p]->query(mer.h, m_responses[qid]->get(mer.i));
}
}
else
{
m_partitions[p]->query(mer.h, m_responses[qid]->get(mer.i));
}

query_result_agg aggs;
aggs.add(query_result(std::move(r), o->z, infos, false));
aggs.output(infos, o->output, o->format, o->single, o->sk_threshold);
}
}

query_result_agg aggs;
for (std::size_t nq = 0; nq < records.size(); ++nq)
{
aggs.add(query_result(std::move(m_responses[nq]), o->z, infos, false));
}
aggs.output(infos, o->output, o->format, o->single, o->sk_threshold);
spdlog::info("Index '{}' processed. ({})", infos.name(), timer.formatted());
});
}

poolL.join_all();
spdlog::info("Done ({}).", gtime.formatted());
}
}

Expand Down

0 comments on commit 655d6c6

Please sign in to comment.