Skip to content

Commit

Permalink
Added nicbufs and ooo analyzers to tthoma.py
Browse files Browse the repository at this point in the history
  • Loading branch information
johnousterhout committed Dec 6, 2023
1 parent e0ce68f commit e1dbc38
Showing 1 changed file with 250 additions and 19 deletions.
269 changes: 250 additions & 19 deletions util/tthoma.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@
# the following fields:
# xmit: Time when ip*xmit was invoked
# nic: Time when the NIC transmitted the packet (if available)
# gro: Time when GRO received (the first bytes of) the packet
# gro: Time when GRO received the packet
# softirq: Time when homa_softirq processed the packet
# free: Time when skb was freed (after copying to application)
# id: RPC id on the sender
# offset: Offset of the data in the packet within its message
# length: # bytes of message data in this packet
Expand Down Expand Up @@ -611,12 +612,14 @@ def __copy_out_start(self, trace, time, core, match, interests):
def __copy_out_done(self, trace, time, core, match, interests):
num_bytes = int(match.group(1))
id = int(match.group(2))
offset = int(match.group(3))
for interest in interests:
interest.tt_copy_out_done(trace, time, core, id, num_bytes)
interest.tt_copy_out_done(trace, time, core, id, num_bytes, offset)

patterns.append({
'name': 'copy_out_done',
'regexp': 'finished copying ([-0-9.]+) bytes for id ([-0-9.]+)'
'regexp': 'finished copying ([0-9.]+) bytes for id ([0-9.]+), '
'.*last offset ([0-9.]+)'
})

def __free_skbs(self, trace, time, core, match, interests):
Expand Down Expand Up @@ -976,7 +979,7 @@ def tt_copy_out_start(self, trace, time, core, id):
stats = trace['copy']
stats['out_start'][core] = time

def tt_copy_out_done(self, trace, time, core, id, num_bytes):
def tt_copy_out_done(self, trace, time, core, id, num_bytes, offset):
global options
stats = trace['copy']
if core in stats['out_start']:
Expand Down Expand Up @@ -1909,6 +1912,210 @@ def output(self):
core_data['max_backlog'] * 1e-3,
core_data['max_backlog_time']))

#------------------------------------------------
# Analyzer: nicbufs
#------------------------------------------------
class AnalyzeNicbufs:
"""
Analyzes lifetimes of skbs for incoming packets to compute total buffer
usage for each channel and underflows of NIC buffer caches (based on
caching mechanism of Mellanox mlx5 driver).
"""

def __init__(self, dispatcher):
dispatcher.interest('AnalyzePackets')
dispatcher.interest('AnalyzeRpcs')

def output(self):
global packets, rpcs

# List of <time, type, id, core, length> records, where type is
# "alloc" or "free", id is a packet id, core is the core where
# homa_gro_receive processed the packet (in the form "node.core"),
# and length is the number of bytes consumed by the packet.
events = []

# Maps from core id (node.core) to the total number of bytes
# received so far by homa_gro_receive on that core.
core_bytes = defaultdict(lambda : 0)

# Maps from packet id to a <gro_time, core_bytes> tuple, where
# gro_time is the time when the packet was processed by homa_gro_receive
# and core_bytes is the value of core_bytes just before the packet
# was allocated.
pkt_allocs = {}

# Maps from core id to <time, active_bytes, pkid, gro_time>, where
# active_bytes is the largest number of active skb bytes seen for that
# core, time is the time when some of those bytes were finally freed,
# pid is the id of the packet freed at time, and gro_time is the time
# when that packet was processed by homa_gro_receive.
core_max = defaultdict(lambda : [0, 0, '', 0])

# Scan all packets to build the events list. Note: change packet
# ids to refer to those on the receiver, not sender.
for pkt in packets.values():
if (not 'gro' in pkt) or (not 'length' in pkt):
continue
rpc_id = pkt['id'] ^ 1
pkid = '%d:%d' % (rpc_id, pkt['offset'])
rpc = rpcs[rpc_id]
core = '%s.%d' % (rpc['node'], rpc['gro_core'])
events.append([pkt['gro'], 'alloc', pkid, core, pkt['length']])
if 'free' in pkt:
events.append([pkt['free'], 'free', pkid, core, pkt['length']])

