Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streamable HTTPCallable functions #14290

Draft
wants to merge 42 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
231d602
[Infra] Update functions workflow to use macOS 15 for Xcode 16 jobs (…
eBlender Nov 8, 2024
a14d964
Stremable Functions.
eBlender Dec 20, 2024
a92d7c2
Changed return type.
eBlender Dec 20, 2024
10bec1d
Lint test
eBlender Dec 20, 2024
53a2aab
Remove test function
eBlender Dec 20, 2024
758fbed
Remove old test.
eBlender Dec 20, 2024
93b6c8b
Updated function, add full test.
eBlender Dec 27, 2024
a7e8fe8
Update functions
eBlender Jan 2, 2025
6d59fcd
Update FunctionsTests.swift
eBlender Jan 2, 2025
51f02b8
Cleanup HTTPCallable
eBlender Jan 2, 2025
7b61076
Add documentation for processResponseDataForStreamableContent
eBlender Jan 2, 2025
a95449e
Update Functions.swift
eBlender Jan 2, 2025
cdc49ee
Update Functions.swift
eBlender Jan 2, 2025
426b6bc
Update FunctionsTests.swift
eBlender Jan 2, 2025
9cb0a5e
Update and Cleanup
eBlender Jan 3, 2025
ad31052
Update IntegrationTests.swift
eBlender Jan 3, 2025
6ee9000
Clean up
eBlender Jan 3, 2025
1ffe73d
Update check.sh
eBlender Jan 3, 2025
9fcd91e
Bump to Main.
eBlender Jan 6, 2025
177aa8e
Merge branch 'main' into iOS-Stremable-Functions
eBlender Jan 6, 2025
f4d678b
Cleanup
eBlender Jan 6, 2025
74557e7
Merge branch 'iOS-Stremable-Functions' of https://github.com/eBlender…
eBlender Jan 6, 2025
18f748b
Update Functions.swift
eBlender Jan 7, 2025
4f956fb
Lint check
eBlender Jan 7, 2025
4edc0ad
Function concurrency error
eBlender Jan 7, 2025
e50f69c
Update .github/workflows/functions.yml
eBlender Jan 15, 2025
7356cf9
Update FirebaseFunctions/Tests/Unit/FunctionsTests.swift
eBlender Jan 15, 2025
aed47d6
Delete firebase-database-emulator.log
eBlender Jan 15, 2025
f6c6cff
Delete firebase-database-emulator.pid
eBlender Jan 15, 2025
75a7574
Update function error handling.
eBlender Jan 15, 2025
adf7366
Merge branch 'iOS-Stremable-Functions' of https://github.com/eBlender…
eBlender Jan 15, 2025
9ef7411
Update FirebaseFunctions/Tests/Unit/FunctionsTests.swift
eBlender Jan 16, 2025
4ee820e
Update FunctionsTests.swift
eBlender Jan 16, 2025
fd68f01
Merge branch 'iOS-Stremable-Functions' of https://github.com/eBlender…
eBlender Jan 16, 2025
f27bf07
Update FunctionsTests.swift
eBlender Jan 16, 2025
1ffa4f0
Format and refactoring.
eBlender Jan 16, 2025
80f0991
Update FirebaseFunctions/Tests/Unit/FunctionsTests.swift
eBlender Jan 16, 2025
756dc26
Update FirebaseFunctions/Tests/Unit/FunctionsTests.swift
eBlender Jan 16, 2025
f031c1f
Update FirebaseFunctions/Tests/Unit/FunctionsTests.swift
eBlender Jan 16, 2025
0df7f8d
Update FirebaseFunctions/Tests/Unit/FunctionsTests.swift
eBlender Jan 16, 2025
231c7dd
Update FirebaseFunctions/Sources/Functions.swift
eBlender Jan 16, 2025
be80d63
Update FirebaseFunctions/Sources/Functions.swift
eBlender Jan 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 155 additions & 2 deletions FirebaseFunctions/Sources/Functions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,105 @@ enum FunctionsConstants {
}
}

