Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Publisher #17

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
mapper update
  • Loading branch information
akshay-ghy committed Sep 11, 2023
commit c65cbbde7db1c9988ee9bcd9bc1bb2d04c6bfe8e
2 changes: 1 addition & 1 deletion zilliqaetl/exporters/kafka_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def _export_items_with_timeout(self, items):
self.export_item(item)

def export_item(self, item):
item_type = item.get("type", None)
item_type = item.pop("type", None)
has_item_type = item_type is not None
if has_item_type and item_type in self.item_type_to_topic_mapping:
topic = self.item_type_to_topic_mapping[item_type]
Expand Down
2 changes: 1 addition & 1 deletion zilliqaetl/exporters/zilliqa_item_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def get_streamer_exporter(output, topic_prefix, topic_suffix):
item_exporter = KafkaItemExporter(item_type_to_topic_mapping={
"transaction": topic_prefix + "-transactions-" + topic_suffix,
"token_transfer": topic_prefix + "-token_transfers-" + topic_suffix,
"trace": topic_prefix + "-traces-" + topic_suffix,
"transition": topic_prefix + "-transitions-" + topic_suffix,
"tx_block": topic_prefix + "-tx_blocks-" + topic_suffix,
})

Expand Down
46 changes: 20 additions & 26 deletions zilliqaetl/jobs/export_tx_blocks_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ def __init__(
export_event_logs=True,
export_exceptions=True,
export_transitions=True,
export_token_transfers=True,
export_traces=True
export_token_transfers=True
):
self.export_token_transfers = export_token_transfers
validate_range(start_block, end_block)
self.start_block = start_block
self.end_block = end_block
Expand All @@ -62,15 +62,11 @@ def __init__(
self.export_event_logs = export_event_logs
self.export_exceptions = export_exceptions
self.export_transitions = export_transitions
self.export_token_transfers = export_token_transfers
self.export_traces = export_traces

def _start(self):
self.item_exporter.open()
pass

def _export(self):
"""This method is called on job.run()"""
self.batch_work_executor.execute(
range(self.start_block, self.end_block + 1),
self._export_batch,
Expand All @@ -81,8 +77,8 @@ def _export_batch(self, block_number_batch):
items = []
for number in block_number_batch:
tx_block = map_tx_block(self.zilliqa_service.get_tx_block(number))
num_txns: int = tx_block['num_transactions']
txns = list(self.zilliqa_service.get_transactions(number)) if num_txns > 0 else []

txns = list(self.zilliqa_service.get_transactions(number)) if tx_block.get('num_transactions') > 0 else []
if self._should_export_transactions():
for txn in txns:
items.append(map_transaction(tx_block, txn))
Expand All @@ -94,25 +90,23 @@ def _export_batch(self, block_number_batch):
items.extend(map_transitions(tx_block, txn))
if self._should_export_token_transfers(txn):
token_transfers = []
token_transfers.extend(map_token_traces(tx_block, txn, txn_type="token_transfer"))
token_transfers.extend(map_token_traces(tx_block, txn))
# Since duplicate can happen for combination of "from_address", "to_address", "value",
# "call_type", "transaction_hash"
dedup_token_transfers = {token["log_index"]: {"call_type": token["call_type"],
"from_address": token["from_address"],
"to_address": token["to_address"],
"transaction_hash": token["transaction_hash"],
"value": token["value"],
"token_address": token["token_address"]}
for token in token_transfers}
unique_token_transfers = {}
for key, token_value in dedup_token_transfers.items():
if token_value not in unique_token_transfers.values():
unique_token_transfers[key] = token_value
token_transfers = [token_transfer for token_transfer in token_transfers if
token_transfer["log_index"] in unique_token_transfers.keys()]
# dedup_token_transfers = {token["log_index"]: {"call_type": token["call_type"],
# "from_address": token["from_address"],
# "to_address": token["to_address"],
# "transaction_hash": token["transaction_hash"],
# "value": token["value"],
# "token_address": token["token_address"]}
# for token in token_transfers}
# unique_token_transfers = {}
# for key, token_value in dedup_token_transfers.items():
# if token_value not in unique_token_transfers.values():
# unique_token_transfers[key] = token_value
# token_transfers = [token_transfer for token_transfer in token_transfers if
# token_transfer["log_index"] in unique_token_transfers.keys()]
items.extend(token_transfers)
if self._should_export_traces(txn):
items.extend(map_token_traces(tx_block, txn, txn_type="trace"))
tx_block['num_present_transactions'] = len(txns)
items.append(tx_block)

Expand All @@ -134,8 +128,8 @@ def _should_export_transitions(self, txn):
def _should_export_token_transfers(self, txn):
return self.export_token_transfers and txn.get('receipt')

def _should_export_traces(self, txn):
return self.export_traces and txn.get('receipt')
# def _should_export_traces(self, txn):
# return self.export_traces and txn.get('receipt')

def _end(self):
self.batch_work_executor.shutdown()
Expand Down
2 changes: 1 addition & 1 deletion zilliqaetl/mappers/ds_block_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ def map_ds_block(raw_block):
'signature': raw_block.get('signature'),
}

return block
return block
2 changes: 1 addition & 1 deletion zilliqaetl/mappers/event_log_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from zilliqaetl.utils.zilliqa_utils import to_int, json_dumps, iso_datetime_string, encode_bech32_address
from zilliqaetl.utils.zilliqa_utils import json_dumps, encode_bech32_address


def map_event_logs(tx_block, txn):
Expand Down
2 changes: 1 addition & 1 deletion zilliqaetl/mappers/exception_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@ def map_exceptions(tx_block, txn):
'index': index,
'line': exception.get('line'),
'message': exception.get('message'),
}
}
34 changes: 20 additions & 14 deletions zilliqaetl/mappers/transaction_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,32 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from pyzil.account import Account

