Skip to content

Commit

Permalink
Transmit Homa packets using IPPROTO_TCP as the IP protocol
Browse files Browse the repository at this point in the history
Still uses old TSO packet format (doesn't depend on TSO to
increment the offset between segments)
  • Loading branch information
johnousterhout committed Jul 16, 2024
1 parent 0cb8533 commit 263f8c2
Show file tree
Hide file tree
Showing 9 changed files with 280 additions and 22 deletions.
59 changes: 49 additions & 10 deletions homa_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,9 @@ struct homa_cache_line {

/**
* struct common_header - Wire format for the first bytes in every Homa
* packet. This must partially match the format of a TCP header so that
* Homa can piggyback on TCP segmentation offload (and possibly other
* features, such as RSS).
* packet. This must (mostly) match the format of a TCP header to enable
* Homa packets to actually be transmitted as TCP packets (and thereby
* take advantage of TSO and other features).
*/
struct common_header {
/**
Expand All @@ -257,13 +257,21 @@ struct common_header {
__be16 dport;

/**
* @unused1: corresponds to the sequence number field in TCP headers;
* @sequence: corresponds to the sequence number field in TCP headers;
* must not be used by Homa, in case it gets incremented during TCP
* offload.
*/
__be32 unused1;
__be32 sequence;

__be32 unused2;
/**
* The fields below correspond to the acknowledgment field in TCP
* headers; not used by Homa, except for the low-order 8 bits, which
* specify the Homa packet type (one of the values in the
* homa_packet_type enum).
*/
__be16 ack1;
__u8 ack2;
__u8 type;

/**
* @doff: High order 4 bits holds the number of 4-byte chunks in a
Expand All @@ -272,17 +280,35 @@ struct common_header {
*/
__u8 doff;

/** @type: One of the values of &enum packet_type. */
__u8 type;
/**
* @flags: Holds TCP flags such as URG, ACK, etc. The special value
* HOMA_TCP_FLAGS is stored here to distinguish Homa-over-TCP packets
* from real TCP packets. It includes the SYN and RST flags,
* which TCP would never use together; must not include URG or FIN
* (TSO will turn off FIN for all but the last segment).
*/
__u8 flags;
#define HOMA_TCP_FLAGS 6

__u16 unused3;
/**
* @window: Corresponds to the window field in TCP headers. Not used
* by HOMA.
*/
__be16 window;

/**
* @checksum: not used by Homa, but must occupy the same bytes as
* the checksum in a TCP header (TSO may modify this?).*/
__be16 checksum;

__u16 unused4;
/**
* @urgent: occupies the same bytes as the urgent pointer in a TCP
* header. When Homa packets are transmitted over TCP, this has the
* special value HOMA_TCP_URGENT (which is set even though URG is
* not set) to indicate that the packet is actually a Homa packet.
*/
__be16 urgent;
#define HOMA_TCP_URGENT 0xb97d

/**
* @sender_id: the identifier of this RPC as used on the sender (i.e.,
Expand Down Expand Up @@ -3482,6 +3508,14 @@ static inline bool is_mapped_ipv4(const struct in6_addr x)
(x.in6_u.u6_addr32[2] == htonl(0xffff)));
}

static inline bool is_homa_pkt(struct sk_buff *skb)
{
struct iphdr *iph = ip_hdr(skb);
return ((iph->protocol == IPPROTO_HOMA) ||
((iph->protocol == IPPROTO_TCP) &&
(tcp_hdr(skb)->urg_ptr == htons(HOMA_TCP_URGENT))));
}

/**
* tt_addr() - Given an address, return a 4-byte id that will (hopefully)
* provide a unique identifier for the address in a timetrace record.
Expand Down Expand Up @@ -3574,6 +3608,8 @@ extern int homa_grant_update_incoming(struct homa_rpc *rpc,
extern int homa_gro_complete(struct sk_buff *skb, int thoff);
extern void homa_gro_gen2(struct sk_buff *skb);
extern void homa_gro_gen3(struct sk_buff *skb);
extern void homa_gro_hook_tcp(void);
extern void homa_gro_unhook_tcp(void);
extern struct sk_buff
*homa_gro_receive(struct list_head *gro_list,
struct sk_buff *skb);
Expand Down Expand Up @@ -3732,6 +3768,9 @@ extern char *homa_symbol_for_state(struct homa_rpc *rpc);
extern char *homa_symbol_for_type(uint8_t type);
extern int homa_sysctl_softirq_cores(struct ctl_table *table, int write,
void __user *buffer, size_t *lenp, loff_t *ppos);
extern struct sk_buff
*homa_tcp_gro_receive(struct list_head *held_list,
struct sk_buff *skb);
extern void homa_timer(struct homa *homa);
extern int homa_timer_main(void *transportInfo);
extern void homa_unhash(struct sock *sk);
Expand Down
2 changes: 2 additions & 0 deletions homa_incoming.c
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,8 @@ void homa_need_ack_pkt(struct sk_buff *skb, struct homa_sock *hsk,
ack.common.type = ACK;
ack.common.sport = h->dport;
ack.common.dport = h->sport;
ack.common.flags = HOMA_TCP_FLAGS;
ack.common.urgent = htons(HOMA_TCP_URGENT);
ack.common.sender_id = cpu_to_be64(id);
ack.num_acks = htons(homa_peer_get_acks(peer,
NUM_PEER_UNACKED_IDS, ack.acks));
Expand Down
86 changes: 83 additions & 3 deletions homa_offload.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,19 @@ static const struct net_offload homa_offload = {

extern struct homa *homa;

/* Pointers to TCP's net_offload structures. NULL means homa_gro_hook_tcp
* hasn't been called yet.
*/
const struct net_offload *tcp_net_offload = NULL;
const struct net_offload *tcp6_net_offload = NULL;

/*
* Identical to *tcp_net_offload except that the gro_receive function
* has been replaced.
*/
static struct net_offload hook_tcp_net_offload;
static struct net_offload hook_tcp6_net_offload;

/**
* homa_offload_init() - Invoked to enable GRO and GSO. Typically invoked
* when the Homa module loads.
Expand All @@ -45,6 +58,72 @@ int homa_offload_end(void)
return res1 ? res1 : res2;
}

/**
* homa_gro_hook_tcp() - Arranges for TCP gro_receive calls to be
* mediated by this file, so that Homa-over-TCP packets can be retrieved
* and funneled through Homa.
*/
void homa_gro_hook_tcp(void)
{
if (tcp_net_offload != NULL)
return;

tcp_net_offload = inet_offloads[IPPROTO_TCP];
hook_tcp_net_offload = *tcp_net_offload;
hook_tcp_net_offload.callbacks.gro_receive = homa_tcp_gro_receive;
inet_offloads[IPPROTO_TCP] = &hook_tcp_net_offload;

tcp6_net_offload = inet6_offloads[IPPROTO_TCP];
hook_tcp6_net_offload = *tcp6_net_offload;
hook_tcp6_net_offload.callbacks.gro_receive = homa_tcp_gro_receive;
inet6_offloads[IPPROTO_TCP] = &hook_tcp6_net_offload;
}

/**
* homa_gro_unhook_tcp() - Reverses the effects of a previous call to
* homa_hook_tcp_gro, so that TCP packets are now passed directly to
* Tcp's gro_receive function without mediation.
*/
void homa_gro_unhook_tcp(void)
{
if (tcp_net_offload == NULL)
return;
inet_offloads[IPPROTO_TCP] = tcp_net_offload;
tcp_net_offload = NULL;
inet6_offloads[IPPROTO_TCP] = tcp6_net_offload;
tcp6_net_offload = NULL;
}

/**
* homa_tcp_gro_receive() - Invoked instead of TCP's normal gro_receive function
* when hooking is enabled. Identifies Homa-over-TCP packets and passes them
* to Homa; sends real TCP packets to TCP's gro_receive function.
*/
struct sk_buff *homa_tcp_gro_receive(struct list_head *held_list,
struct sk_buff *skb)
{
struct common_header *h = (struct common_header *)
skb_transport_header(skb);
// tt_record4("homa_tcp_gro_receive got type 0x%x, flags 0x%x, "
// "urgent 0x%x, id %d", h->type, h->flags,
// ntohs(h->urgent), homa_local_id(h->sender_id));
if ((h->flags != HOMA_TCP_FLAGS) || (ntohs(h->urgent) != HOMA_TCP_URGENT))
return tcp_net_offload->callbacks.gro_receive(held_list, skb);

/* Change the packet's IP protocol to Homa so that it will get
* dispatched directly to Homa in the future.
*/
if (skb_is_ipv6(skb)) {
ipv6_hdr(skb)->nexthdr = IPPROTO_HOMA;
} else {
ip_hdr(skb)->check = ~csum16_add(csum16_sub(~ip_hdr(skb)->check,
htons(ip_hdr(skb)->protocol)),
htons(IPPROTO_HOMA));
ip_hdr(skb)->protocol = IPPROTO_HOMA;
}
return homa_gro_receive(held_list, skb);
}

/**
* homa_set_softirq_cpu() - Arrange for SoftIRQ processing of a packet to
* occur on a specific core (creates a socket flow table entry for the core,
Expand Down Expand Up @@ -434,9 +513,10 @@ void homa_gro_gen3(struct sk_buff *skb)
int homa_gro_complete(struct sk_buff *skb, int hoffset)
{
struct data_header *h = (struct data_header *) skb_transport_header(skb);
// tt_record4("homa_gro_complete type %d, id %d, offset %d, count %d",
// h->type, homa_local_id(h->sender_id), ntohl(d->seg.offset),
// NAPI_GRO_CB(skb)->count);
// tt_record4("homa_gro_complete type %d, id %d, offset %d, count %d",
// h->common.type, homa_local_id(h->common.sender_id),
// ntohl(h->seg.offset),
// NAPI_GRO_CB(skb)->count);

if (homa->gro_policy & HOMA_GRO_GEN3) {
homa_gro_gen3(skb);
Expand Down
12 changes: 10 additions & 2 deletions homa_outgoing.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,12 @@ struct sk_buff *homa_new_data_packet(struct homa_rpc *rpc,
sizeof(*h) - sizeof(struct data_segment));
h->common.sport = htons(rpc->hsk->port);
h->common.dport = htons(rpc->dport);
homa_set_doff(h);
h->common.sequence = 0;
h->common.type = DATA;
homa_set_doff(h);
h->common.flags = HOMA_TCP_FLAGS;
h->common.checksum = 0;
h->common.urgent = htons(HOMA_TCP_URGENT);
h->common.sender_id = cpu_to_be64(rpc->id);
h->message_length = htonl(rpc->msgout.length);
h->incoming = htonl(rpc->msgout.unscheduled);
Expand Down Expand Up @@ -311,6 +315,8 @@ int homa_xmit_control(enum homa_packet_type type, void *contents,
h->type = type;
h->sport = htons(rpc->hsk->port);
h->dport = htons(rpc->dport);
h->flags = HOMA_TCP_FLAGS;
h->urgent = htons(HOMA_TCP_URGENT);
h->sender_id = cpu_to_be64(rpc->id);
return __homa_xmit_control(contents, length, rpc->peer, rpc->hsk);
}
Expand Down Expand Up @@ -425,8 +431,10 @@ void homa_xmit_unknown(struct sk_buff *skb, struct homa_sock *hsk)
homa_local_id(h->sender_id));
unknown.common.sport = h->dport;
unknown.common.dport = h->sport;
unknown.common.sender_id = cpu_to_be64(homa_local_id(h->sender_id));
unknown.common.type = UNKNOWN;
unknown.common.flags = HOMA_TCP_FLAGS;
unknown.common.urgent = htons(HOMA_TCP_URGENT);
unknown.common.sender_id = cpu_to_be64(homa_local_id(h->sender_id));
peer = homa_peer_find(&hsk->homa->peers, &saddr, &hsk->inet);
if (!IS_ERR(peer))
__homa_xmit_control(&unknown, sizeof(unknown), peer, hsk);
Expand Down
3 changes: 3 additions & 0 deletions homa_plumbing.c
Original file line number Diff line number Diff line change
Expand Up @@ -608,11 +608,13 @@ static int __init homa_load(void) {
goto out_cleanup;
}

homa_gro_hook_tcp();
tt_init("timetrace", homa->temp);

return 0;

out_cleanup:
homa_gro_unhook_tcp();
homa_offload_end();
unregister_net_sysctl_table(homa_ctl_header);
proc_remove(metrics_dir_entry);
Expand All @@ -636,6 +638,7 @@ static void __exit homa_unload(void) {

tt_destroy();

homa_gro_unhook_tcp();
if (timer_kthread)
wake_up_process(timer_kthread);
if (homa_offload_end() != 0)
Expand Down
5 changes: 5 additions & 0 deletions homa_socktab.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ void homa_sock_init(struct homa_sock *hsk, struct homa *homa)
bucket->id = i + 1000000;
}
memset(&hsk->buffer_pool, 0, sizeof(hsk->buffer_pool));

/* This line will cause outgoing packets to be sent with TCP
* as the IP protocol (so that TSO and RSS will work better).
*/
hsk->sock.sk_protocol = IPPROTO_TCP;
spin_unlock_bh(&socktab->write_lock);
}

Expand Down
13 changes: 7 additions & 6 deletions homa_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -1316,27 +1316,28 @@ void homa_freeze_peers(struct homa *homa)
/* Find a socket to use (any will do). */
hsk = homa_socktab_start_scan(&homa->port_map, &scan);
if (hsk == NULL) {
printk(KERN_NOTICE "homa_freeze_peers couldn't find a socket\n");
tt_record("homa_freeze_peers couldn't find a socket");
return;
}

peers = homa_peertab_get_peers(&homa->peers, &num_peers);
if (peers == NULL) {
printk(KERN_NOTICE "homa_freeze_peers couldn't find peers "
"to freeze\n");
tt_record("homa_freeze_peers couldn't find peers to freeze");
return;
}
freeze.common.type = FREEZE;
freeze.common.sport = htons(hsk->port);;
freeze.common.dport = 0;
freeze.common.flags = HOMA_TCP_FLAGS;
freeze.common.urgent = htons(HOMA_TCP_URGENT);
freeze.common.sender_id = 0;
for (i = 0; i < num_peers; i++) {
tt_record1("Sending freeze to 0x%x", tt_addr(peers[i]->addr));
err = __homa_xmit_control(&freeze, sizeof(freeze), peers[i], hsk);
if (err != 0)
printk(KERN_NOTICE "homa_freeze_peers got error %d "
"in xmit to %s\n", err,
homa_print_ipv6_addr(&peers[i]->addr));
tt_record2("homa_freeze_peers got error %d in xmit "
"to 0x%x\n", err,
tt_addr(peers[i]->addr));
}
kfree(peers);
}
Expand Down
5 changes: 5 additions & 0 deletions test/mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ struct dst_ops mock_dst_ops = {.mtu = mock_get_mtu};
struct net_device mock_net_device = {
.gso_max_segs = 1000,
.gso_max_size = 0};
const struct net_offload *inet_offloads[MAX_INET_PROTOS];
const struct net_offload *inet6_offloads[MAX_INET_PROTOS];

static struct hrtimer_clock_base clock_base;
unsigned int cpu_khz = 1000000;
Expand Down Expand Up @@ -1384,6 +1386,7 @@ struct sk_buff *mock_skb_new(struct in6_addr *saddr, struct common_header *h,
ip_hdr(skb)->version = 4;
ip_hdr(skb)->saddr = saddr->in6_u.u6_addr32[3];
ip_hdr(skb)->protocol = IPPROTO_HOMA;
ip_hdr(skb)->check = 0;
}
skb->_skb_refdst = 0;
skb->hash = 3;
Expand Down Expand Up @@ -1482,6 +1485,8 @@ void mock_teardown(void)
mock_compound_order_mask = 0;
mock_page_nid_mask = 0;
mock_net_device.gso_max_size = 0;
memset(inet_offloads, 0, sizeof(inet_offloads));
memset(inet6_offloads, 0, sizeof(inet6_offloads));

int count = unit_hash_size(skbs_in_use);
if (count > 0)
Expand Down
Loading

0 comments on commit 263f8c2

Please sign in to comment.