diff --git a/README.md b/README.md index 7e945f6..0e1a6f9 100644 --- a/README.md +++ b/README.md @@ -125,6 +125,10 @@ This repo contains an implementation of the Homa transport protocol as a Linux k sysctl mechanism. For details, see the man page `homa.7`. ## Significant recent improvements +- July 2024: introduced "TCP hijacking", where Homa packets are sent as + legitimate TCP segments (using TCP as the IP protocol) and then reclaimed + from TCP on the destination. This allows Homa to make better use of + TSO and RSS. - June 2024: refactored sk_buff management to use frags; improves efficiency significantly. - April 2024: replaced `master` branch with `main` diff --git a/homa_impl.h b/homa_impl.h index 09733c4..ad32610 100644 --- a/homa_impl.h +++ b/homa_impl.h @@ -258,8 +258,9 @@ struct common_header { /** * @sequence: corresponds to the sequence number field in TCP headers; - * must not be used by Homa, in case it gets incremented during TCP - * offload. + * used in DATA packets to hold the offset in the message of the first + * byte of data. However, when TSO is used without TCP hijacking, this + * value will only be correct in the first segment of a GSO packet. */ __be32 sequence; @@ -277,6 +278,7 @@ struct common_header { * @doff: High order 4 bits holds the number of 4-byte chunks in a * data_header (low-order bits unused). Used only for DATA packets; * must be in the same position as the data offset in a TCP header. + * Used by TSO to determine where the replicated header portion ends. */ __u8 doff; @@ -342,33 +344,66 @@ struct homa_ack { __be16 server_port; } __attribute__((packed)); -/** - * struct data_segment - Wire format for a chunk of data that is part of - * a DATA packet. A single sk_buff can hold multiple data_segments in order - * to enable send and receive offload (the idea is to carry many network - * packets of info in a single traversal of the Linux networking stack). - * A DATA sk_buff contains a data_header followed by any number of - * data_segments. +/* struct data_header - Contains data for part or all of a Homa message. + * An incoming packet consists of a data_header followed by message data. + * An outgoing packet can have this simple format as well, or it can be + * structured as a GSO packet. Homa supports two different formats for GSO + * packets, depending on whether TCP hijacking is enabled: + * + * No hijacking: TCP hijacking: + * + * |-----------------------| |-----------------------| + * | | | | + * | data_header | | data_header | + * | | | | + * |---------------------- | |-----------------------| + * | | | | + * | | | | + * | segment data | | segment data | + * | | | | + * | | | | + * |-----------------------| |-----------------------| + * | seg_header | | | + * |-----------------------| | | + * | | | segment data | + * | | | | + * | segment data | | | + * | | |-----------------------| + * | | | | + * |-----------------------| | | + * | seg_header | | segment data | + * |-----------------------| | | + * | | | | + * | | |-----------------------| + * | segment data | + * | | + * | | + * |-----------------------| + * + * With TCP hijacking, TSO will automatically adjust @common.sequence in + * the segments, so that value can be used as the offset of the data within + * the message. Without TCP hijacking, TSO will not adjust @common.sequence + * in the segments, so Homa sprinkles correct offsets (in seg_headers) + * throughout the segment data; TSO/GSO will include a different seg_header + * in each generated packet. */ -struct data_segment { + +struct seg_header { /** * @offset: Offset within message of the first byte of data in - * this segment. Segments within an sk_buff are not guaranteed - * to be in order. + * this segment. If this field is -1 it means that the packet was + * generated by GSO with TCP hijacking. In this case the true offset + * is in @common.sequence. homa_gro_receive detects this situation + * and updates this value from @common.sequence if needed, so the + * value will always be valid once the packet reaches homa_softirq. */ __be32 offset; - - /** @data: the payload of this segment. */ - char data[0]; } __attribute__((packed)); -/* struct data_header - Overall header format for a DATA sk_buff, which - * contains this header followed by any number of data_segments. - */ struct data_header { struct common_header common; - /** @message_length: Total #bytes in the *message* */ + /** @message_length: Total #bytes in the message. */ __be32 message_length; /** @@ -405,8 +440,8 @@ struct data_header { __u8 pad; - /** @seg: First of possibly many segments */ - struct data_segment seg; + /** @seg: First of possibly many segments. */ + struct seg_header seg; } __attribute__((packed)); _Static_assert(sizeof(struct data_header) <= HOMA_MAX_HEADER, "data_header too large for HOMA_MAX_HEADER; must " @@ -414,17 +449,17 @@ _Static_assert(sizeof(struct data_header) <= HOMA_MAX_HEADER, _Static_assert(sizeof(struct data_header) >= HOMA_MIN_PKT_LENGTH, "data_header too small: Homa doesn't currently have code" "to pad data packets"); -_Static_assert(((sizeof(struct data_header) - sizeof(struct data_segment)) +_Static_assert(((sizeof(struct data_header) - sizeof(struct seg_header)) & 0x3) == 0, " data_header length not a multiple of 4 bytes (required " "for TCP/TSO compatibility"); /** - * homa_rx_data_len() - Returns the total amount of message data contained - * in an incoming DATA packet. This function works only for incoming - * packets and ougoing packets that don't use GSO. + * homa_data_len() - Returns the total number of bytes in a DATA packet + * after the data_header. Note: if the packet is a GSO packet, the result + * may include metadata as well as packet data. */ -static inline int homa_rx_data_len(struct sk_buff *skb) +static inline int homa_data_len(struct sk_buff *skb) { return skb->len - skb_transport_offset(skb) - sizeof(struct data_header); } @@ -3116,6 +3151,9 @@ struct homa_skb_info { */ int data_bytes; + /** @seg_length: maximum number of data bytes in each GSO segment. */ + int seg_length; + /** * @offset: offset within the message of the first byte of data in * this packet. @@ -3306,12 +3344,15 @@ static inline struct homa_rpc_bucket *homa_server_rpc_bucket( /** * homa_set_doff() - Fills in the doff TCP header field for a Homa packet. - * @h: Packet header whose doff field is to be set. + * @h: Packet header whose doff field is to be set. + * @size: Size of the "header", bytes (must be a multiple of 4). This + * information is used only for TSO; it's the number of bytes + * that should be replicated in each segment. The bytes after + * this will be distributed among segments. */ -static inline void homa_set_doff(struct data_header *h) +static inline void homa_set_doff(struct data_header *h, int size) { - h->common.doff = (sizeof(struct data_header) - - sizeof(struct data_segment)) << 2; + h->common.doff = size << 2; } static inline struct homa_sock *homa_sk(const struct sock *sk) @@ -3594,6 +3635,8 @@ extern void homa_dst_refresh(struct homa_peertab *peertab, extern int homa_err_handler_v4(struct sk_buff *skb, u32 info); extern int homa_err_handler_v6(struct sk_buff *skb, struct inet6_skb_parm * , u8, u8, int, __be32); +extern int homa_fill_data_interleaved(struct homa_rpc *rpc, + struct sk_buff *skb, struct iov_iter *iter); extern struct homa_rpc *homa_find_client_rpc(struct homa_sock *hsk, __u64 id); extern struct homa_rpc diff --git a/homa_incoming.c b/homa_incoming.c index 17b806c..69b56de 100644 --- a/homa_incoming.c +++ b/homa_incoming.c @@ -102,7 +102,7 @@ void homa_add_packet(struct homa_rpc *rpc, struct sk_buff *skb) { struct data_header *h = (struct data_header *) skb->data; int start = ntohl(h->seg.offset); - int length = homa_rx_data_len(skb); + int length = homa_data_len(skb); int end = start + length; struct homa_gap *gap, *dummy, *gap2; @@ -253,7 +253,7 @@ int homa_copy_to_user(struct homa_rpc *rpc) struct data_header *h = (struct data_header *) skbs[i]->data; int offset = ntohl(h->seg.offset); - int pkt_length = homa_rx_data_len(skbs[i]); + int pkt_length = homa_data_len(skbs[i]); int copied = 0; char *dst; struct iovec iov; @@ -574,9 +574,9 @@ void homa_data_pkt(struct sk_buff *skb, struct homa_rpc *rpc) tt_record4("Dropping packet because no buffer space available: " "id %d, offset %d, length %d, old incoming %d", rpc->id, ntohl(h->seg.offset), - homa_rx_data_len(skb), + homa_data_len(skb), rpc->msgin.granted); - INC_METRIC(dropped_data_no_bufs, homa_rx_data_len(skb)); + INC_METRIC(dropped_data_no_bufs, homa_data_len(skb)); goto discard; } diff --git a/homa_offload.c b/homa_offload.c index ee5b7f6..fcc8b47 100644 --- a/homa_offload.c +++ b/homa_offload.c @@ -200,7 +200,7 @@ struct sk_buff *homa_gso_segment(struct sk_buff *skb, * in each segment) from data, which is divided among the segments. */ __skb_pull(skb, sizeof(struct data_header) - - sizeof(struct data_segment)); + - sizeof(struct seg_header)); segs = skb_segment(skb, features); /* Set incrementing ids in each of the segments (mimics behavior @@ -274,11 +274,17 @@ struct sk_buff *homa_gro_receive(struct list_head *held_list, // tt_record("homa_gro_receive can't pull enough data " // "from packet for trace"); if (h_new->common.type == DATA) { + if (h_new->seg.offset == -1) { + tt_record2("homa_gro_receive replaced offset %d with %d", + ntohl(h_new->seg.offset), + ntohl(h_new->common.sequence)); + h_new->seg.offset = h_new->common.sequence; + } tt_record4("homa_gro_receive got packet from 0x%x " "id %llu, offset %d, priority %d", saddr, homa_local_id(h_new->common.sender_id), ntohl(h_new->seg.offset), priority); - if ((homa_rx_data_len(skb) == ntohl(h_new->message_length)) + if ((homa_data_len(skb) == ntohl(h_new->message_length)) && (homa->gro_policy & HOMA_GRO_SHORT_BYPASS) && !busy) { INC_METRIC(gro_data_bypasses, 1); diff --git a/homa_outgoing.c b/homa_outgoing.c index 4a4cb68..97836ea 100644 --- a/homa_outgoing.c +++ b/homa_outgoing.c @@ -49,9 +49,57 @@ void homa_message_out_init(struct homa_rpc *rpc, int length) rpc->msgout.init_cycles = get_cycles(); } +/** + * homa_fill_data_interleaved() - This function is invoked to fill in the + * part of a data packet after the initial header, when GSO is being used + * but TCP hijacking is not. As result, seg_headers must be interleaved + * with the data to provide the correct offset for each segment. + * @rpc: RPC whose output message is being created. + * @skb: The packet being filled. The initial data_header was + * created and initialized by the caller and the + * homa_skb_info has been filled in with the packet geometry. + * @iter: Describes location(s) of (remaining) message data in user + * space. + */ +int homa_fill_data_interleaved(struct homa_rpc *rpc, struct sk_buff *skb, + struct iov_iter *iter) +{ + struct homa_skb_info *homa_info = homa_get_skb_info(skb); + int seg_length = homa_info->seg_length; + int bytes_left = homa_info->data_bytes; + int offset = homa_info->offset; + int err; + + /* Each iteration of the following loop adds info for one packet, + * which includes a seg_header followed by the data for that + * segment. The first seg_header was already added by the caller. + */ + while (1) { + struct seg_header seg; + if (bytes_left < seg_length) + seg_length = bytes_left; + err = homa_skb_append_from_iter(rpc->hsk->homa, skb, iter, + seg_length); + if (err != 0) + return err; + bytes_left -= seg_length; + offset += seg_length; + + if (bytes_left == 0) + break; + + seg.offset = htonl(offset); + err = homa_skb_append_to_frag(rpc->hsk->homa, skb, &seg, + sizeof(seg)); + if (err != 0) + return err; + } + return 0; +} + /** * homa_new_data_packet() - Allocate a new sk_buff and fill it with a Homa - * data packet. The resulting packet will be a TSO packet that will eventually + * data packet. The resulting packet will be a GSO packet that will eventually * be segmented by the NIC. * @rpc: RPC that packet will belong to (msgout must have been * initialized). @@ -61,10 +109,10 @@ void homa_message_out_init(struct homa_rpc *rpc, int length) * packet. * @length: How many bytes of data to include in the skb. Caller must * ensure that this amount of data isn't too much for a - * well-formed TSO packet, and that iter has at least this + * well-formed GSO packet, and that iter has at least this * much data. * @max_seg_data: Maximum number of bytes of message data that can go in - * a single segment of the TSO packet. + * a single segment of the GSO packet. * Return: A pointer to the new packet, or a negative errno. */ struct sk_buff *homa_new_data_packet(struct homa_rpc *rpc, @@ -74,24 +122,22 @@ struct sk_buff *homa_new_data_packet(struct homa_rpc *rpc, struct data_header *h; struct sk_buff *skb; struct homa_skb_info *homa_info; - int segs, bytes_left, err; + int segs, err, gso_size; /* Initialize the overall skb. */ - skb = homa_skb_new_tx(sizeof32(struct data_header) - - sizeof32(struct data_segment)); + skb = homa_skb_new_tx(sizeof32(struct data_header)); if (!skb) return ERR_PTR(-ENOMEM); /* Fill in the Homa header (which will be replicated in every - * network packet by GSO/TSO). + * network packet by GSO). */ - h = (struct data_header *) skb_put(skb, - sizeof(*h) - sizeof(struct data_segment)); + h = (struct data_header *)skb_put(skb, sizeof(struct data_header)); h->common.sport = htons(rpc->hsk->port); h->common.dport = htons(rpc->dport); h->common.sequence = htonl(offset); h->common.type = DATA; - homa_set_doff(h); + homa_set_doff(h, sizeof(struct data_header)); h->common.flags = HOMA_TCP_FLAGS; h->common.checksum = 0; h->common.urgent = htons(HOMA_TCP_URGENT); @@ -102,49 +148,34 @@ struct sk_buff *homa_new_data_packet(struct homa_rpc *rpc, homa_peer_get_acks(rpc->peer, 1, &h->ack); h->cutoff_version = rpc->peer->cutoff_version; h->retransmit = 0; + h->seg.offset = -1; + + segs = (length + max_seg_data - 1) / max_seg_data; homa_info = homa_get_skb_info(skb); homa_info->next_skb = NULL; - homa_info->wire_bytes = 0; - homa_info->data_bytes = 0; + homa_info->wire_bytes = length + segs * (sizeof(struct data_header) + + rpc->hsk->ip_header_length + HOMA_ETH_OVERHEAD); + homa_info->data_bytes = length; + homa_info->seg_length = max_seg_data; homa_info->offset = offset; - /* Each iteration of the following loop adds one segment - * (which will become a separate packet after GSO) to the skb. - */ - bytes_left = length; - segs = 0; - homa_skb_stash_pages(rpc->hsk->homa, length); - do { - int seg_size; - struct data_segment seg; - seg.offset = htonl(offset); - if (bytes_left <= max_seg_data) - seg_size = bytes_left; - else - seg_size = max_seg_data; - err = homa_skb_append_to_frag(rpc->hsk->homa, skb, &seg, - sizeof(seg)); - if (err != 0) - goto error; + if ((segs > 1) && (rpc->hsk->sock.sk_protocol != IPPROTO_TCP)) { + homa_set_doff(h, sizeof(struct data_header) - + sizeof32(struct seg_header)); + h->seg.offset = htonl(offset); + gso_size = max_seg_data + sizeof(struct seg_header); + err = homa_fill_data_interleaved(rpc, skb, iter); + } else { + gso_size = max_seg_data; err = homa_skb_append_from_iter(rpc->hsk->homa, skb, iter, - seg_size); - if (err != 0) - goto error; - bytes_left -= seg_size; - segs++; - homa_info->wire_bytes += seg_size + sizeof(struct data_segment) - + sizeof(struct data_header) - + rpc->hsk->ip_header_length - + HOMA_ETH_OVERHEAD; - homa_info->data_bytes += seg_size; - offset += seg_size; - } while (bytes_left > 0); - - if (segs > 1) - { + length); + } + if (err) + goto error; + + if (segs > 1) { skb_shinfo(skb)->gso_segs = segs; - skb_shinfo(skb)->gso_size = sizeof(struct data_segment) - + max_seg_data; + skb_shinfo(skb)->gso_size = gso_size; /* It's unclear what gso_type should be used to force software * GSO; the value below seems to work... @@ -186,7 +217,7 @@ int homa_message_out_fill(struct homa_rpc *rpc, struct iov_iter *iter, int xmit) * header). * max_seg_data: largest amount of Homa message data that fits * in an on-the-wire packet (after segmentation). - * max_gso_data: largest amount of Homa message data that fits + * max_gso_data: largest amount of Homa message data that fits * in a GSO packet (before segmentation). */ int mtu, max_seg_data, max_gso_data; @@ -230,6 +261,7 @@ int homa_message_out_fill(struct homa_rpc *rpc, struct iov_iter *iter, int xmit) overlap_xmit = rpc->msgout.length > 2*max_gso_data; rpc->msgout.granted = rpc->msgout.unscheduled; atomic_or(RPC_COPYING_FROM_USER, &rpc->flags); + homa_skb_stash_pages(rpc->hsk->homa, rpc->msgout.length); /* Each iteration of the loop below creates one GSO packet. */ tt_record3("starting copy from user space for id %d, length %d, " @@ -588,36 +620,34 @@ void homa_resend_data(struct homa_rpc *rpc, int start, int end, */ for (skb = rpc->msgout.packets; skb != NULL; skb = homa_info->next_skb) { - int seg_offset, offset, seg_length, segs_left, data_left; - struct data_segment seg; + int seg_offset, offset, seg_length, data_left; struct data_header *h; homa_info = homa_get_skb_info(skb); - if (homa_info->offset >= end) + offset = homa_info->offset; + if (offset >= end) break; - if (start >=(homa_info->offset + homa_info->data_bytes)) + if (start >= (offset + homa_info->data_bytes)) continue; - seg_offset = sizeof32(struct data_header) - - sizeof32(struct data_segment); - segs_left = skb_shinfo(skb)->gso_segs; + offset = homa_info->offset; + seg_offset = sizeof32(struct data_header); data_left = homa_info->data_bytes; - if (segs_left < 1) { - segs_left = 1; - seg_length = homa_rx_data_len(skb); - } else - seg_length = skb_shinfo(skb)->gso_size - sizeof32(seg); - for ( ; segs_left > 0; segs_left--, - seg_offset += sizeof32(seg) + seg_length) { + if (skb_shinfo(skb)->gso_segs <= 1) + seg_length = data_left; + else { + seg_length = homa_info->seg_length; + h = (struct data_header *) skb_transport_header(skb); + } + for ( ; data_left > 0; data_left -= seg_length, + offset += seg_length, + seg_offset += skb_shinfo(skb)->gso_size) { struct sk_buff *new_skb; struct homa_skb_info *new_homa_info; int err; - homa_skb_get(skb, &seg, seg_offset, sizeof(seg)); - offset = ntohl(seg.offset); if (seg_length > data_left) seg_length = data_left; - data_left -= seg_length; if (end <= offset) goto resend_done; @@ -626,7 +656,7 @@ void homa_resend_data(struct homa_rpc *rpc, int start, int end, /* This segment must be retransmitted. */ new_skb = homa_skb_new_tx(sizeof(struct data_header) - - sizeof(struct data_segment)); + - sizeof(struct seg_header)); if (unlikely(!new_skb)) { if (rpc->hsk->homa->verbose) printk(KERN_NOTICE "homa_resend_data " @@ -636,8 +666,9 @@ void homa_resend_data(struct homa_rpc *rpc, int start, int end, } h = (struct data_header *) __skb_put_data(new_skb, skb_transport_header(skb), - sizeof32(struct data_header) - - sizeof32(struct data_segment)); + sizeof32(struct data_header)); + h->common.sequence = htonl(offset); + h->seg.offset = htonl(offset); h->retransmit = 1; if ((offset + seg_length) <= rpc->msgout.granted) h->incoming = htonl(rpc->msgout.granted); @@ -646,7 +677,7 @@ void homa_resend_data(struct homa_rpc *rpc, int start, int end, else h->incoming = htonl(offset + seg_length); err = homa_skb_append_from_skb(rpc->hsk->homa, new_skb, - skb, seg_offset, sizeof32(seg) + seg_length); + skb, seg_offset, seg_length); if (err != 0) { printk(KERN_ERR "homa_resend_data got error %d " "from homa_skb_append_from_skb\n", @@ -662,6 +693,7 @@ void homa_resend_data(struct homa_rpc *rpc, int start, int end, + sizeof(struct data_header) + seg_length + HOMA_ETH_OVERHEAD; new_homa_info->data_bytes = seg_length; + new_homa_info->seg_length = seg_length; new_homa_info->offset = offset; tt_record3("retransmitting offset %d, length %d, id %d", offset, seg_length, rpc->id); diff --git a/homa_plumbing.c b/homa_plumbing.c index 920bd3a..93fb06c 100644 --- a/homa_plumbing.c +++ b/homa_plumbing.c @@ -529,7 +529,7 @@ static int __init homa_load(void) { printk(KERN_NOTICE "Homa module loading\n"); printk(KERN_NOTICE "Homa structure sizes: data_header %u, " - "data_segment %u, ack %u, " + "seg_header %u, ack %u, " "grant_header %u, peer %u, ip_hdr %u flowi %u " "ipv6_hdr %u, flowi6 %u " "tcp_sock %u homa_rpc %u sk_buff %u " @@ -537,7 +537,7 @@ static int __init homa_load(void) { "HOMA_MAX_BPAGES %u NR_CPUS %u " "nr_cpu_ids %u, MAX_NUMNODES %d\n", sizeof32(struct data_header), - sizeof32(struct data_segment), + sizeof32(struct seg_header), sizeof32(struct homa_ack), sizeof32(struct grant_header), sizeof32(struct homa_peer), diff --git a/homa_utils.c b/homa_utils.c index 72cba1c..2462321 100644 --- a/homa_utils.c +++ b/homa_utils.c @@ -1127,23 +1127,24 @@ char *homa_print_packet(struct sk_buff *skb, char *buffer, int buf_len) case DATA: { struct data_header *h = (struct data_header *) header; struct homa_skb_info *homa_info = homa_get_skb_info(skb); - int data_left, i, seg_length, pos; + int data_left, i, seg_length, pos, offset; if (skb_shinfo(skb)->gso_segs == 0) { - seg_length = homa_rx_data_len(skb); + seg_length = homa_data_len(skb); data_left = 0; } else { - seg_length = skb_shinfo(skb)->gso_size - - sizeof(struct data_segment); + seg_length = homa_info->seg_length; if (seg_length > homa_info->data_bytes) seg_length = homa_info->data_bytes; data_left = homa_info->data_bytes - seg_length; } + offset = ntohl(h->seg.offset); + if (offset == -1) + offset = ntohl(h->common.sequence); used = homa_snprintf(buffer, buf_len, used, ", message_length %d, offset %d, " "data_length %d, incoming %d", - ntohl(h->message_length), - ntohl(h->seg.offset), seg_length, - ntohl(h->incoming)); + ntohl(h->message_length), offset, + seg_length, ntohl(h->incoming)); if (ntohs(h->cutoff_version != 0)) used = homa_snprintf(buffer, buf_len, used, ", cutoff_version %d", @@ -1159,15 +1160,19 @@ char *homa_print_packet(struct sk_buff *skb, char *buffer, int buf_len) pos = skb_transport_offset(skb) + sizeof32(*h) + seg_length; used = homa_snprintf(buffer, buf_len, used, ", extra segs"); for (i = skb_shinfo(skb)->gso_segs - 1; i > 0; i--) { - struct data_segment seg; - homa_skb_get(skb, &seg, pos, sizeof(seg)); + if (homa_info->seg_length < skb_shinfo(skb)->gso_size) { + struct seg_header seg; + homa_skb_get(skb, &seg, pos, sizeof(seg)); + offset = ntohl(seg.offset); + } else { + offset += seg_length; + } if (seg_length > data_left) seg_length = data_left; used = homa_snprintf(buffer, buf_len, used, - " %d@%d", seg_length, - ntohl(seg.offset)); + " %d@%d", seg_length, offset); data_left -= seg_length; - pos += seg_length + sizeof(struct data_segment); + pos += skb_shinfo(skb)->gso_size; }; break; } @@ -1254,31 +1259,37 @@ char *homa_print_packet_short(struct sk_buff *skb, char *buffer, int buf_len) case DATA: { struct data_header *h = (struct data_header *)header; struct homa_skb_info *homa_info = homa_get_skb_info(skb); - struct data_segment seg; - int data_left, used, i, seg_length, pos; + int data_left, used, i, seg_length, pos, offset; + if (skb_shinfo(skb)->gso_segs == 0) { - seg_length = homa_rx_data_len(skb); + seg_length = homa_data_len(skb); data_left = 0; } else { - seg_length = skb_shinfo(skb)->gso_size - sizeof32(seg); - if (seg_length > homa_info->data_bytes) - seg_length = homa_info->data_bytes; + seg_length = homa_info->seg_length; data_left = homa_info->data_bytes - seg_length; } + offset = ntohl(h->seg.offset); + if (offset == -1) + offset = ntohl(h->common.sequence); pos = skb_transport_offset(skb) + sizeof32(*h) + seg_length; used = homa_snprintf(buffer, buf_len, 0, "DATA%s %d@%d", h->retransmit ? " retrans" : "", - seg_length, ntohl(h->seg.offset)); + seg_length, offset); for (i = skb_shinfo(skb)->gso_segs - 1; i > 0; i--) { - homa_skb_get(skb, &seg, pos, sizeof(seg)); + if (homa_info->seg_length < skb_shinfo(skb)->gso_size) { + struct seg_header seg; + homa_skb_get(skb, &seg, pos, sizeof(seg)); + offset = ntohl(seg.offset); + } else { + offset += seg_length; + } if (seg_length > data_left) seg_length = data_left; used = homa_snprintf(buffer, buf_len, used, - " %d@%d", seg_length, - ntohl(seg.offset)); + " %d@%d", seg_length, offset); data_left -= seg_length; - pos += seg_length + sizeof32(struct data_segment); + pos += skb_shinfo(skb)->gso_size; } break; } diff --git a/test/mock.c b/test/mock.c index c80372b..abe99df 100644 --- a/test/mock.c +++ b/test/mock.c @@ -611,8 +611,9 @@ int ip6_xmit(const struct sock *sk, struct sk_buff *skb, struct flowi6 *fl6, struct homa_skb_info *homa_info; homa_info = homa_get_skb_info(skb); unit_log_printf("; ", "homa_info: wire_bytes %d, data_bytes %d, " - "offset %d", homa_info->wire_bytes, - homa_info->data_bytes, homa_info->offset); + "seg_length %d, offset %d", homa_info->wire_bytes, + homa_info->data_bytes, homa_info->seg_length, + homa_info->offset); } kfree_skb(skb); return 0; @@ -1024,7 +1025,7 @@ struct sk_buff *skb_segment(struct sk_buff *head_skb, /* Split the existing packet into two packets. */ memcpy(&h, skb_transport_header(head_skb), sizeof(h)); offset = ntohl(h.seg.offset); - length = homa_rx_data_len(head_skb); + length = homa_data_len(head_skb); skb1 = mock_skb_new(&ipv6_hdr(head_skb)->saddr, &h.common, length/2, offset); offset += length/2; diff --git a/test/unit_homa_offload.c b/test/unit_homa_offload.c index bbd31fe..70de2ca 100644 --- a/test/unit_homa_offload.c +++ b/test/unit_homa_offload.c @@ -210,6 +210,34 @@ TEST_F(homa_offload, homa_gso_segment_set_ip_ids) kfree_skb(segs); } +TEST_F(homa_offload, homa_gro_receive__update_offset_from_sequence) +{ + struct sk_buff *skb, *skb2; + struct data_header *h; + + /* First call: copy offset from sequence number. */ + self->header.common.sequence = htonl(6000); + self->header.seg.offset = -1; + skb = mock_skb_new(&self->ip, &self->header.common, 1400, 0); + NAPI_GRO_CB(skb)->same_flow = 0; + homa_cores[cpu_number]->held_skb = NULL; + homa_cores[cpu_number]->held_bucket = 99; + EXPECT_EQ(NULL, homa_gro_receive(&self->empty_list, skb)); + h = (struct data_header *) skb_transport_header(skb); + EXPECT_EQ(6000, htonl(h->seg.offset)); + + /* Second call: offset already valid. */ + self->header.common.sequence = htonl(6000); + self->header.seg.offset = ntohl(5000); + skb2 = mock_skb_new(&self->ip, &self->header.common, 1400, 0); + NAPI_GRO_CB(skb2)->same_flow = 0; + EXPECT_EQ(NULL, homa_gro_receive(&self->empty_list, skb2)); + h = (struct data_header *)skb_transport_header(skb2); + EXPECT_EQ(5000, htonl(h->seg.offset)); + + kfree_skb(skb); + kfree_skb(skb2); +} TEST_F(homa_offload, homa_gro_receive__HOMA_GRO_SHORT_BYPASS) { struct in6_addr client_ip = unit_get_in_addr("196.168.0.1"); diff --git a/test/unit_homa_outgoing.c b/test/unit_homa_outgoing.c index 49b0cd9..7b04e88 100644 --- a/test/unit_homa_outgoing.c +++ b/test/unit_homa_outgoing.c @@ -86,6 +86,47 @@ TEST_F(homa_outgoing, set_priority__priority_mapping) EXPECT_STREQ("7 3", mock_xmit_prios); } +TEST_F(homa_outgoing, homa_fill_data_interleaved) +{ + struct homa_rpc *crpc = homa_rpc_new_client(&self->hsk, + &self->server_addr); + homa_rpc_unlock(crpc); + struct iov_iter *iter = unit_iov_iter((void *)1000, 5000); + homa_message_out_init(crpc, 10000); + + unit_log_clear(); + struct sk_buff *skb = homa_new_data_packet(crpc, iter, 10000, 5000, + 1500); + EXPECT_STREQ("_copy_from_iter 1500 bytes at 1000; " + "_copy_from_iter 1500 bytes at 2500; " + "_copy_from_iter 1500 bytes at 4000; " + "_copy_from_iter 500 bytes at 5500", unit_log_get()); + + char buffer[1000]; + EXPECT_STREQ("DATA from 0.0.0.0:40000, dport 99, id 2, " + "message_length 10000, offset 10000, data_length 1500, " + "incoming 10000, extra segs 1500@11500 1500@13000 " + "500@14500", + homa_print_packet(skb, buffer, sizeof(buffer))); + EXPECT_EQ(5000 + sizeof32(struct data_header) + + 3*sizeof32(struct seg_header), skb->len); + kfree_skb(skb); +} +TEST_F(homa_outgoing, homa_fill_data_interleaved__error_copying_data) +{ + struct homa_rpc *crpc = homa_rpc_new_client(&self->hsk, + &self->server_addr); + homa_rpc_unlock(crpc); + struct iov_iter *iter = unit_iov_iter((void *)1000, 5000); + homa_message_out_init(crpc, 10000); + + unit_log_clear(); + mock_copy_data_errors = 1; + struct sk_buff *skb = homa_new_data_packet(crpc, iter, 10000, 5000, + 1500); + EXPECT_EQ(EFAULT, -PTR_ERR(skb)); +} + TEST_F(homa_outgoing, homa_new_data_packet__one_segment) { struct homa_rpc *crpc = homa_rpc_new_client(&self->hsk, @@ -122,10 +163,10 @@ TEST_F(homa_outgoing, homa_new_data_packet__cant_allocate_skb) EXPECT_TRUE(IS_ERR(skb)); EXPECT_EQ(ENOMEM, -PTR_ERR(skb)); } -TEST_F(homa_outgoing, homa_new_data_packet__multiple_segments) +TEST_F(homa_outgoing, homa_new_data_packet__multiple_segments_homa_fill_data_interleaved) { struct homa_rpc *crpc = homa_rpc_new_client(&self->hsk, - &self->server_addr); + &self->server_addr); homa_rpc_unlock(crpc); struct iov_iter *iter = unit_iov_iter((void *)1000, 5000); homa_message_out_init(crpc, 10000); @@ -143,37 +184,65 @@ TEST_F(homa_outgoing, homa_new_data_packet__multiple_segments) "message_length 10000, offset 10000, data_length 1500, " "incoming 10000, extra segs 1500@11500 1500@13000 " "500@14500", - homa_print_packet(skb, buffer, sizeof(buffer))); + homa_print_packet(skb, buffer, sizeof(buffer))); - EXPECT_EQ(4*(sizeof(struct data_header) + sizeof(struct data_segment) + crpc->hsk->ip_header_length + HOMA_ETH_OVERHEAD) + 5000, homa_get_skb_info(skb)->wire_bytes); + EXPECT_EQ(4*(sizeof(struct data_header) + crpc->hsk->ip_header_length + + HOMA_ETH_OVERHEAD) + 5000, + homa_get_skb_info(skb)->wire_bytes); EXPECT_EQ(5000, homa_get_skb_info(skb)->data_bytes); kfree_skb(skb); } -TEST_F(homa_outgoing, homa_new_data_packet__cant_allocate_frag) +TEST_F(homa_outgoing, homa_new_data_packet__error_in_homa_fill_data_interleaved) { struct homa_rpc *crpc = homa_rpc_new_client(&self->hsk, &self->server_addr); homa_rpc_unlock(crpc); struct iov_iter *iter = unit_iov_iter((void *)1000, 5000); - homa_message_out_init(crpc, 500); + homa_message_out_init(crpc, 10000); unit_log_clear(); mock_alloc_page_errors = -1; - struct sk_buff *skb = homa_new_data_packet(crpc, iter, 0, 500, 2000); + struct sk_buff *skb = homa_new_data_packet(crpc, iter, 10000, 5000, + 1500); EXPECT_TRUE(IS_ERR(skb)); EXPECT_EQ(ENOMEM, -PTR_ERR(skb)); } -TEST_F(homa_outgoing, homa_new_data_packet__cant_copy_data) +TEST_F(homa_outgoing, homa_new_data_packet__multiple_segments_tcp_hijacking) { - struct homa_rpc *crpc = homa_rpc_new_client(&self->hsk, - &self->server_addr); + self->homa.hijack_tcp = 1; + struct homa_sock hsk; + mock_sock_init(&hsk, &self->homa, self->client_port+1); + struct homa_rpc *crpc = homa_rpc_new_client(&hsk, &self->server_addr); homa_rpc_unlock(crpc); struct iov_iter *iter = unit_iov_iter((void *)1000, 5000); + homa_message_out_init(crpc, 10000); + + unit_log_clear(); + struct sk_buff *skb = homa_new_data_packet(crpc, iter, 10000, 5000, + 1500); + EXPECT_STREQ("_copy_from_iter 5000 bytes at 1000", unit_log_get()); + + char buffer[1000]; + EXPECT_STREQ("DATA from 0.0.0.0:40001, dport 99, id 2, " + "message_length 10000, offset 10000, data_length 1500, " + "incoming 10000, extra segs 1500@11500 1500@13000 " + "500@14500", + homa_print_packet(skb, buffer, sizeof(buffer))); + kfree_skb(skb); + homa_sock_destroy(&hsk); +} +TEST_F(homa_outgoing, homa_new_data_packet__error_copying_data_hijacking_path) +{ + struct homa_rpc *crpc = homa_rpc_new_client(&self->hsk, + &self->server_addr); + homa_rpc_unlock(crpc); + struct iov_iter *iter = unit_iov_iter((void *) 1000, 5000); homa_message_out_init(crpc, 500); unit_log_clear(); mock_copy_data_errors = 1; - struct sk_buff *skb = homa_new_data_packet(crpc, iter, 0, 500, 2000); + struct sk_buff *skb = homa_new_data_packet(crpc, iter, 5000, 500, + 2000); EXPECT_TRUE(IS_ERR(skb)); EXPECT_EQ(EFAULT, -PTR_ERR(skb)); } @@ -190,7 +259,7 @@ TEST_F(homa_outgoing, homa_new_data_packet__gso_information) 1500); EXPECT_EQ(4, skb_shinfo(skb)->gso_segs); - EXPECT_EQ(1500 + sizeof(struct data_segment), + EXPECT_EQ(1500 + sizeof(struct seg_header), skb_shinfo(skb)->gso_size); EXPECT_EQ(SKB_GSO_TCPV6, skb_shinfo(skb)->gso_type); kfree_skb(skb); @@ -829,8 +898,9 @@ TEST_F(homa_outgoing, homa_resend_data__set_homa_info) mock_xmit_log_homa_info = 1; homa_resend_data(crpc, 8400, 8800, 2); EXPECT_STREQ("xmit DATA retrans 1400@8400; " - "homa_info: wire_bytes 1538, data_bytes 1400, offset 8400", - unit_log_get()); + "homa_info: wire_bytes 1538, data_bytes 1400, " + "seg_length 1400, offset 8400", + unit_log_get()); } TEST_F(homa_outgoing, homa_outgoing_sysctl_changed)