from zilliqaetl.utils.zilliqa_utils import to_int, iso_datetime_string, encode_bech32_pub_key, encode_bech32_address

from zilliqaetl.utils.zilliqa_utils import to_int, encode_bech32_pub_key, encode_bech32_address


# Modified acc to MS use case
def map_transaction(tx_block, txn):
block = {
'type': 'transaction',
'hash': '0x' + txn.get('ID'),
'token_address': '0x0000',
'id': {txn.get("ID")},
'block_number': tx_block.get('number'),
'block_timestamp': tx_block.get('timestamp'),
'value': to_int(txn.get('amount')),
'gas_price': to_int(txn.get('gasPrice')),
'from_address': encode_bech32_pub_key(txn.get('senderPubKey')),
'to_address': encode_bech32_address(txn.get('toAddr')),
'amount': txn.get('amount'),
# 'code': txn.get('code'),
# 'data': txn.get('data'),
# 'gas_limit': to_int(txn.get('gasLimit')),
'gas_price': txn.get('gasPrice'),
# 'nonce': to_int(txn.get('nonce')),
# 'sender_pub_key': txn.get('senderPubKey'),
'sender': encode_bech32_pub_key(txn.get('senderPubKey')),
# 'signature': txn.get('signature'),
'to_addr': encode_bech32_address(txn.get('toAddr')),
# 'version': to_int(txn.get('version')),
**map_receipt(txn)
}
block["fee"] = block.pop("gas_price") * block.pop("gas_used")
if block["receipt_status"] == 0:
block["value"] = 0
block["hash"]="0x"

return block


Expand All @@ -50,6 +54,8 @@ def map_receipt(txn):
return None

return {
'receipt_status': int(receipt.get('success')),
'gas_used': to_int(receipt.get('cumulative_gas'))
# 'accepted': receipt.get('accepted'),
'success': receipt.get('success'),
'cumulative_gas': receipt.get('cumulative_gas'),
# 'epoch_num': to_int(receipt.get('epoch_num')),
}
50 changes: 27 additions & 23 deletions zilliqaetl/mappers/transition_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from zilliqaetl.utils.zilliqa_utils import to_int, encode_bech32_address
import json