# Process the events in time order
events.sort(key=lambda t : t[0])
for time, type, pkid, core, length in events:
if type == 'alloc':
pkt_allocs[pkid] = [time, core_bytes[core]]
core_bytes[core] += length
elif type == 'free':
active_bytes = core_bytes[core] - pkt_allocs[pkid][1]
if active_bytes > core_max[core][1]:
core_max[core] = [time, active_bytes, pkid,
pkt_allocs[pkid][0]]
else:
print('Bogus event type %s in nicbufs analzyer' % (type),
file=sys.stderr)


print('\n-----------------')
print('Analyzer: nicbufs')
print('-----------------')
print('Maximum active NIC buffer space used for each GRO core over the')
print('life of the traces (assuming Mellanox mlx5 buffer cache):')
print('Active: Maximum bytes of NIC buffers used by the core (bytes')
print(' allocated on Core between when PktId was received and')
print(' when PktId was freed)')
print('PktId: Identifier (as seen by receiver) for the packet ')
print(' corresponding to Active')
print('Node: Node where Pktid was received')
print('Core: Core on which Pktid was received')
print('GRO: Time when homa_gro_receive processed Pktid on Core')
print('Free: Time when packet was freed after copying to user space')
print('Life: Packet lifetime (Free - GRO, usecs)\n')

maxes = []
for core, max in core_max.items():
time, active, pkid, gro_time = max
maxes.append([core, time, active, pkid, gro_time])
maxes.sort(key=lambda t : t[2], reverse = True)
print(' Active PktId Node Core GRO '
'Free Life')
print('-------------------------------------------------------------'
'------------')
for core, time, active, pkid, gro_time in maxes:
node, core_id = core.split('.')
print('%8d %20s %10s %4s %9.3f %9.3f %7.1f' % (active, pkid,
node, core_id, gro_time, time, time - gro_time))


#------------------------------------------------
# Analyzer: ooo
#------------------------------------------------
class AnalyzeOoo:
"""
Prints statistics about out-of-order packet arrivals. Also prints
details about out-of-order packets in the RPCs that experienced the
highest out-of-order delays (--verbose will print info for all OOO RPCs)
"""

def __init__(self, dispatcher):
dispatcher.interest('AnalyzeRpcs')

def output(self):
global rpcs, options

total_rpcs = 0
total_packets = 0
ooo_packets = 0

# Each element of this list contains a <delay, info> tuple describing
# all of the out-of-order packets in a single RPC: delay is the
# maximum delay experienced by any of the out-of-order packets, and
# info contains one or more lines of text, each line describing one
# ooo packet.
ooo_rpcs = []

# Scan the incoming packets in each RPC.
for id, rpc in rpcs.items():
if not 'gro_data' in rpc:
continue
total_rpcs += 1
pkts = rpc['gro_data']
total_packets += len(pkts)
highest_index = -1
highest_offset = -1
highest_prio = 0
max_delay = -1
info = ''
for i in range(len(pkts)):
time, offset, prio = pkts[i]
if offset > highest_offset:
highest_index = i;
highest_offset = offset
highest_prio = prio
continue

# This packet is out of order. Find the first packet received
# with higher offset than this one so we can compute how long
# this packet was delayed.
ooo_packets += 1
gap = highest_index
while gap > 0:
if pkts[gap-1][1] < offset:
break
gap -= 1
gap_time, gap_offset, gap_prio = pkts[gap]
delay = time - gap_time
if max_delay == -1:
rpc_id = '%12d' % (id)
else:
rpc_id = ' ' * 12
info += '%s %7d %10s %9.3f %6.1f %8d %3d %3d\n' % (rpc_id, offset,
rpc['node'], time, delay, highest_offset - offset,
prio, highest_prio)
if delay > max_delay:
max_delay = delay
if info:
ooo_rpcs.append([max_delay, info])

print('\n-----------------')
print('Analyzer: ooo')
print('-----------------')
print('RPCs with out-of-order packets: %d/%d (%.1f%%)' %
(len(ooo_rpcs), total_rpcs, 100.0*len(ooo_rpcs)/total_rpcs))
print('Out-of-order packets: %d/%d (%.1f%%)' %
(ooo_packets, total_packets, 100.0*ooo_packets/total_packets))

