Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: support AsyncIterable instead of Observable #600

Closed
paralin opened this issue Jun 26, 2022 · 8 comments
Closed

Feature: support AsyncIterable instead of Observable #600

paralin opened this issue Jun 26, 2022 · 8 comments
Labels

Comments

@paralin
Copy link
Collaborator

paralin commented Jun 26, 2022

Currently the interface generated for an RPC is like:

export interface MyService {
  MyRpcCall(request: Observable<Packet>): Observable<Packet>
}

I am proposing to add a new option which instead generates:

import { Source, Sink } from 'it-stream-types'

export interface MyService {
  MyRpcCall(request: Source<Packet>): Source<Packet>
}

This would use it-pipe and the other iterables instead of Observable and rxjs.

@stephenh
Copy link
Owner

@paralin sure, that sgtm. Little worried about this becoming a slippery slope to supporting ~N different stream libraries, but understood that just is what it is for now, w/o something built-in to node.

@paralin
Copy link
Collaborator Author

paralin commented Jun 26, 2022

@stephenh https://www.typescriptlang.org/docs/handbook/release-notes/typescript-2-3.html

As far as I know Async Iterators are actually somewhat more of a Javascript feature and closer to the "standard way" than rxjs Observables, since they're baked into the language.

The it-stream-types and it-pipe libraries are just nice interfaces on top to make it a bit easier.

@stephenh
Copy link
Owner

@paralin ah yeah, that's a good point! Even better.

Oh wow, yeah, that it-stream-types library is really small:

https://github.com/achingbrain/it-stream-types/blob/master/src/index.ts

Could we get by with just using AsyncIterator directly? Like if Source is just a type alias:

export type Source<T> = AsyncIterable<T> | Iterable<T>

I'm wondering if using AsyncIterable directly would still be just as ergonomic for users of it-pipe, without introducing that dependency in the ts-proto code itself?

@paralin
Copy link
Collaborator Author

paralin commented Jun 28, 2022

Yes, I think the only difference is to replace the Observable type with AsyncIterable.

@stephenh
Copy link
Owner

Ah great! That sounds good to me. I think we have an useObservable flag already...iirc? If so, maybe a useObservable=rxjs / useObservable=async?

Fwiw given AsyncIterable is built-in, I'd be fine switching the default behavior to use it instead.

@paralin
Copy link
Collaborator Author

paralin commented Jun 30, 2022

I've implemented the code generation part without adding any additional package deps.

The following examples have comments which indicate which it-stream-type they are compatible with.

Example of generated encodeTransform and decodeTransform functions:

  // encodeTransform encodes a source of message objects.
  // implements Transform<TestMessage, Uint8Array>
  // source: Source<TestMessage | TestMessage[]>
  async *encodeTransform(
    source: AsyncIterable<TestMessage | TestMessage[]> | Iterable<TestMessage | TestMessage[]>
  ): AsyncIterable<Uint8Array> {
    for await (const pkt of source) {
      if (Array.isArray(pkt)) {
        for (const p of pkt) {
          yield* [TestMessage.encode(p).finish()]
        }
      } else {
        yield* [TestMessage.encode(pkt).finish()]
      }
    }
  },

  // decodeTransform decodes a source of encoded messages.
  // implements Transform<Uint8Array, TestMessage>
  // source: Source<Uint8Array | Uint8Array[]>
  async *decodeTransform(
    source: AsyncIterable<Uint8Array | Uint8Array[]> | Iterable<Uint8Array | Uint8Array[]>
  ): AsyncIterable<TestMessage> {
    for await (const pkt of source) {
      if (Array.isArray(pkt)) {
        for (const p of pkt) {
          yield* [TestMessage.decode(p)]
        }
      } else {
        yield* [TestMessage.decode(pkt)]
      }
    }
  },

Example of the generated RPC for a bidirectional RPC:

  BidiStreaming(request: AsyncIterable<TestMessage>): AsyncIterable<TestMessage> {
    const data = TestMessage.encodeTransform(request)
    const result = this.rpc.bidirectionalStreamingRequest('simple.Test', 'BidiStreaming', data);
    return TestMessage.decodeTransform(result)
  }

I'm quite happy with how straightforward this ended up being to implement.

@paralin
Copy link
Collaborator Author

paralin commented Jun 30, 2022

See #605

paralin added a commit to paralin/ts-proto that referenced this issue Jun 30, 2022
Adds option useAsyncIterable which uses AsyncIterable instead of Observable.

For example:

  bidirectionalStreamingRequest(
    service: string,
    method: string,
    data: AsyncIterable<Uint8Array>
  ): AsyncIterable<Uint8Array>