@available(iOS 13, macCatalyst 13, macOS 10.15, tvOS 13, watchOS 7, *)
func stream(at url: URL,
eBlender marked this conversation as resolved.
Show resolved Hide resolved
withObject data: Any?,
options: HTTPSCallableOptions?,
timeout: TimeInterval) async throws
-> AsyncThrowingStream<HTTPSCallableResult, Error> {
let context = try await contextProvider.context(options: options)
let fetcher = try makeFetcherForStreamableContent(
url: url,
data: data,
options: options,
timeout: timeout,
context: context
)

do {
let rawData = try await fetcher.beginFetch()
return try callableResultFromResponseAsync(data: rawData, error: nil)
Comment on lines +490 to +491
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just came across one subtle issue is that GTMSessionFetcher is returning the entire data result at once rather than chunks of the stream. It doesn't look like GTMSessionFetcher supports receiving streams so we may have to change this up and use the standard library's URLSession and URLSession.AsyncBytes

Copy link
Author

@eBlender eBlender Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll get to it. I was trying to keep it as closest as possible to HTTPCallable. It is subtle but quite important.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend following what the Vertex SDK does. Here is the method that loads the request stream: https://github.com/firebase/firebase-ios-sdk/blob/main/FirebaseVertexAI/Sources/GenerativeAIService.swift#L82-L176

I'm thinking the implementation here won't be as complex.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM.

} catch {
// This method always throws when `error` is not `nil`, but ideally,
// it should be refactored so it looks less confusing.
return try callableResultFromResponseAsync(data: nil, error: error)
}
}

@available(iOS 13.0, *)
func callableResultFromResponseAsync(data: Data?,
error: Error?) throws -> AsyncThrowingStream<
HTTPSCallableResult, Error

> {
let processedData =
try processResponseDataForStreamableContent(
from: data,
error: error
)

return processedData
}

private func makeFetcherForStreamableContent(url: URL,
data: Any?,
options: HTTPSCallableOptions?,
timeout: TimeInterval,
context: FunctionsContext) throws
-> GTMSessionFetcher {
let request = URLRequest(
url: url,
cachePolicy: .useProtocolCachePolicy,
timeoutInterval: timeout
)
let fetcher = fetcherService.fetcher(with: request)

let data = data ?? NSNull()
let encoded = try serializer.encode(data)
let body = ["data": encoded]
let payload = try JSONSerialization.data(withJSONObject: body, options: [.fragmentsAllowed])
fetcher.bodyData = payload

// Set the headers for starting a streaming session.
fetcher.setRequestValue("application/json", forHTTPHeaderField: "Content-Type")
fetcher.setRequestValue("text/event-stream", forHTTPHeaderField: "Accept")
fetcher.request?.httpMethod = "POST"
if let authToken = context.authToken {
let value = "Bearer \(authToken)"
fetcher.setRequestValue(value, forHTTPHeaderField: "Authorization")
}

if let fcmToken = context.fcmToken {
fetcher.setRequestValue(fcmToken, forHTTPHeaderField: Constants.fcmTokenHeader)
}

if options?.requireLimitedUseAppCheckTokens == true {
if let appCheckToken = context.limitedUseAppCheckToken {
fetcher.setRequestValue(
appCheckToken,
forHTTPHeaderField: Constants.appCheckTokenHeader
)
}
} else if let appCheckToken = context.appCheckToken {
fetcher.setRequestValue(
appCheckToken,
forHTTPHeaderField: Constants.appCheckTokenHeader
)
}
// Remove after genStream is updated on the emulator or deployed
#if DEBUG
fetcher.allowLocalhostRequest = true
fetcher.allowedInsecureSchemes = ["http"]
#endif
// Override normal security rules if this is a local test.
if emulatorOrigin != nil {
fetcher.allowLocalhostRequest = true
fetcher.allowedInsecureSchemes = ["http"]
}

return fetcher
}