if not ooo_rpcs:
return
print('')
print('Information about out-of-order packets, grouped by RPC and sorted')
print('so that RPCs with largest OOO delays appear first (use --verbose')
print('to display all RPCs with OOO packets):')
print('RPC: Identifier for the RPC')
print('Offset: Offset of the out-of-order packet within the RPC')
print('Node: Node on which the packet was received')
print('Time: Time when the packet was received by homa_gro_receive')
print('Delay: Time - receive time for earliest packet with higher offset')
print('Gap: Offset of highest packet received before this one, minus')
print(' offset of this packet')
print('Prio: Priority of this packet')
print('Prev: Priority of the highest-offset packet received before ')
print(' this one')
print('')
print(' RPC Offset Node Time Delay Gap Prio Prev')
print('-------------------------------------------------------------------')
ooo_rpcs.sort(key=lambda t : t[0], reverse=True)
count = 0
for delay, info in ooo_rpcs:
if (count >= 20) and not options.verbose:
break
print(info, end='')
count += 1

#------------------------------------------------
# Analyzer: packet
Expand Down Expand Up @@ -2123,6 +2330,16 @@ class AnalyzePackets:
def __init__(self, dispatcher):
return

def init_trace(self, trace):
# Maps from RPC id to a list of active data packets for that RPC
# (packets that have been received by homa_gro_receive but not
# yet copied to user space).
self.active = defaultdict(list)

# Maps from core to a list of packets that have been copied out
# to user space by that core (but not yet freed).
self.copied = defaultdict(list)

def tt_ip_xmit(self, trace, time, core, id, offset):
global packets
packets[pkt_id(id, offset)]['xmit'] = time
Expand All @@ -2133,23 +2350,37 @@ def tt_mlx_data(self, trace, time, core, peer, id, offset):

def tt_gro_data(self, trace, time, core, peer, id, offset, prio):
global packets
p = pkt_id(id^1, offset)
packets[p]['gro'] = time
packets[p]['priority'] = prio
packets[p]['id'] = id^1
packets[p]['offset'] = offset
p = packets[pkt_id(id^1, offset)]
p['gro'] = time
p['priority'] = prio
p['id'] = id^1
p['offset'] = offset
self.active[id].append(p)

def tt_softirq_data(self, trace, time, core, id, offset, msg_length):
global packets
p = pkt_id(id^1, offset)
packets[p]['softirq'] = time
packets[p]['msg_length'] = msg_length
p = packets[pkt_id(id^1, offset)]
p['softirq'] = time
p['msg_length'] = msg_length

def tt_copy_out_done(self, trace, time, core, id, num_bytes, offset):
pkts = self.active[id]
for i in range(len(pkts) -1, -1, -1):
p = pkts[i]
if p['offset'] <= offset:
self.copied[core].append(p)
pkts.pop(i)

def tt_free_skbs(self, trace, time, core, num_skbs):
for p in self.copied[core]:
p['free'] = time
self.copied[core] = []

def tt_send_data(self, trace, time, core, id, offset, length):
global packets
p = pkt_id(id, offset)
packets[p]['id'] = id
packets[p]['length'] = length
p = packets[pkt_id(id, offset)]
p['id'] = id
p['length'] = length

def tt_send_grant(self, trace, time, core, id, offset, priority):
global grants
Expand All @@ -2161,9 +2392,9 @@ def tt_mlx_grant(self, trace, time, core, peer, id, offset):

def tt_gro_grant(self, trace, time, core, peer, id, offset, priority):
global grants
p = pkt_id(id^1, offset)
grants[p]['gro'] = time
grants[p]['id'] = id^1
g = grants[pkt_id(id^1, offset)]
g['gro'] = time
g['id'] = id^1

def tt_softirq_grant(self, trace, time, core, id, offset):
global grants
Expand Down Expand Up @@ -2312,7 +2543,7 @@ def tt_copy_out_start(self, trace, time, core, id):
if not 'copy_out_start' in rpcs[id]:
rpcs[id]['copy_out_start'] = time

def tt_copy_out_done(self, trace, time, core, id, num_bytes):
def tt_copy_out_done(self, trace, time, core, id, num_bytes, offset):
global rpcs
if not id in rpcs:
self.new_rpc(id, trace['node'])
Expand Down

0 comments on commit e1dbc38

Please sign in to comment.