Generates Transform async iterables for encoding and decoding:

  // encodeTransform encodes a source of message objects.
  // Transform<TestMessage, Uint8Array>
  async *encodeTransform(
    source: AsyncIterable<TestMessage | TestMessage[]> | Iterable<TestMessage | TestMessage[]>
  ): AsyncIterable<Uint8Array> {
    for await (const pkt of source) {
      if (Array.isArray(pkt)) {
        for (const p of pkt) {
          yield* [TestMessage.encode(p).finish()];
        }
      } else {
        yield* [TestMessage.encode(pkt).finish()];
      }
    }
  },

  // decodeTransform decodes a source of encoded messages.
  // Transform<Uint8Array, TestMessage>
  async *decodeTransform(
    source: AsyncIterable<Uint8Array | Uint8Array[]> | Iterable<Uint8Array | Uint8Array[]>
  ): AsyncIterable<TestMessage> {
    for await (const pkt of source) {
      if (Array.isArray(pkt)) {
        for (const p of pkt) {
          yield* [TestMessage.decode(p)];
        }
      } else {
        yield* [TestMessage.decode(pkt)];
      }
    }
  },

Generates RPC service implementations which use the Transform iterators:

  BidiStreaming(request: AsyncIterable<TestMessage>): AsyncIterable<TestMessage> {
    const data = TestMessage.encodeTransform(request);
    const result = this.rpc.bidirectionalStreamingRequest('simple.Test', 'BidiStreaming', data);
    return TestMessage.decodeTransform(result);
  }

AsyncIterables indicate a stream has ended by closing with an optional error.

Fixes stephenh#600

Signed-off-by: Christian Stewart <christian@paral.in>
paralin added a commit to paralin/ts-proto that referenced this issue Jun 30, 2022
Adds option useAsyncIterable which uses AsyncIterable instead of Observable.

For example:

  bidirectionalStreamingRequest(
    service: string,
    method: string,
    data: AsyncIterable<Uint8Array>
  ): AsyncIterable<Uint8Array>

Generates Transform async iterables for encoding and decoding:

  // encodeTransform encodes a source of message objects.
  // Transform<TestMessage, Uint8Array>
  async *encodeTransform(
    source: AsyncIterable<TestMessage | TestMessage[]> | Iterable<TestMessage | TestMessage[]>
  ): AsyncIterable<Uint8Array> {
    for await (const pkt of source) {
      if (Array.isArray(pkt)) {
        for (const p of pkt) {
          yield* [TestMessage.encode(p).finish()];
        }
      } else {
        yield* [TestMessage.encode(pkt).finish()];
      }
    }
  },

  // decodeTransform decodes a source of encoded messages.
  // Transform<Uint8Array, TestMessage>
  async *decodeTransform(
    source: AsyncIterable<Uint8Array | Uint8Array[]> | Iterable<Uint8Array | Uint8Array[]>
  ): AsyncIterable<TestMessage> {
    for await (const pkt of source) {
      if (Array.isArray(pkt)) {
        for (const p of pkt) {
          yield* [TestMessage.decode(p)];
        }
      } else {
        yield* [TestMessage.decode(pkt)];
      }
    }
  },

Generates RPC service implementations which use the Transform iterators:

  BidiStreaming(request: AsyncIterable<TestMessage>): AsyncIterable<TestMessage> {
    const data = TestMessage.encodeTransform(request);
    const result = this.rpc.bidirectionalStreamingRequest('simple.Test', 'BidiStreaming', data);
    return TestMessage.decodeTransform(result);
  }

AsyncIterables indicate a stream has ended by closing with an optional error.

Fixes stephenh#600

Signed-off-by: Christian Stewart <christian@paral.in>
paralin added a commit to paralin/ts-proto that referenced this issue Jun 30, 2022
Adds option useAsyncIterable which uses AsyncIterable instead of Observable.

For example:

  bidirectionalStreamingRequest(
    service: string,
    method: string,
    data: AsyncIterable<Uint8Array>
  ): AsyncIterable<Uint8Array>

Generates Transform async iterables for encoding and decoding:

  // encodeTransform encodes a source of message objects.
  // Transform<TestMessage, Uint8Array>
  async *encodeTransform(
    source: AsyncIterable<TestMessage | TestMessage[]> | Iterable<TestMessage | TestMessage[]>
  ): AsyncIterable<Uint8Array> {
    for await (const pkt of source) {
      if (Array.isArray(pkt)) {
        for (const p of pkt) {
          yield* [TestMessage.encode(p).finish()];
        }
      } else {
        yield* [TestMessage.encode(pkt).finish()];
      }
    }
  },

  // decodeTransform decodes a source of encoded messages.
  // Transform<Uint8Array, TestMessage>
  async *decodeTransform(
    source: AsyncIterable<Uint8Array | Uint8Array[]> | Iterable<Uint8Array | Uint8Array[]>
  ): AsyncIterable<TestMessage> {
    for await (const pkt of source) {
      if (Array.isArray(pkt)) {
        for (const p of pkt) {
          yield* [TestMessage.decode(p)];
        }
      } else {
        yield* [TestMessage.decode(pkt)];
      }
    }
  },

