Skip to content

Commit

Permalink
execute task decorator
Browse files Browse the repository at this point in the history
  • Loading branch information
Yostra committed Apr 29, 2021
1 parent ea10fcb commit bb4ed35
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 6 deletions.
14 changes: 9 additions & 5 deletions chia/full_node/coin_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async def create(cls, db_wrapper: DBWrapper, cache_size: uint32 = uint32(60000))

await self.coin_record_db.execute("CREATE INDEX IF NOT EXISTS coin_spent on coin_record(spent)")

await self.coin_record_db.execute("CREATE INDEX IF NOT EXISTS coin_spent on coin_record(puzzle_hash)")
await self.coin_record_db.execute("CREATE INDEX IF NOT EXISTS coin_puzzle_hash on coin_record(puzzle_hash)")

await self.coin_record_db.commit()
self.coin_record_cache = LRUCache(cache_size)
Expand Down Expand Up @@ -106,15 +106,17 @@ async def new_block(self, block: FullBlock, tx_additions: List[Coin], tx_removal

# Checks DB and DiffStores for CoinRecord with coin_name and returns it
async def get_coin_record(self, coin_name: bytes32) -> Optional[CoinRecord]:
cached = self.coin_record_cache.get(coin_name.hex())
cached = self.coin_record_cache.get(coin_name)
if cached is not None:
return cached
cursor = await self.coin_record_db.execute("SELECT * from coin_record WHERE coin_name=?", (coin_name.hex(),))
row = await cursor.fetchone()
await cursor.close()
if row is not None:
coin = Coin(bytes32(bytes.fromhex(row[6])), bytes32(bytes.fromhex(row[5])), uint64.from_bytes(row[7]))
return CoinRecord(coin, row[1], row[2], row[3], row[4], row[8])
record = CoinRecord(coin, row[1], row[2], row[3], row[4], row[8])
self.coin_record_cache.put(record.coin.name(), record)
return record
return None

async def get_coins_added_at_height(self, height: uint32) -> List[CoinRecord]:
Expand Down Expand Up @@ -205,7 +207,7 @@ async def rollback_to_block(self, block_index: int):
coin_record.coinbase,
coin_record.timestamp,
)
self.coin_record_cache.put(coin_record.coin.name().hex(), new_record)
self.coin_record_cache.put(coin_record.coin.name(), new_record)
if int(coin_record.confirmed_block_index) > block_index:
delete_queue.append(coin_name)

Expand All @@ -223,6 +225,9 @@ async def rollback_to_block(self, block_index: int):

# Store CoinRecord in DB and ram cache
async def _add_coin_record(self, record: CoinRecord, allow_replace: bool) -> None:
if self.coin_record_cache.get(record.coin.name()) is not None:
self.coin_record_cache.remove(record.coin.name())

cursor = await self.coin_record_db.execute(
f"INSERT {'OR REPLACE ' if allow_replace else ''}INTO coin_record VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
Expand All @@ -238,7 +243,6 @@ async def _add_coin_record(self, record: CoinRecord, allow_replace: bool) -> Non
),
)
await cursor.close()
self.coin_record_cache.put(record.coin.name().hex(), record)

# Update coin_record to be spent in DB
async def _set_spent(self, coin_name: bytes32, index: uint32) -> uint64:
Expand Down
3 changes: 2 additions & 1 deletion chia/full_node/full_node_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from chia.types.mempool_item import MempoolItem
from chia.types.peer_info import PeerInfo
from chia.types.unfinished_block import UnfinishedBlock
from chia.util.api_decorators import api_request, peer_required, bytes_required
from chia.util.api_decorators import api_request, peer_required, bytes_required, execute_task
from chia.util.generator_tools import get_block_header
from chia.util.hash import std_hash
from chia.util.ints import uint8, uint32, uint64, uint128
Expand Down Expand Up @@ -92,6 +92,7 @@ async def respond_peers_introducer(
await peer.close()
return None

@execute_task
@peer_required
@api_request
async def new_peak(self, request: full_node_protocol.NewPeak, peer: ws.WSChiaConnection) -> Optional[Message]:
Expand Down
9 changes: 9 additions & 0 deletions chia/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ def __init__(
self.app_shut_down_task: Optional[asyncio.Task] = None
self.received_message_callback: Optional[Callable] = None
self.api_tasks: Dict[bytes32, asyncio.Task] = {}
self.execute_tasks: Set[bytes32] = set()

self.tasks_from_peer: Dict[bytes32, Set[bytes32]] = {}
self.banned_peers: Dict[str, float] = {}
self.invalid_protocol_ban_seconds = 10
Expand Down Expand Up @@ -465,6 +467,8 @@ def cancel_tasks_from_peer(self, peer_id: bytes32):

task_ids = self.tasks_from_peer[peer_id]
for task_id in task_ids:
if task_id in self.execute_tasks:
continue
task = self.api_tasks[task_id]
task.cancel()

Expand Down Expand Up @@ -501,6 +505,9 @@ async def api_call(full_message: Message, connection: WSChiaConnection, task_id)
if self.api.api_ready is False:
return None

if hasattr(f, "execute_task"):
self.execute_tasks.add(task_id)

if hasattr(f, "peer_required"):
coroutine = f(full_message.data, connection)
else:
Expand Down Expand Up @@ -542,6 +549,8 @@ async def wrapped_coroutine() -> Optional[Message]:
self.api_tasks.pop(task_id)
if task_id in self.tasks_from_peer[connection.peer_node_id]:
self.tasks_from_peer[connection.peer_node_id].remove(task_id)
if task_id in self.execute_tasks:
self.execute_tasks.remove(task_id)

task_id = token_bytes()
api_task = asyncio.create_task(api_call(payload_inc, connection_inc, task_id))
Expand Down
8 changes: 8 additions & 0 deletions chia/util/api_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,11 @@ def inner():
return func

return inner()


def execute_task(func):
def inner():
setattr(func, "execute_task", True)
return func

return inner()

0 comments on commit bb4ed35

Please sign in to comment.