-
-
Notifications
You must be signed in to change notification settings - Fork 1.8k
/
Copy pathstreams.pyi
153 lines (141 loc) · 5.67 KB
/
streams.pyi
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import ssl
import sys
from _typeshed import ReadableBuffer, StrPath
from collections.abc import AsyncIterator, Awaitable, Callable, Iterable, Sequence, Sized
from typing import Any, Protocol, SupportsIndex
from typing_extensions import Self, TypeAlias
from . import events, protocols, transports
from .base_events import Server
if sys.platform == "win32":
__all__ = ("StreamReader", "StreamWriter", "StreamReaderProtocol", "open_connection", "start_server")
else:
__all__ = (
"StreamReader",
"StreamWriter",
"StreamReaderProtocol",
"open_connection",
"start_server",
"open_unix_connection",
"start_unix_server",
)
_ClientConnectedCallback: TypeAlias = Callable[[StreamReader, StreamWriter], Awaitable[None] | None]
class _ReaduntilBuffer(ReadableBuffer, Sized, Protocol): ...
if sys.version_info >= (3, 10):
async def open_connection(
host: str | None = None,
port: int | str | None = None,
*,
limit: int = 65536,
ssl_handshake_timeout: float | None = ...,
**kwds: Any,
) -> tuple[StreamReader, StreamWriter]: ...
async def start_server(
client_connected_cb: _ClientConnectedCallback,
host: str | Sequence[str] | None = None,
port: int | str | None = None,
*,
limit: int = 65536,
ssl_handshake_timeout: float | None = ...,
**kwds: Any,
) -> Server: ...
else:
async def open_connection(
host: str | None = None,
port: int | str | None = None,
*,
loop: events.AbstractEventLoop | None = None,
limit: int = 65536,
ssl_handshake_timeout: float | None = ...,
**kwds: Any,
) -> tuple[StreamReader, StreamWriter]: ...
async def start_server(
client_connected_cb: _ClientConnectedCallback,
host: str | None = None,
port: int | str | None = None,
*,
loop: events.AbstractEventLoop | None = None,
limit: int = 65536,
ssl_handshake_timeout: float | None = ...,
**kwds: Any,
) -> Server: ...
if sys.platform != "win32":
if sys.version_info >= (3, 10):
async def open_unix_connection(
path: StrPath | None = None, *, limit: int = 65536, **kwds: Any
) -> tuple[StreamReader, StreamWriter]: ...
async def start_unix_server(
client_connected_cb: _ClientConnectedCallback, path: StrPath | None = None, *, limit: int = 65536, **kwds: Any
) -> Server: ...
else:
async def open_unix_connection(
path: StrPath | None = None, *, loop: events.AbstractEventLoop | None = None, limit: int = 65536, **kwds: Any
) -> tuple[StreamReader, StreamWriter]: ...
async def start_unix_server(
client_connected_cb: _ClientConnectedCallback,
path: StrPath | None = None,
*,
loop: events.AbstractEventLoop | None = None,
limit: int = 65536,
**kwds: Any,
) -> Server: ...
class FlowControlMixin(protocols.Protocol):
def __init__(self, loop: events.AbstractEventLoop | None = None) -> None: ...
class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
def __init__(
self,
stream_reader: StreamReader,
client_connected_cb: _ClientConnectedCallback | None = None,
loop: events.AbstractEventLoop | None = None,
) -> None: ...
def __del__(self) -> None: ...
class StreamWriter:
def __init__(
self,
transport: transports.WriteTransport,
protocol: protocols.BaseProtocol,
reader: StreamReader | None,
loop: events.AbstractEventLoop,
) -> None: ...
@property
def transport(self) -> transports.WriteTransport: ...
def write(self, data: bytes | bytearray | memoryview) -> None: ...
def writelines(self, data: Iterable[bytes | bytearray | memoryview]) -> None: ...
def write_eof(self) -> None: ...
def can_write_eof(self) -> bool: ...
def close(self) -> None: ...
def is_closing(self) -> bool: ...
async def wait_closed(self) -> None: ...
def get_extra_info(self, name: str, default: Any = None) -> Any: ...
async def drain(self) -> None: ...
if sys.version_info >= (3, 12):
async def start_tls(
self,
sslcontext: ssl.SSLContext,
*,
server_hostname: str | None = None,
ssl_handshake_timeout: float | None = None,
ssl_shutdown_timeout: float | None = None,
) -> None: ...
elif sys.version_info >= (3, 11):
async def start_tls(
self, sslcontext: ssl.SSLContext, *, server_hostname: str | None = None, ssl_handshake_timeout: float | None = None
) -> None: ...
if sys.version_info >= (3, 11):
def __del__(self) -> None: ...
class StreamReader(AsyncIterator[bytes]):
def __init__(self, limit: int = 65536, loop: events.AbstractEventLoop | None = None) -> None: ...
def exception(self) -> Exception: ...
def set_exception(self, exc: Exception) -> None: ...
def set_transport(self, transport: transports.BaseTransport) -> None: ...
def feed_eof(self) -> None: ...
def at_eof(self) -> bool: ...
def feed_data(self, data: Iterable[SupportsIndex]) -> None: ...
async def readline(self) -> bytes: ...
if sys.version_info >= (3, 13):
async def readuntil(self, separator: _ReaduntilBuffer | tuple[_ReaduntilBuffer, ...] = b"\n") -> bytes: ...
else:
async def readuntil(self, separator: _ReaduntilBuffer = b"\n") -> bytes: ...
async def read(self, n: int = -1) -> bytes: ...
async def readexactly(self, n: int) -> bytes: ...
def __aiter__(self) -> Self: ...
async def __anext__(self) -> bytes: ...