diff --git a/FirebaseFunctions/Sources/Functions.swift b/FirebaseFunctions/Sources/Functions.swift index 51e405b2f39..27d764188bf 100644 --- a/FirebaseFunctions/Sources/Functions.swift +++ b/FirebaseFunctions/Sources/Functions.swift @@ -471,6 +471,105 @@ enum FunctionsConstants { } } + @available(iOS 13, macCatalyst 13, macOS 10.15, tvOS 13, watchOS 7, *) + public func stream(at url: URL, + withObject data: Any?, + options: HTTPSCallableOptions?, + timeout: TimeInterval) async throws + -> AsyncThrowingStream { + let context = try await contextProvider.context(options: options) + let fetcher = try makeFetcherForStreamableContent( + url: url, + data: data, + options: options, + timeout: timeout, + context: context + ) + + do { + let rawData = try await fetcher.beginFetch() + return try callableResultFromResponseAsync(data: rawData, error: nil) + } catch { + // This method always throws when `error` is not `nil`, but ideally, + // it should be refactored so it looks less confusing. + return try callableResultFromResponseAsync(data: nil, error: error) + } + } + + @available(iOS 13.0, *) + func callableResultFromResponseAsync(data: Data?, + error: Error?) throws -> AsyncThrowingStream< + HTTPSCallableResult, Error + + > { + let processedData = + try processResponseDataForStreamableContent( + from: data, + error: error + ) + + return processedData + } + + private func makeFetcherForStreamableContent(url: URL, + data: Any?, + options: HTTPSCallableOptions?, + timeout: TimeInterval, + context: FunctionsContext) throws + -> GTMSessionFetcher { + let request = URLRequest( + url: url, + cachePolicy: .useProtocolCachePolicy, + timeoutInterval: timeout + ) + let fetcher = fetcherService.fetcher(with: request) + + let data = data ?? NSNull() + let encoded = try serializer.encode(data) + let body = ["data": encoded] + let payload = try JSONSerialization.data(withJSONObject: body, options: [.fragmentsAllowed]) + fetcher.bodyData = payload + + // Set the headers for starting a streaming session. + fetcher.setRequestValue("application/json", forHTTPHeaderField: "Content-Type") + fetcher.setRequestValue("text/event-stream", forHTTPHeaderField: "Accept") + fetcher.request?.httpMethod = "POST" + if let authToken = context.authToken { + let value = "Bearer \(authToken)" + fetcher.setRequestValue(value, forHTTPHeaderField: "Authorization") + } + + if let fcmToken = context.fcmToken { + fetcher.setRequestValue(fcmToken, forHTTPHeaderField: Constants.fcmTokenHeader) + } + + if options?.requireLimitedUseAppCheckTokens == true { + if let appCheckToken = context.limitedUseAppCheckToken { + fetcher.setRequestValue( + appCheckToken, + forHTTPHeaderField: Constants.appCheckTokenHeader + ) + } + } else if let appCheckToken = context.appCheckToken { + fetcher.setRequestValue( + appCheckToken, + forHTTPHeaderField: Constants.appCheckTokenHeader + ) + } + // Remove after genStream is updated on the emulator or deployed + #if DEBUG + fetcher.allowLocalhostRequest = true + fetcher.allowedInsecureSchemes = ["http"] + #endif + // Override normal security rules if this is a local test. + if emulatorOrigin != nil { + fetcher.allowLocalhostRequest = true + fetcher.allowedInsecureSchemes = ["http"] + } + + return fetcher + } + private func makeFetcher(url: URL, data: Any?, options: HTTPSCallableOptions?, @@ -556,6 +655,58 @@ enum FunctionsConstants { return data } + @available(iOS 13, macCatalyst 13, macOS 10.15, tvOS 13, watchOS 7, *) + private func processResponseDataForStreamableContent(from data: Data?, + error: Error?) throws + -> AsyncThrowingStream< + HTTPSCallableResult, + Error + > { + return AsyncThrowingStream { continuation in + Task { + var resultArray = [String]() + do { + if let error = error { + throw error + } + + guard let data = data else { + throw NSError(domain: FunctionsErrorDomain.description, code: -1, userInfo: nil) + } + + if let dataChunk = String(data: data, encoding: .utf8) { + // We remove the "data:" field so it can be safely parsed to Json. + let dataChunkToJson = dataChunk.split(separator: "\n").map { + String($0.dropFirst(5)) + } + resultArray.append(contentsOf: dataChunkToJson) + } else { + throw NSError(domain: FunctionsErrorDomain.description, code: -1, userInfo: nil) + } + + for dataChunk in resultArray { + let json = try callableResult( + fromResponseData: dataChunk.data( + using: .utf8, + allowLossyConversion: true + ) ?? Data() + ) + continuation.yield(HTTPSCallableResult(data: json.data)) + } + + continuation.onTermination = { @Sendable _ in + // Callback for cancelling the stream + continuation.finish() + } + // Close the stream once it's done + continuation.finish() + } catch { + continuation.finish(throwing: error) + } + } + } + } + private func responseDataJSON(from data: Data) throws -> Any { let responseJSONObject = try JSONSerialization.jsonObject(with: data) @@ -564,8 +715,10 @@ enum FunctionsConstants { throw FunctionsError(.internal, userInfo: userInfo) } - // `result` is checked for backwards compatibility: - guard let dataJSON = responseJSON["data"] ?? responseJSON["result"] else { + // `result` is checked for backwards compatibility, + // `message` is checked for StramableContent: + guard let dataJSON = responseJSON["data"] ?? responseJSON["result"] ?? responseJSON["message"] + else { let userInfo = [NSLocalizedDescriptionKey: "Response is missing data field."] throw FunctionsError(.internal, userInfo: userInfo) } diff --git a/FirebaseFunctions/Sources/HTTPSCallable.swift b/FirebaseFunctions/Sources/HTTPSCallable.swift index c2281e54866..c53ddd941c9 100644 --- a/FirebaseFunctions/Sources/HTTPSCallable.swift +++ b/FirebaseFunctions/Sources/HTTPSCallable.swift @@ -143,4 +143,11 @@ open class HTTPSCallable: NSObject { try await functions .callFunction(at: url, withObject: data, options: options, timeout: timeoutInterval) } + + @available(iOS 13, tvOS 13, macOS 10.15, macCatalyst 13, watchOS 7, *) + open func stream(_ data: Any? = nil) async throws + -> AsyncThrowingStream { + try await functions + .stream(at: url, withObject: data, options: options, timeout: timeoutInterval) + } } diff --git a/FirebaseFunctions/Tests/Unit/FunctionsTests.swift b/FirebaseFunctions/Tests/Unit/FunctionsTests.swift index 42e684cdf1a..a96ef8cc44b 100644 --- a/FirebaseFunctions/Tests/Unit/FunctionsTests.swift +++ b/FirebaseFunctions/Tests/Unit/FunctionsTests.swift @@ -358,4 +358,73 @@ class FunctionsTests: XCTestCase { } waitForExpectations(timeout: 1.5) } + + func testGenerateStreamContent() async throws { + let options = HTTPSCallableOptions(requireLimitedUseAppCheckTokens: true) + + let input: [String: Any] = ["data": "Why is the sky blue"] + let stream = try await functions!.stream( + at: URL(string: "http://127.0.0.1:5001/demo-project/us-central1/genStream")!, + withObject: input, + options: options, + timeout: 4.0 + ) + let result = try await response(from: stream) + XCTAssertEqual( + result, + [ + "chunk hello", + "chunk world", + "chunk this", + "chunk is", + "chunk cool", + "hello world this is cool", + ] + ) + } + + func testGenerateStreamContentCanceled() async { + let options = HTTPSCallableOptions(requireLimitedUseAppCheckTokens: true) + let input: [String: Any] = ["data": "Why is the sky blue"] + + let task = Task.detached { [self] in + let stream = try await functions!.stream( + at: URL(string: "http://127.0.0.1:5001/demo-project/us-central1/genStream")!, + withObject: input, + options: options, + timeout: 4.0 + ) + + let result = try await response(from: stream) + // Since we cancel the call we are expecting an empty array. + XCTAssertEqual( + result, + [] + ) + } + // We cancel the task and we expect a null response even if the stream was initiated. + task.cancel() + let respone = await task.result + XCTAssertNotNil(respone) + } +} + +private func response(from stream: AsyncThrowingStream) async throws -> [String] { + var response = [String]() + for try await result in stream { + // First chunk of the stream comes as NSDictionary + if let dataChunk = result.data as? NSDictionary { + for (key, value) in dataChunk { + response.append("\(key) \(value)") + } + } else { + // Last chunk is the concatenated result so we have to parse it as String else will + // fail. + if let dataString = result.data as? String { + response.append(dataString) + } + } + } + return response }