Skip to content

Commit

Permalink
revert
Browse files Browse the repository at this point in the history
  • Loading branch information
vlad-gogov committed Dec 4, 2024
1 parent 98db446 commit f7e2fdc
Showing 1 changed file with 24 additions and 10 deletions.
34 changes: 24 additions & 10 deletions ydb/tests/olap/scenario/test_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
ScenarioTestHelper,
TestContext,
CreateTable,
DropTable,
)

from ydb import PrimitiveType
from typing import List, Dict, Any
from ydb.tests.olap.lib.utils import get_external_param
import threading

class TestInsert(BaseTestSet):
Expand All @@ -29,33 +29,47 @@ def _loop_upsert(self, ctx: TestContext, data: list):
for batch in data:
sth.bulk_upsert_data("log", self.schema_log, batch)

def _loop_insert(self, ctx: TestContext):
def _loop_insert(self, ctx: TestContext, rows_count: int):
sth = ScenarioTestHelper(ctx)
for i in range(100):
for i in range(rows_count):
sth.execute_query(f"$cnt = SELECT CAST(COUNT(*) AS INT64) from `{sth.get_full_path("log")}`; INSERT INTO `{sth.get_full_path("cnt") }` (key, c) values({i}, $cnt)")

def scenario_read_data_during_bulk_upsert(self, ctx: TestContext):
sth = ScenarioTestHelper(ctx)
cnt_table_name: str = "cnt"
log_table_name: str = "log"
batches_count = int(get_external_param('batches_count', '10'))
rows_count = int(get_external_param('rows_count', '1000'))
inserts_count = int(get_external_param('inserts_count', '200'))
sth.execute_scheme_query(CreateTable(cnt_table_name).with_schema(self.schema_cnt))
sth.execute_scheme_query(CreateTable(log_table_name).with_schema(self.schema_log))

data: list = []
for i in range(100):
data: List = []
for i in range(batches_count):
batch: List[Dict[str, Any]] = []
for j in range(i, 1000):
batch.append({'key' : j})
for j in range(rows_count):
batch.append({'key' : j + rows_count * i})
data.append(batch)

thread1 = threading.Thread(target=self._loop_upsert, args=[ctx, data])
thread2 = threading.Thread(target=self._loop_insert, args=[ctx])
thread2 = threading.Thread(target=self._loop_insert, args=[ctx, inserts_count])

thread1.start()
thread2.start()

thread2.join()
thread1.join()

sth.execute_scheme_query(DropTable(cnt_table_name))
sth.execute_scheme_query(DropTable(log_table_name))
rows: int = sth.get_table_rows_count(cnt_table_name)
assert rows == inserts_count
scan_result = sth.execute_scan_query(f'SELECT key, c FROM `{sth.get_full_path(cnt_table_name)}` ORDER BY key')
for i in range(rows):
if scan_result.result_set.rows[i]['key'] != i:
assert False, f"{i} ?= {scan_result.result_set.rows[i]['key']}"

rows: int = sth.get_table_rows_count(log_table_name)
assert rows == rows_count * batches_count
scan_result = sth.execute_scan_query(f'SELECT key FROM `{sth.get_full_path(log_table_name)}` ORDER BY key')
for i in range(rows):
if scan_result.result_set.rows[i]['key'] != i:
assert False, f"{i} ?= {scan_result.result_set.rows[i]['key']}"

0 comments on commit f7e2fdc

Please sign in to comment.