from pyzil.crypto import zilkey

# The above tag based code would only work with the below XSGD and XIDR address only
from zilliqaetl.utils.zilliqa_utils import to_int, json_dumps, iso_datetime_string, encode_bech32_address, \
encode_bech32_address

SUPPORTED_TOKENS = ["zil1zu72vac254htqpg3mtywdcfm84l3dfd9qzww8t", "zil180v66mlw007ltdv8tq5t240y7upwgf7djklmwh",
"zil1cuf78e3p37utekgk0gtcvd3hvkrqcgt06lrnty"]

Expand Down Expand Up @@ -54,21 +55,24 @@ def map_transitions(tx_block, txn):
msg = transition.get('msg')
yield {
'type': 'transition',
'token_address': '0x0000',
'block_number': tx_block.get('number'),
'block_timestamp': tx_block.get('timestamp'),
'transaction_hash': '0x' + txn.get('ID'),
'log_index': index,
'transaction_id': txn.get('ID'),
# 'index': index,
# 'accepted': receipt.get('accepted'),
'addr': encode_bech32_address(transition.get('addr')),
'depth': transition.get('depth'),
'zill_amount': to_int(msg.get('_amount')),
# 'depth': transition.get('depth'),
'amount': msg.get('_amount'),
'recipient': encode_bech32_address(msg.get('_recipient')),
'call_type': msg.get('_tag'),
"receipt_status": int(receipt.get("success")),
"parameters": params_to_json(msg.get('params'))
# 'tag': msg.get('_tag'),
# 'params': [json_dumps(param) for param in msg.get('params')],
}


