Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for request body to be larger than 2GB on 32-bit devices #746

Merged
merged 9 commits into from
Jun 28, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ extension RequestBodyLength {
case .none:
self = .known(0)
case .byteBuffer(let buffer):
self = .known(buffer.readableBytes)
self = .known(Int64(buffer.readableBytes))
case .sequence(let length, _, _), .asyncSequence(let length, _):
self = length
}
Expand Down
8 changes: 7 additions & 1 deletion Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ extension HTTPClientRequest.Body {
public static func bytes<Bytes: RandomAccessCollection & Sendable>(
_ bytes: Bytes
) -> Self where Bytes.Element == UInt8 {
self.bytes(bytes, length: .known(bytes.count))
self.bytes(bytes, length: .known(Int64(bytes.count)))
}

/// Create an ``HTTPClientRequest/Body-swift.struct`` from a `Sequence` of bytes.
Expand Down Expand Up @@ -341,7 +341,13 @@ extension HTTPClientRequest.Body {
public static let unknown: Self = .init(storage: .unknown)

/// The size of the request body is known and exactly `count` bytes
@_disfavoredOverload
public static func known(_ count: Int) -> Self {
Lukasa marked this conversation as resolved.
Show resolved Hide resolved
.init(storage: .known(Int64(count)))
}

/// The size of the request body is known and exactly `count` bytes
public static func known(_ count: Int64) -> Self {
.init(storage: .known(count))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ struct HTTPRequestStateMachine {
/// The request is streaming its request body. `expectedBodyLength` has a value, if the request header contained
/// a `"content-length"` header field. If the request header contained a `"transfer-encoding" = "chunked"`
/// header field, the `expectedBodyLength` is `nil`.
case streaming(expectedBodyLength: Int?, sentBodyBytes: Int, producer: ProducerControlState)
case streaming(expectedBodyLength: Int64?, sentBodyBytes: Int64, producer: ProducerControlState)
/// The request has sent its request body and end.
case endSent
}
Expand Down Expand Up @@ -308,13 +308,13 @@ struct HTTPRequestStateMachine {
// pause. The reason for this is as follows: There might be thread synchronization
// situations in which the producer might not have received the plea to pause yet.

if let expected = expectedBodyLength, sentBodyBytes + part.readableBytes > expected {
if let expected = expectedBodyLength, sentBodyBytes + Int64(part.readableBytes) > expected {
let error = HTTPClientError.bodyLengthMismatch
self.state = .failed(error)
return .failRequest(error, .close(promise))
}

sentBodyBytes += part.readableBytes
sentBodyBytes += Int64(part.readableBytes)

let requestState: RequestState = .streaming(
expectedBodyLength: expectedBodyLength,
Expand Down Expand Up @@ -768,7 +768,7 @@ struct HTTPRequestStateMachine {
}

extension RequestFramingMetadata.Body {
var expectedLength: Int? {
var expectedLength: Int64? {
switch self {
case .fixedSize(let length): return length
case .stream: return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ internal enum RequestBodyLength: Hashable, Sendable {
/// size of the request body is not known before starting the request
case unknown
/// size of the request body is fixed and exactly `count` bytes
case known(_ count: Int)
case known(_ count: Int64)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
struct RequestFramingMetadata: Hashable {
enum Body: Hashable {
case stream
case fixedSize(Int)
case fixedSize(Int64)
}

var connectionClose: Bool
Expand Down
43 changes: 37 additions & 6 deletions Sources/AsyncHTTPClient/HTTPHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,35 @@ extension HTTPClient {

/// Body size. If nil,`Transfer-Encoding` will automatically be set to `chunked`. Otherwise a `Content-Length`
/// header is set with the given `length`.
public var length: Int?
@available(*, deprecated, message: "Use `contentLength: Int64?` instead")
aryan-25 marked this conversation as resolved.
Show resolved Hide resolved
public var length: Int? {
get {
self.contentLength.flatMap { Int($0) }
}
set {
self.contentLength = newValue.flatMap { Int64($0) }
}
}

/// Body size. If nil,`Transfer-Encoding` will automatically be set to `chunked`. Otherwise a `Content-Length`
/// header is set with the given `contentLength`.
public var contentLength: Int64?

/// Body chunk provider.
public var stream: @Sendable (StreamWriter) -> EventLoopFuture<Void>

@usableFromInline typealias StreamCallback = @Sendable (StreamWriter) -> EventLoopFuture<Void>

@inlinable
@available(*, deprecated, message: "Use initializer with `contentLength: Int64` instead")
init(length: Int?, stream: @escaping StreamCallback) {
self.length = length
self.contentLength = length.flatMap { Int64($0) }
self.stream = stream
}
aryan-25 marked this conversation as resolved.
Show resolved Hide resolved

@inlinable
init(contentLength: Int64?, stream: @escaping StreamCallback) {
self.contentLength = contentLength.flatMap { $0 }
self.stream = stream
}

Expand All @@ -88,7 +107,7 @@ extension HTTPClient {
/// - parameters:
/// - buffer: Body `ByteBuffer` representation.
public static func byteBuffer(_ buffer: ByteBuffer) -> Body {
return Body(length: buffer.readableBytes) { writer in
return Body(contentLength: Int64(buffer.readableBytes)) { writer in
writer.write(.byteBuffer(buffer))
}
}
Expand All @@ -100,18 +119,30 @@ extension HTTPClient {
/// header is set with the given `length`.
/// - stream: Body chunk provider.
@preconcurrency
@available(*, deprecated, message: "Use `stream` with `length: Int64` instead")
aryan-25 marked this conversation as resolved.
Show resolved Hide resolved
public static func stream(length: Int? = nil, _ stream: @Sendable @escaping (StreamWriter) -> EventLoopFuture<Void>) -> Body {
return Body(length: length, stream: stream)
}


/// Create and stream body using ``StreamWriter``.
///
/// - parameters:
/// - contentLength: Body size. If nil, `Transfer-Encoding` will automatically be set to `chunked`. Otherwise a `Content-Length`
/// header is set with the given `contentLength`.
/// - bodyStream: Body chunk provider.
public static func stream(contentLength: Int64? = nil, bodyStream: @Sendable @escaping (StreamWriter) -> EventLoopFuture<Void>) -> Body {
return Body(contentLength: contentLength, stream: bodyStream)
}

/// Create and stream body using a collection of bytes.
///
/// - parameters:
/// - data: Body binary representation.
@preconcurrency
@inlinable
public static func bytes<Bytes>(_ bytes: Bytes) -> Body where Bytes: RandomAccessCollection, Bytes: Sendable, Bytes.Element == UInt8 {
return Body(length: bytes.count) { writer in
return Body(contentLength: Int64(bytes.count)) { writer in
if bytes.count <= bagOfBytesToByteBufferConversionChunkSize {
return writer.write(.byteBuffer(ByteBuffer(bytes: bytes)))
} else {
Expand All @@ -125,7 +156,7 @@ extension HTTPClient {
/// - parameters:
/// - string: Body `String` representation.
public static func string(_ string: String) -> Body {
return Body(length: string.utf8.count) { writer in
return Body(contentLength: Int64(string.utf8.count)) { writer in
if string.utf8.count <= bagOfBytesToByteBufferConversionChunkSize {
return writer.write(.byteBuffer(ByteBuffer(string: string)))
} else {
Expand Down Expand Up @@ -858,7 +889,7 @@ extension RequestBodyLength {
self = .known(0)
return
}
guard let length = body.length else {
guard let length = body.contentLength else {
self = .unknown
return
}
Expand Down
50 changes: 50 additions & 0 deletions Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,56 @@ final class AsyncAwaitEndToEndTests: XCTestCase {
}
}

struct AsyncSequenceByteBufferGenerator: AsyncSequence, Sendable, AsyncIteratorProtocol {
typealias Element = ByteBuffer

let chunkSize: Int
let totalChunks: Int
var chunksGenerated: Int = 0

init(chunkSize: Int, totalChunks: Int) {
self.chunkSize = chunkSize
self.totalChunks = totalChunks
}

mutating func next() async throws -> ByteBuffer? {
guard self.chunksGenerated < self.totalChunks else { return nil }

self.chunksGenerated += 1
return ByteBuffer(repeating: 1, count: self.chunkSize)
aryan-25 marked this conversation as resolved.
Show resolved Hide resolved
}

func makeAsyncIterator() -> AsyncSequenceByteBufferGenerator {
return self
}
}

func testEchoStreamThatHas3GBInTotal() async throws {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
let bin = HTTPBin(.http1_1()) { _ in HTTPEchoHandler() }
defer { XCTAssertNoThrow(try bin.shutdown()) }

let client: HTTPClient = makeDefaultHTTPClient(eventLoopGroupProvider: .shared(eventLoopGroup))
defer { XCTAssertNoThrow(try client.syncShutdown()) }

let logger = Logger(label: "HTTPClient", factory: StreamLogHandler.standardOutput(label:))

var request = HTTPClientRequest(url: "http://localhost:\(bin.port)/")
request.method = .POST

let sequence = AsyncSequenceByteBufferGenerator(
chunkSize: 4_194_304, // 4MB chunk
totalChunks: 768 // Total = 3GB
)
request.body = .stream(sequence, length: .unknown)

let response: HTTPClientResponse = try await client.execute(request, deadline: .now() + .seconds(30), logger: logger)
XCTAssertEqual(response.headers["content-length"], [])

_ = try await response.body.collect(upTo: 3_221_225_472)
aryan-25 marked this conversation as resolved.
Show resolved Hide resolved
}

func testPostWithAsyncSequenceOfByteBuffers() {
XCTAsyncTest {
let bin = HTTPBin(.http2(compress: false)) { _ in HTTPEchoHandler() }
Expand Down
10 changes: 5 additions & 5 deletions Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
guard let testUtils = maybeTestUtils else { return XCTFail("Expected connection setup works") }

var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 100) { writer in
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 100) { writer in
testWriter.start(writer: writer)
}))
guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") }
Expand Down Expand Up @@ -345,7 +345,7 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
guard let testUtils = maybeTestUtils else { return XCTFail("Expected connection setup works") }

var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 10) { writer in
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 10) { writer in
// Advance time by more than the idle write timeout (that's 1 millisecond) to trigger the timeout.
embedded.embeddedEventLoop.advanceTime(by: .milliseconds(2))
return testWriter.start(writer: writer)
Expand Down Expand Up @@ -384,7 +384,7 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
guard let testUtils = maybeTestUtils else { return XCTFail("Expected connection setup works") }

var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 10) { writer in
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 10) { writer in
embedded.isWritable = false
embedded.pipeline.fireChannelWritabilityChanged()
// This should not trigger any errors or timeouts, because the timer isn't running
Expand Down Expand Up @@ -432,7 +432,7 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
guard let testUtils = maybeTestUtils else { return XCTFail("Expected connection setup works") }

var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 2) { writer in
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 2) { writer in
return testWriter.start(writer: writer, expectedErrors: [HTTPClientError.cancelled])
}))
guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") }
Expand Down Expand Up @@ -595,7 +595,7 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
guard let testUtils = maybeTestUtils else { return XCTFail("Expected connection setup works") }

var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 10) { writer in
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 10) { writer in
testWriter.start(writer: writer)
}))
guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") }
Expand Down
4 changes: 2 additions & 2 deletions Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ class HTTP1ConnectionTests: XCTestCase {
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(
url: "http://localhost/hello/swift",
method: .POST,
body: .stream(length: 4) { writer -> EventLoopFuture<Void> in
func recursive(count: UInt8, promise: EventLoopPromise<Void>) {
body: .stream(contentLength: 4) { writer -> EventLoopFuture<Void> in
@Sendable func recursive(count: UInt8, promise: EventLoopPromise<Void>) {
aryan-25 marked this conversation as resolved.
Show resolved Hide resolved
guard count < 4 else {
return promise.succeed(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class HTTP2ClientRequestHandlerTests: XCTestCase {
let testWriter = TestBackpressureWriter(eventLoop: embedded.eventLoop, parts: 50)

var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 100) { writer in
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 100) { writer in
testWriter.start(writer: writer)
}))
guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") }
Expand Down Expand Up @@ -295,7 +295,7 @@ class HTTP2ClientRequestHandlerTests: XCTestCase {

let testWriter = TestBackpressureWriter(eventLoop: embedded.eventLoop, parts: 5)
var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 10) { writer in
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 10) { writer in
// Advance time by more than the idle write timeout (that's 1 millisecond) to trigger the timeout.
embedded.embeddedEventLoop.advanceTime(by: .milliseconds(2))
return testWriter.start(writer: writer)
Expand Down Expand Up @@ -335,7 +335,7 @@ class HTTP2ClientRequestHandlerTests: XCTestCase {

let testWriter = TestBackpressureWriter(eventLoop: embedded.eventLoop, parts: 5)
var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 10) { writer in
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 10) { writer in
embedded.isWritable = false
embedded.pipeline.fireChannelWritabilityChanged()
// This should not trigger any errors or timeouts, because the timer isn't running
Expand Down Expand Up @@ -385,7 +385,7 @@ class HTTP2ClientRequestHandlerTests: XCTestCase {

let testWriter = TestBackpressureWriter(eventLoop: embedded.eventLoop, parts: 5)
var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 2) { writer in
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 2) { writer in
return testWriter.start(writer: writer, expectedErrors: [HTTPClientError.cancelled])
}))
guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") }
Expand Down
4 changes: 2 additions & 2 deletions Tests/AsyncHTTPClientTests/HTTP2ClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class HTTP2ClientTests: XCTestCase {
let client = self.makeDefaultHTTPClient()
defer { XCTAssertNoThrow(try client.syncShutdown()) }
var response: HTTPClient.Response?
let body = HTTPClient.Body.stream(length: nil) { writer in
let body = HTTPClient.Body.stream(contentLength: nil) { writer in
writer.write(.byteBuffer(ByteBuffer(integer: UInt64(0)))).flatMap {
writer.write(.byteBuffer(ByteBuffer(integer: UInt64(0))))
}
Expand All @@ -84,7 +84,7 @@ class HTTP2ClientTests: XCTestCase {
defer { XCTAssertNoThrow(try bin.shutdown()) }
let client = self.makeDefaultHTTPClient()
defer { XCTAssertNoThrow(try client.syncShutdown()) }
let body = HTTPClient.Body.stream(length: 12) { writer in
let body = HTTPClient.Body.stream(contentLength: 12) { writer in
writer.write(.byteBuffer(ByteBuffer(integer: UInt64(0)))).flatMap {
writer.write(.byteBuffer(ByteBuffer(integer: UInt64(0))))
}
Expand Down
Loading