From 263f8c2059aad5b08ad45a15a187d436a27d946a Mon Sep 17 00:00:00 2001 From: John Ousterhout Date: Mon, 15 Jul 2024 21:27:05 -0700 Subject: [PATCH] Transmit Homa packets using IPPROTO_TCP as the IP protocol Still uses old TSO packet format (doesn't depend on TSO to increment the offset between segments) --- homa_impl.h | 59 ++++++++++++++++---- homa_incoming.c | 2 + homa_offload.c | 86 +++++++++++++++++++++++++++- homa_outgoing.c | 12 +++- homa_plumbing.c | 3 + homa_socktab.c | 5 ++ homa_utils.c | 13 +++-- test/mock.c | 5 ++ test/unit_homa_offload.c | 117 ++++++++++++++++++++++++++++++++++++++- 9 files changed, 280 insertions(+), 22 deletions(-) diff --git a/homa_impl.h b/homa_impl.h index b7c851d..103632c 100644 --- a/homa_impl.h +++ b/homa_impl.h @@ -239,9 +239,9 @@ struct homa_cache_line { /** * struct common_header - Wire format for the first bytes in every Homa - * packet. This must partially match the format of a TCP header so that - * Homa can piggyback on TCP segmentation offload (and possibly other - * features, such as RSS). + * packet. This must (mostly) match the format of a TCP header to enable + * Homa packets to actually be transmitted as TCP packets (and thereby + * take advantage of TSO and other features). */ struct common_header { /** @@ -257,13 +257,21 @@ struct common_header { __be16 dport; /** - * @unused1: corresponds to the sequence number field in TCP headers; + * @sequence: corresponds to the sequence number field in TCP headers; * must not be used by Homa, in case it gets incremented during TCP * offload. */ - __be32 unused1; + __be32 sequence; - __be32 unused2; + /** + * The fields below correspond to the acknowledgment field in TCP + * headers; not used by Homa, except for the low-order 8 bits, which + * specify the Homa packet type (one of the values in the + * homa_packet_type enum). + */ + __be16 ack1; + __u8 ack2; + __u8 type; /** * @doff: High order 4 bits holds the number of 4-byte chunks in a @@ -272,17 +280,35 @@ struct common_header { */ __u8 doff; - /** @type: One of the values of &enum packet_type. */ - __u8 type; + /** + * @flags: Holds TCP flags such as URG, ACK, etc. The special value + * HOMA_TCP_FLAGS is stored here to distinguish Homa-over-TCP packets + * from real TCP packets. It includes the SYN and RST flags, + * which TCP would never use together; must not include URG or FIN + * (TSO will turn off FIN for all but the last segment). + */ + __u8 flags; +#define HOMA_TCP_FLAGS 6 - __u16 unused3; + /** + * @window: Corresponds to the window field in TCP headers. Not used + * by HOMA. + */ + __be16 window; /** * @checksum: not used by Homa, but must occupy the same bytes as * the checksum in a TCP header (TSO may modify this?).*/ __be16 checksum; - __u16 unused4; + /** + * @urgent: occupies the same bytes as the urgent pointer in a TCP + * header. When Homa packets are transmitted over TCP, this has the + * special value HOMA_TCP_URGENT (which is set even though URG is + * not set) to indicate that the packet is actually a Homa packet. + */ + __be16 urgent; +#define HOMA_TCP_URGENT 0xb97d /** * @sender_id: the identifier of this RPC as used on the sender (i.e., @@ -3482,6 +3508,14 @@ static inline bool is_mapped_ipv4(const struct in6_addr x) (x.in6_u.u6_addr32[2] == htonl(0xffff))); } +static inline bool is_homa_pkt(struct sk_buff *skb) +{ + struct iphdr *iph = ip_hdr(skb); + return ((iph->protocol == IPPROTO_HOMA) || + ((iph->protocol == IPPROTO_TCP) && + (tcp_hdr(skb)->urg_ptr == htons(HOMA_TCP_URGENT)))); +} + /** * tt_addr() - Given an address, return a 4-byte id that will (hopefully) * provide a unique identifier for the address in a timetrace record. @@ -3574,6 +3608,8 @@ extern int homa_grant_update_incoming(struct homa_rpc *rpc, extern int homa_gro_complete(struct sk_buff *skb, int thoff); extern void homa_gro_gen2(struct sk_buff *skb); extern void homa_gro_gen3(struct sk_buff *skb); +extern void homa_gro_hook_tcp(void); +extern void homa_gro_unhook_tcp(void); extern struct sk_buff *homa_gro_receive(struct list_head *gro_list, struct sk_buff *skb); @@ -3732,6 +3768,9 @@ extern char *homa_symbol_for_state(struct homa_rpc *rpc); extern char *homa_symbol_for_type(uint8_t type); extern int homa_sysctl_softirq_cores(struct ctl_table *table, int write, void __user *buffer, size_t *lenp, loff_t *ppos); +extern struct sk_buff + *homa_tcp_gro_receive(struct list_head *held_list, + struct sk_buff *skb); extern void homa_timer(struct homa *homa); extern int homa_timer_main(void *transportInfo); extern void homa_unhash(struct sock *sk); diff --git a/homa_incoming.c b/homa_incoming.c index be727b2..38ec831 100644 --- a/homa_incoming.c +++ b/homa_incoming.c @@ -832,6 +832,8 @@ void homa_need_ack_pkt(struct sk_buff *skb, struct homa_sock *hsk, ack.common.type = ACK; ack.common.sport = h->dport; ack.common.dport = h->sport; + ack.common.flags = HOMA_TCP_FLAGS; + ack.common.urgent = htons(HOMA_TCP_URGENT); ack.common.sender_id = cpu_to_be64(id); ack.num_acks = htons(homa_peer_get_acks(peer, NUM_PEER_UNACKED_IDS, ack.acks)); diff --git a/homa_offload.c b/homa_offload.c index 9d5904a..ac4f91b 100644 --- a/homa_offload.c +++ b/homa_offload.c @@ -20,6 +20,19 @@ static const struct net_offload homa_offload = { extern struct homa *homa; +/* Pointers to TCP's net_offload structures. NULL means homa_gro_hook_tcp + * hasn't been called yet. + */ +const struct net_offload *tcp_net_offload = NULL; +const struct net_offload *tcp6_net_offload = NULL; + +/* + * Identical to *tcp_net_offload except that the gro_receive function + * has been replaced. + */ +static struct net_offload hook_tcp_net_offload; +static struct net_offload hook_tcp6_net_offload; + /** * homa_offload_init() - Invoked to enable GRO and GSO. Typically invoked * when the Homa module loads. @@ -45,6 +58,72 @@ int homa_offload_end(void) return res1 ? res1 : res2; } +/** + * homa_gro_hook_tcp() - Arranges for TCP gro_receive calls to be + * mediated by this file, so that Homa-over-TCP packets can be retrieved + * and funneled through Homa. + */ +void homa_gro_hook_tcp(void) +{ + if (tcp_net_offload != NULL) + return; + + tcp_net_offload = inet_offloads[IPPROTO_TCP]; + hook_tcp_net_offload = *tcp_net_offload; + hook_tcp_net_offload.callbacks.gro_receive = homa_tcp_gro_receive; + inet_offloads[IPPROTO_TCP] = &hook_tcp_net_offload; + + tcp6_net_offload = inet6_offloads[IPPROTO_TCP]; + hook_tcp6_net_offload = *tcp6_net_offload; + hook_tcp6_net_offload.callbacks.gro_receive = homa_tcp_gro_receive; + inet6_offloads[IPPROTO_TCP] = &hook_tcp6_net_offload; +} + +/** + * homa_gro_unhook_tcp() - Reverses the effects of a previous call to + * homa_hook_tcp_gro, so that TCP packets are now passed directly to + * Tcp's gro_receive function without mediation. + */ +void homa_gro_unhook_tcp(void) +{ + if (tcp_net_offload == NULL) + return; + inet_offloads[IPPROTO_TCP] = tcp_net_offload; + tcp_net_offload = NULL; + inet6_offloads[IPPROTO_TCP] = tcp6_net_offload; + tcp6_net_offload = NULL; +} + +/** + * homa_tcp_gro_receive() - Invoked instead of TCP's normal gro_receive function + * when hooking is enabled. Identifies Homa-over-TCP packets and passes them + * to Homa; sends real TCP packets to TCP's gro_receive function. + */ +struct sk_buff *homa_tcp_gro_receive(struct list_head *held_list, + struct sk_buff *skb) +{ + struct common_header *h = (struct common_header *) + skb_transport_header(skb); + // tt_record4("homa_tcp_gro_receive got type 0x%x, flags 0x%x, " + // "urgent 0x%x, id %d", h->type, h->flags, + // ntohs(h->urgent), homa_local_id(h->sender_id)); + if ((h->flags != HOMA_TCP_FLAGS) || (ntohs(h->urgent) != HOMA_TCP_URGENT)) + return tcp_net_offload->callbacks.gro_receive(held_list, skb); + + /* Change the packet's IP protocol to Homa so that it will get + * dispatched directly to Homa in the future. + */ + if (skb_is_ipv6(skb)) { + ipv6_hdr(skb)->nexthdr = IPPROTO_HOMA; + } else { + ip_hdr(skb)->check = ~csum16_add(csum16_sub(~ip_hdr(skb)->check, + htons(ip_hdr(skb)->protocol)), + htons(IPPROTO_HOMA)); + ip_hdr(skb)->protocol = IPPROTO_HOMA; + } + return homa_gro_receive(held_list, skb); +} + /** * homa_set_softirq_cpu() - Arrange for SoftIRQ processing of a packet to * occur on a specific core (creates a socket flow table entry for the core, @@ -434,9 +513,10 @@ void homa_gro_gen3(struct sk_buff *skb) int homa_gro_complete(struct sk_buff *skb, int hoffset) { struct data_header *h = (struct data_header *) skb_transport_header(skb); -// tt_record4("homa_gro_complete type %d, id %d, offset %d, count %d", -// h->type, homa_local_id(h->sender_id), ntohl(d->seg.offset), -// NAPI_GRO_CB(skb)->count); + // tt_record4("homa_gro_complete type %d, id %d, offset %d, count %d", + // h->common.type, homa_local_id(h->common.sender_id), + // ntohl(h->seg.offset), + // NAPI_GRO_CB(skb)->count); if (homa->gro_policy & HOMA_GRO_GEN3) { homa_gro_gen3(skb); diff --git a/homa_outgoing.c b/homa_outgoing.c index fbafca8..16558b2 100644 --- a/homa_outgoing.c +++ b/homa_outgoing.c @@ -89,8 +89,12 @@ struct sk_buff *homa_new_data_packet(struct homa_rpc *rpc, sizeof(*h) - sizeof(struct data_segment)); h->common.sport = htons(rpc->hsk->port); h->common.dport = htons(rpc->dport); - homa_set_doff(h); + h->common.sequence = 0; h->common.type = DATA; + homa_set_doff(h); + h->common.flags = HOMA_TCP_FLAGS; + h->common.checksum = 0; + h->common.urgent = htons(HOMA_TCP_URGENT); h->common.sender_id = cpu_to_be64(rpc->id); h->message_length = htonl(rpc->msgout.length); h->incoming = htonl(rpc->msgout.unscheduled); @@ -311,6 +315,8 @@ int homa_xmit_control(enum homa_packet_type type, void *contents, h->type = type; h->sport = htons(rpc->hsk->port); h->dport = htons(rpc->dport); + h->flags = HOMA_TCP_FLAGS; + h->urgent = htons(HOMA_TCP_URGENT); h->sender_id = cpu_to_be64(rpc->id); return __homa_xmit_control(contents, length, rpc->peer, rpc->hsk); } @@ -425,8 +431,10 @@ void homa_xmit_unknown(struct sk_buff *skb, struct homa_sock *hsk) homa_local_id(h->sender_id)); unknown.common.sport = h->dport; unknown.common.dport = h->sport; - unknown.common.sender_id = cpu_to_be64(homa_local_id(h->sender_id)); unknown.common.type = UNKNOWN; + unknown.common.flags = HOMA_TCP_FLAGS; + unknown.common.urgent = htons(HOMA_TCP_URGENT); + unknown.common.sender_id = cpu_to_be64(homa_local_id(h->sender_id)); peer = homa_peer_find(&hsk->homa->peers, &saddr, &hsk->inet); if (!IS_ERR(peer)) __homa_xmit_control(&unknown, sizeof(unknown), peer, hsk); diff --git a/homa_plumbing.c b/homa_plumbing.c index 196376f..a0dab33 100644 --- a/homa_plumbing.c +++ b/homa_plumbing.c @@ -608,11 +608,13 @@ static int __init homa_load(void) { goto out_cleanup; } + homa_gro_hook_tcp(); tt_init("timetrace", homa->temp); return 0; out_cleanup: + homa_gro_unhook_tcp(); homa_offload_end(); unregister_net_sysctl_table(homa_ctl_header); proc_remove(metrics_dir_entry); @@ -636,6 +638,7 @@ static void __exit homa_unload(void) { tt_destroy(); + homa_gro_unhook_tcp(); if (timer_kthread) wake_up_process(timer_kthread); if (homa_offload_end() != 0) diff --git a/homa_socktab.c b/homa_socktab.c index a7d9c20..376aa12 100644 --- a/homa_socktab.c +++ b/homa_socktab.c @@ -154,6 +154,11 @@ void homa_sock_init(struct homa_sock *hsk, struct homa *homa) bucket->id = i + 1000000; } memset(&hsk->buffer_pool, 0, sizeof(hsk->buffer_pool)); + + /* This line will cause outgoing packets to be sent with TCP + * as the IP protocol (so that TSO and RSS will work better). + */ + hsk->sock.sk_protocol = IPPROTO_TCP; spin_unlock_bh(&socktab->write_lock); } diff --git a/homa_utils.c b/homa_utils.c index a2519de..19353dc 100644 --- a/homa_utils.c +++ b/homa_utils.c @@ -1316,27 +1316,28 @@ void homa_freeze_peers(struct homa *homa) /* Find a socket to use (any will do). */ hsk = homa_socktab_start_scan(&homa->port_map, &scan); if (hsk == NULL) { - printk(KERN_NOTICE "homa_freeze_peers couldn't find a socket\n"); + tt_record("homa_freeze_peers couldn't find a socket"); return; } peers = homa_peertab_get_peers(&homa->peers, &num_peers); if (peers == NULL) { - printk(KERN_NOTICE "homa_freeze_peers couldn't find peers " - "to freeze\n"); + tt_record("homa_freeze_peers couldn't find peers to freeze"); return; } freeze.common.type = FREEZE; freeze.common.sport = htons(hsk->port);; freeze.common.dport = 0; + freeze.common.flags = HOMA_TCP_FLAGS; + freeze.common.urgent = htons(HOMA_TCP_URGENT); freeze.common.sender_id = 0; for (i = 0; i < num_peers; i++) { tt_record1("Sending freeze to 0x%x", tt_addr(peers[i]->addr)); err = __homa_xmit_control(&freeze, sizeof(freeze), peers[i], hsk); if (err != 0) - printk(KERN_NOTICE "homa_freeze_peers got error %d " - "in xmit to %s\n", err, - homa_print_ipv6_addr(&peers[i]->addr)); + tt_record2("homa_freeze_peers got error %d in xmit " + "to 0x%x\n", err, + tt_addr(peers[i]->addr)); } kfree(peers); } diff --git a/test/mock.c b/test/mock.c index def8eba..1459797 100644 --- a/test/mock.c +++ b/test/mock.c @@ -170,6 +170,8 @@ struct dst_ops mock_dst_ops = {.mtu = mock_get_mtu}; struct net_device mock_net_device = { .gso_max_segs = 1000, .gso_max_size = 0}; +const struct net_offload *inet_offloads[MAX_INET_PROTOS]; +const struct net_offload *inet6_offloads[MAX_INET_PROTOS]; static struct hrtimer_clock_base clock_base; unsigned int cpu_khz = 1000000; @@ -1384,6 +1386,7 @@ struct sk_buff *mock_skb_new(struct in6_addr *saddr, struct common_header *h, ip_hdr(skb)->version = 4; ip_hdr(skb)->saddr = saddr->in6_u.u6_addr32[3]; ip_hdr(skb)->protocol = IPPROTO_HOMA; + ip_hdr(skb)->check = 0; } skb->_skb_refdst = 0; skb->hash = 3; @@ -1482,6 +1485,8 @@ void mock_teardown(void) mock_compound_order_mask = 0; mock_page_nid_mask = 0; mock_net_device.gso_max_size = 0; + memset(inet_offloads, 0, sizeof(inet_offloads)); + memset(inet6_offloads, 0, sizeof(inet6_offloads)); int count = unit_hash_size(skbs_in_use); if (count > 0) diff --git a/test/unit_homa_offload.c b/test/unit_homa_offload.c index c311265..3d4928c 100644 --- a/test/unit_homa_offload.c +++ b/test/unit_homa_offload.c @@ -11,7 +11,21 @@ extern struct homa *homa; -FIXTURE(homa_offload) { +static struct sk_buff *tcp_gro_receive(struct list_head *held_list, + struct sk_buff *skb) +{ + UNIT_LOG("; ", "tcp_gro_receive"); + return NULL; +} +static struct sk_buff *tcp6_gro_receive(struct list_head *held_list, + struct sk_buff *skb) +{ + UNIT_LOG("; ", "tcp6_gro_receive"); + return NULL; +} + + FIXTURE(homa_offload) +{ struct homa homa; struct homa_sock hsk; struct in6_addr ip; @@ -19,6 +33,8 @@ FIXTURE(homa_offload) { struct napi_struct napi; struct sk_buff *skb, *skb2; struct list_head empty_list; + struct net_offload tcp_offloads; + struct net_offload tcp6_offloads; }; FIXTURE_SETUP(homa_offload) { @@ -31,6 +47,8 @@ FIXTURE_SETUP(homa_offload) self->header = (struct data_header){.common = { .sport = htons(40000), .dport = htons(99), .type = DATA, + .flags = HOMA_TCP_FLAGS, + .urgent = HOMA_TCP_URGENT, .sender_id = cpu_to_be64(1000)}, .message_length = htonl(10000), .incoming = htonl(10000), .cutoff_version = 0, @@ -60,6 +78,11 @@ FIXTURE_SETUP(homa_offload) list_add_tail(&self->skb->list, &self->napi.gro_hash[2].list); list_add_tail(&self->skb2->list, &self->napi.gro_hash[2].list); INIT_LIST_HEAD(&self->empty_list); + self->tcp_offloads.callbacks.gro_receive = tcp_gro_receive; + inet_offloads[IPPROTO_TCP] = &self->tcp_offloads; + self->tcp6_offloads.callbacks.gro_receive = tcp6_gro_receive; + inet6_offloads[IPPROTO_TCP] = &self->tcp6_offloads; + unit_log_clear(); /* Configure so core isn't considered too busy for bypasses. */ @@ -78,6 +101,98 @@ FIXTURE_TEARDOWN(homa_offload) unit_teardown(); } +TEST_F(homa_offload, homa_gro_hook_tcp) +{ + homa_gro_hook_tcp(); + EXPECT_EQ(&homa_tcp_gro_receive, + inet_offloads[IPPROTO_TCP]->callbacks.gro_receive); + EXPECT_EQ(&homa_tcp_gro_receive, + inet6_offloads[IPPROTO_TCP]->callbacks.gro_receive); + + /* Second hook call should do nothing. */ + homa_gro_hook_tcp(); + + homa_gro_unhook_tcp(); + EXPECT_EQ(&tcp_gro_receive, + inet_offloads[IPPROTO_TCP]->callbacks.gro_receive); + EXPECT_EQ(&tcp6_gro_receive, + inet6_offloads[IPPROTO_TCP]->callbacks.gro_receive); + + /* Second unhook call should do nothing. */ + homa_gro_unhook_tcp(); + EXPECT_EQ(&tcp_gro_receive, + inet_offloads[IPPROTO_TCP]->callbacks.gro_receive); + EXPECT_EQ(&tcp6_gro_receive, + inet6_offloads[IPPROTO_TCP]->callbacks.gro_receive); +} + +TEST_F(homa_offload, homa_tcp_gro_receive__pass_to_tcp) +{ + struct sk_buff *skb; + struct common_header *h; + homa_gro_hook_tcp(); + self->header.seg.offset = htonl(6000); + skb = mock_skb_new(&self->ip, &self->header.common, 1400, 0); + h = (struct common_header *) skb_transport_header(skb); + h->flags = 0; + EXPECT_EQ(NULL, homa_tcp_gro_receive(&self->empty_list, skb)); + EXPECT_STREQ("tcp_gro_receive", unit_log_get()); + kfree_skb(skb); + unit_log_clear(); + + skb = mock_skb_new(&self->ip, &self->header.common, 1400, 0); + h = (struct common_header *)skb_transport_header(skb); + h->urgent -= 1; + EXPECT_EQ(NULL, homa_tcp_gro_receive(&self->empty_list, skb)); + EXPECT_STREQ("tcp_gro_receive", unit_log_get()); + kfree_skb(skb); + homa_gro_unhook_tcp(); +} +TEST_F(homa_offload, homa_tcp_gro_receive__pass_to_homa_ipv6) +{ + struct sk_buff *skb; + struct common_header *h; + homa_gro_hook_tcp(); + self->header.seg.offset = htonl(6000); + skb = mock_skb_new(&self->ip, &self->header.common, 1400, 0); + ip_hdr(skb)->protocol = IPPROTO_TCP; + h = (struct common_header *)skb_transport_header(skb); + h->flags = HOMA_TCP_FLAGS; + h->urgent = htons(HOMA_TCP_URGENT); + NAPI_GRO_CB(skb)->same_flow = 0; + homa_cores[cpu_number]->held_skb = NULL; + homa_cores[cpu_number]->held_bucket = 99; + EXPECT_EQ(NULL, homa_tcp_gro_receive(&self->empty_list, skb)); + EXPECT_EQ(skb, homa_cores[cpu_number]->held_skb); + EXPECT_STREQ("", unit_log_get()); + EXPECT_EQ(IPPROTO_HOMA, ipv6_hdr(skb)->nexthdr); + kfree_skb(skb); + homa_gro_unhook_tcp(); +} +TEST_F(homa_offload, homa_tcp_gro_receive__pass_to_homa_ipv4) +{ + struct sk_buff *skb; + struct common_header *h; + mock_ipv6 = false; + homa_gro_hook_tcp(); + self->header.seg.offset = htonl(6000); + skb = mock_skb_new(&self->ip, &self->header.common, 1400, 0); + ip_hdr(skb)->protocol = IPPROTO_TCP; + h = (struct common_header *)skb_transport_header(skb); + h->flags = HOMA_TCP_FLAGS; + h->urgent = htons(HOMA_TCP_URGENT); + NAPI_GRO_CB(skb)->same_flow = 0; + homa_cores[cpu_number]->held_skb = NULL; + homa_cores[cpu_number]->held_bucket = 99; + EXPECT_EQ(NULL, homa_tcp_gro_receive(&self->empty_list, skb)); + EXPECT_EQ(skb, homa_cores[cpu_number]->held_skb); + EXPECT_STREQ("", unit_log_get()); + EXPECT_EQ(IPPROTO_HOMA, ip_hdr(skb)->protocol); + EXPECT_EQ(2303, ip_hdr(skb)->check); + kfree_skb(skb); + homa_gro_unhook_tcp(); +} + TEST_F(homa_offload, homa_gso_segment_set_ip_ids) { mock_ipv6 = false;