def map_token_traces(tx_block, txn, txn_type):
def map_token_traces(tx_block, txn):
# TODO: Cleanup logic for adding 0x for token_transfers, only for this we have to add rest we are not adding
txn_type = 'token_transfer'
receipt = txn.get('receipt')
if receipt and receipt.get('transitions'):
for index, transition in enumerate(receipt.get('transitions')):
Expand All @@ -80,15 +84,15 @@ def map_token_traces(tx_block, txn, txn_type):
'block_number': tx_block.get('number'),
'block_timestamp': tx_block.get('timestamp'),
'transaction_hash': '0x' + txn.get('ID'),
'log_index': index,
"receipt_status": int(receipt.get("success")),
# 'log_index': index,
# "receipt_status": int(receipt.get("success")),
}
if txn_type == 'trace' and msg.get('_amount', "0") != "0":
data["value"] = msg.get('_amount')
data["from_address"] = encoded_addr
data["to_address"] = encoded_recipient
yield data
elif txn_type == 'token_transfer' and (msg.get('_amount', "0") == "0") and (
# if txn_type == 'trace' and msg.get('_amount', "0") != "0":
# data["value"] = msg.get('_amount')
# data["from_address"] = encoded_addr
# data["to_address"] = encoded_recipient
# yield data
if (msg.get('_amount', "0") == "0") and (
(encoded_addr in SUPPORTED_TOKENS) or (encoded_recipient in SUPPORTED_TOKENS)):
tag = msg.get('_tag')
params = params_to_json(msg.get('params'), json_string=False)
Expand All @@ -98,28 +102,28 @@ def map_token_traces(tx_block, txn, txn_type):
data["to_address"] = params["to"]
data["value"] = params["amount"]
data["token_address"] = encoded_recipient
data["call_type"] = tag
# data["call_type"] = tag
yield data
elif (tag == "RecipientAcceptTransfer") and ({"sender", "recipient", "amount"} <= param_keys):
data["from_address"] = params["sender"]
data["to_address"] = params["recipient"]
data["value"] = params["amount"]
data["token_address"] = encoded_addr
data["call_type"] = tag
# data["call_type"] = tag
yield data
elif (tag == "Mint") and ("amount" in param_keys) and \
(("to" in param_keys) or ("recipient" in param_keys)):
data["from_address"] = "zil1qqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq9yf6pz"
data["to_address"] = params.get("to", params.get("recipient", None))
data["value"] = params["amount"]
data["token_address"] = encoded_addr if (encoded_addr in SUPPORTED_TOKENS) else encoded_recipient
data["call_type"] = tag
# data["call_type"] = tag
yield data
elif (tag == "Burn") and ("amount" in param_keys) and \
(("to" in param_keys) or ("recipient" in param_keys)):
data["from_address"] = params.get("initiator", encoded_addr)
data["to_address"] = "zil1qqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq9yf6pz"
data["value"] = params["amount"]
data["token_address"] = encoded_addr if (encoded_addr in SUPPORTED_TOKENS) else encoded_recipient
data["call_type"] = tag
# data["call_type"] = tag
yield data
29 changes: 15 additions & 14 deletions zilliqaetl/mappers/tx_block_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,23 @@ def map_tx_block(raw_block):
body = raw_block.get('body')
block = {
'type': 'tx_block',
'token_address': '0x0000',
'number': to_int(header.get('BlockNum')),
'ds_block_number': to_int(header.get('DSBlockNum')),
# 'ds_block_number': to_int(header.get('DSBlockNum')),
'timestamp': iso_datetime_string(header.get('Timestamp')),
'version': header.get('Version'),
'gas_limit': to_int(header.get('GasLimit')),
'gas_used': to_int(header.get('GasUsed')),
'mb_info_hash': header.get('MbInfoHash'),
'tx_leader_pub_key': header.get('MinerPubKey'),
# 'version': header.get('Version'),
# 'gas_limit': to_int(header.get('GasLimit')),
# 'gas_used': to_int(header.get('GasUsed')),
# 'mb_info_hash': header.get('MbInfoHash'),
# 'tx_leader_pub_key': header.get('MinerPubKey'),
'tx_leader_address': encode_bech32_pub_key(header.get('MinerPubKey')),
'num_micro_blocks': to_int(header.get('NumMicroBlocks')),
'num_transactions': to_int(header.get('NumTxns')),
'prev_block_hash': header.get('PrevBlockHash'),
'rewards': to_int(header.get('Rewards')),
'state_delta_hash': header.get('StateDeltaHash'),
'state_root_hash': header.get('StateRootHash'),
'header_signature': body.get('HeaderSign')
# 'num_micro_blocks': to_int(header.get('NumMicroBlocks')),
# 'num_transactions': to_int(header.get('NumTxns')),
# 'prev_block_hash': header.get('PrevBlockHash'),
'rewards': header.get('Rewards'),
# 'state_delta_hash': header.get('StateDeltaHash'),
# 'state_root_hash': header.get('StateRootHash'),
# 'header_signature': body.get('HeaderSign')
}

return block
return block
7 changes: 7 additions & 0 deletions zilliqaetl/streaming/zil_stream_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ def export_all(self, start_block, end_block):
export_transitions=False,
item_exporter=self.item_exporter,
)
job = ExportTxBlocksJob(
start_block=start_block,
end_block=end_block,
zilliqa_api=self.api,
max_workers=self.max_workers,
item_exporter=self.item_exporter,
)
job.run()

def close(self):
Expand Down
3 changes: 2 additions & 1 deletion zilliqaetl/utils/zilliqa_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ def iso_datetime_string(timestamp):
if isinstance(timestamp, str):
timestamp = int(timestamp)

return datetime.utcfromtimestamp(timestamp / 1000000).strftime('%Y-%m-%d %H:%M:%S')
return int(timestamp / 1000000)
# return datetime.utcfromtimestamp(timestamp / 1000000).strftime('%Y-%m-%d %H:%M:%S')


def encode_bech32_pub_key(pub_key):
Expand Down