diff --git a/.github/workflows/unitary_tests.yml b/.github/workflows/unitary_tests.yml index 4415605..446f42e 100644 --- a/.github/workflows/unitary_tests.yml +++ b/.github/workflows/unitary_tests.yml @@ -33,6 +33,6 @@ jobs: run: | python -m unittest tests/test_hashes.py python -m unittest tests/test_routing.py - python -m unittest tests/test_dht.py + python -m unittest tests/test_network.py diff --git a/dht/bitarray_tests.py b/dht/bitarray_tests.py new file mode 100644 index 0000000..b9fc145 --- /dev/null +++ b/dht/bitarray_tests.py @@ -0,0 +1,23 @@ +import bitarray +import ctypes +from bitarray import util +import random +from collections import deque, defaultdict + +rint = random.randint(0, 99) +inthash = hash(hex(rint)) + +barray = util.int2ba(ctypes.c_ulong(inthash).value, length=64) +print(barray) +print(len(barray)) +print(barray.to01()) + +defD = {'b': 1, 'g': 2, 'r': 3, 'y': 4} +d = defaultdict() +for i, v in defD.items(): + d[i] = v + +print(d.items()) +q = deque(d.items()) +print(q[2:]) + diff --git a/dht/dht.py b/dht/dht.py index bebbeda..efcf9a2 100644 --- a/dht/dht.py +++ b/dht/dht.py @@ -1,5 +1,6 @@ import random import time +from collections import deque, defaultdict, OrderedDict from dht.key_store import KeyValueStore from dht.routing_table import RoutingTable from dht.hashes import Hash @@ -12,28 +13,25 @@ class DHTClient: """ This class represents the client that participates and interacts with the simulated DHT""" def __repr__(self) -> str: - return "DHT-cli-"+self.ID + return "DHT-cli-"+str(self.ID) - def __init__(self, nodeid, network, kbucketSize:int = 20, a: int = 1, b: int = 20, stuckMaxCnt: int = 3): + def __init__(self, nodeid: int, network, kbucketsize: int = 20, a: int = 1, b: int = 20, steptostop: int = 3): """ client builder -> init all the internals & compose the routing table""" - # TODO: on the options given for the DHTClient, we could consider: - # - latency distribution (how much time to wait before giving back any reply) - # - Kbucket size self.ID = nodeid self.network = network - self.k = kbucketSize - self.rt = RoutingTable(self.ID, kbucketSize) + self.k = kbucketsize + self.rt = RoutingTable(self.ID, kbucketsize) self.ks = KeyValueStore() # DHT parameters - self.alpha = a # the concurrency parameter per path - self.beta = b # the number of peers closest to a target that must have responded for a query path to terminate - self.lookupStuckMaxCnt = stuckMaxCnt # Number of maximum hops the client will do without reaching a closest peer + self.alpha = a # the concurrency parameter per path + self.beta = b # the number of peers closest to a target that must have responded for a query path to terminate + self.lookupsteptostop = steptostop # Number of maximum hops the client will do without reaching a closest peer # to finalize the lookup process def bootstrap(self) -> str: """ Initialize the RoutingTable from the given network and return the count of nodes per kbucket""" - rtNodes = self.network.bootstrap_node(self.ID, self.k) - for node in rtNodes: + rtnodes = self.network.bootstrap_node(self.ID, self.k) + for node in rtnodes: self.rt.new_discovered_peer(node) # Return the summary of the RoutingTable return self.rt.summary() @@ -41,7 +39,7 @@ def bootstrap(self) -> str: def lookup_for_hash(self, key: Hash): """ search for the closest peers to any given key, starting the lookup for the closest nodes in the local routing table, and contacting Alpha nodes in parallel """ - lookupSummary = { + lookupsummary = { 'targetKey': key, 'startTime': time.time(), 'connectionAttempts': 0, @@ -50,11 +48,11 @@ def lookup_for_hash(self, key: Hash): 'aggrDelay': 0, } - closestNodes = self.rt.get_closest_nodes_to(key) - nodesToTry = closestNodes.copy() - newNodes = {} - lookupValue = "" - stuckCnt = 0 + closestnodes = self.rt.get_closest_nodes_to(key) + nodestotry = closestnodes.copy() + newnodes = defaultdict() + lookupvalue = "" + stepscnt = 0 concurrency = 0 def has_closer_nodes(prev, new): for node, dist in new.items(): @@ -68,36 +66,35 @@ def has_closer_nodes(prev, new): return False def not_tracked(total, newones): - newNodes = {} + newnodes = defaultdict() for node, dist in newones.items(): if node not in total: - newNodes[node] = dist - return newNodes + newnodes[node] = dist + return newnodes - while (stuckCnt < self.lookupStuckMaxCnt) and (len(nodesToTry) > 0) : + while (stepscnt < self.lookupsteptostop) and (len(nodestotry) > 0): # ask queued nodes to try - for node in list(nodesToTry): - lookupSummary['connectionAttempts'] += 1 + for node in list(nodestotry): + lookupsummary['connectionAttempts'] += 1 try: connection, conndelay = self.network.connect_to_node(self.ID, node) - lookupSummary['aggrDelay'] += conndelay - # TODO: should I reuse the same delay from stablishing the connection for the rest of the operations? - newNodes, val, ok, closestDelay = connection.get_closest_nodes_to(key) + lookupsummary['aggrDelay'] += conndelay + newnodes, val, ok, closestdelay = connection.get_closest_nodes_to(key) if ok: - lookupValue = val - lookupSummary['aggrDelay'] += closestDelay - lookupSummary['successfulCons'] += 1 - if has_closer_nodes(closestNodes, newNodes): - stuckCnt = 0 - nonTrackedNodes = not_tracked(closestNodes, newNodes) - closestNodes.update(nonTrackedNodes) - nodesToTry.update(nonTrackedNodes) - nodesToTry = dict(sorted(nodesToTry.items(), key= lambda item: item[1])) + lookupvalue = val + lookupsummary['aggrDelay'] += closestdelay + lookupsummary['successfulCons'] += 1 + if has_closer_nodes(closestnodes, newnodes): + stepscnt = 0 + nontrackednodes = not_tracked(closestnodes, newnodes) + closestnodes.update(nontrackednodes) + nodestotry.update(nontrackednodes) + nodestotry = OrderedDict(sorted(nodestotry.items(), key= lambda item: item[1])) else: - stuckCnt += 1 + stepscnt += 1 except ConnectionError: - lookupSummary['failedCons'] += 1 - stuckCnt += 1 + lookupsummary['failedCons'] += 1 + stepscnt += 1 concurrency += 1 if concurrency >= self.alpha: break @@ -106,50 +103,50 @@ def not_tracked(total, newones): pass # finish with the summary - lookupSummary.update({ + lookupsummary.update({ 'finishTime': time.time(), - 'totalNodes': len(closestNodes), - 'value': lookupValue, + 'totalNodes': len(closestnodes), + 'value': lookupvalue, }) # limit the output to beta number of nodes - closestNodes = dict(sorted(closestNodes.items(), key=lambda item: item[1])[:self.beta]) - # the aggregated delay of the operation is included with the summary `lookupSummary['aggrDelay']` - return closestNodes, lookupValue, lookupSummary, lookupSummary['aggrDelay'] + closestnodes = OrderedDict(sorted(closestnodes.items(), key=lambda item: item[1])[:self.beta]) + # the aggregated delay of the operation is included with the summary `lookupsummary['aggrDelay']` + return closestnodes, lookupvalue, lookupsummary, lookupsummary['aggrDelay'] def get_closest_nodes_to(self, key: Hash): """ return the closest nodes to a given key from the local routing table (local perception of the network) """ # check if we actually have the value of KeyValueStore, and return the content - closerNodes = self.rt.get_closest_nodes_to(key) + closernodes = self.rt.get_closest_nodes_to(key) val, ok = self.ks.read(key) - return closerNodes, val, ok + return closernodes, val, ok def provide_block_segment(self, segment) -> dict: """ looks for the closest nodes in the network, and sends them a """ - provideSummary = { - 'succesNodeIDs': [], - 'failedNodeIDs': [], + providesummary = { + 'succesNodeIDs': deque(), + 'failedNodeIDs': deque(), 'startTime': time.time(), 'aggrDelay': 0, } segH = Hash(segment) - closestNodes, _, lookupSummary, lookupDelay = self.lookup_for_hash(segH) - provideSummary['aggrDelay'] += lookupDelay - for cn in closestNodes: + closestnodes, _, lookupsummary, lookupdelay = self.lookup_for_hash(segH) + providesummary['aggrDelay'] += lookupdelay + for cn in closestnodes: try: - connection, conneDelay = self.network.connect_to_node(self.ID, cn) - provideSummary['aggrDelay'] += conneDelay - storeDelay = connection.store_segment(segment) - provideSummary['aggrDelay'] += storeDelay - provideSummary['succesNodeIDs'].append(cn) + connection, conndelay = self.network.connect_to_node(self.ID, cn) + providesummary['aggrDelay'] += conndelay + storedelay = connection.store_segment(segment) + providesummary['aggrDelay'] += storedelay + providesummary['succesNodeIDs'].append(cn) except ConnectionError: - provideSummary['failedNodeIDs'].append(cn) + providesummary['failedNodeIDs'].append(cn) - provideSummary.update({ - 'closestNodes': closestNodes.keys(), + providesummary.update({ + 'closestNodes': closestnodes.keys(), 'finishTime': time.time(), - 'contactedPeers': lookupSummary['connectionAttempts'], + 'contactedPeers': lookupsummary['connectionAttempts'], }) - return provideSummary, provideSummary['aggrDelay'] + return providesummary, providesummary['aggrDelay'] def store_segment(self, segment): segH = Hash(segment) @@ -175,7 +172,7 @@ class NodeStore(): def __init__(self): """ init the memory NodeStore """ - self.nodes = {} + self.nodes = defaultdict() def add_node(self, node: DHTClient): """ add or overide existing info about a node """ @@ -200,8 +197,8 @@ def len(self): class ConnectionError(Exception): """ custom connection error exection to notify an errored connection """ - def __init__(self, nodeID: int, error, time): - self.erroredNode = nodeID + def __init__(self, nodeid: int, error, time): + self.erroredNode = nodeid self.error = error self.errorTime = time @@ -211,18 +208,18 @@ def description(self) -> str: class Connection(): """ connection simbolizes the interaction that 2 DHTClients could have with eachother """ - def __init__(self, connID: int, f: int, to: DHTClient, delayRange): - self.connID = connID + def __init__(self, connid: int, f: int, to: DHTClient, delayrange): + self.connid = connid self.f = f self.to = to - if delayRange is not None: - self.delay = random.sample(delayRange, 1)[0] + if delayrange is not None: + self.delay = random.sample(delayrange, 1)[0] else: self.delay = 0 # ms def get_closest_nodes_to(self, key: Hash): - closerNodes, val, ok = self.to.get_closest_nodes_to(key) - return closerNodes, val, ok, self.delay + closernodes, val, ok = self.to.get_closest_nodes_to(key) + return closernodes, val, ok, self.delay def store_segment(self, segment): self.to.store_segment(segment) @@ -237,67 +234,67 @@ class DHTNetwork: """ serves a the shared point between all the nodes participating in the simulation, allows node to communicat with eachother without needing to implement an API or similar""" - def __init__(self, networkID: int, errorRate: int, delayRage): + def __init__(self, networkid: int, errorrate: int, delayrange): """ class initializer, it allows to define the networkID and the delays between nodes """ - self.networkID = networkID - self.errorRate = errorRate # % - self.delayRange = delayRage # list() in ms -> i.e., (5, 100) ms | None - self.nodeStore = NodeStore() - self.errorTracker = [] # every time that an error is tracked, add it to the queue - self.connectionTracker = [] # every time that a connection was stablished - self.connectionCnt = 0 + self.networkid = networkid + self.errorrate = errorrate # % + self.delayrange = delayrange # list() in ms -> i.e., (5, 100) ms | None + self.nodestore = NodeStore() + self.errortracker = deque() # every time that an error is tracked, add it to the queue + self.connectiontracker = deque() # every time that a connection was stablished + self.connectioncnt = 0 - def add_new_node(self, newNode: DHTClient): + def add_new_node(self, newnode: DHTClient): """ add a new node to the DHT network """ - self.nodeStore.add_node(newNode) + self.nodestore.add_node(newnode) - def connect_to_node(self, originNode: int, targetNode: int): + def connect_to_node(self, ognode: int, targetnode: int): """ get connection to the DHTclient target from the PeerStore and an associated delay or raise an error """ - self.connectionCnt += 1 + self.connectioncnt += 1 try: # check the error rate (avoid stablishing the connection if there is an error) - if random.randint(0, 99) < self.errorRate: - connerror = ConnectionError(targetNode, "simulated error", time.time()) - self.errorTracker.append(connerror) + if random.randint(0, 99) < self.errorrate: + connerror = ConnectionError(targetnode, "simulated error", time.time()) + self.errortracker.append(connerror) raise connerror - node = self.nodeStore.get_node(targetNode) - connection = Connection(self.connectionCnt, originNode, node, self.delayRange) - self.connectionTracker.append({ + node = self.nodestore.get_node(targetnode) + connection = Connection(self.connectioncnt, ognode, node, self.delayrange) + self.connectiontracker.append({ 'time': time.time(), - 'from': originNode, - 'to': targetNode, + 'from': ognode, + 'to': targetnode, 'delay': connection.delay}) return connection, connection.delay except NodeNotInStoreError as e: connerror = ConnectionError(e.missingNode, e.description, e.time) - self.errorTracker.append({ + self.errortracker.append({ 'time': connerror.errorTime, 'error': connerror.description}) raise connerror - def bootstrap_node(self, nodeid: int, bucketsize: int): # ( accuracy: int = 100 ) + def bootstrap_node(self, nodeid: int, bucketsize: int): # ( accuracy: int = 100 ) """ checks among all the existing nodes in the network, which are the correct ones to fill up the routing table of the given node """ # best way to know which nodes are the best nodes for a routing table, is to compose a rt itself # Accuracy = How many closest peers / K closest peers do we know (https://github.com/plprobelab/network-measurements/blob/master/results/rfm19-dht-routing-table-health.md) # TODO: generate a logic that selects the routing table with the given accuracy rt = RoutingTable(nodeid, bucketsize) - for node in self.nodeStore.get_nodes(): + for node in self.nodestore.get_nodes(): rt.new_discovered_peer(node) return rt.get_routing_nodes() def summary(self): """ return the summary of what happened in the network """ return { - 'total_nodes': self.nodeStore.len(), - 'attempts': self.connectionCnt, - 'successful': len(self.connectionTracker), - 'failures': len(self.errorTracker)} + 'total_nodes': self.nodestore.len(), + 'attempts': self.connectioncnt, + 'successful': len(self.connectiontracker), + 'failures': len(self.errortracker)} def len(self) -> int: - return self.nodeStore.len() + return self.nodestore.len() diff --git a/dht/hashes.py b/dht/hashes.py index 3e1bf4c..806b777 100644 --- a/dht/hashes.py +++ b/dht/hashes.py @@ -1,14 +1,16 @@ import ctypes +from bitarray.util import int2ba # TODO: swapt hash to SHA256 with the possibility of reusing a given seed for reproducibility # at the moment, I'm using the default 64bit hash function from Python HASH_BASE = 64 -class Hash(): + +class Hash: def __init__(self, value): """ basic representation of a Hash object for the DHT, which includes the main utilities related to a hash """ self.value = self.hash_key(value) - self.bitArray = BitArray(self.value, HASH_BASE) + self.bitarray = BitArray(self.value, HASH_BASE) # TODO: the hash values could be reproduced if the ENVIRONMENT VARIABLE PYTHONHASHSEED is set to a 64 bit integer https://docs.python.org/3/using/cmdline.html#envvar-PYTHONHASHSEED def hash_key(self, key): @@ -20,62 +22,41 @@ def hash_key(self, key): # ensure that the hash is unsigned return ctypes.c_ulong(h).value - def xor_to(self, targetHash:int) -> int: + def xor_to(self, targetint: int) -> int: """ Returns the XOR distance between both hash values""" - distance = self.value ^ targetHash - return ctypes.c_ulong(distance).value + return ctypes.c_ulong(self.value ^ targetint).value - def xor_to_hash(self, targetHash) -> int: + def xor_to_hash(self, targethash) -> int: """ Returns the XOR distance between both hash values""" - distance = self.value ^ targetHash.value - return ctypes.c_ulong(distance).value + return ctypes.c_ulong(self.value ^ targethash.value).value - def shared_upper_bits(self, targetHash) -> int: + def shared_upper_bits(self, targethash) -> int: """ returns the number of upper sharing bits between 2 hash values """ - targetBits = BitArray(targetHash.value, HASH_BASE) - sBits = self.bitArray.upper_sharing_bits(targetBits) - return sBits + return self.bitarray.upper_sharing_bits(targethash.bitarray) def __repr__(self) -> str: return str(hex(self.value)) - def __eq__(self, targetHash) -> bool: - return self.value == targetHash.value - - def is_smaller_than(self, targetHash) -> bool: - return self.value < targetHash.value + def __eq__(self, targethash) -> bool: + return self.value == targethash.value - def is_greater_than(self, targetHash) -> bool: - return self.value > targetHash.value -class BitArray(): +class BitArray: """ array representation of an integer using only bits, ideal for finding matching upper bits""" - def __init__(self, intValue:int, base:int): + def __init__(self, uintval:int, base:int): self.base = base - self.bitArray = self.bin(intValue) + self.bitarray = int2ba(uintval, length=base) def __repr__(self): - return str(self.bitArray) - - def upper_sharing_bits(self, targetBitArray) -> int: - sBits = 0 - for i, bit in enumerate(self.bitArray): - if bit == targetBitArray.get_x_bit(i): - sBits += 1 + return self.bitarray.to01() + + def upper_sharing_bits(self, targetba) -> int: + sbits = 0 + proc = self.bitarray ^ targetba.bitarray + for bit in proc: + if bit == 0: + sbits += 1 else: break - return sBits - - def get_x_bit(self, idx:int = 0): - return self.bitArray[idx] + return sbits - def bin(self, n): - s = "" - i = 1 << self.base-1 - while(i > 0) : - if((n & i) != 0) : - s += "1" - else : - s += "0" - i = i // 2 - return s diff --git a/dht/key_store.py b/dht/key_store.py index a6f5692..c9ff7f0 100644 --- a/dht/key_store.py +++ b/dht/key_store.py @@ -1,20 +1,22 @@ from dht.hashes import Hash +from collections import defaultdict -class KeyValueStore(): + +class KeyValueStore: """ Memory-storage unit that will keep track of each of the key-values that a DHT client has to keep locally""" def __init__(self): """ compose the storage unit in memory """ - self.storage = {} + self.storage = defaultdict() - def add(self, key:Hash, value): + def add(self, key: Hash, value): """ aggregates a new value to the store, or overrides it if it was already a value for the key """ self.storage[key.value] = value - def remove(self, key:Hash): + def remove(self, key: Hash): self.storage.pop(key.value) - def read(self, key:Hash): + def read(self, key: Hash): """ reads a value for the given Key, or return false if it wasn't found """ try: value = self.storage[key.value] @@ -24,6 +26,5 @@ def read(self, key:Hash): ok = False return value, ok - def summary(self) -> int: - """ returns the number of items stored in the local KeyValueStore """ - return len(self.storage) + def __len__(self): + return len(self.storage) diff --git a/dht/routing_table.py b/dht/routing_table.py index 64e4840..0f2b187 100644 --- a/dht/routing_table.py +++ b/dht/routing_table.py @@ -1,49 +1,50 @@ from ctypes import sizeof -from dht.hashes import Hash +from dht.hashes import Hash +from collections import deque, defaultdict, OrderedDict -class RoutingTable(): - def __init__(self, localNodeID:int, bucketSize:int) -> None: - self.localNodeID = localNodeID - self.bucketSize = bucketSize - self.kbuckets = [] - self.lastUpdated = 0 +class RoutingTable: + def __init__(self, localnodeid:int, bucketsize:int) -> None: + self.localnodeid = localnodeid + self.bucketsize = bucketsize + self.kbuckets = deque() + self.lastupdated = 0 # not really used at this time - def new_discovered_peer(self, nodeID:int): + def new_discovered_peer(self, nodeid:int): """ notify the routing table of a new discovered node in the network and check if it has a place in a given bucket """ # check matching bits - localNodeH = Hash(self.localNodeID) - nodeH = Hash(nodeID) - sBits = localNodeH.shared_upper_bits(nodeH) + localnodehash = Hash(self.localnodeid) + nodehash = Hash(nodeid) + sbits = localnodehash.shared_upper_bits(nodehash) # Check if there is a kbucket already at that place - while len(self.kbuckets) < sBits+1: + while len(self.kbuckets) < sbits+1: # Fill middle kbuckets if needed - self.kbuckets.append(KBucket(self.localNodeID, self.bucketSize)) + self.kbuckets.append(KBucket(self.localnodeid, self.bucketsize)) # check/update the bucket with the newest nodeID - self.kbuckets[sBits] = self.kbuckets[sBits].add_peer_to_bucket(nodeID) + self.kbuckets[sbits] = self.kbuckets[sbits].add_peer_to_bucket(nodeid) return self - def get_closest_nodes_to(self, key:Hash): + def get_closest_nodes_to(self, key: Hash): """ return the list of Nodes (in order) close to the given key in the routing table """ - closestNodes = {} + closestnodes = defaultdict() # check the distances for all the nodes in the rt for b in self.kbuckets: - for n in b.bucketNodes: + for n in b.bucketnodes: nH = Hash(n) dist = nH.xor_to_hash(key) - closestNodes[n] = dist + closestnodes[n] = dist # sort the dict based on dist - closestNodes = dict(sorted(closestNodes.items(), key=lambda item: item[1])[:self.bucketSize]) - return closestNodes + closestnodes = OrderedDict(sorted(closestnodes.items(), key=lambda item: item[1])[:self.bucketsize]) + return closestnodes def get_routing_nodes(self): # get the closest nodes to the peer - rtNodes = [] + rtnodes = deque() for b in self.kbuckets: - for n in b.bucketNodes: - rtNodes.append(n) - return rtNodes + for n in b.bucketnodes: + rtnodes.append(n) + return rtnodes def __repr__(self) -> str: s = "" @@ -52,68 +53,64 @@ def __repr__(self) -> str: return s def summary(self) -> str: - return self.__repr__() + return self.__repr__() -class KBucket(): + +class KBucket: """ single representation of a kademlia kbucket, which contains the closest nodes sharing X number of upper bits on their NodeID's Hashes """ - def __init__(self, ourNodeID:int, size:int): + def __init__(self, localnodeid: int, size: int): """ initialize the kbucket with setting a max size along some other control variables """ - self.localNodeID = ourNodeID - self.bucketNodes = [] - self.bucketSize = size - self.lastUpdated = 0 + self.localnodeid = localnodeid + self.bucketnodes = deque(maxlen=size) + self.bucketsize = size + self.lastupdated = 0 - def add_peer_to_bucket(self, nodeID:int): + def add_peer_to_bucket(self, nodeid: int): """ check if the new node is elegible to replace a further one """ # Check if the distance between our NodeID and the remote one - localNodeH = Hash(self.localNodeID) - nodeH = Hash(nodeID) - dist = localNodeH.xor_to_hash(nodeH) - - bucketDistances = self.get_distances_to_key(localNodeH) - if (self.len() > 0) and (self.len() >= self.bucketSize): - if bucketDistances[list(bucketDistances)[-1]] < dist: - pass + localnodehash = Hash(self.localnodeid) + nodehash = Hash(nodeid) + dist = localnodehash.xor_to_hash(nodehash) + bucketdistances = self.get_distances_to_key(localnodehash) + if (self.len() > 0) and (self.len() >= self.bucketsize): + if bucketdistances[deque(bucketdistances)[-1]] < dist: + pass else: # As the dist of the new node is smaller, add it to the list - bucketDistances[nodeID] = dist + bucketdistances[nodeid] = dist # Sort back the nodes with the new one and remove the last remaining item - bucketDistances = dict(sorted(bucketDistances.items(), key=lambda item: item[1])) - bucketDistances.pop(list(bucketDistances)[-1]) + bucketdistances = OrderedDict(sorted(bucketdistances.items(), key=lambda item: item[1])) + bucketdistances.pop(deque(bucketdistances)[-1]) # Update the new closest nodes in the bucket - self.bucketNodes = list(bucketDistances.keys()) + self.bucketnodes = deque(bucketdistances.keys(), maxlen=len(bucketdistances)) else: - self.bucketNodes.append(nodeID) + self.bucketnodes.append(nodeid) return self - def get_distances_to_key(self, key:Hash): + def get_distances_to_key(self, key: Hash): """ return the distances from all the nodes in the bucket to a given key """ - distances = {} - for nodeID in self.bucketNodes: - nodeH = Hash(nodeID) - dist = nodeH.xor_to_hash(key) - distances[nodeID] = dist - return dict(sorted(distances.items(), key=lambda item: item[1])) - - def get_x_nodes_close_to(self, key:Hash, numberOfNodes:int): + distances = defaultdict() + for nodeid in self.bucketnodes: + nodehash = Hash(nodeid) + dist = nodehash.xor_to_hash(key) + distances[nodeid] = dist + return OrderedDict(sorted(distances.items(), key=lambda item: item[1])) + + def get_x_nodes_close_to(self, key: Hash, nnodes: int): """ return the XX number of nodes close to a key from this bucket """ - print(f"checking in bucket {numberOfNodes} nodes") - distances = {} - if numberOfNodes <= 0: - return distances - distances = self.get_distances_to_key(key) + distances = self.get_distances_to_key(key) # Get only the necessary and closest nodes to the key from the kbucket - nodes = {} - for node, dist in distances.items(): - nodes[node] = dist - if len(nodes) <= numberOfNodes: - break - return nodes + for i, _ in list(distances.keys())[nnodes:]: # rely on std array, as the size is small and it can be sliced :) + distances.pop(i) + return distances def len(self) -> int: - return len(self.bucketNodes) + return len(self.bucketnodes) + + def __len__(self) -> int: + return len(self.bucketnodes) def __repr__(self) -> str: return f"{self.len()} nodes" diff --git a/pyproject.toml b/pyproject.toml index b494b9d..ef1cc73 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ maintainers = [ {name = "@cortze | Mikel Cortes ", email = "cortze@protonmail.com"}, ] requires-python = ">=3.10" -dependencies = [ "pandas", "jupyter" ] +dependencies = [ "bitarray" ] dynamic = [ "version", diff --git a/requirements.txt b/requirements.txt index bb91194..758713d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,102 +1 @@ -anyio==3.7.1 -argon2-cffi==21.3.0 -argon2-cffi-bindings==21.2.0 -arrow==1.2.3 -asttokens==2.2.1 -async-lru==2.0.3 -attrs==23.1.0 -Babel==2.12.1 -backcall==0.2.0 -beautifulsoup4==4.12.2 -bleach==6.0.0 -certifi==2023.7.22 -cffi==1.15.1 -charset-normalizer==3.2.0 -comm==0.1.3 -debugpy==1.6.7 -decorator==5.1.1 -defusedxml==0.7.1 -exceptiongroup==1.1.2 -executing==1.2.0 -fastjsonschema==2.18.0 -fqdn==1.5.1 -idna==3.4 -ipykernel==6.25.0 -ipython==8.14.0 -ipython-genutils==0.2.0 -ipywidgets==8.0.7 -isoduration==20.11.0 -jedi==0.18.2 -Jinja2==3.1.2 -json5==0.9.14 -jsonpointer==2.4 -jsonschema==4.18.4 -jsonschema-specifications==2023.7.1 -jupyter==1.0.0 -jupyter-console==6.6.3 -jupyter-events==0.6.3 -jupyter-lsp==2.2.0 -jupyter_client==8.3.0 -jupyter_core==5.3.1 -jupyter_server==2.7.0 -jupyter_server_terminals==0.4.4 -jupyterlab==4.0.3 -jupyterlab-pygments==0.2.2 -jupyterlab-widgets==3.0.8 -jupyterlab_server==2.24.0 -MarkupSafe==2.1.3 -matplotlib-inline==0.1.6 -mistune==3.0.1 -nbclient==0.8.0 -nbconvert==7.7.3 -nbformat==5.9.1 -nest-asyncio==1.5.6 -notebook==7.0.0 -notebook_shim==0.2.3 -numpy==1.25.1 -overrides==7.3.1 -packaging==23.1 -pandas==2.0.3 -pandocfilters==1.5.0 -parso==0.8.3 -pexpect==4.8.0 -pickleshare==0.7.5 -platformdirs==3.9.1 -prometheus-client==0.17.1 -prompt-toolkit==3.0.39 -psutil==5.9.5 -ptyprocess==0.7.0 -pure-eval==0.2.2 -pycparser==2.21 -Pygments==2.15.1 -python-dateutil==2.8.2 -python-json-logger==2.0.7 -pytz==2023.3 -PyYAML==6.0.1 -pyzmq==25.1.0 -qtconsole==5.4.3 -QtPy==2.3.1 -referencing==0.30.0 -requests==2.31.0 -rfc3339-validator==0.1.4 -rfc3986-validator==0.1.1 -rpds-py==0.9.2 -Send2Trash==1.8.2 -six==1.16.0 -sniffio==1.3.0 -soupsieve==2.4.1 -stack-data==0.6.2 -terminado==0.17.1 -tinycss2==1.2.1 -tomli==2.0.1 -tornado==6.3.2 -traitlets==5.9.0 -typing_extensions==4.7.1 -tzdata==2023.3 -uri-template==1.3.0 -urllib3==2.0.4 -wcwidth==0.2.6 -webcolors==1.13 -webencodings==0.5.1 -websocket-client==1.6.1 -widgetsnbextension==4.0.8 +bitarray==2.8.0 diff --git a/tests/__init__.py b/tests/__init__.py index 39787eb..786a803 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,3 +1,3 @@ from tests.test_hashes import * from tests.test_routing import * -from tests.test_dht import * +from tests.test_network import * diff --git a/tests/test_dht.py b/tests/test_network.py similarity index 54% rename from tests/test_dht.py rename to tests/test_network.py index 6d3032f..099714b 100644 --- a/tests/test_dht.py +++ b/tests/test_network.py @@ -12,18 +12,18 @@ def test_network(self): k = 20 size = 200 id = 0 - errorRate = 0 # apply an error rate of 0 (to check if the logic pases) - delayRange = None # ms - network, _ = generateNetwork(k, size, id, errorRate, delayRange) + errorrate = 0 # apply an error rate of 0 (to check if the logic pases) + delayrange = None # ms + network, _ = generate_network(k, size, id, errorrate, delayrange) # check total size of the network - totalnodes = network.nodeStore.len() + totalnodes = network.nodestore.len() self.assertEqual(size, totalnodes) # check if we could have the correct rt for any specific nodeIDs for nodeID in range(k): # the test should actually fail if a Exception is raised - to = random.sample(range(size-1), 1) + to = random.sample(range(size), 1) _, _ = network.connect_to_node(nodeID, to[0]) # force the failure of the connections attempting to connect a peer that doesn't exist @@ -41,9 +41,9 @@ def test_network_initialization(self): """ test that the routing tables for each nodeID are correctly initialized """ k = 2 size = 20 - errorRate = 0 # apply an error rate of 0 (to check if the logic pases) - delayRange = None # ms - network, nodes = generateNetwork(k, size, 0, errorRate, delayRange) + errorrate = 0 # apply an error rate of 0 (to check if the logic pases) + delayrange = None # ms + network, nodes = generate_network(k, size, 0, errorrate, delayrange) for node in nodes: summary = node.bootstrap() @@ -58,34 +58,34 @@ def test_dht_interop(self): k = 10 size = 500 id = 0 - errorRate = 0 # apply an error rate of 0 (to check if the logic pases) - delayRange = None # ms - _, nodes = generateNetwork(k, size, id, errorRate, delayRange) + errorrate = 0 # apply an error rate of 0 (to check if the logic pases) + delayrange = None # ms + _, nodes = generate_network(k, size, id, errorrate, delayrange) for node in nodes: node.bootstrap() - randomSegment = "this is a simple segment of code" - segH = Hash(randomSegment) + randomsegment = "this is a simple segment of code" + segH = Hash(randomsegment) # use random node as lookup point - randomNodeID = random.sample(range(1, size), 1)[0] - rNode = nodes[randomNodeID] - self.assertNotEqual(rNode.network.len(), 0) + randomid = random.sample(range(1, size), 1)[0] + rnode = nodes[randomid] + self.assertNotEqual(rnode.network.len(), 0) - closestNodes, val, summary, _ = rNode.lookup_for_hash(key=segH) - self.assertEqual(val, "") # empty val, nothing stored yet - self.assertEqual(len(closestNodes), k) + closestnodes, val, summary, _ = rnode.lookup_for_hash(key=segH) + self.assertEqual(val, "") # empty val, nothing stored yet + self.assertEqual(len(closestnodes), k) # print(f"lookup operation with {size} nodes done in {summary['finishTime'] - summary['startTime']}") - # validation of the lookup closestNodes vs the actual closestNodes in the network - validationClosestNodes = {} + # validation of the lookup closestnodes vs the actual closestnodes in the network + validationclosestnodes = {} for node in nodes: nodeH = Hash(node.ID) dist = nodeH.xor_to_hash(segH) - validationClosestNodes[node.ID] = dist + validationclosestnodes[node.ID] = dist - validationClosestNodes = dict(sorted(validationClosestNodes.items(), key=lambda item: item[1])[:k]) - for i, node in enumerate(closestNodes): - self.assertEqual((node in validationClosestNodes), True) + validationclosestnodes = dict(sorted(validationclosestnodes.items(), key=lambda item: item[1])[:k]) + for i, node in enumerate(closestnodes): + self.assertEqual((node in validationclosestnodes), True) def test_dht_error_rate_on_connection(self): """ test if the nodes in the network actually route to the closest peer, and implicidly, if the DHTclient interface works """ @@ -93,8 +93,8 @@ def test_dht_error_rate_on_connection(self): size = 2 id = 0 errorrate = 50 # apply an error rate of 0 (to check if the logic pases) - delayRange = None # ms - network, nodes = generateNetwork(k, size, id, errorrate, delayRange) + delayrange = None # ms + network, nodes = generate_network(k, size, id, errorrate, delayrange) for node in nodes: node.bootstrap() @@ -119,77 +119,77 @@ def test_dht_provide_and_lookup(self): k = 10 size = 500 id = 0 - errorRate = 0 # apply an error rate of 0 (to check if the logic pases) - delayRange = None # ms - _, nodes = generateNetwork(k, size, id, errorRate, delayRange) + errorrate = 0 # apply an error rate of 0 (to check if the logic pases) + delayrange = None # ms + _, nodes = generate_network(k, size, id, errorrate, delayrange) for node in nodes: node.bootstrap() - randomSegment = "this is a simple segment of code" - segH = Hash(randomSegment) + rsegment = "this is a simple segment of code" + segH = Hash(rsegment) # use random node as lookup point - publisherNodeID = random.sample(range(1, size), 1)[0] - pNode = nodes[publisherNodeID] - self.assertNotEqual(pNode.network.len(), 0) + pnodeid = random.sample(range(1, size), 1)[0] + pnode = nodes[pnodeid] + self.assertNotEqual(pnode.network.len(), 0) - provideSummary, _ = pNode.provide_block_segment(randomSegment) - self.assertEqual(len(provideSummary["closestNodes"]), k) + psummary, _ = pnode.provide_block_segment(rsegment) + self.assertEqual(len(psummary["closestNodes"]), k) # print(f"provide operation with {size} nodes done in {provideSummary['finishTime'] - provideSummary['startTime']}") - interestedNodeID = random.sample(range(1, size), 1)[0] - iNode = nodes[interestedNodeID] - closestNodes, val, summary, _ = iNode.lookup_for_hash(key=segH) - self.assertEqual(randomSegment, val) + interestednodeid = random.sample(range(1, size), 1)[0] + inode = nodes[interestednodeid] + closestnodes, val, summary, _ = inode.lookup_for_hash(key=segH) + self.assertEqual(rsegment, val) def test_aggregated_delays(self): """ test if the interaction between the nodes in the network actually generate a compounded delay """ k = 10 size = 500 id = 0 - errorRate = 0 # apply an error rate of 0 (to check if the logic pases) + errorrate = 0 # apply an error rate of 0 (to check if the logic pases) maxDelay = 101 minDelay = 10 - delayRange = range(minDelay, maxDelay, 10) # ms - _, nodes = generateNetwork(k, size, id, errorRate, delayRange) + delayrange = range(minDelay, maxDelay, 10) # ms + _, nodes = generate_network(k, size, id, errorrate, delayrange) for node in nodes: node.bootstrap() randomSegment = "this is a simple segment of code" segH = Hash(randomSegment) # use random node as lookup point - publisherNodeID = random.sample(range(1, size), 1)[0] - pNode = nodes[publisherNodeID] - self.assertNotEqual(pNode.network.len(), 0) - - provideSummary, aggrdelay = pNode.provide_block_segment(randomSegment) - self.assertEqual(len(provideSummary["closestNodes"]), k) - - lookupPeers = provideSummary['contactedPeers'] - providePeers = len(provideSummary['succesNodeIDs']) - totdelays = lookupPeers * 2 + providePeers + 2 - bestDelay = totdelays * minDelay - worstDelay = totdelays * maxDelay - self.assertGreater(aggrdelay, bestDelay) - self.assertLess(aggrdelay, worstDelay) - - interestedNodeID = random.sample(range(1, size), 1)[0] - iNode = nodes[interestedNodeID] - closestNodes, val, summary, aggrdelay = iNode.lookup_for_hash(key=segH) + publishernodeid = random.sample(range(1, size), 1)[0] + pnode = nodes[publishernodeid] + self.assertNotEqual(pnode.network.len(), 0) + + providesummary, aggrdelay = pnode.provide_block_segment(randomSegment) + self.assertEqual(len(providesummary["closestNodes"]), k) + + lookuppeers = providesummary['contactedPeers'] + providepeers = len(providesummary['succesNodeIDs']) + totdelays = lookuppeers * 2 + providepeers + 2 + bestdelay = totdelays * minDelay + worstdelay = totdelays * maxDelay + self.assertGreater(aggrdelay, bestdelay) + self.assertLess(aggrdelay, worstdelay) + + interestednodeid = random.sample(range(1, size), 1)[0] + inode = nodes[interestednodeid] + closestnodes, val, summary, aggrdelay = inode.lookup_for_hash(key=segH) self.assertEqual(randomSegment, val) - lookupPeers = summary['successfulCons'] - totdelays = lookupPeers * 2 - bestDelay = totdelays * minDelay - worstDelay = totdelays * maxDelay - self.assertGreater(aggrdelay, bestDelay) - self.assertLess(aggrdelay, worstDelay) + lookuppeers = summary['successfulCons'] + totdelays = lookuppeers * 2 + bestdelay = totdelays * minDelay + worstdelay = totdelays * maxDelay + self.assertGreater(aggrdelay, bestdelay) + self.assertLess(aggrdelay, worstdelay) -def generateNetwork(k, size, id, errorRate, delayRate): - network = DHTNetwork(id, errorRate, delayRate) - nodeIDs = range(1, size+1, 1) +def generate_network(k, size, id, errorrate, delayrate): + network = DHTNetwork(id, errorrate, delayrate) + nodeids = range(0, size, 1) nodes = [] - for i in nodeIDs: - n = DHTClient(nodeid=i, network=network, kbucketSize=k, a=1, b=k, stuckMaxCnt=3) + for i in nodeids: + n = DHTClient(nodeid=i, network=network, kbucketsize=k, a=1, b=k, steptostop=3) network.add_new_node(n) nodes.append(n) return network, nodes diff --git a/tests/test_routing.py b/tests/test_routing.py index 9f75f97..2631628 100644 --- a/tests/test_routing.py +++ b/tests/test_routing.py @@ -4,51 +4,51 @@ class TestDHTHashes(unittest.TestCase): def test_kbucket(self): - bucketSize = 2 - localID = 1 - localH = Hash(localID) - remoteIDs = [2, 3, 4] + bucketsize = 2 + localid = 1 + localhash = Hash(localid) + remoteids = [2, 3, 4] - kbucket = KBucket(localID, bucketSize) - ogDistances = [] - for id in remoteIDs: + kbucket = KBucket(localid, bucketsize) + ogs = [] + for id in remoteids: h = Hash(id) - ogDistances.append(localH.xor_to_hash(h)) + ogs.append(localhash.xor_to_hash(h)) kbucket.add_peer_to_bucket(id) # Firts, check the bucket didn't go beyond the size - self.assertEqual(kbucket.len(), bucketSize) + self.assertEqual(len(kbucket), bucketsize) - distances = ogDistances.copy() + distances = ogs.copy() # Second, ensure that the bucket has in fact the closest nodeIDs to the local ID - closeNodesInBucket = kbucket.get_x_nodes_close_to(localH, bucketSize) + closeNodesInBucket = kbucket.get_x_nodes_close_to(localhash, bucketsize) for node in closeNodesInBucket: minDist = min(distances) - minIdx = get_index_of_value(ogDistances, minDist) - self.assertEqual(node, remoteIDs[minIdx]) + minIdx = get_index_of_value(ogs, minDist) + self.assertEqual(node, remoteids[minIdx]) distances = remove_item_from_array(distances, get_index_of_value(distances, minDist)) def test_routing_table(self): - totalNodes = 40 - bucketSize = 5 - localID = 1 - localH = Hash(localID) - remoteIDs = range(localID+1, localID+1+totalNodes, 1) + totalnodes = 40 + bucketsize = 5 + localid = 1 + localhash = Hash(localid) + remoteids = range(localid+1, localid+1+totalnodes, 1) - routingTable = RoutingTable(localID, bucketSize) + rt = RoutingTable(localid, bucketsize) - bucketControl = {} - sharedBits = {} + bucketcontrol = {} + sbits = {} distances = [] - for id in remoteIDs: + for id in remoteids: idH = Hash(id) - sBits = localH.shared_upper_bits(idH) - dist = localH.xor_to_hash(idH) - if sBits not in bucketControl.keys(): - bucketControl[sBits] = 0 - bucketControl[sBits] += 1 - sharedBits[id] = sBits - routingTable.new_discovered_peer(id) + sBits = localhash.shared_upper_bits(idH) + dist = localhash.xor_to_hash(idH) + if sBits not in bucketcontrol.keys(): + bucketcontrol[sBits] = 0 + bucketcontrol[sBits] += 1 + sbits[id] = sBits + rt.new_discovered_peer(id) distances.append(dist) # print("summary of the rt:") @@ -58,29 +58,29 @@ def test_routing_table(self): # print(distances,'\n') # check that there is no missing bucket, and that max item per buckets is maintained - for i, b in enumerate(routingTable.kbuckets): - if i in bucketControl: - if bucketControl[i] > bucketSize: - self.assertEqual(b.len(), bucketSize) - else: - self.assertEqual(b.len(), bucketControl[i]) + for i, b in enumerate(rt.kbuckets): + if i in bucketcontrol: + if bucketcontrol[i] > bucketsize: + self.assertEqual(b.len(), bucketsize) + else: + self.assertEqual(b.len(), bucketcontrol[i]) else: self.assertEqual(b.len(), 0) - randomID = totalNodes + 100 - randomH = Hash(randomID) + randomid = totalnodes + 100 + randomhash = Hash(randomid) - rtNodes = [] - distanceToHash = [] - sharedBits = {} + rtnodes = [] + distancetohash = [] + sbits = {} # the loookup in the routing table will give us the nodes IN the rt with the least distance to hashes # Thus, only compare it with the IDs in the routing table - for b in routingTable.kbuckets: - for node in b.bucketNodes: - nodeH = Hash(node) - sharedBits[node] = nodeH.shared_upper_bits(randomH) - distanceToHash.append(nodeH.xor_to_hash(randomH)) - rtNodes.append(node) + for b in rt.kbuckets: + for node in b.bucketnodes: + nodehash = Hash(node) + sbits[node] = nodehash.shared_upper_bits(randomhash) + distancetohash.append(nodehash.xor_to_hash(randomhash)) + rtnodes.append(node) # print("summary of the lookup:") # print(localH.sharedUpperBits(randomH)) @@ -88,16 +88,16 @@ def test_routing_table(self): # print(rtNodes) # print(distanceToHash,'\n') - closestNodes = routingTable.get_closest_nodes_to(randomH) + closestnodes = rt.get_closest_nodes_to(randomhash) # print(closestNodes) # check if the closest nodes are actually the closest ones in the rt - distances_copy = distanceToHash.copy() - for node in closestNodes: - minDist = min(distances_copy) - minDistNode = rtNodes[get_index_of_value(distanceToHash, minDist)] - self.assertEqual(node, minDistNode) - distances_copy = remove_item_from_array(distances_copy, get_index_of_value(distances_copy,minDist)) + distances_copy = distancetohash.copy() + for node in closestnodes: + mindist = min(distances_copy) + mindistnode = rtnodes[get_index_of_value(distancetohash, mindist)] + self.assertEqual(node, mindistnode) + distances_copy = remove_item_from_array(distances_copy, get_index_of_value(distances_copy, mindist)) def get_index_of_value(array, value):