private func makeFetcher(url: URL,
data: Any?,
options: HTTPSCallableOptions?,
Expand Down Expand Up @@ -556,6 +655,58 @@ enum FunctionsConstants {
return data
}

@available(iOS 13, macCatalyst 13, macOS 10.15, tvOS 13, watchOS 7, *)
private func processResponseDataForStreamableContent(from data: Data?,
error: Error?) throws
-> AsyncThrowingStream<
HTTPSCallableResult,
Error
> {
return AsyncThrowingStream { continuation in
Task {
var resultArray = [String]()
do {
if let error = error {
throw error
}

guard let data = data else {
throw NSError(domain: FunctionsErrorDomain.description, code: -1, userInfo: nil)
}

if let dataChunk = String(data: data, encoding: .utf8) {
// We remove the "data :" field so it can be safely parsed to Json.
let dataChunkToJson = dataChunk.split(separator: "\n").map {
String($0.dropFirst(6))
}
eBlender marked this conversation as resolved.
Show resolved Hide resolved
resultArray.append(contentsOf: dataChunkToJson)
} else {
throw NSError(domain: FunctionsErrorDomain.description, code: -1, userInfo: nil)
}

for dataChunk in resultArray {
let json = try callableResult(
fromResponseData: dataChunk.data(
using: .utf8,
allowLossyConversion: true
) ?? Data()
)
continuation.yield(HTTPSCallableResult(data: json.data))
}

continuation.onTermination = { @Sendable _ in
// Callback for cancelling the stream
continuation.finish()
}
// Close the stream once it's done
continuation.finish()
} catch {
continuation.finish(throwing: error)
}
}
}
}

private func responseDataJSON(from data: Data) throws -> Any {
let responseJSONObject = try JSONSerialization.jsonObject(with: data)

Expand All @@ -564,8 +715,10 @@ enum FunctionsConstants {
throw FunctionsError(.internal, userInfo: userInfo)
}

// `result` is checked for backwards compatibility:
guard let dataJSON = responseJSON["data"] ?? responseJSON["result"] else {
// `result` is checked for backwards compatibility,
// `message` is checked for StramableContent:
guard let dataJSON = responseJSON["data"] ?? responseJSON["result"] ?? responseJSON["message"]
else {
let userInfo = [NSLocalizedDescriptionKey: "Response is missing data field."]
throw FunctionsError(.internal, userInfo: userInfo)
}
Expand Down
7 changes: 7 additions & 0 deletions FirebaseFunctions/Sources/HTTPSCallable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,11 @@ open class HTTPSCallable: NSObject {
try await functions
.callFunction(at: url, withObject: data, options: options, timeout: timeoutInterval)
}

@available(iOS 13, tvOS 13, macOS 10.15, macCatalyst 13, watchOS 7, *)
open func stream(_ data: Any? = nil) async throws
-> AsyncThrowingStream<HTTPSCallableResult, Error> {
try await functions
.stream(at: url, withObject: data, options: options, timeout: timeoutInterval)
}
}
84 changes: 84 additions & 0 deletions FirebaseFunctions/Tests/Unit/FunctionsTests.swift
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With #14357 merged, we should be able to get these passing in CI now. This can be done by moving these new tests to the integration test file and use the emulatorURL helper method for the stream API's at URL: URL property. https://github.com/firebase/firebase-ios-sdk/blob/main/FirebaseFunctions/Tests/Integration/IntegrationTests.swift

Original file line number Diff line number Diff line change
Expand Up @@ -358,4 +358,88 @@ class FunctionsTests: XCTestCase {
}
waitForExpectations(timeout: 1.5)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think another good test case would be to test when an error is thrown.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we would have (genStream, genStreamError, genStreamNoReturn)
So I will add genStreamError and genStreamNoReturn cases as well.

SG??

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM.

func testGenerateStreamContent() async throws {
let options = HTTPSCallableOptions(requireLimitedUseAppCheckTokens: true)
var response = [String]()
let responseQueue = DispatchQueue(label: "responseQueue")
eBlender marked this conversation as resolved.
Show resolved Hide resolved

let input: [String: Any] = ["data": "Why is the sky blue"]
let stream = try await functions?.stream(
at: URL(string: "http://127.0.0.1:5001/demo-project/us-central1/genStream")!,
withObject: input,
options: options,
timeout: 4.0
)
// First chunk of the stream comes as NSDictionary
if let stream = stream {
eBlender marked this conversation as resolved.
Show resolved Hide resolved
for try await result in stream {
if let dataChunk = result.data as? NSDictionary {
for (key, value) in dataChunk {
responseQueue.sync {
response.append("\(key) \(value)")
}
}
} else {
// Last chunk is the concatenated result so we have to parse it as String else will
// fail.
if let dataString = result.data as? String {
responseQueue.sync {
response.append(dataString)
}
}
}
}
XCTAssertEqual(
response,
[
"chunk hello",
"chunk world",
"chunk this",
"chunk is",
"chunk cool",
"hello world this is cool",
]
)
}
XCTExpectFailure("Failed to download stream")
eBlender marked this conversation as resolved.
Show resolved Hide resolved
}

func testGenerateStreamContentCanceled() async {
let options = HTTPSCallableOptions(requireLimitedUseAppCheckTokens: true)
let input: [String: Any] = ["data": "Why is the sky blue"]

let task = Task.detached { [self] in
let stream = try await functions!.stream(
at: URL(string: "http://127.0.0.1:5001/demo-project/us-central1/genStream")!,
withObject: input,
options: options,
timeout: 4.0
)
// First chunk of the stream comes as NSDictionary
var response = [String]()
for try await result in stream {
if let dataChunk = result.data as? NSDictionary {
for (key, value) in dataChunk {
response.append("\(key) \(value)")
}
} else {
// Last chunk is the concatenated result so we have to parse it as String else will
// fail.
if let dataString = result.data as? String {
response.append(dataString)
}
}
}
eBlender marked this conversation as resolved.
Show resolved Hide resolved
// Since we cancel the call we are expecting an empty array.
XCTAssertEqual(
response,
[]
)
}
// We cancel the task and we expect a null response even if the stream was initiated.
task.cancel()
let result = await task.result
XCTAssertNotNil(result)
}
}