Skip to content

Commit

Permalink
Implement Reverse Christmas Tree format for declarations
Browse files Browse the repository at this point in the history
  • Loading branch information
johnousterhout committed Oct 8, 2024
1 parent dbff820 commit 63c5732
Show file tree
Hide file tree
Showing 35 changed files with 1,008 additions and 570 deletions.
4 changes: 2 additions & 2 deletions homa_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ ssize_t homa_reply(int sockfd, const void *message_buf, size_t length,
const union sockaddr_in_union *dest_addr, uint64_t id)
{
struct homa_sendmsg_args args;
struct iovec vec;
struct msghdr hdr;
struct iovec vec;
int result;

args.id = id;
Expand Down Expand Up @@ -117,8 +117,8 @@ int homa_send(int sockfd, const void *message_buf, size_t length,
uint64_t completion_cookie)
{
struct homa_sendmsg_args args;
struct iovec vec;
struct msghdr hdr;
struct iovec vec;
int result;

args.id = 0;
Expand Down
21 changes: 10 additions & 11 deletions homa_grant.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ inline int homa_grant_update_incoming(struct homa_rpc *rpc, struct homa *homa)
*/
void homa_grant_add_rpc(struct homa_rpc *rpc)
{
struct homa_rpc *candidate;
struct homa *homa = rpc->hsk->homa;
struct homa_peer *peer = rpc->peer;
struct homa_peer *peer_cand;
struct homa *homa = rpc->hsk->homa;
struct homa_rpc *candidate;

/* Make sure this message is in the right place in the grantable_rpcs
* list for its peer.
Expand Down Expand Up @@ -158,11 +158,11 @@ void homa_grant_add_rpc(struct homa_rpc *rpc)
*/
void homa_grant_remove_rpc(struct homa_rpc *rpc)
{
struct homa_rpc *head;
struct homa *homa = rpc->hsk->homa;
struct homa_peer *peer = rpc->peer;
struct homa_rpc *candidate;
struct homa *homa = rpc->hsk->homa;
__u64 time = get_cycles();
struct homa_rpc *head;

if (list_empty(&rpc->grantable_links))
return;
Expand Down Expand Up @@ -382,9 +382,6 @@ void homa_grant_check_rpc(struct homa_rpc *rpc)
*/
void homa_grant_recalc(struct homa *homa, int locked)
{
int i, active, try_again;
__u64 start;

/* The tricky part of this method is that we need to release
* homa->grantable_lock before actually sending grants, because
* (a) we need to hold the RPC lock while sending grants, and
Expand All @@ -393,6 +390,8 @@ void homa_grant_recalc(struct homa *homa, int locked)
* This array hold a copy of homa->active_rpcs.
*/
struct homa_rpc *active_rpcs[HOMA_MAX_GRANTS];
int i, active, try_again;
__u64 start;

tt_record("homa_grant_recalc starting");
INC_METRIC(grant_recalc_calls, 1);
Expand Down Expand Up @@ -423,8 +422,8 @@ void homa_grant_recalc(struct homa *homa, int locked)
homa->max_overcommit);
homa->num_active_rpcs = active;
for (i = 0; i < active; i++) {
int extra_levels;
struct homa_rpc *rpc = homa->active_rpcs[i];
int extra_levels;

active_rpcs[i] = rpc;
atomic_inc(&rpc->grants_in_progress);
Expand Down Expand Up @@ -563,10 +562,10 @@ int homa_grant_pick_rpcs(struct homa *homa, struct homa_rpc **rpcs,
*/
void homa_grant_find_oldest(struct homa *homa)
{
int max_incoming = homa->grant_window + 2*homa->fifo_grant_increment;
struct homa_rpc *rpc, *oldest;
struct homa_peer *peer;
__u64 oldest_birth;
int max_incoming = homa->grant_window + 2*homa->fifo_grant_increment;

oldest = NULL;
oldest_birth = ~0;
Expand Down Expand Up @@ -647,9 +646,9 @@ void homa_grant_free_rpc(struct homa_rpc *rpc)
*/
int homa_grantable_lock_slow(struct homa *homa, int recalc)
{
int result = 0;
__u64 start = get_cycles();
int starting_count = atomic_read(&homa->grant_recalc_count);
__u64 start = get_cycles();
int result = 0;

tt_record("beginning wait for grantable lock");
while (1) {
Expand Down
1 change: 1 addition & 0 deletions homa_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,7 @@ struct homa {
*/
int temp[4];
};

/**
* struct homa_skb_info - Additional information needed by Homa for each
* outbound DATA packet. Space is allocated for this at the very end of the
Expand Down
54 changes: 27 additions & 27 deletions homa_incoming.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ struct homa_gap *homa_gap_new(struct list_head *next, int start, int end)
*/
void homa_gap_retry(struct homa_rpc *rpc)
{
struct homa_gap *gap;
struct resend_header resend;
struct homa_gap *gap;

list_for_each_entry(gap, &rpc->msgin.gaps, links) {
resend.offset = htonl(gap->start);
Expand All @@ -103,10 +103,10 @@ void homa_gap_retry(struct homa_rpc *rpc)
void homa_add_packet(struct homa_rpc *rpc, struct sk_buff *skb)
{
struct data_header *h = (struct data_header *) skb->data;
struct homa_gap *gap, *dummy, *gap2;
int start = ntohl(h->seg.offset);
int length = homa_data_len(skb);
int end = start + length;
struct homa_gap *gap, *dummy, *gap2;

if ((start + length) > rpc->msgin.length) {
tt_record3("Packet extended past message end; id %d, offset %d, length %d",
Expand Down Expand Up @@ -210,12 +210,12 @@ int homa_copy_to_user(struct homa_rpc *rpc)
#define MAX_SKBS 20
#endif
struct sk_buff *skbs[MAX_SKBS];
int n = 0; /* Number of filled entries in skbs. */
int error = 0;
int start_offset = 0;
int end_offset = 0;
int i;
int error = 0;
__u64 start;
int n = 0; /* Number of filled entries in skbs. */
int i;

/* Tricky note: we can't hold the RPC lock while we're actually
* copying to user space, because (a) it's illegal to hold a spinlock
Expand Down Expand Up @@ -256,12 +256,12 @@ int homa_copy_to_user(struct homa_rpc *rpc)
for (i = 0; i < n; i++) {
struct data_header *h = (struct data_header *)
skbs[i]->data;
int offset = ntohl(h->seg.offset);
int pkt_length = homa_data_len(skbs[i]);
int offset = ntohl(h->seg.offset);
int buf_bytes, chunk_size;
struct iov_iter iter;
int copied = 0;
char *dst;
struct iov_iter iter;
int buf_bytes, chunk_size;

/* Each iteration of this loop copies to one
* user buffer.
Expand Down Expand Up @@ -335,26 +335,26 @@ int homa_copy_to_user(struct homa_rpc *rpc)
*/
void homa_dispatch_pkts(struct sk_buff *skb, struct homa *homa)
{
const struct in6_addr saddr = skb_canonical_ipv6_saddr(skb);
struct data_header *h = (struct data_header *) skb->data;
__u64 id = homa_local_id(h->common.sender_id);
int dport = ntohs(h->common.dport);
struct homa_sock *hsk;
struct homa_rpc *rpc = NULL;
struct sk_buff *next;

#ifdef __UNIT_TEST__
#define MAX_ACKS 2
#else
#define MAX_ACKS 10
#endif
const struct in6_addr saddr = skb_canonical_ipv6_saddr(skb);
struct data_header *h = (struct data_header *) skb->data;
__u64 id = homa_local_id(h->common.sender_id);
int dport = ntohs(h->common.dport);

/* Used to collect acks from data packets so we can process them
* all at the end (can't process them inline because that may
* require locking conflicting RPCs). If we run out of space just
* ignore the extra acks; they'll be regenerated later through the
* explicit mechanism.
*/
struct homa_ack acks[MAX_ACKS];
struct homa_rpc *rpc = NULL;
struct homa_sock *hsk;
struct sk_buff *next;
int num_acks = 0;

/* Find the appropriate socket.*/
Expand Down Expand Up @@ -536,8 +536,8 @@ void homa_dispatch_pkts(struct sk_buff *skb, struct homa *homa)
*/
void homa_data_pkt(struct sk_buff *skb, struct homa_rpc *rpc)
{
struct homa *homa = rpc->hsk->homa;
struct data_header *h = (struct data_header *) skb->data;
struct homa *homa = rpc->hsk->homa;

tt_record4("incoming data packet, id %d, peer 0x%x, offset %d/%d",
homa_local_id(h->common.sender_id),
Expand Down Expand Up @@ -760,12 +760,12 @@ void homa_unknown_pkt(struct sk_buff *skb, struct homa_rpc *rpc)
*/
void homa_cutoffs_pkt(struct sk_buff *skb, struct homa_sock *hsk)
{
struct cutoffs_header *h = (struct cutoffs_header *) skb->data;
const struct in6_addr saddr = skb_canonical_ipv6_saddr(skb);
struct homa_peer *peer;
int i;
struct cutoffs_header *h = (struct cutoffs_header *) skb->data;
struct homa_peer *peer = homa_peer_find(hsk->homa->peers,
&saddr, &hsk->inet);

peer = homa_peer_find(hsk->homa->peers, &saddr, &hsk->inet);
if (!IS_ERR(peer)) {
peer->unsched_cutoffs[0] = INT_MAX;
for (i = 1; i < HOMA_MAX_PRIORITIES; i++)
Expand All @@ -789,8 +789,8 @@ void homa_need_ack_pkt(struct sk_buff *skb, struct homa_sock *hsk,
struct common_header *h = (struct common_header *) skb->data;
const struct in6_addr saddr = skb_canonical_ipv6_saddr(skb);
__u64 id = homa_local_id(h->sender_id);
struct ack_header ack;
struct homa_peer *peer;
struct ack_header ack;

tt_record1("Received NEED_ACK for id %d", id);

Expand Down Expand Up @@ -843,8 +843,8 @@ void homa_need_ack_pkt(struct sk_buff *skb, struct homa_sock *hsk,
void homa_ack_pkt(struct sk_buff *skb, struct homa_sock *hsk,
struct homa_rpc *rpc)
{
struct ack_header *h = (struct ack_header *) skb->data;
const struct in6_addr saddr = skb_canonical_ipv6_saddr(skb);
struct ack_header *h = (struct ack_header *) skb->data;
int i, count;

if (rpc != NULL) {
Expand Down Expand Up @@ -983,8 +983,8 @@ void homa_abort_rpcs(struct homa *homa, const struct in6_addr *addr,
int port, int error)
{
struct homa_socktab_scan scan;
struct homa_sock *hsk;
struct homa_rpc *rpc, *tmp;
struct homa_sock *hsk;

rcu_read_lock();
for (hsk = homa_socktab_start_scan(homa->port_map, &scan);
Expand Down Expand Up @@ -1181,11 +1181,11 @@ int homa_register_interests(struct homa_interest *interest,
struct homa_rpc *homa_wait_for_message(struct homa_sock *hsk, int flags,
__u64 id)
{
int error, blocked = 0, polled = 0;
struct homa_rpc *result = NULL;
struct homa_interest interest;
struct homa_rpc *rpc = NULL;
uint64_t poll_start, now;
int error, blocked = 0, polled = 0;

/* Each iteration of this loop finds an RPC, but it might not be
* in a state where we can return it (e.g., there might be packets
Expand Down Expand Up @@ -1375,10 +1375,10 @@ struct homa_rpc *homa_wait_for_message(struct homa_sock *hsk, int flags,
struct homa_interest *homa_choose_interest(struct homa *homa,
struct list_head *head, int offset)
{
__u64 busy_time = get_cycles() - homa->busy_cycles;
struct homa_interest *backup = NULL;
struct list_head *pos;
struct homa_interest *interest;
__u64 busy_time = get_cycles() - homa->busy_cycles;
struct list_head *pos;

list_for_each(pos, head) {
interest = (struct homa_interest *) (((char *) pos) - offset);
Expand All @@ -1405,8 +1405,8 @@ struct homa_interest *homa_choose_interest(struct homa *homa,
*/
void homa_rpc_handoff(struct homa_rpc *rpc)
{
struct homa_interest *interest;
struct homa_sock *hsk = rpc->hsk;
struct homa_interest *interest;

if ((atomic_read(&rpc->flags) & RPC_HANDING_OFF)
|| !list_empty(&rpc->ready_links))
Expand Down
34 changes: 19 additions & 15 deletions homa_offload.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ int homa_offload_init(void)
*/
int homa_offload_end(void)
{
int res1 = inet_del_offload(&homa_offload, IPPROTO_HOMA);
int res2 = inet6_del_offload(&homa_offload, IPPROTO_HOMA);
int res1 = inet_del_offload(&homa_offload, IPPROTO_HOMA);

return res1 ? res1 : res2;
}
Expand Down Expand Up @@ -131,6 +131,7 @@ struct sk_buff *homa_tcp_gro_receive(struct list_head *held_list,
{
struct common_header *h = (struct common_header *)
skb_transport_header(skb);

// tt_record4("homa_tcp_gro_receive got type 0x%x, flags 0x%x, "
// "urgent 0x%x, id %d", h->type, h->flags,
// ntohs(h->urgent), homa_local_id(h->sender_id));
Expand Down Expand Up @@ -277,20 +278,21 @@ struct sk_buff *homa_gro_receive(struct list_head *held_list,
* gro_list by the caller, so it will be considered for merges
* in the future.
*/
struct sk_buff *held_skb;
struct sk_buff *result = NULL;
struct homa_offload_core *offload_core = &per_cpu(homa_offload_core,
raw_smp_processor_id());
__u64 now = get_cycles();
int busy = (now - offload_core->last_gro) < homa->gro_busy_cycles;
__u32 hash;
__u64 saved_softirq_metric, softirq_cycles;
struct homa_offload_core *offload_core;
struct sk_buff *result = NULL;
__u64 *softirq_cycles_metric;
struct data_header *h_new = (struct data_header *)
skb_transport_header(skb);
struct data_header *h_new;
struct sk_buff *held_skb;
__u64 now = get_cycles();
int priority;
__u32 saddr;
__u32 hash;
int busy;

h_new = (struct data_header *) skb_transport_header(skb);
offload_core = &per_cpu(homa_offload_core, raw_smp_processor_id());
busy = (now - offload_core->last_gro) < homa->gro_busy_cycles;
offload_core->last_active = now;
if (skb_is_ipv6(skb)) {
priority = ipv6_hdr(skb)->priority;
Expand Down Expand Up @@ -468,11 +470,11 @@ void homa_gro_gen2(struct sk_buff *skb)
* balancing.
*/
struct data_header *h = (struct data_header *) skb_transport_header(skb);
int i;
int this_core = raw_smp_processor_id();
struct homa_offload_core *offload_core;
int candidate = this_core;
__u64 now = get_cycles();
struct homa_offload_core *offload_core;
int i;

for (i = CORES_TO_CHECK; i > 0; i--) {
candidate++;
Expand Down Expand Up @@ -523,11 +525,12 @@ void homa_gro_gen3(struct sk_buff *skb)
* load balancer.
*/
struct data_header *h = (struct data_header *) skb_transport_header(skb);
int i, core;
__u64 now, busy_time;
int *candidates = per_cpu(homa_offload_core, raw_smp_processor_id())
.gen3_softirq_cores;
int *candidates;
int i, core;

candidates = per_cpu(homa_offload_core,
raw_smp_processor_id()).gen3_softirq_cores;
now = get_cycles();
busy_time = now - homa->busy_cycles;

Expand Down Expand Up @@ -567,6 +570,7 @@ void homa_gro_gen3(struct sk_buff *skb)
int homa_gro_complete(struct sk_buff *skb, int hoffset)
{
struct data_header *h = (struct data_header *) skb_transport_header(skb);

// tt_record4("homa_gro_complete type %d, id %d, offset %d, count %d",
// h->common.type, homa_local_id(h->common.sender_id),
// ntohl(h->seg.offset),
Expand Down
Loading

0 comments on commit 63c5732

Please sign in to comment.