Generates RPC service implementations which use the Transform iterators:

  BidiStreaming(request: AsyncIterable<TestMessage>): AsyncIterable<TestMessage> {
    const data = TestMessage.encodeTransform(request);
    const result = this.rpc.bidirectionalStreamingRequest('simple.Test', 'BidiStreaming', data);
    return TestMessage.decodeTransform(result);
  }

AsyncIterables indicate a stream has ended by closing with an optional error.

Fixes stephenh#600

Signed-off-by: Christian Stewart <christian@paral.in>
paralin added a commit to paralin/ts-proto that referenced this issue Jun 30, 2022
Adds option useAsyncIterable which uses AsyncIterable instead of Observable.

For example:

  bidirectionalStreamingRequest(
    service: string,
    method: string,
    data: AsyncIterable<Uint8Array>
  ): AsyncIterable<Uint8Array>

Generates Transform async iterables for encoding and decoding:

  // encodeTransform encodes a source of message objects.
  // Transform<TestMessage, Uint8Array>
  async *encodeTransform(
    source: AsyncIterable<TestMessage | TestMessage[]> | Iterable<TestMessage | TestMessage[]>
  ): AsyncIterable<Uint8Array> {
    for await (const pkt of source) {
      if (Array.isArray(pkt)) {
        for (const p of pkt) {
          yield* [TestMessage.encode(p).finish()];
        }
      } else {
        yield* [TestMessage.encode(pkt).finish()];
      }
    }
  },

  // decodeTransform decodes a source of encoded messages.
  // Transform<Uint8Array, TestMessage>
  async *decodeTransform(
    source: AsyncIterable<Uint8Array | Uint8Array[]> | Iterable<Uint8Array | Uint8Array[]>
  ): AsyncIterable<TestMessage> {
    for await (const pkt of source) {
      if (Array.isArray(pkt)) {
        for (const p of pkt) {
          yield* [TestMessage.decode(p)];
        }
      } else {
        yield* [TestMessage.decode(pkt)];
      }
    }
  },

Generates RPC service implementations which use the Transform iterators:

  BidiStreaming(request: AsyncIterable<TestMessage>): AsyncIterable<TestMessage> {
    const data = TestMessage.encodeTransform(request);
    const result = this.rpc.bidirectionalStreamingRequest('simple.Test', 'BidiStreaming', data);
    return TestMessage.decodeTransform(result);
  }

AsyncIterables indicate a stream has ended by closing with an optional error.

Fixes stephenh#600

Signed-off-by: Christian Stewart <christian@paral.in>
paralin added a commit to paralin/ts-proto that referenced this issue Jul 1, 2022
Adds option useAsyncIterable which uses AsyncIterable instead of Observable.

For example:

  bidirectionalStreamingRequest(
    service: string,
    method: string,
    data: AsyncIterable<Uint8Array>
  ): AsyncIterable<Uint8Array>

Generates Transform async iterables for encoding and decoding:

  // encodeTransform encodes a source of message objects.
  // Transform<TestMessage, Uint8Array>
  async *encodeTransform(
    source: AsyncIterable<TestMessage | TestMessage[]> | Iterable<TestMessage | TestMessage[]>
  ): AsyncIterable<Uint8Array> {
    for await (const pkt of source) {
      if (Array.isArray(pkt)) {
        for (const p of pkt) {
          yield* [TestMessage.encode(p).finish()];
        }
      } else {
        yield* [TestMessage.encode(pkt).finish()];
      }
    }
  },

  // decodeTransform decodes a source of encoded messages.
  // Transform<Uint8Array, TestMessage>
  async *decodeTransform(
    source: AsyncIterable<Uint8Array | Uint8Array[]> | Iterable<Uint8Array | Uint8Array[]>
  ): AsyncIterable<TestMessage> {
    for await (const pkt of source) {
      if (Array.isArray(pkt)) {
        for (const p of pkt) {
          yield* [TestMessage.decode(p)];
        }
      } else {
        yield* [TestMessage.decode(pkt)];
      }
    }
  },

Generates RPC service implementations which use the Transform iterators:

  BidiStreaming(request: AsyncIterable<TestMessage>): AsyncIterable<TestMessage> {
    const data = TestMessage.encodeTransform(request);
    const result = this.rpc.bidirectionalStreamingRequest('simple.Test', 'BidiStreaming', data);
    return TestMessage.decodeTransform(result);
  }

AsyncIterables indicate a stream has ended by closing with an optional error.

Fixes stephenh#600

Signed-off-by: Christian Stewart <christian@paral.in>
paralin added a commit to paralin/ts-proto that referenced this issue Jul 1, 2022
Adds option useAsyncIterable which uses AsyncIterable instead of Observable.

For example:

  bidirectionalStreamingRequest(
    service: string,
    method: string,
    data: AsyncIterable<Uint8Array>
  ): AsyncIterable<Uint8Array>

Generates Transform async iterables for encoding and decoding:

  // encodeTransform encodes a source of message objects.
  // Transform<TestMessage, Uint8Array>
  async *encodeTransform(
    source: AsyncIterable<TestMessage | TestMessage[]> | Iterable<TestMessage | TestMessage[]>
  ): AsyncIterable<Uint8Array> {
    for await (const pkt of source) {
      if (Array.isArray(pkt)) {
        for (const p of pkt) {
          yield* [TestMessage.encode(p).finish()];
        }
      } else {
        yield* [TestMessage.encode(pkt).finish()];
      }
    }
  },

  // decodeTransform decodes a source of encoded messages.
  // Transform<Uint8Array, TestMessage>
  async *decodeTransform(
    source: AsyncIterable<Uint8Array | Uint8Array[]> | Iterable<Uint8Array | Uint8Array[]>
  ): AsyncIterable<TestMessage> {
    for await (const pkt of source) {
      if (Array.isArray(pkt)) {
        for (const p of pkt) {
          yield* [TestMessage.decode(p)];
        }
      } else {
        yield* [TestMessage.decode(pkt)];
      }
    }
  },

Generates RPC service implementations which use the Transform iterators:

  BidiStreaming(request: AsyncIterable<TestMessage>): AsyncIterable<TestMessage> {
    const data = TestMessage.encodeTransform(request);
    const result = this.rpc.bidirectionalStreamingRequest('simple.Test', 'BidiStreaming', data);
    return TestMessage.decodeTransform(result);
  }

AsyncIterables indicate a stream has ended by closing with an optional error.

Fixes stephenh#600

Signed-off-by: Christian Stewart <christian@paral.in>
paralin added a commit to paralin/ts-proto that referenced this issue Jul 1, 2022
Adds option useAsyncIterable which uses AsyncIterable instead of Observable.

For example:

  bidirectionalStreamingRequest(
    service: string,
    method: string,
    data: AsyncIterable<Uint8Array>
  ): AsyncIterable<Uint8Array>

Generates Transform async iterables for encoding and decoding:

  // encodeTransform encodes a source of message objects.
  // Transform<TestMessage, Uint8Array>
  async *encodeTransform(
    source: AsyncIterable<TestMessage | TestMessage[]> | Iterable<TestMessage | TestMessage[]>
  ): AsyncIterable<Uint8Array> {
    for await (const pkt of source) {
      if (Array.isArray(pkt)) {
        for (const p of pkt) {
          yield* [TestMessage.encode(p).finish()];
        }
      } else {
        yield* [TestMessage.encode(pkt).finish()];
      }
    }
  },

  // decodeTransform decodes a source of encoded messages.
  // Transform<Uint8Array, TestMessage>
  async *decodeTransform(
    source: AsyncIterable<Uint8Array | Uint8Array[]> | Iterable<Uint8Array | Uint8Array[]>
  ): AsyncIterable<TestMessage> {
    for await (const pkt of source) {
      if (Array.isArray(pkt)) {
        for (const p of pkt) {
          yield* [TestMessage.decode(p)];
        }
      } else {
        yield* [TestMessage.decode(pkt)];
      }
    }
  },

Generates RPC service implementations which use the Transform iterators:

  BidiStreaming(request: AsyncIterable<TestMessage>): AsyncIterable<TestMessage> {
    const data = TestMessage.encodeTransform(request);
    const result = this.rpc.bidirectionalStreamingRequest('simple.Test', 'BidiStreaming', data);
    return TestMessage.decodeTransform(result);
  }

AsyncIterables indicate a stream has ended by closing with an optional error.

Fixes stephenh#600

Signed-off-by: Christian Stewart <christian@paral.in>
stephenh pushed a commit that referenced this issue Jul 1, 2022
# [1.116.0](v1.115.5...v1.116.0) (2022-07-01)

### Features

* add option to use async iterables ([#605](#605)) ([ca8ea8d](ca8ea8d)), closes [#600](#600)
@stephenh
Copy link
Owner

stephenh commented Jul 1, 2022

🎉 This issue has been resolved in version 1.116.0 🎉

The release is available on:

Your semantic-release bot 📦🚀

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants