diff --git a/src-json/define.json b/src-json/define.json index a78384d2439..d282dcdf71a 100644 --- a/src-json/define.json +++ b/src-json/define.json @@ -21,6 +21,11 @@ "define": "as3", "doc": "Defined when outputting flash9 as3 source code." }, + { + "name": "Asys", + "define": "asys", + "doc": "Defined for all platforms that support the libuv-based asys package." + }, { "name": "CheckXmlProxy", "define": "check_xml_proxy", diff --git a/src/context/common.ml b/src/context/common.ml index 500d448a493..6ebad553465 100644 --- a/src/context/common.ml +++ b/src/context/common.ml @@ -84,6 +84,8 @@ type platform_config = { pf_static : bool; (** has access to the "sys" package *) pf_sys : bool; + (** has access to the "asys" package *) + pf_asys : bool; (** captured variables handling (see before) *) pf_capture_policy : capture_policy; (** when calling a method with optional args, do we replace the missing args with "null" constants *) @@ -312,6 +314,7 @@ let default_config = { pf_static = true; pf_sys = true; + pf_asys = false; pf_capture_policy = CPNone; pf_pad_nulls = false; pf_add_final_return = false; @@ -588,6 +591,11 @@ let init_platform com pf = define com Define.Sys end else com.package_rules <- PMap.add "sys" Forbidden com.package_rules; + if com.config.pf_asys then begin + raw_define_value com.defines "target.asys" "true"; + define com Define.Asys + end else + com.package_rules <- PMap.add "asys" Forbidden com.package_rules; if com.config.pf_uses_utf16 then begin raw_define_value com.defines "target.utf16" "true"; define com Define.Utf16; diff --git a/std/asys/AsyncFileSystem.hx b/std/asys/AsyncFileSystem.hx new file mode 100644 index 00000000000..3ae72208756 --- /dev/null +++ b/std/asys/AsyncFileSystem.hx @@ -0,0 +1,52 @@ +package asys; + +import haxe.Error; +import haxe.NoData; +import haxe.async.Callback; +import haxe.io.Bytes; +import haxe.io.FilePath; +import asys.*; + +/** + This class provides methods for asynchronous operations on files and + directories. For synchronous operations, see `asys.FileSystem`. + + All methods here are asynchronous versions of the functions in + `asys.FileSystem`. Please see them for a description of the arguments and + use of each method. + + Any synchronous method that returns no value (`Void` return type) has an + extra `callback:Callback` argument. + + Any synchronous method that returns a value has an extra + `callback:Callback` argument, where `T` is the return type of the + synchronous method. + + Errors are communicated through the callbacks or in some cases thrown + immediately. +**/ +extern class AsyncFileSystem { + static function access(path:FilePath, ?mode:FileAccessMode = FileAccessMode.Ok, callback:Callback):Void; + static function chmod(path:FilePath, mode:FilePermissions, ?followSymLinks:Bool = true, callback:Callback):Void; + static function chown(path:FilePath, uid:Int, gid:Int, ?followSymLinks:Bool = true, callback:Callback):Void; + static function copyFile(src:FilePath, dest:FilePath, ?flags:FileCopyFlags, callback:Callback):Void; + static function exists(path:FilePath, callback:Callback):Void; + static function link(existingPath:FilePath, newPath:FilePath, callback:Callback):Void; + static function mkdir(path:FilePath, ?recursive:Bool, ?mode:FilePermissions, callback:Callback):Void; + static function mkdtemp(prefix:FilePath, callback:Callback):Void; + static function readdir(path:FilePath, callback:Callback>):Void; + static function readdirTypes(path:FilePath, callback:Callback>):Void; + static function readlink(path:FilePath, callback:Callback):Void; + static function realpath(path:FilePath, callback:Callback):Void; + static function rename(oldPath:FilePath, newPath:FilePath, callback:Callback):Void; + static function rmdir(path:FilePath, callback:Callback):Void; + static function stat(path:FilePath, ?followSymLinks:Bool = true, callback:Callback):Void; + static function symlink(target:FilePath, path:FilePath, ?type:String, callback:Callback):Void; + static function truncate(path:FilePath, len:Int, callback:Callback):Void; + static function unlink(path:FilePath, callback:Callback):Void; + static function utimes(path:FilePath, atime:Date, mtime:Date, callback:Callback):Void; + static function appendFile(path:FilePath, data:Bytes, ?flags:FileOpenFlags, ?mode:FilePermissions, callback:Callback):Void; + static function open(path:FilePath, ?flags:FileOpenFlags, ?mode:FilePermissions, ?binary:Bool = true, callback:Callback):Void; + static function readFile(path:FilePath, ?flags:FileOpenFlags, callback:Callback):Void; + static function writeFile(path:FilePath, data:Bytes, ?flags:FileOpenFlags, ?mode:FilePermissions, callback:Callback):Void; +} diff --git a/std/asys/CurrentProcess.hx b/std/asys/CurrentProcess.hx new file mode 100644 index 00000000000..fab11a88cbb --- /dev/null +++ b/std/asys/CurrentProcess.hx @@ -0,0 +1,62 @@ +package asys; + +import haxe.async.*; +import asys.net.Socket; +import asys.io.*; + +/** + Methods to control the current process and IPC interaction with the parent + process. +**/ +class CurrentProcess { + /** + Emitted when a message is received over IPC. `initIpc` must be called first + to initialise the IPC channel. + **/ + public static final messageSignal:Signal = new ArraySignal(); + + static var ipc:Socket; + static var ipcOut:IpcSerializer; + static var ipcIn:IpcUnserializer; + + /** + Environment variables, as available to the process when it was created. + This map can be modified, but the changes will only be visible within the + current process. Use `asys.System.setEnv` to make modifications which will + be available in other processes in the same shell. + **/ + public static var environment(default, null):Map; + + /** + Initialise the IPC channel on the given file descriptor `fd`. This should + only be used when the current process was spawned with `Process.spawn` from + another Haxe process. `fd` should correspond to the index of the `Ipc` + entry in `options.stdio`. + **/ + public static function initIpc(fd:Int):Void { + if (ipc != null) + throw "IPC already initialised"; + ipc = Socket.create(); + ipcOut = @:privateAccess new IpcSerializer(ipc); + ipcIn = @:privateAccess new IpcUnserializer(ipc); + ipc.connectFd(fd, true); + ipc.errorSignal.on(err -> trace("IPC error", err)); + ipcIn.messageSignal.on(message -> messageSignal.emit(message)); + } + + /** + Sends a message over IPC. `initIpc` must be called first to initialise the + IPC channel. + **/ + public static function send(message:IpcMessage):Void { + if (ipc == null) + throw "IPC not connected"; + ipcOut.write(message); + } + + extern public static function initUv():Void; + + extern public static function runUv(?mode:asys.uv.UVRunMode = RunDefault):Bool; + + extern public static function stopUv():Void; +} diff --git a/std/asys/DirectoryEntry.hx b/std/asys/DirectoryEntry.hx new file mode 100644 index 00000000000..959f7f10f42 --- /dev/null +++ b/std/asys/DirectoryEntry.hx @@ -0,0 +1,17 @@ +package asys; + +import haxe.io.FilePath; + +/** + An entry returned from `asys.FileSystem.readdirTypes`. +**/ +interface DirectoryEntry { + var name(get, never):FilePath; + function isBlockDevice():Bool; + function isCharacterDevice():Bool; + function isDirectory():Bool; + function isFIFO():Bool; + function isFile():Bool; + function isSocket():Bool; + function isSymbolicLink():Bool; +} diff --git a/std/asys/FileAccessMode.hx b/std/asys/FileAccessMode.hx new file mode 100644 index 00000000000..cec5e363f6f --- /dev/null +++ b/std/asys/FileAccessMode.hx @@ -0,0 +1,19 @@ +package asys; + +/** + Wrapper for file access modes. See `asys.FileSystem.access`. +**/ +enum abstract FileAccessMode(Int) { + var Ok = 0; + var Execute = 1 << 0; + var Write = 1 << 1; + var Read = 1 << 2; + + inline function new(value:Int) + this = value; + + inline function get_raw():Int return this; + + @:op(A | B) + inline function join(other:FileAccessMode):FileAccessMode return new FileAccessMode(this | other.get_raw()); +} diff --git a/std/asys/FileCopyFlags.hx b/std/asys/FileCopyFlags.hx new file mode 100644 index 00000000000..db45d4f0bb2 --- /dev/null +++ b/std/asys/FileCopyFlags.hx @@ -0,0 +1,23 @@ +package asys; + +enum abstract FileCopyFlags(Int) { + /** + Fail if destination exists. + **/ + var FailIfExists = 1 << 0; + + /** + Copy-on-write reflink if possible. + **/ + var COWClone = 1 << 1; + + /** + Copy-on-write reflink or fail. + **/ + var COWCloneForce = 1 << 2; + + inline function get_raw():Int return this; + + @:op(A | B) + inline function join(other:FileCopyFlags) return this | other.get_raw(); +} diff --git a/std/asys/FileOpenFlags.hx b/std/asys/FileOpenFlags.hx new file mode 100644 index 00000000000..47fb8573afd --- /dev/null +++ b/std/asys/FileOpenFlags.hx @@ -0,0 +1,80 @@ +package asys; + +class FileOpenFlagsImpl { + public static function fromString(flags:String):FileOpenFlags { + return (switch (flags) { + case "r": ReadOnly; + case "r+": ReadWrite; + case "rs+": ReadWrite | Sync; + case "sr+": ReadWrite | Sync; + case "w": Truncate | Create | WriteOnly; + case "w+": Truncate | Create | ReadWrite; + case "a": Append | Create | WriteOnly; + case "a+": Append | Create | ReadWrite; + case "wx": Truncate | Create | WriteOnly | Excl; + case "xw": Truncate | Create | WriteOnly | Excl; + case "wx+": Truncate | Create | ReadWrite | Excl; + case "xw+": Truncate | Create | ReadWrite | Excl; + case "ax": Append | Create | WriteOnly | Excl; + case "xa": Append | Create | WriteOnly | Excl; + case "as": Append | Create | WriteOnly | Sync; + case "sa": Append | Create | WriteOnly | Sync; + case "ax+": Append | Create | ReadWrite | Excl; + case "xa+": Append | Create | ReadWrite | Excl; + case "as+": Append | Create | ReadWrite | Sync; + case "sa+": Append | Create | ReadWrite | Sync; + case _: throw "invalid file open flags"; + }); + } +} + +/** + Flags used when opening a file with `asys.FileSystem.open` or other file + functions. Specify whether the opened file: + + - will be readable + - will be writable + - will be truncated (all data lost) first + - will be in append mode + - will be opened exclusively by this process + + Instances of this type can be created by combining flags with the bitwise or + operator: + + ```haxe + Truncate | Create | WriteOnly + ``` + + Well-known combinations of flags can be specified with a string. The + supported modes are: `r`, `r+`, `rs+`, `sr+`, `w`, `w+`, `a`, `a+`, `wx`, + `xw`, `wx+`, `xw+`, `ax`, `xa`, `as`, `sa`, `ax+`, `xa+`, `as+`, `sa+`. +**/ +@:native("asys.FileOpenFlagsImpl") +extern enum abstract FileOpenFlags(Int) { + @:from public static function fromString(flags:String):FileOpenFlags; + + inline function new(value:Int) + this = value; + + inline function get_raw():Int return this; + + @:op(A | B) + inline function join(other:FileOpenFlags):FileOpenFlags return new FileOpenFlags(this | other.get_raw()); + + // TODO: some of these don't make sense in Haxe-wrapped libuv + var Append; + var Create; + var Direct; + var Directory; + var Dsync; + var Excl; + var NoAtime; + var NoCtty; + var NoFollow; + var NonBlock; + var ReadOnly; + var ReadWrite; + var Sync; + var Truncate; + var WriteOnly; +} diff --git a/std/asys/FilePermissions.hx b/std/asys/FilePermissions.hx new file mode 100644 index 00000000000..ba38128b720 --- /dev/null +++ b/std/asys/FilePermissions.hx @@ -0,0 +1,90 @@ +package asys; + +/** + File permissions in specify whether a file can be read, written, or executed + by its owner, its owning group, and everyone else. Instances of this type + can be constructed by combining individual file permissions with the `|` + operator: + + ```haxe + ReadOwner | WriteOwner | ReadGroup | ReadOthers + ``` + + Alternatively, file permissions may be specified as a string with exactly 9 + characters, in the format `rwxrwxrwx`, where each letter may instead be a + `-` character. The first three characters represent the permissions of the + owner, the second three characters represent the permissions of the owning + group, and the last three characters represent the permissions of everyone + else. + + ```haxe + "rw-r--r--" + ``` + + Finally, file permissions may be constructed from an octal representation + using the `fromOctal` function. + + ```haxe + FilePermissions.fromOctal("644") + ``` +**/ +enum abstract FilePermissions(Int) { + @:from public static function fromString(s:String):FilePermissions { + inline function bit(cc:Int, expect:Int):Int { + return (if (cc == expect) + 1; + else if (cc == "-".code) + 0; + else + throw "invalid file permissions string"); + } + switch (s.length) { + case 9: // rwxrwxrwx + return new FilePermissions(bit(s.charCodeAt(0), "r".code) << 8 + | bit(s.charCodeAt(1), "w".code) << 7 + | bit(s.charCodeAt(2), "x".code) << 6 + | bit(s.charCodeAt(3), "r".code) << 5 + | bit(s.charCodeAt(4), "w".code) << 4 + | bit(s.charCodeAt(5), "x".code) << 3 + | bit(s.charCodeAt(6), "r".code) << 2 + | bit(s.charCodeAt(7), "w".code) << 1 + | bit(s.charCodeAt(8), "x".code)); + case _: + throw "invalid file permissions string"; + } + } + + public static function fromOctal(s:String):FilePermissions { + inline function digit(n:Int):Int { + if (n >= "0".code && n <= "7".code) return n - "0".code; + throw "invalid octal file permissions"; + } + switch (s.length) { + case 3: // 777 + return new FilePermissions(digit(s.charCodeAt(0)) << 6 + | digit(s.charCodeAt(1)) << 3 + | digit(s.charCodeAt(2))); + case _: + throw "invalid octal file permissions"; + } + } + + var None = 0; + var ExecuteOthers = 1 << 0; + var WriteOthers = 1 << 1; + var ReadOthers = 1 << 2; + var ExecuteGroup = 1 << 3; + var WriteGroup = 1 << 4; + var ReadGroup = 1 << 5; + var ExecuteOwner = 1 << 6; + var WriteOwner = 1 << 7; + var ReadOwner = 1 << 8; + + inline function new(value:Int) + this = value; + + inline function get_raw():Int return this; + + @:op(A | B) + inline function join(other:FilePermissions) return new FilePermissions(this | other.get_raw()); +} diff --git a/std/asys/FileStat.hx b/std/asys/FileStat.hx new file mode 100644 index 00000000000..6599edba380 --- /dev/null +++ b/std/asys/FileStat.hx @@ -0,0 +1,40 @@ +package asys; + +typedef FileStatData = { + var atime:Date; + var ctime:Date; + var dev:Int; + var gid:Int; + var ino:Int; + var mode:Int; + var mtime:Date; + var nlink:Int; + var rdev:Int; + var size:Int; + var uid:Int; + + var blksize:Int; + var blocks:Int; + var atimeMs:Float; + var ctimeMs:Float; + var mtimeMs:Float; + var birthtime:Date; + var birthtimeMs:Float; +}; + +@:forward +abstract FileStat(FileStatData) from FileStatData { + public function isBlockDevice():Bool return false; + + public function isCharacterDevice():Bool return false; + + public function isDirectory():Bool return false; + + public function isFIFO():Bool return false; + + public function isFile():Bool return false; + + public function isSocket():Bool return false; + + public function isSymbolicLink():Bool return false; +} diff --git a/std/asys/FileSystem.hx b/std/asys/FileSystem.hx new file mode 100644 index 00000000000..53c9d232e54 --- /dev/null +++ b/std/asys/FileSystem.hx @@ -0,0 +1,220 @@ +package asys; + +import haxe.Error; +import haxe.io.Bytes; +import haxe.io.FilePath; +import asys.io.*; + +typedef FileReadStreamCreationOptions = { + ?flags:FileOpenFlags, + ?mode:FilePermissions +} & + asys.io.FileReadStream.FileReadStreamOptions; + +/** + This class provides methods for synchronous operations on files and + directories. For asynchronous operations, see `asys.async.FileSystem`. + + Passing `null` as a path to any of the functions in this class will result + in unspecified behaviour. +**/ +extern class FileSystem { + public static inline final async = asys.AsyncFileSystem; + + /** + Tests specific user permissions for the file specified by `path`. If the + check fails, throws an exception. `mode` is one or more `FileAccessMode` + values: + + - `FileAccessMode.Ok` - file is visible to the calling process (it exists) + - `FileAccessMode.Execute` - file can be executed by the calling process + - `FileAccessMode.Write` - file can be written to by the calling process + - `FileAccessMode.Read` - file can be read from by the calling process + + Mode values can be combined with the bitwise or operator, e.g. calling + `access` with the `mode`: + + ```haxe + FileAccessMode.Execute | FileAccessMode.Read + ``` + + will check that the file is both readable and executable. + + The result of this call should not be used in a condition before a call to + e.g. `open`, because this would introduce a race condition (the file could + be deleted after the `access` call, but before the `open` call). Instead, + the latter function should be called immediately and errors should be + handled with a `try ... catch` block. + **/ + static function access(path:FilePath, ?mode:FileAccessMode = FileAccessMode.Ok):Void; + + /** + Appends `data` at the end of the file located at `path`. + **/ + static function appendFile(path:FilePath, data:Bytes, ?flags:FileOpenFlags /* a */, ?mode:FilePermissions /* 0666 */):Void; + + /** + Changes the permissions of the file specific by `path` to `mode`. + + If `path` points to a symbolic link, this function will change the + permissions of the target file, not the symbolic link itself, unless + `followSymLinks` is set to `false`. + + TODO: `followSymLinks == false` is not implemented and will throw. + **/ + static function chmod(path:FilePath, mode:FilePermissions, ?followSymLinks:Bool = true):Void; + + /** + Changes the owner and group of the file specific by `path` to `uid` and + `gid`, respectively. + + If `path` points to a symbolic link, this function will change the + permissions of the target file, not the symbolic link itself, unless + `followSymLinks` is set to `false`. + **/ + static function chown(path:FilePath, uid:Int, gid:Int, ?followSymLinks:Bool = true):Void; + + /** + Copies the file at `src` to `dest`. If `dest` exists, it is overwritten. + **/ + static function copyFile(src:FilePath, dest:FilePath /* , ?flags:FileCopyFlags */):Void; + + /** + Creates a read stream (an instance of `IReadable`) for the given path. + `options` can be used to specify how the file is opened, as well as which + part of the file will be read by the stream. + + - `options.flags` - see `open`. + - `options.mode` - see `open`. + - `options.autoClose` - whether the file should be closed automatically + once the stream is fully consumed. + - `options.start` - starting position in bytes (inclusive). + - `options.end` - end position in bytes (non-inclusive). + **/ + static function createReadStream(path:FilePath, ?options:FileReadStreamCreationOptions):FileReadStream; + + // static function createWriteStream(path:FilePath, ?options:{?flags:FileOpenFlags, ?mode:FilePermissions, ?autoClose:Bool, ?start:Int}):FileWriteStream; + + /** + Returns `true` if the file or directory specified by `path` exists. + + The result of this call should not be used in a condition before a call to + e.g. `open`, because this would introduce a race condition (the file could + be deleted after the `exists` call, but before the `open` call). Instead, + the latter function should be called immediately and errors should be + handled with a `try ... catch` block. + **/ + static function exists(path:FilePath):Bool; + + static function link(existingPath:FilePath, newPath:FilePath):Void; + + /** + Creates a directory at the path `path`, with file mode `mode`. + + If `recursive` is `false` (default), this function can only create one + directory at a time, the last component of `path`. If `recursive` is `true`, + intermediate directories will be created as needed. + **/ + static function mkdir(path:FilePath, ?recursive:Bool = false, ?mode:FilePermissions /* 0777 */):Void; + + /** + Creates a unique temporary directory. `prefix` should be a path template + ending in six `X` characters, which will be replaced with random characters. + Returns the path to the created directory. + + The generated directory needs to be manually deleted by the process. + **/ + static function mkdtemp(prefix:FilePath):FilePath; + + /** + Opens the file located at `path`. + **/ + static function open(path:FilePath, ?flags:FileOpenFlags /* a */, ?mode:FilePermissions /* 0666 */):File; + + /** + Reads the contents of a directory specified by `path`. Returns an array of + `FilePath`s relative to the specified directory (i.e. the paths are not + absolute). The array will not include `.` or `..`. + **/ + static function readdir(path:FilePath):Array; + + /** + Same as `readdir`, but returns an array of `DirectoryEntry` values instead. + **/ + static function readdirTypes(path:FilePath):Array; + + /** + Reads all the bytes of the file located at `path`. + **/ + static function readFile(path:FilePath, ?flags:FileOpenFlags /* r */):Bytes; + + /** + Returns the contents (target path) of the symbolic link located at `path`. + **/ + static function readlink(path:FilePath):FilePath; + + /** + Returns the canonical path name of `path` (which may be a relative path) + by resolving `.`, `..`, and symbolic links. + **/ + static function realpath(path:FilePath):FilePath; + + /** + Renames the file or directory located at `oldPath` to `newPath`. If a file + already exists at `newPath`, it is overwritten. If a directory already + exists at `newPath`, an exception is thrown. + **/ + static function rename(oldPath:FilePath, newPath:FilePath):Void; + + /** + Resizes the file located at `path` to exactly `len` bytes. If the file was + larger than `len` bytes, the extra data is lost. If the file was smaller + than `len` bytes, the file is extended with null bytes. + **/ + static function resize(path:FilePath, ?len:Int = 0):Void; + + /** + Deletes the directory located at `path`. If the directory is not empty or + cannot be deleted, an error is thrown. + **/ + static function rmdir(path:FilePath):Void; + + /** + Returns information about the file located at `path`. + + If `path` points to a symbolic link, this function will return information + about the target file, not the symbolic link itself, unless `followSymLinks` + is set to `false`. + **/ + static function stat(path:FilePath, ?followSymLinks:Bool = true):asys.FileStat; + + /** + Creates a symbolic link at `path`, pointing to `target`. + + The `type` argument is ignored on all platforms except `Windows`. + **/ + static function symlink(target:FilePath, path:FilePath, ?type:SymlinkType = SymlinkType.SymlinkDir):Void; + + /** + Deletes the file located at `path`. + **/ + static function unlink(path:FilePath):Void; + + /** + Modifies the system timestamps of the file located at `path`. + **/ + static function utimes(path:FilePath, atime:Date, mtime:Date):Void; + + /** + Creates a file watcher for `path`. + + @param recursive If `true`, the file watcher will signal for changes in + sub-directories of `path` as well. + **/ + static function watch(path:FilePath, ?recursive:Bool = false):FileWatcher; + + /** + Writes `data` to the file located at `path`. + **/ + static function writeFile(path:FilePath, data:Bytes, ?flags:FileOpenFlags /* w */, ?mode:FilePermissions /* 0666 */):Void; +} diff --git a/std/asys/FileWatcher.hx b/std/asys/FileWatcher.hx new file mode 100644 index 00000000000..e85a1bdc8c6 --- /dev/null +++ b/std/asys/FileWatcher.hx @@ -0,0 +1,47 @@ +package asys; + +import haxe.Error; +import haxe.NoData; +import haxe.async.*; +import haxe.io.FilePath; + +/** + File watchers can be obtained with the `asys.FileSystem.watch` method. + Instances of this class will emit signals whenever any file in their watched + path is modified. +**/ +class FileWatcher { + /** + Emitted when a watched file is modified. + **/ + public final changeSignal:Signal = new ArraySignal(); + + /** + Emitted when `this` watcher is fully closed. No further signals will be + emitted. + **/ + public final closeSignal:Signal = new ArraySignal(); + + /** + Emitted when an error occurs. + **/ + public final errorSignal:Signal = new ArraySignal(); + + private var native:FileWatcherNative; + + private function new(filename:FilePath, recursive:Bool) {} + + /** + Closes `this` watcher. This operation is asynchronous and will emit the + `closeSignal` once done. If `listener` is given, it will be added to the + `closeSignal`. + **/ + public function close(?listener:Listener):Void { + if (listener != null) + closeSignal.once(listener); + } + + extern public function ref():Void; + + extern public function unref():Void; +} diff --git a/std/asys/FileWatcherEvent.hx b/std/asys/FileWatcherEvent.hx new file mode 100644 index 00000000000..4400a72e3ea --- /dev/null +++ b/std/asys/FileWatcherEvent.hx @@ -0,0 +1,14 @@ +package asys; + +import haxe.io.FilePath; + +/** + Events emitted by the `changeSignal` of a `sys.FileWatcher`. Any file change + consists of a name change (`Rename`), a content change (`Change`), or both + (`RenameChange`). +**/ +enum FileWatcherEvent { + Rename(newPath:FilePath); + Change(path:FilePath); + RenameChange(path:FilePath); +} diff --git a/std/asys/Net.hx b/std/asys/Net.hx new file mode 100644 index 00000000000..38545a3fd64 --- /dev/null +++ b/std/asys/Net.hx @@ -0,0 +1,71 @@ +package asys; + +import haxe.NoData; +import haxe.async.*; +import asys.net.*; +import asys.net.SocketOptions.SocketConnectTcpOptions; +import asys.net.SocketOptions.SocketConnectIpcOptions; +import asys.net.Server.ServerOptions; +import asys.net.Server.ServerListenTcpOptions; +import asys.net.Server.ServerListenIpcOptions; + +enum SocketConnect { + Tcp(options:SocketConnectTcpOptions); + Ipc(options:SocketConnectIpcOptions); +} + +enum ServerListen { + Tcp(options:ServerListenTcpOptions); + Ipc(options:ServerListenIpcOptions); +} + +typedef SocketCreationOptions = SocketOptions & {?connect:SocketConnect}; + +typedef ServerCreationOptions = ServerOptions & {?listen:ServerListen}; + +/** + Network utilities. +**/ +class Net { + /** + Constructs a socket with the given `options`. If `options.connect` is + given, an appropriate `connect` method is called on the socket. If `cb` is + given, it is passed to the `connect` method, so it will be called once the + socket successfully connects or an error occurs during connecting. + + The `options` object is given both to the `Socket` constructor and to the + `connect` method. + **/ + public static function createConnection(options:SocketCreationOptions, ?cb:Callback):Socket { + var socket = Socket.create(options); + if (options != null && options.connect != null) + switch (options.connect) { + case Tcp(options): + socket.connectTcp(options, cb); + case Ipc(options): + socket.connectIpc(options, cb); + } + return socket; + } + + /** + Constructs a server with the given `options`. If `options.listen` is + given, an appropriate `listen` method is called on the server. If `cb` is + given, it is passed to the `listen` method, so it will be called for each + client that connects to the server. + + The `options` object is given both to the `Server` constructor and to the + `listen` method. + **/ + public static function createServer(?options:ServerCreationOptions, ?listener:Listener):Server { + var server = new Server(options); + if (options != null && options.listen != null) + switch (options.listen) { + case Tcp(options): + server.listenTcp(options, listener); + case Ipc(options): + server.listenIpc(options, listener); + } + return server; + } +} diff --git a/std/asys/Process.hx b/std/asys/Process.hx new file mode 100644 index 00000000000..69967759940 --- /dev/null +++ b/std/asys/Process.hx @@ -0,0 +1,224 @@ +package asys; + +import haxe.Error; +import haxe.NoData; +import haxe.async.*; +import haxe.io.*; +import asys.net.Socket; +import asys.io.*; +import asys.uv.UVProcessSpawnFlags; + +/** + Options for spawning a process. See `Process.spawn`. + + Either `stdio` or some of `stdin`, `stdout`, and `stderr` can be used define + the file descriptors for the new process: + + - `Ignore` - skip the current position. No stream or pipe will be open for + this index. + - `Inherit` - inherit the corresponding file descriptor from the current + process. Shares standard input, standard output, and standard error in + index 0, 1, and 2, respectively. In index 3 or higher, `Inherit` has the + same effect as `Ignore`. + - `Pipe(readable, writable, ?pipe)` - create or use a pipe. `readable` and + `writable` specify whether the pipe will be readable and writable from + the point of view of the spawned process. If `pipe` is given, it is used + directly, otherwise a new pipe is created. + - `Ipc` - create an IPC (inter-process communication) pipe. Only one may be + specified in `options.stdio`. This special pipe will not have an entry in + the `stdio` array of the resulting process; instead, messages can be sent + using the `send` method, and received over `messageSignal`. IPC pipes + allow sending and receiving structured Haxe data, as well as connected + sockets and pipes. +**/ +typedef ProcessSpawnOptions = { + /** + Path to the working directory. Defaults to the current working directory if + not given. + **/ + ?cwd:FilePath, + /** + Environment variables. Defaults to the environment variables of the current + process if not given. + **/ + ?env:Map, + /** + First entry in the `argv` array for the spawned process. Defaults to + `command` if not given. + **/ + ?argv0:String, + /** + Array of `ProcessIO` specifications, see `Process.spawn`. Must be `null` if + any of `stdin`, `stdout`, or `stderr` are specified. + **/ + ?stdio:Array, + /** + `ProcessIO` specification for file descriptor 0, the standard input. Must + be `null` if `stdio` is specified. + **/ + ?stdin:ProcessIO, + /** + `ProcessIO` specification for file descriptor 1, the standard output. Must + be `null` if `stdio` is specified. + **/ + ?stdout:ProcessIO, + /** + `ProcessIO` specification for file descriptor 2, the standard error. Must + be `null` if `stdio` is specified. + **/ + ?stderr:ProcessIO, + /** + When `true`, creates a detached process which can continue running after + the current process exits. Note that `unref` must be called on the spawned + process otherwise the event loop of the current process is kept alive. + **/ + ?detached:Bool, + /** + User identifier. + **/ + ?uid:Int, + /** + Group identifier. + **/ + ?gid:Int, + // ?shell:?, + /** + (Windows only.) Do not perform automatic quoting or escaping of arguments. + **/ + ?windowsVerbatimArguments:Bool, + /** + (Windows only.) Automatically hide the window of the spawned process. + **/ + ?windowsHide:Bool +}; + +/** + Class representing a spawned process. +**/ +class Process { + /** + Execute the given `command` with `args` (none by default). `options` can be + specified to change the way the process is spawned. See + `ProcessSpawnOptions` for a description of the options. + + Pipes are made available in the `stdio` array after the process is + spawned. Standard file descriptors have their own variables: + + - `stdin` - set to point to a pipe in index 0, if it exists and is + read-only for the spawned process. + - `stdout` - set to point to a pipe in index 1, if it exists and is + write-only for the spawned process. + - `stderr` - set to point to a pipe in index 2, if it exists and is + write-only for the spawned process. + + If `options.stdio` is not given, + `[Pipe(true, false), Pipe(false, true), Pipe(false, true)]` is used as a + default. + **/ + extern public static function spawn(command:String, ?args:Array, ?options:ProcessSpawnOptions):Process; + + /** + Emitted when `this` process and all of its pipes are closed. + **/ + public final closeSignal:Signal = new ArraySignal(); + + /** + Emitted when `this` process disconnects from the IPC channel, if one was + established. + **/ + public final disconnectSignal:Signal = new ArraySignal(); + + /** + Emitted when an error occurs during communication with `this` process. + **/ + public final errorSignal:Signal = new ArraySignal(); + + /** + Emitted when `this` process exits, potentially due to a signal. + **/ + public final exitSignal:Signal = new ArraySignal(); + + /** + Emitted when a message is received over IPC. The process must be created + with an `Ipc` entry in `options.stdio`; see `Process.spawn`. + **/ + public var messageSignal(default, null):Signal; + + /** + `true` when IPC communication is available, indicating that messages may be + received with `messageSignal` and sent with `send`. + **/ + public var connected(default, null):Bool = false; + + /** + Set to `true` if the `kill` was used to send a signal to `this` process. + **/ + public var killed(default, null):Bool = false; + + extern private function get_pid():Int; + + /** + Process identifier of `this` process. A PID uniquely identifies a process + on its host machine for the duration of its lifetime. + **/ + public var pid(get, never):Int; + + /** + Standard input. May be `null` - see `options.stdio` in `spawn`. + **/ + public var stdin:IWritable; + + /** + Standard output. May be `null` - see `options.stdio` in `spawn`. + **/ + public var stdout:IReadable; + + /** + Standard error. May be `null` - see `options.stdio` in `spawn`. + **/ + public var stderr:IReadable; + + /** + Pipes created between the current (host) process and `this` (spawned) + process. The order corresponds to the `ProcessIO` specifiers in + `options.stdio` in `spawn`. This array can be used to access non-standard + pipes, i.e. file descriptors 3 and higher, as well as file descriptors 0-2 + with non-standard read/write access. + **/ + public var stdio:Array; + + /** + Disconnect `this` process from the IPC channel. + **/ + extern public function disconnect():Void; + + /** + Send a signal to `this` process. + **/ + extern public function kill(?signal:Int = 7):Void; + + /** + Close `this` process handle and all pipes in `stdio`. + **/ + extern public function close(?cb:Callback):Void; + + /** + Send `data` to the process over the IPC channel. The process must be + created with an `Ipc` entry in `options.stdio`; see `Process.spawn`. + **/ + public function send(message:IpcMessage):Void { + if (!connected) + throw "IPC not connected"; + ipcOut.write(message); + } + + extern public function ref():Void; + + extern public function unref():Void; + + var ipc:Socket; + var ipcOut:asys.io.IpcSerializer; + var ipcIn:asys.io.IpcUnserializer; + + private function new() {} +} diff --git a/std/asys/ProcessExit.hx b/std/asys/ProcessExit.hx new file mode 100644 index 00000000000..3aebee4958e --- /dev/null +++ b/std/asys/ProcessExit.hx @@ -0,0 +1,16 @@ +package asys; + +/** + Represents how a process exited. +**/ +typedef ProcessExit = { + /** + Exit code of the process. Non-zero values usually indicate an error. + Specific meanings of exit codes differ from program to program. + **/ + var code:Int; + /** + Signal that cause the process to exit, or zero if none. + **/ + var signal:Int; +}; diff --git a/std/asys/ProcessIO.hx b/std/asys/ProcessIO.hx new file mode 100644 index 00000000000..d7df8fa62b9 --- /dev/null +++ b/std/asys/ProcessIO.hx @@ -0,0 +1,10 @@ +package asys; + +enum ProcessIO { + Ignore; + Inherit; + Pipe(readable:Bool, writable:Bool, ?pipe:asys.net.Socket); + Ipc; + // Stream(_); + // Fd(_); +} diff --git a/std/asys/SymlinkType.hx b/std/asys/SymlinkType.hx new file mode 100644 index 00000000000..edcd5239a69 --- /dev/null +++ b/std/asys/SymlinkType.hx @@ -0,0 +1,9 @@ +package asys; + +enum abstract SymlinkType(Int) { + var SymlinkFile = 0; + var SymlinkDir = 1; + var SymlinkJunction = 2; // Windows only + + inline function get_raw():Int return this; +} diff --git a/std/asys/System.hx b/std/asys/System.hx new file mode 100644 index 00000000000..0acd01de9f6 --- /dev/null +++ b/std/asys/System.hx @@ -0,0 +1,15 @@ +package asys; + +extern class System { + /** + Returns the current value of the environment variable `name`, or `null` if + it does not exist. + **/ + public static function getEnv(name:String):Null; + + /** + Sets the value of the environment variable `name` to `value`. If `value` is + `null`, the variable is deleted. + **/ + public static function setEnv(name:String, value:Null):Void; +} diff --git a/std/asys/Timer.hx b/std/asys/Timer.hx new file mode 100644 index 00000000000..565ca668d5a --- /dev/null +++ b/std/asys/Timer.hx @@ -0,0 +1,34 @@ +package asys; + +class Timer { + public static function delay(f:() -> Void, timeMs:Int):Timer { + var t = new Timer(timeMs); + t.run = function() { + t.stop(); + f(); + }; + return t; + } + + public static function measure(f:()->T, ?pos:haxe.PosInfos):T { + var t0 = stamp(); + var r = f(); + haxe.Log.trace((stamp() - t0) + "s", pos); + return r; + } + + public static function stamp():Float { + // TODO: libuv? + return Sys.time(); + } + + public function new(timeMs:Int) {} + + public dynamic function run():Void {} + + extern public function stop():Void; + + extern public function ref():Void; + + extern public function unref():Void; +} diff --git a/std/asys/io/AsyncFile.hx b/std/asys/io/AsyncFile.hx new file mode 100644 index 00000000000..9252e99c3c2 --- /dev/null +++ b/std/asys/io/AsyncFile.hx @@ -0,0 +1,47 @@ +package asys.io; + +import haxe.Error; +import haxe.NoData; +import haxe.async.*; +import haxe.io.Bytes; +import haxe.io.Encoding; +import asys.*; + +/** + This class provides methods for asynchronous operations on files instances. + For synchronous operations, see `asys.io.File`. To obtain an instance of + this class, use the `async` field of `asys.io.File`. + + ```haxe + var file = asys.FileSystem.open("example.txt", "r"); + file.async.readFile(contents -> trace(contents.toString())); + ``` + + All methods here are asynchronous versions of the functions in + `asys.io.File`. Please see them for a description of the arguments and + use of each method. + + Any synchronous method that returns no value (`Void` return type) has an + extra `callback:Callback` argument. + + Any synchronous method that returns a value has an extra + `callback:Callback` argument, where `T` is the return type of the + synchronous method. + + Errors are communicated through the callbacks or in some cases thrown + immediately. +**/ +extern class AsyncFile { + function chmod(mode:FilePermissions, callback:Callback):Void; + function chown(uid:Int, gid:Int, callback:Callback):Void; + function close(callback:Callback):Void; + function datasync(callback:Callback):Void; + function readBuffer(buffer:Bytes, offset:Int, length:Int, position:Int, callback:Callback<{bytesRead:Int, buffer:Bytes}>):Void; + function readFile(callback:Callback):Void; + function stat(callback:Callback):Void; + function sync(callback:Callback):Void; + function truncate(?len:Int = 0, callback:Callback):Void; + function utimes(atime:Date, mtime:Date, callback:Callback):Void; + function writeBuffer(buffer:Bytes, offset:Int, length:Int, position:Int, callback:Callback<{bytesWritten:Int, buffer:Bytes}>):Void; + function writeString(str:String, ?position:Int, ?encoding:Encoding, callback:Callback<{bytesWritten:Int, buffer:Bytes}>):Void; +} diff --git a/std/asys/io/File.hx b/std/asys/io/File.hx new file mode 100644 index 00000000000..a920244b1aa --- /dev/null +++ b/std/asys/io/File.hx @@ -0,0 +1,88 @@ +package asys.io; + +import haxe.Error; +import haxe.io.Bytes; +import haxe.io.Encoding; +import asys.*; + +/** + Class representing an open file. Some methods in this class are instance + variants of the same methods in `asys.FileSystem`. +**/ +extern class File { + private function get_async():AsyncFile; + + var async(get, never):AsyncFile; + + /** + See `asys.FileSystem.chmod`. + **/ + function chmod(mode:FilePermissions):Void; + + /** + See `asys.FileSystem.chown`. + **/ + function chown(uid:Int, gid:Int):Void; + + /** + Closes the file. Any operation after this method is called is invalid. + **/ + function close():Void; + + /** + Same as `sync`, but metadata is not flushed unless needed for subsequent + data reads to be correct. E.g. changes to the modification times are not + flushed, but changes to the filesize do. + **/ + function datasync():Void; + + /** + Reads a part of `this` file into the given `buffer`. + + @param buffer Buffer to which data will be written. + @param offset Position in `buffer` at which to start writing. + @param length Number of bytes to read from `this` file. + @param position Position in `this` file at which to start reading. + **/ + function readBuffer(buffer:Bytes, offset:Int, length:Int, position:Int):{bytesRead:Int, buffer:Bytes}; + + /** + Reads the entire contents of `this` file. + **/ + function readFile():Bytes; + + /** + See `asys.FileSystem.stat`. + **/ + function stat():FileStat; + + /** + Flushes all modified data and metadata of `this` file to the disk. + **/ + function sync():Void; + + /** + See `asys.FileSystem.truncate`. + **/ + function truncate(?len:Int = 0):Void; + + /** + See `asys.FileSystem.utimes`. + **/ + function utimes(atime:Date, mtime:Date):Void; + + /** + Writes a part of the given `buffer` into `this` file. + + @param buffer Buffer from which data will be read. + @param offset Position in `buffer` at which to start reading. + @param length Number of bytes to write to `this` file. + @param position Position in `this` file at which to start writing. + **/ + function writeBuffer(buffer:Bytes, offset:Int, length:Int, position:Int):{bytesWritten:Int, buffer:Bytes}; + + /** + Writes a string to `this` file at `position`. + **/ + function writeString(str:String, ?position:Int, ?encoding:Encoding):{bytesWritten:Int, buffer:Bytes}; +} diff --git a/std/asys/io/FileInput.hx b/std/asys/io/FileInput.hx new file mode 100644 index 00000000000..060dcc9b831 --- /dev/null +++ b/std/asys/io/FileInput.hx @@ -0,0 +1,42 @@ +package asys.io; + +import haxe.io.Bytes; + +class FileInput extends haxe.io.Input { + final file:asys.io.File; + var position:Int = 0; + + function new(file:asys.io.File) { + this.file = file; + } + + public function seek(p:Int, pos:sys.io.FileSeek):Void { + position = (switch (pos) { + case SeekBegin: p; + case SeekCur: position + p; + case SeekEnd: file.stat().size + p; + }); + } + + public function tell():Int { + return position; + } + + override public function readByte():Int { + var buf = Bytes.alloc(1); + file.readBuffer(buf, 0, 1, position++); + return buf.get(0); + } + + override public function readBytes(buf:Bytes, pos:Int, len:Int):Int { + if (pos < 0 || len < 0 || pos + len > buf.length) + throw haxe.io.Error.OutsideBounds; + var read = file.readBuffer(buf, pos, len, position).bytesRead; + position += read; + return read; + } + + override public function close():Void { + file.close(); + } +} diff --git a/std/asys/io/FileOutput.hx b/std/asys/io/FileOutput.hx new file mode 100644 index 00000000000..436196c224d --- /dev/null +++ b/std/asys/io/FileOutput.hx @@ -0,0 +1,46 @@ +package asys.io; + +import haxe.io.Bytes; + +class FileOutput extends haxe.io.Output { + final file:asys.io.File; + var position:Int = 0; + + function new(file:asys.io.File) { + this.file = file; + } + + public function seek(p:Int, pos:sys.io.FileSeek):Void { + position = (switch (pos) { + case SeekBegin: p; + case SeekCur: position + p; + case SeekEnd: file.stat().size + p; + }); + } + + public function tell():Int { + return position; + } + + override public function writeByte(byte:Int):Void { + var buf = Bytes.alloc(1); + buf.set(1, byte); + file.writeBuffer(buf, 0, 1, position++); + } + + override public function writeBytes(buf:Bytes, pos:Int, len:Int):Int { + if (pos < 0 || len < 0 || pos + len > buf.length) + throw haxe.io.Error.OutsideBounds; + var written = file.writeBuffer(buf, pos, len, position).bytesWritten; + position += written; + return written; + } + + override public function flush():Void { + file.datasync(); + } + + override public function close():Void { + file.close(); + } +} diff --git a/std/asys/io/FileReadStream.hx b/std/asys/io/FileReadStream.hx new file mode 100644 index 00000000000..b14d8a6deda --- /dev/null +++ b/std/asys/io/FileReadStream.hx @@ -0,0 +1,20 @@ +package asys.io; + +import haxe.NoData; +import haxe.async.Signal; + +typedef FileReadStreamOptions = { + ?autoClose:Bool, + ?start:Int, + ?end:Int, + ?highWaterMark:Int +}; + +extern class FileReadStream extends haxe.io.Readable { + final openSignal:Signal; + final readySignal:Signal; + + var bytesRead:Int; + var path:String; + var pending:Bool; +} diff --git a/std/asys/io/FileWriteStream.hx b/std/asys/io/FileWriteStream.hx new file mode 100644 index 00000000000..a000f03a9f4 --- /dev/null +++ b/std/asys/io/FileWriteStream.hx @@ -0,0 +1,13 @@ +package asys.io; + +import haxe.NoData; +import haxe.async.Signal; + +extern class FileWriteStream extends haxe.io.Writable { + final openSignal:Signal; + final readySignal:Signal; + + var bytesWritten:Int; + var path:String; + var pending:Bool; +} diff --git a/std/asys/io/IpcMessage.hx b/std/asys/io/IpcMessage.hx new file mode 100644 index 00000000000..580fe9acc1d --- /dev/null +++ b/std/asys/io/IpcMessage.hx @@ -0,0 +1,21 @@ +package asys.io; + +import asys.net.Socket; + +/** + A message sent over an IPC channel. Sent with `Process.send` to a sub-process + or with `CurrentProcess.send` to the parent process. Received with + `Process.messageSignal` from a sub-process, or `CurrentProcess.messageSignal` + from the parent process. +**/ +typedef IpcMessage = { + /** + The actual message. May be any data that is serializable with + `haxe.Serializer`. + **/ + var message:Any; + /** + Sockets and pipes associated with the message. Must be connected. + **/ + var ?sockets:Array; +}; diff --git a/std/asys/io/IpcSerializer.hx b/std/asys/io/IpcSerializer.hx new file mode 100644 index 00000000000..7bfafe7642e --- /dev/null +++ b/std/asys/io/IpcSerializer.hx @@ -0,0 +1,50 @@ +package asys.io; + +import haxe.Error; +import haxe.NoData; +import haxe.async.*; +import haxe.io.*; +import asys.net.Socket; + +/** + Class used internally to send messages and handles over an IPC channel. See + `Process.spawn` for creating an IPC channel and `Process.send` for sending + messages over the channel. +**/ +class IpcSerializer { + static var activeSerializer:IpcSerializer = null; + static var dummyBuffer = Bytes.ofString("s"); + + final pipe:Socket; + // final chunkSockets:Array = []; + + function new(pipe:Socket) { + this.pipe = pipe; + } + + /** + Sends `data` over the pipe. `data` will be serialized with a call to + `haxe.Serializer.run`. Objects of type `Socket` can be sent along with the + data if `handles` is provided. + **/ + public function write(message:IpcMessage):Void { + activeSerializer = this; + if (message.sockets != null) + for (socket in message.sockets) { + if (!socket.connected) + throw "cannot send unconnected socket over IPC"; + pipe.writeHandle(dummyBuffer, socket); + } + var serial = haxe.Serializer.run(message.message); + pipe.write(Bytes.ofString('${serial.length}:$serial')); + // chunkSockets.resize(0); + activeSerializer = null; + } + + /** + // TODO: see `Socket.hxUnserialize` comment + Sends `data` over the pipe. `data` will be serialized with a call to + `haxe.Serializer.run`. However, objects of type `asys.async.net.Socket` + will also be correctly serialized and can be received by the other end. + **/ +} diff --git a/std/asys/io/IpcUnserializer.hx b/std/asys/io/IpcUnserializer.hx new file mode 100644 index 00000000000..17c0c7716d7 --- /dev/null +++ b/std/asys/io/IpcUnserializer.hx @@ -0,0 +1,85 @@ +package asys.io; + +import haxe.Error; +import haxe.NoData; +import haxe.async.*; +import haxe.io.*; +import asys.net.Socket; + +/** + Class used internally to receive messages and handles over an IPC channel. + See `CurrentProcess.initIpc` for initialising IPC for a process. +**/ +class IpcUnserializer { + static var activeUnserializer:IpcUnserializer = null; + + public final messageSignal:Signal = new ArraySignal(); + public final errorSignal:Signal = new ArraySignal(); + + final pipe:Socket; + // var chunkSockets:Array = []; + var chunkLenbuf:String = ""; + var chunkBuf:StringBuf; + var chunkSize:Null = 0; + var chunkSocketCount:Int = 0; + + function new(pipe:Socket) { + this.pipe = pipe; + pipe.dataSignal.on(handleData); + } + + function handleData(data:Bytes):Void { + if (data.length == 0) + return; + try { + var data = data.toString(); + while (data != null) { + if (chunkSize == 0) { + chunkLenbuf += data; + var colonPos = chunkLenbuf.indexOf(":"); + if (colonPos != -1) { + chunkSocketCount = 0; + while (chunkLenbuf.charAt(chunkSocketCount) == "s") + chunkSocketCount++; + chunkSize = Std.parseInt(chunkLenbuf.substr(chunkSocketCount, colonPos)); + if (chunkSize == null || chunkSize <= 0) { + chunkSize = 0; + throw "invalid chunk size received"; + } + chunkBuf = new StringBuf(); + chunkBuf.add(chunkLenbuf.substr(colonPos + 1)); + chunkLenbuf = ""; + // chunkSockets.resize(0); + } + } else { + chunkBuf.add(data); + } + data = null; + if (chunkSize != 0) { + if (chunkBuf.length >= chunkSize) { + var serial = chunkBuf.toString(); + if (serial.length > chunkSize) { + data = serial.substr(chunkSize); + serial = serial.substr(0, chunkSize); + } + chunkBuf = null; + var chunkSockets = []; + if (chunkSocketCount > pipe.handlesPending) + throw "not enough handles received"; + for (i in 0...chunkSocketCount) + chunkSockets.push(pipe.readHandle()); + activeUnserializer = this; + var message = haxe.Unserializer.run(serial); + messageSignal.emit({message: message, sockets: chunkSockets}); + chunkSize = 0; + chunkSocketCount = 0; + // chunkSockets.resize(0); + activeUnserializer = null; + } + } + } + } catch (e:Dynamic) { + errorSignal.emit(e); + } + } +} diff --git a/std/asys/net/Address.hx b/std/asys/net/Address.hx new file mode 100644 index 00000000000..9f7fc965b15 --- /dev/null +++ b/std/asys/net/Address.hx @@ -0,0 +1,21 @@ +package asys.net; + +import haxe.io.Bytes; + +/** + Represents a resolved IP address. The methods from `asys.net.AddressTools` + are always available on `Address` instances. +**/ +@:using(asys.net.AddressTools) +enum Address { + /** + 32-bit IPv4 address. As an example, the IP address `127.0.0.1` is + represented as `Ipv4(0x7F000001)`. + **/ + Ipv4(raw:Int); + + /** + 128-bit IPv6 address. + **/ + Ipv6(raw:Bytes); +} diff --git a/std/asys/net/AddressTools.hx b/std/asys/net/AddressTools.hx new file mode 100644 index 00000000000..a8fa530ab3b --- /dev/null +++ b/std/asys/net/AddressTools.hx @@ -0,0 +1,223 @@ +package asys.net; + +import haxe.io.Bytes; +import asys.net.IpFamily; + +/** + Methods for converting to and from `Address` instances. +**/ +class AddressTools { + static final v4re = { + final v4seg = "(?:[0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])"; + final v4str = '${v4seg}\\.${v4seg}\\.${v4seg}\\.${v4seg}'; + new EReg('^${v4str}$$', ""); + }; + + static final v6re = { + final v4seg = "(?:[0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])"; + final v4str = '${v4seg}\\.${v4seg}\\.${v4seg}\\.${v4seg}'; + final v6seg = "(?:[0-9a-fA-F]{1,4})"; + new EReg("^(" + + '(?:${v6seg}:){7}(?:${v6seg}|:)|' + + '(?:${v6seg}:){6}(?:${v4str}|:${v6seg}|:)|' + + '(?:${v6seg}:){5}(?::${v4str}|(:${v6seg}){1,2}|:)|' + + '(?:${v6seg}:){4}(?:(:${v6seg}){0,1}:${v4str}|(:${v6seg}){1,3}|:)|' + + '(?:${v6seg}:){3}(?:(:${v6seg}){0,2}:${v4str}|(:${v6seg}){1,4}|:)|' + + '(?:${v6seg}:){2}(?:(:${v6seg}){0,3}:${v4str}|(:${v6seg}){1,5}|:)|' + + '(?:${v6seg}:){1}(?:(:${v6seg}){0,4}:${v4str}|(:${v6seg}){1,6}|:)|' + + '(?::((?::${v6seg}){0,5}:${v4str}|(?::${v6seg}){1,7}|:))' + + ")$", // "(%[0-9a-zA-Z]{1,})?$", // TODO: interface not supported + ""); + }; + + /** + Returns the IP address representing all hosts for the given IP family. + + - For IPv4, the address is `0.0.0.0`. + - For IPv6, the address is `::`. + **/ + public static function all(family:IpFamily):Address { + return (switch (family) { + case Ipv4: Ipv4(0); + case Ipv6: Ipv6(Bytes.ofHex("00000000000000000000000000000000")); + }); + } + + /** + Returns the IP address representing the local hosts for the given IP family. + + - For IPv4, the address is `127.0.0.1`. + - For IPv6, the address is `::1`. + **/ + public static function localhost(family:IpFamily):Address { + return (switch (family) { + case Ipv4: Ipv4(0x7F000001); + case Ipv6: Ipv6(Bytes.ofHex("00000000000000000000000000000001")); + }); + } + + /** + Converts an `Address` to a `String`. + + - IPv4 addresses are represented with the dotted quad format, e.g. + `192.168.0.1`. + - IPv6 addresses are represented with the standard lowercased hexadecimal + representation, with `::` used to mark a long stretch of zeros. + **/ + public static function toString(address:Address):String { + return (switch (address) { + case Ipv4(ip): + '${ip >>> 24}.${(ip >> 16) & 0xFF}.${(ip >> 8) & 0xFF}.${ip & 0xFF}'; + case Ipv6(ip): + var groups = [for (i in 0...8) (ip.get(i * 2) << 8) | ip.get(i * 2 + 1)]; + var longestRun = -1; + var longestPos = -1; + for (i in 0...8) { + if (groups[i] != 0) + continue; + var run = 1; + // TODO: skip if the longest run cannot be beaten + for (j in i + 1...8) { + if (groups[j] != 0) + break; + run++; + } + if (run > longestRun) { + longestRun = run; + longestPos = i; + } + } + inline function hex(groups:Array):String { + return groups.map(value -> StringTools.hex(value, 1).toLowerCase()).join(":"); + } + if (longestRun > 1) { + hex(groups.slice(0, longestPos)) + "::" + hex(groups.slice(longestPos + longestRun)); + } else { + hex(groups); + } + }); + } + + /** + Returns `true` if `address` represents a valid IPv4 or IPv6 address. + **/ + public static function isIp(address:String):Bool { + return isIpv4(address) || isIpv6(address); + } + + /** + Returns `true` if `address` represents a valid IPv4 address. + **/ + public static function isIpv4(address:String):Bool { + return v4re.match(address); + } + + /** + Returns `true` if `address` represents a valid IPv6 address. + **/ + public static function isIpv6(address:String):Bool { + return v6re.match(address); + } + + /** + Tries to convert the `String` `address` to an `Address` instance. Returns + the parsed `Address` or `null` if `address` does not represent a valid IP + address. + **/ + public static function toIp(address:String):Null
{ + var ipv4 = toIpv4(address); + return ipv4 != null ? ipv4 : toIpv6(address); + } + + /** + Tries to convert the `String` `address` to an IPv4 `Address` instance. + Returns the parsed `Address` or `null` if `address` does not represent a + valid IPv4 address. + **/ + public static function toIpv4(address:String):Null
{ + if (!isIpv4(address)) + return null; + var components = address.split(".").map(Std.parseInt); + return Ipv4((components[0] << 24) | (components[1] << 16) | (components[2] << 8) | components[3]); + } + + /** + Tries to convert the `String` `address` to an IPv6 `Address` instance. + Returns the parsed `Address` or `null` if `address` does not represent a + valid IPv6 address. + **/ + public static function toIpv6(address:String):Null
{ + if (!isIpv6(address)) + return null; + var buffer = Bytes.alloc(16); + buffer.fill(0, 16, 0); + function parse(component:String, res:Int):Void { + var value = Std.parseInt('0x0$component'); + buffer.set(res, value >> 8); + buffer.set(res + 1, value & 0xFF); + } + var stretch = address.split("::"); + var components = stretch[0].split(":"); + for (i in 0...components.length) + parse(components[i], i * 2); + if (stretch.length > 1) { + var end = 16; + components = stretch[1].split(":"); + if (isIpv4(components[components.length - 1])) { + end -= 4; + var ip = components.pop().split(".").map(Std.parseInt); + for (i in 0...4) + buffer.set(end + i, ip[i]); + } + end -= components.length * 2; + for (i in 0...components.length) + parse(components[i], end + i); + } + return Ipv6(buffer); + } + + /** + Returns the IPv6 version of the given `address`. IPv6 addresses are + returned unmodified, IPv4 addresses are mapped to IPv6 using the + `:ffff:0:0/96` IPv4 transition prefix. + + ```haxe + "127.0.0.1".toIpv4().mapToIpv6().toString(); // ::ffff:7f00:1 + ``` + **/ + public static function mapToIpv6(address:Address):Address { + return (switch (address) { + case Ipv4(ip): + var buffer = Bytes.alloc(16); + buffer.set(10, 0xFF); + buffer.set(11, 0xFF); + buffer.set(12, ip >>> 24); + buffer.set(13, (ip >> 16) & 0xFF); + buffer.set(14, (ip >> 8) & 0xFF); + buffer.set(15, ip & 0xFF); + Ipv6(buffer); + case _: + address; + }); + } + + /** + Returns `true` if `a` and `b` are the same IP address. + + If `ipv6mapped` is `true`, bot `a` and `b` are mapped to IPv6 (using + `mapToIpv6`) before the comparison. + **/ + public static function equals(a:Address, b:Address, ?ipv6mapped:Bool = false):Bool { + if (ipv6mapped) { + return (switch [mapToIpv6(a), mapToIpv6(b)] { + case [Ipv6(a), Ipv6(b)]: a.compare(b) == 0; + case _: false; // cannot happen? + }); + } + return (switch [a, b] { + case [Ipv4(a), Ipv4(b)]: a == b; + case [Ipv6(a), Ipv6(b)]: a.compare(b) == 0; + case _: false; + }); + } +} diff --git a/std/asys/net/Dns.hx b/std/asys/net/Dns.hx new file mode 100644 index 00000000000..0b56bb9784a --- /dev/null +++ b/std/asys/net/Dns.hx @@ -0,0 +1,24 @@ +package asys.net; + +import haxe.async.*; + +/** + Asynchronous Domain Name System (DNS) methods. +**/ +extern class Dns { + /** + Looks up the given `hostname`. `callback` will be called once the operation + completes. In case of success, the data given to callback is an array of + `asys.net.Address` instances representing all the IP addresses found + associated with the hostname. + + - `lookupOptions.family` - if not `null`, only addresses of the given IP + family will be returned. + **/ + static function lookup(hostname:String, ?lookupOptions:DnsLookupOptions, callback:Callback>):Void; + + /** + Looks up a reverse DNS entry for the given `ip`. + **/ + static function reverse(ip:Address, callback:Callback>):Void; +} diff --git a/std/asys/net/DnsLookupOptions.hx b/std/asys/net/DnsLookupOptions.hx new file mode 100644 index 00000000000..8f8708e4b31 --- /dev/null +++ b/std/asys/net/DnsLookupOptions.hx @@ -0,0 +1,16 @@ +package asys.net; + +typedef DnsLookupOptions = { + ?family:IpFamily, + ?hints:DnsHints +}; + +enum abstract DnsHints(Int) from Int { + var AddrConfig = 1 << 0; + var V4Mapped = 1 << 1; + + inline function get_raw():Int return this; + + @:op(A | B) + inline function join(other:DnsHints):DnsHints return this | other.get_raw(); +} diff --git a/std/asys/net/IpFamily.hx b/std/asys/net/IpFamily.hx new file mode 100644 index 00000000000..40e9701d350 --- /dev/null +++ b/std/asys/net/IpFamily.hx @@ -0,0 +1,9 @@ +package asys.net; + +/** + Represents a family of the Internet Protocol (IP). +**/ +enum IpFamily { + Ipv4; + Ipv6; +} diff --git a/std/asys/net/Server.hx b/std/asys/net/Server.hx new file mode 100644 index 00000000000..4f3c17cccb5 --- /dev/null +++ b/std/asys/net/Server.hx @@ -0,0 +1,61 @@ +package asys.net; + +import haxe.Error; +import haxe.NoData; +import haxe.async.*; + +typedef ServerOptions = { + ?allowHalfOpen:Bool, + ?pauseOnConnect:Bool +}; + +typedef ServerListenTcpOptions = { + ?port:Int, + ?host:String, + ?address:Address, + ?backlog:Int, + ?exclusive:Bool, + ?ipv6only:Bool +}; + +typedef ServerListenIpcOptions = { + path:String, + ?backlog:Int, + ?exclusive:Bool, + ?readableAll:Bool, + ?writableAll:Bool +}; + +class Server { + public final closeSignal:Signal = new ArraySignal(); + public final connectionSignal:Signal = new ArraySignal(); + public final errorSignal:Signal = new ArraySignal(); + public final listeningSignal:Signal = new ArraySignal(); + + public var listening(default, null):Bool; + public var maxConnections:Int; // TODO + + extern function get_localAddress():Null; + + public var localAddress(get, never):Null; + + public function new(?options:ServerOptions) {} + + // function address():SocketAddress; + + extern public function close(?callback:Callback):Void; + + // function getConnections(callback:Callback):Void; + // function listenSocket(socket:Socket, ?backlog:Int, ?listener:Listener):Void; + // function listenServer(server:Server, ?backlog:Int, ?listener:Listener):Void; + // function listenFile(file:sys.io.File, ?backlog:Int, ?listener:Listener):Void; + extern public function listenIpc(options:ServerListenIpcOptions, ?listener:Listener):Void; + + extern public function listenTcp(options:ServerListenTcpOptions, ?listener:Listener):Void; + + extern public function ref():Void; + + extern public function unref():Void; + + var listenDefer:asys.Timer; +} diff --git a/std/asys/net/Socket.hx b/std/asys/net/Socket.hx new file mode 100644 index 00000000000..a2f84c6f3c6 --- /dev/null +++ b/std/asys/net/Socket.hx @@ -0,0 +1,250 @@ +package asys.net; + +import haxe.Error; +import haxe.NoData; +import haxe.async.*; +import haxe.io.*; +import haxe.io.Readable.ReadResult; +import asys.io.*; +import asys.net.SocketOptions.SocketConnectTcpOptions; +import asys.net.SocketOptions.SocketConnectIpcOptions; + +/** + Socket object, used for clients and servers for TCP communications and IPC + (inter-process communications) over Windows named pipes and Unix local domain + sockets. + + An IPC pipe is a communication channel between two processes. It may be + uni-directional or bi-directional, depending on how it is created. Pipes can + be automatically created for spawned subprocesses with `Process.spawn`. +**/ +class Socket extends Duplex { + /** + Creates an unconnected socket or pipe instance. + + @param options.allowHalfOpen + @param options.readable Whether the socket should be readable to the + current process. + @param options.writable Whether the socket should be writable to the + current process. + **/ + public static function create(?options:SocketOptions):Socket { + // TODO: use options + return new Socket(); + } + + /** + Emitted when the handle is closed. + **/ + public final closeSignal:Signal = new ArraySignal(); + + /** + Emitted when the socket connects to a remote endpoint. + **/ + public final connectSignal:Signal = new ArraySignal(); + + // endSignal + + /** + (TCP only.) Emitted after the IP address of the hostname given in + `connectTcp` is resolved, but before the socket connects. + **/ + public final lookupSignal:Signal
= new ArraySignal(); + + /** + Emitted when a timeout occurs. See `setTimeout`. + **/ + public final timeoutSignal:Signal = new ArraySignal(); + + extern private function get_localAddress():Null; + + /** + The address of the local side of the socket connection, or `null` if not + connected. + **/ + public var localAddress(get, never):Null; + + extern private function get_remoteAddress():Null; + + /** + The address of the remote side of the socket connection, or `null` if not + connected. + **/ + public var remoteAddress(get, never):Null; + + extern private function get_handlesPending():Int; + + /** + (IPC only.) Number of pending sockets or pipes. Accessible using + `readHandle`. + **/ + public var handlesPending(get, never):Int; + + /** + `true` when `this` socket is connected to a remote host or an IPC pipe. + **/ + public var connected(default, null):Bool = false; + + /** + Connect `this` socket via TCP to the given remote. + + If neither `options.host` nor `options.address` is specified, the host + `localhost` is resolved via DNS and used as the address. At least one of + `options.host` or `options.address` must be `null`. + + `options.localAddress` and `options.localPort` can be used to specify what + address and port to use on the local machine for the outgoing connection. + If `null` or not specified, an address and/or a port will be chosen + automatically by the system when connecting. The local address and port can + be obtained using the `localAddress`. + + @param options.port Remote port to connect to. + @param options.host Hostname to connect to, will be resolved using + `Dns.resolve` to an address. `lookupSignal` will be emitted with the + resolved address before the connection is attempted. + @param options.address IPv4 or IPv6 address to connect to. + @param options.localAddress Local IPv4 or IPv6 address to connect from. + @param options.localPort Local port to connect from. + @param options.family Limit DNS lookup to the given family. + **/ + extern public function connectTcp(options:SocketConnectTcpOptions, ?cb:Callback):Void; + + /** + Connect `this` socket to an IPC pipe. + + @param options.path Pipe path. + **/ + extern public function connectIpc(options:SocketConnectIpcOptions, ?cb:Callback):Void; + + /** + Connect `this` socket to a file descriptor. Used internally to establish + IPC channels between Haxe processes. + + @param ipc Whether IPC features (sending sockets) should be enabled. + **/ + extern public function connectFd(fd:Int, ipc:Bool):Void; + + /** + Closes `this` socket and all underlying resources. + **/ + extern public function destroy(?cb:Callback):Void; + + /** + (TCP only.) Enable or disable TCP keep-alive. + + @param initialDelay Initial delay in seconds. Ignored if `enable` is + `false`. + **/ + extern public function setKeepAlive(?enable:Bool = false, ?initialDelay:Int = 0):Void; + + /** + (TCP only.) Enable or disable TCP no-delay. Enabling no-delay disables + Nagle's algorithm. + **/ + extern public function setNoDelay(?noDelay:Bool = true):Void; + + /** + Set a timeout for socket oprations. Any time activity is detected on the + socket (see below), the timer is reset to `timeout`. When the timer runs + out, `timeoutSignal` is emitted. Note that a timeout will not automatically + do anything to the socket - it is up to the `timeoutSignal` handler to + perform an action, e.g. ping the remote host or close the socket. + + Socket activity which resets the timer: + + - A chunk of data is received. + - An error occurs during reading. + - A chunk of data is written to the socket. + - Connection is established. + - (TCP only.) DNS lookup is finished (successfully or not). + + @param timeout Timeout in seconds, or `0` to disable. + **/ + public function setTimeout(timeout:Int, ?listener:Listener):Void { + timeoutTime = timeout; + timeoutReset(); + if (listener != null) + timeoutSignal.once(listener); + } + + /** + (IPC only.) Send a socket or pipe in along with the given `data`. The + socket must be connected. + **/ + extern public function writeHandle(data:Bytes, handle:Socket):Void; + + /** + (IPC only.) Receive a socket or pipe. Should only be called when + `handlesPending` is greater than zero. + **/ + extern public function readHandle():Socket; + + extern public function ref():Void; + + extern public function unref():Void; + + var connectDefer:asys.Timer; + var internalReadCalled = false; + var readStarted = false; + var connectStarted = false; + var serverSpawn:Bool = false; + var timeoutTime:Int = 0; + var timeoutTimer:asys.Timer; + + function new() { + super(); + } + + extern function initPipe(ipc:Bool):Void; + + // TODO: keep track of pending writes for finish event emission + // in `internalWrite` and `writeHandle` + function writeDone(err:Error, nd:NoData):Void { + timeoutReset(); + if (err != null) + errorSignal.emit(err); + // TODO: destroy stream and socket + } + + function timeoutTrigger():Void { + timeoutTimer = null; + timeoutSignal.emit(new NoData()); + } + + function timeoutReset():Void { + if (timeoutTimer != null) + timeoutTimer.stop(); + timeoutTimer = null; + if (timeoutTime != 0) { + timeoutTimer = asys.Timer.delay(timeoutTrigger, timeoutTime); + timeoutTimer.unref(); + } + } + + /* + // TODO: #8263 (static hxUnserialize) + // Automatic un/serialisation will not work here since hxUnserialize needs to + // call super, otherwise the socket is unusable; for now sockets are + // delivered separately in IPC. + + @:access(asys.io.IpcSerializer) + private function hxSerialize(_):Void { + if (IpcSerializer.activeSerializer == null) + throw "cannot serialize socket"; + IpcSerializer.activeSerializer.chunkSockets.push(this); + } + + @:access(asys.io.IpcUnserializer) + private function hxUnserialize(_):Void { + if (IpcUnserializer.activeUnserializer == null) + throw "cannot unserialize socket"; + trace(dataSignal, input); + var source:Socket = IpcUnserializer.activeUnserializer.chunkSockets.shift(); + this.native = source.native; + this.nativePipe = source.nativePipe; + this.nativeSocket = source.nativeSocket; + this.connected = true; + trace("successfully unserialized", this.nativeSocket); + } + */ +} diff --git a/std/asys/net/SocketAddress.hx b/std/asys/net/SocketAddress.hx new file mode 100644 index 00000000000..48283c48a76 --- /dev/null +++ b/std/asys/net/SocketAddress.hx @@ -0,0 +1,15 @@ +package asys.net; + +/** + Reperesents the address of a connected or bound `Socket` object. +**/ +enum SocketAddress { + /** + Address of a socket connected or bound to an IPv4 or IPv6 address and port. + **/ + Network(address:Address, port:Int); + /** + Filepath of a IPC pipe (Windows named pipe or Unix local domain socket). + **/ + Unix(path:String); +} diff --git a/std/asys/net/SocketOptions.hx b/std/asys/net/SocketOptions.hx new file mode 100644 index 00000000000..d6810bdf037 --- /dev/null +++ b/std/asys/net/SocketOptions.hx @@ -0,0 +1,41 @@ +package asys.net; + +/** + See `Socket.create`. +**/ +typedef SocketOptions = { + // ?file:asys.io.File, // fd in Node + ?allowHalfOpen:Bool, + ?readable:Bool, + ?writable:Bool +}; + +/** + See `Socket.connectTcp`. +**/ +typedef SocketConnectTcpOptions = { + port:Int, + ?host:String, + ?address:Address, + ?localAddress:Address, + ?localPort:Int, + ?family:IpFamily +}; + +/** + See `Socket.connectIpc`. +**/ +typedef SocketConnectIpcOptions = { + path:String +}; + +/** + See `UdpSocket.create`. +**/ +typedef UdpSocketOptions = { + ?reuseAddr:Bool, + ?ipv6Only:Bool, + ?recvBufferSize:Int, + ?sendBufferSize:Int, + // ?lookup:DnsLookupFunction +}; diff --git a/std/asys/net/UdpSocket.hx b/std/asys/net/UdpSocket.hx new file mode 100644 index 00000000000..357337ae247 --- /dev/null +++ b/std/asys/net/UdpSocket.hx @@ -0,0 +1,171 @@ +package asys.net; + +import haxe.Error; +import haxe.NoData; +import haxe.async.*; +import haxe.io.Bytes; +import asys.net.SocketOptions.UdpSocketOptions; + + +class UdpSocket { + public static function create(type:IpFamily, ?options:UdpSocketOptions, ?listener:Listener):UdpSocket { + var res = new UdpSocket(type); + // TODO: use other options, register listener + if (options == null) + options = {}; + if (options.recvBufferSize != null) + res.recvBufferSize = options.recvBufferSize; + if (options.sendBufferSize != null) + res.sendBufferSize = options.sendBufferSize; + return res; + } + + public final type:IpFamily; + + /** + Remote address and port that `this` socket is connected to. See `connect`. + **/ + public var remoteAddress(default, null):Null; + + private function get_localAddress():Null { + return try native.getSockName() catch (e:Dynamic) null; + } + + public var localAddress(get, never):Null; + + extern function get_recvBufferSize():Int; + + extern function set_recvBufferSize(size:Int):Int; + + public var recvBufferSize(get, set):Int; + + extern function get_sendBufferSize():Int; + + extern function set_sendBufferSize(size:Int):Int; + + public var sendBufferSize(get, set):Int; + + // final closeSignal:Signal; + // final connectSignal:Signal; + // final listeningSignal:Signal; + + public final errorSignal:Signal = new ArraySignal(); + + /** + Emitted when a message is received by `this` socket. See `UdpMessage`. + **/ + public final messageSignal:Signal = new ArraySignal(); + + /** + Joins the given multicast group. + **/ + extern public function addMembership(multicastAddress:String, ?multicastInterface:String):Void; + + /** + Leaves the given multicast group. + **/ + extern public function dropMembership(multicastAddress:String, ?multicastInterface:String):Void; + + /** + Binds `this` socket to a local address and port. Packets sent to the bound + address will arrive via `messageSignal`. Outgoing packets will be sent from + the given address and port. If any packet is sent without calling `bind` + first, an address and port is chosen automatically by the system - it can + be obtained with `localAddress`. + **/ + extern public function bind(?address:Address, ?port:Int):Void; + + /** + Closes `this` socket and all underlying resources. + **/ + extern public function close(?cb:Callback):Void; + + /** + Connects `this` socket to a remote address and port. Any `send` calls after + `connect` is called must not specify `address` nor `port`, they will + automatically use the ones specified in the `connect` call. + **/ + public function connect(?address:Address, port:Int):Void { + if (remoteAddress != null) + throw "already connected"; + if (address == null) + address = AddressTools.localhost(type); + remoteAddress = Network(address, port); + } + + /** + Clears any remote address and port previously set with `connect`. + **/ + public function disconnect():Void { + if (remoteAddress == null) + throw "not connected"; + remoteAddress = null; + } + + /** + Sends a message. + + @param msg Buffer from which to read the message data. + @param offset Position in `msg` at which to start reading. + @param length Length of message in bytes. + @param address Address to send the message to. Must be `null` if `this` + socket is connected. + @param port Port to send the message to. Must be `null` if `this` socket is + connected. + **/ + extern public function send(msg:Bytes, offset:Int, length:Int, ?address:Address, ?port:Int, ?cb:Callback):Void; + + /** + Sets broadcast on or off. + **/ + extern public function setBroadcast(flag:Bool):Void; + + /** + Sets the multicast interface on which to send and receive data. + **/ + extern public function setMulticastInterface(multicastInterface:String):Void; + + /** + Set IP multicast loopback on or off. Makes multicast packets loop back to + local sockets. + **/ + extern public function setMulticastLoopback(flag:Bool):Void; + + /** + Sets the multicast TTL (time-to-live). + **/ + extern public function setMulticastTTL(ttl:Int):Void; + + /** + Sets the TTL (time-to-live) for outgoing packets. + + @param ttl Number of hops. + **/ + extern public function setTTL(ttl:Int):Void; + + extern public function ref():Void; + + extern public function unref():Void; + + function new(type) { + this.type = type; + } +} + +/** + A packet received emitted by `messageSignal` of a `UdpSocket`. +**/ +typedef UdpMessage = { + /** + Message data. + **/ + var data:Bytes; + /** + Remote IPv4 or IPv6 address from which the message originates. + **/ + var remoteAddress:Address; + /** + Remote port from which the message originates. + **/ + var remotePort:Int; +}; diff --git a/std/asys/uv/UVConstants.hx b/std/asys/uv/UVConstants.hx new file mode 100644 index 00000000000..a273abe154c --- /dev/null +++ b/std/asys/uv/UVConstants.hx @@ -0,0 +1,14 @@ +package asys.uv; + +class UVConstants { + public static inline final S_IFMT = 0xF000; + public static inline final S_PERM = 0x0FFF; + + public static inline final S_IFBLK = 0x6000; + public static inline final S_IFCHR = 0x2000; + public static inline final S_IFDIR = 0x4000; + public static inline final S_IFIFO = 0x1000; + public static inline final S_IFLNK = 0xA000; + public static inline final S_IFREG = 0x8000; + public static inline final S_IFSOCK = 0xC000; +} diff --git a/std/asys/uv/UVDirentType.hx b/std/asys/uv/UVDirentType.hx new file mode 100644 index 00000000000..7a58336b2e5 --- /dev/null +++ b/std/asys/uv/UVDirentType.hx @@ -0,0 +1,12 @@ +package asys.uv; + +enum abstract UVDirentType(Int) { + var DirentUnknown = 0; + var DirentFile; + var DirentDir; + var DirentLink; + var DirentFifo; + var DirentSocket; + var DirentChar; + var DirentBlock; +} diff --git a/std/asys/uv/UVErrorType.hx b/std/asys/uv/UVErrorType.hx new file mode 100644 index 00000000000..51116a75784 --- /dev/null +++ b/std/asys/uv/UVErrorType.hx @@ -0,0 +1,388 @@ +package asys.uv; + +extern enum abstract UVErrorType(Int) { + /** + Argument list too long. + **/ + var E2BIG; + + /** + Permission denied. + **/ + var EACCES; + + /** + Address already in use. + **/ + var EADDRINUSE; + + /** + Address not available. + **/ + var EADDRNOTAVAIL; + + /** + Address family not supported. + **/ + var EAFNOSUPPORT; + + /** + Resource temporarily unavailable. + **/ + var EAGAIN; + + /** + Address family not supported. + **/ + var EAI_ADDRFAMILY; + + /** + Temporary failure. + **/ + var EAI_AGAIN; + + /** + Bad ai_flags value. + **/ + var EAI_BADFLAGS; + + /** + Invalid value for hints. + **/ + var EAI_BADHINTS; + + /** + Request canceled. + **/ + var EAI_CANCELED; + + /** + Permanent failure. + **/ + var EAI_FAIL; + + /** + Ai_family not supported. + **/ + var EAI_FAMILY; + + /** + Out of memory. + **/ + var EAI_MEMORY; + + /** + No address. + **/ + var EAI_NODATA; + + /** + Unknown node or service. + **/ + var EAI_NONAME; + + /** + Argument buffer overflow. + **/ + var EAI_OVERFLOW; + + /** + Resolved protocol is unknown. + **/ + var EAI_PROTOCOL; + + /** + Service not available for socket type. + **/ + var EAI_SERVICE; + + /** + Socket type not supported. + **/ + var EAI_SOCKTYPE; + + /** + Connection already in progress. + **/ + var EALREADY; + + /** + Bad file descriptor. + **/ + var EBADF; + + /** + Resource busy or locked. + **/ + var EBUSY; + + /** + Operation canceled. + **/ + var ECANCELED; + + /** + Invalid Unicode character. + **/ + var ECHARSET; + + /** + Software caused connection abort. + **/ + var ECONNABORTED; + + /** + Connection refused. + **/ + var ECONNREFUSED; + + /** + Connection reset by peer. + **/ + var ECONNRESET; + + /** + Destination address required. + **/ + var EDESTADDRREQ; + + /** + File already exists. + **/ + var EEXIST; + + /** + Bad address in system call argument. + **/ + var EFAULT; + + /** + File too large. + **/ + var EFBIG; + + /** + Host is unreachable. + **/ + var EHOSTUNREACH; + + /** + Interrupted system call. + **/ + var EINTR; + + /** + Invalid argument. + **/ + var EINVAL; + + /** + I/o error. + **/ + var EIO; + + /** + Socket is already connected. + **/ + var EISCONN; + + /** + Illegal operation on a directory. + **/ + var EISDIR; + + /** + Too many symbolic links encountered. + **/ + var ELOOP; + + /** + Too many open files. + **/ + var EMFILE; + + /** + Message too long. + **/ + var EMSGSIZE; + + /** + Name too long. + **/ + var ENAMETOOLONG; + + /** + Network is down. + **/ + var ENETDOWN; + + /** + Network is unreachable. + **/ + var ENETUNREACH; + + /** + File table overflow. + **/ + var ENFILE; + + /** + No buffer space available. + **/ + var ENOBUFS; + + /** + No such device. + **/ + var ENODEV; + + /** + No such file or directory. + **/ + var ENOENT; + + /** + Not enough memory. + **/ + var ENOMEM; + + /** + Machine is not on the network. + **/ + var ENONET; + + /** + Protocol not available. + **/ + var ENOPROTOOPT; + + /** + No space left on device. + **/ + var ENOSPC; + + /** + Function not implemented. + **/ + var ENOSYS; + + /** + Socket is not connected. + **/ + var ENOTCONN; + + /** + Not a directory. + **/ + var ENOTDIR; + + /** + Directory not empty. + **/ + var ENOTEMPTY; + + /** + Socket operation on non-socket. + **/ + var ENOTSOCK; + + /** + Operation not supported on socket. + **/ + var ENOTSUP; + + /** + Operation not permitted. + **/ + var EPERM; + + /** + Broken pipe. + **/ + var EPIPE; + + /** + Protocol error. + **/ + var EPROTO; + + /** + Protocol not supported. + **/ + var EPROTONOSUPPORT; + + /** + Protocol wrong type for socket. + **/ + var EPROTOTYPE; + + /** + Result too large. + **/ + var ERANGE; + + /** + Read-only file system. + **/ + var EROFS; + + /** + Cannot send after transport endpoint shutdown. + **/ + var ESHUTDOWN; + + /** + Invalid seek. + **/ + var ESPIPE; + + /** + No such process. + **/ + var ESRCH; + + /** + Connection timed out. + **/ + var ETIMEDOUT; + + /** + Text file is busy. + **/ + var ETXTBSY; + + /** + Cross-device link not permitted. + **/ + var EXDEV; + + /** + Unknown error. + **/ + var UNKNOWN; + + /** + End of file. + **/ + var EOF; + + /** + No such device or address. + **/ + var ENXIO; + + /** + Too many links. + **/ + var EMLINK; + + /** + Host is down. + **/ + var EHOSTDOWN; + + /** + Unknown error within libuv or libuv glue code. + **/ + var EOTHER; +} diff --git a/std/asys/uv/UVFsEventType.hx b/std/asys/uv/UVFsEventType.hx new file mode 100644 index 00000000000..64f03c0d5b2 --- /dev/null +++ b/std/asys/uv/UVFsEventType.hx @@ -0,0 +1,7 @@ +package asys.uv; + +enum abstract UVFsEventType(Int) { + var Rename = 1; + var Change = 2; + var RenameChange = 3; +} diff --git a/std/asys/uv/UVProcessSpawnFlags.hx b/std/asys/uv/UVProcessSpawnFlags.hx new file mode 100644 index 00000000000..3b7d0538c6a --- /dev/null +++ b/std/asys/uv/UVProcessSpawnFlags.hx @@ -0,0 +1,18 @@ +package asys.uv; + +enum abstract UVProcessSpawnFlags(Int) { + var None = 0; + var SetUid = 1 << 0; + var SetGid = 1 << 1; + var WindowsVerbatimArguments = 1 << 2; + var Detached = 1 << 3; + var WindowsHide = 1 << 4; + + function new(raw:Int) + this = raw; + + inline function get_raw():Int return this; + + @:op(A | B) + inline function join(other:UVProcessSpawnFlags) return new UVProcessSpawnFlags(this | other.get_raw()); +} diff --git a/std/asys/uv/UVRunMode.hx b/std/asys/uv/UVRunMode.hx new file mode 100644 index 00000000000..22900c82dfa --- /dev/null +++ b/std/asys/uv/UVRunMode.hx @@ -0,0 +1,7 @@ +package asys.uv; + +enum abstract UVRunMode(Int) { + var RunDefault = 0; + var RunOnce; + var RunNoWait; +} diff --git a/std/asys/uv/UVStat.hx b/std/asys/uv/UVStat.hx new file mode 100644 index 00000000000..c6e6ad76abb --- /dev/null +++ b/std/asys/uv/UVStat.hx @@ -0,0 +1,50 @@ +package asys.uv; + +class UVStat { + public final dev:Int; + public final mode:Int; + public final nlink:Int; + public final uid:Int; + public final gid:Int; + public final rdev:Int; + public final ino:Int; + public final size:Int; + public final blksize:Int; + public final blocks:Int; + public final flags:Int; + public final gen:Int; + + public function new(st_dev:Int, st_mode:Int, st_nlink:Int, st_uid:Int, st_gid:Int, st_rdev:Int, st_ino:Int, st_size:Int, st_blksize:Int, st_blocks:Int, + st_flags:Int, st_gen:Int) { + dev = st_dev; + mode = st_mode; + nlink = st_nlink; + uid = st_uid; + gid = st_gid; + rdev = st_rdev; + ino = st_ino; + size = st_size; + blksize = st_blksize; + blocks = st_blocks; + flags = st_flags; + gen = st_gen; + } + + public function isBlockDevice():Bool return (mode & asys.uv.UVConstants.S_IFMT) == asys.uv.UVConstants.S_IFBLK; + + public function isCharacterDevice():Bool return (mode & asys.uv.UVConstants.S_IFMT) == asys.uv.UVConstants.S_IFCHR; + + public function isDirectory():Bool return (mode & asys.uv.UVConstants.S_IFMT) == asys.uv.UVConstants.S_IFDIR; + + public function isFIFO():Bool return (mode & asys.uv.UVConstants.S_IFMT) == asys.uv.UVConstants.S_IFIFO; + + public function isFile():Bool return (mode & asys.uv.UVConstants.S_IFMT) == asys.uv.UVConstants.S_IFREG; + + public function isSocket():Bool return (mode & asys.uv.UVConstants.S_IFMT) == asys.uv.UVConstants.S_IFSOCK; + + public function isSymbolicLink():Bool return (mode & asys.uv.UVConstants.S_IFMT) == asys.uv.UVConstants.S_IFLNK; + + function get_permissions():FilePermissions return @:privateAccess new FilePermissions(mode & asys.uv.UVConstants.S_PERM); + + public var permissions(get, never):FilePermissions; +} diff --git a/std/haxe/Error.hx b/std/haxe/Error.hx new file mode 100644 index 00000000000..3cb4ef19e2a --- /dev/null +++ b/std/haxe/Error.hx @@ -0,0 +1,116 @@ +package haxe; + +import asys.uv.UVErrorType; +import haxe.PosInfos; + +/** + Common class for errors. +**/ +class Error { + function get_message():String { + return (switch (type) { + case UVError(UVErrorType.E2BIG): "argument list too long"; + case UVError(UVErrorType.EACCES): "permission denied"; + case UVError(UVErrorType.EADDRINUSE): "address already in use"; + case UVError(UVErrorType.EADDRNOTAVAIL): "address not available"; + case UVError(UVErrorType.EAFNOSUPPORT): "address family not supported"; + case UVError(UVErrorType.EAGAIN): "resource temporarily unavailable"; + case UVError(UVErrorType.EAI_ADDRFAMILY): "address family not supported"; + case UVError(UVErrorType.EAI_AGAIN): "temporary failure"; + case UVError(UVErrorType.EAI_BADFLAGS): "bad ai_flags value"; + case UVError(UVErrorType.EAI_BADHINTS): "invalid value for hints"; + case UVError(UVErrorType.EAI_CANCELED): "request canceled"; + case UVError(UVErrorType.EAI_FAIL): "permanent failure"; + case UVError(UVErrorType.EAI_FAMILY): "ai_family not supported"; + case UVError(UVErrorType.EAI_MEMORY): "out of memory"; + case UVError(UVErrorType.EAI_NODATA): "no address"; + case UVError(UVErrorType.EAI_NONAME): "unknown node or service"; + case UVError(UVErrorType.EAI_OVERFLOW): "argument buffer overflow"; + case UVError(UVErrorType.EAI_PROTOCOL): "resolved protocol is unknown"; + case UVError(UVErrorType.EAI_SERVICE): "service not available for socket type"; + case UVError(UVErrorType.EAI_SOCKTYPE): "socket type not supported"; + case UVError(UVErrorType.EALREADY): "connection already in progress"; + case UVError(UVErrorType.EBADF): "bad file descriptor"; + case UVError(UVErrorType.EBUSY): "resource busy or locked"; + case UVError(UVErrorType.ECANCELED): "operation canceled"; + case UVError(UVErrorType.ECHARSET): "invalid Unicode character"; + case UVError(UVErrorType.ECONNABORTED): "software caused connection abort"; + case UVError(UVErrorType.ECONNREFUSED): "connection refused"; + case UVError(UVErrorType.ECONNRESET): "connection reset by peer"; + case UVError(UVErrorType.EDESTADDRREQ): "destination address required"; + case UVError(UVErrorType.EEXIST): "file already exists"; + case UVError(UVErrorType.EFAULT): "bad address in system call argument"; + case UVError(UVErrorType.EFBIG): "file too large"; + case UVError(UVErrorType.EHOSTUNREACH): "host is unreachable"; + case UVError(UVErrorType.EINTR): "interrupted system call"; + case UVError(UVErrorType.EINVAL): "invalid argument"; + case UVError(UVErrorType.EIO): "i/o error"; + case UVError(UVErrorType.EISCONN): "socket is already connected"; + case UVError(UVErrorType.EISDIR): "illegal operation on a directory"; + case UVError(UVErrorType.ELOOP): "too many symbolic links encountered"; + case UVError(UVErrorType.EMFILE): "too many open files"; + case UVError(UVErrorType.EMSGSIZE): "message too long"; + case UVError(UVErrorType.ENAMETOOLONG): "name too long"; + case UVError(UVErrorType.ENETDOWN): "network is down"; + case UVError(UVErrorType.ENETUNREACH): "network is unreachable"; + case UVError(UVErrorType.ENFILE): "file table overflow"; + case UVError(UVErrorType.ENOBUFS): "no buffer space available"; + case UVError(UVErrorType.ENODEV): "no such device"; + case UVError(UVErrorType.ENOENT): "no such file or directory"; + case UVError(UVErrorType.ENOMEM): "not enough memory"; + case UVError(UVErrorType.ENONET): "machine is not on the network"; + case UVError(UVErrorType.ENOPROTOOPT): "protocol not available"; + case UVError(UVErrorType.ENOSPC): "no space left on device"; + case UVError(UVErrorType.ENOSYS): "function not implemented"; + case UVError(UVErrorType.ENOTCONN): "socket is not connected"; + case UVError(UVErrorType.ENOTDIR): "not a directory"; + case UVError(UVErrorType.ENOTEMPTY): "directory not empty"; + case UVError(UVErrorType.ENOTSOCK): "socket operation on non-socket"; + case UVError(UVErrorType.ENOTSUP): "operation not supported on socket"; + case UVError(UVErrorType.EPERM): "operation not permitted"; + case UVError(UVErrorType.EPIPE): "broken pipe"; + case UVError(UVErrorType.EPROTO): "protocol error"; + case UVError(UVErrorType.EPROTONOSUPPORT): "protocol not supported"; + case UVError(UVErrorType.EPROTOTYPE): "protocol wrong type for socket"; + case UVError(UVErrorType.ERANGE): "result too large"; + case UVError(UVErrorType.EROFS): "read-only file system"; + case UVError(UVErrorType.ESHUTDOWN): "cannot send after transport endpoint shutdown"; + case UVError(UVErrorType.ESPIPE): "invalid seek"; + case UVError(UVErrorType.ESRCH): "no such process"; + case UVError(UVErrorType.ETIMEDOUT): "connection timed out"; + case UVError(UVErrorType.ETXTBSY): "text file is busy"; + case UVError(UVErrorType.EXDEV): "cross-device link not permitted"; + case UVError(UVErrorType.UNKNOWN): "unknown error"; + case UVError(UVErrorType.EOF): "end of file"; + case UVError(UVErrorType.ENXIO): "no such device or address"; + case UVError(UVErrorType.EMLINK): "too many links"; + case UVError(UVErrorType.EHOSTDOWN): "host is down"; + case UVError(UVErrorType.EOTHER): "other UV error"; + case _: "unknown error"; + }); + } + + /** + A human-readable representation of the error. + **/ + public var message(get, never):String; + + /** + Position where the error was thrown. By default, this is the place where the error is constructed. + **/ + public final posInfos:PosInfos; + + /** + Error type, usable for discerning error types with `switch` statements. + **/ + public final type:ErrorType; + + public function new(type:ErrorType, ?posInfos:PosInfos) { + this.type = type; + this.posInfos = posInfos; + } + + public function toString():String { + return '$message at $posInfos'; + } +} diff --git a/std/haxe/ErrorType.hx b/std/haxe/ErrorType.hx new file mode 100644 index 00000000000..6a99b8d0d3e --- /dev/null +++ b/std/haxe/ErrorType.hx @@ -0,0 +1,5 @@ +package haxe; + +enum ErrorType { + UVError(errno:asys.uv.UVErrorType); +} diff --git a/std/haxe/NoData.hx b/std/haxe/NoData.hx new file mode 100644 index 00000000000..05de27b473d --- /dev/null +++ b/std/haxe/NoData.hx @@ -0,0 +1,10 @@ +package haxe; + +/** + Data type used to indicate the absence of a value, especially in types with + type parameters. +**/ +abstract NoData(Int) { + public inline function new() + this = 0; +} diff --git a/std/haxe/async/ArraySignal.hx b/std/haxe/async/ArraySignal.hx new file mode 100644 index 00000000000..b5639fb02a7 --- /dev/null +++ b/std/haxe/async/ArraySignal.hx @@ -0,0 +1,42 @@ +package haxe.async; + +/** + Basic implementation of a `haxe.async.Signal`. Uses an array for storing + listeners for the signal. +**/ +class ArraySignal implements Signal { + final listeners:Array> = []; + + function get_listenerCount():Int { + return listeners.length; + } + + public var listenerCount(get, never):Int; + + public function new() {} + + public function on(listener:Listener):Void { + listeners.push(listener); + } + + public function once(listener:Listener):Void { + listeners.push(function wrapped(data:T):Void { + listeners.remove(wrapped); + listener(data); + }); + } + + public function off(listener:Listener):Void { + listeners.remove(listener); + } + + public function clear():Void { + listeners.resize(0); + } + + public function emit(data:T):Void { + for (listener in listeners) { + listener(data); + } + } +} diff --git a/std/haxe/async/Callback.hx b/std/haxe/async/Callback.hx new file mode 100644 index 00000000000..d0329ff028e --- /dev/null +++ b/std/haxe/async/Callback.hx @@ -0,0 +1,65 @@ +package haxe.async; + +import haxe.Error; +import haxe.NoData; + +typedef CallbackData = (?error:Error, ?result:T) -> Void; + +/** + A callback. All callbacks in the standard library are functions which accept + two arguments: an error (`haxe.Error`) and a result (`T`). If error is + non-`null`, result must be `null`. The callback type is declared in `CallbackData`. + + This abstract defines multiple `@:from` conversions to improve readability of + callback code. +**/ +@:callable +abstract Callback(CallbackData) from CallbackData { + /** + Returns a callback of the same type as `cb` which is guaranteed to be + non-`null`. If `cb` is given and is not `null` it is returned directly. + If `cb` is `null` a dummy callback which does nothing is returned instead. + **/ + public static function nonNull(?cb:Callback):Callback { + if (cb == null) + return (_, _) -> {}; + return cb; + } + + /** + Wraps a function which takes a single optional `haxe.Error` argument into + a callback of type `Callback`. Allows: + + ```haxe + var cb:Callback = (?err) -> trace("error!", err); + ``` + **/ + @:from public static inline function fromOptionalErrorOnly(f:(?error:Error) -> Void):Callback { + return (?err:Error, ?result:NoData) -> f(err); + } + + /** + Wraps a function which takes a single `haxe.Error` argument into a callback + of type `Callback`. Allows: + + ```haxe + var cb:Callback = (err) -> trace("error!", err); + ``` + **/ + @:from public static inline function fromErrorOnly(f:(error:Error) -> Void):Callback { + return (?err:Error, ?result:NoData) -> f(err); + } + + /* + // this should not be encouraged, may mess up from(Optional)ErrorOnly + @:from static inline function fromResultOnly(f:(?result:T) -> Void):Callback return (?err:Error, ?result:T) -> f(result); + */ + + /** + Wraps a callback function declared without `?` (optional) arguments into a + callback. + **/ + @:from public static inline function fromErrorResult(f:(error:Error, result:T) -> Void):Callback { + return (?err:Error, ?result:T) -> f(err, result); + } +} diff --git a/std/haxe/async/Defer.hx b/std/haxe/async/Defer.hx new file mode 100644 index 00000000000..dd6eadfe9ee --- /dev/null +++ b/std/haxe/async/Defer.hx @@ -0,0 +1,11 @@ +package haxe.async; + +class Defer { + /** + Schedules the given function to run during the next processing tick. + Convenience shortcut for `Timer.delay(f, 0)`. + **/ + public static inline function nextTick(f:() -> Void):haxe.Timer { + return haxe.Timer.delay(f, 0); + } +} diff --git a/std/haxe/async/Listener.hx b/std/haxe/async/Listener.hx new file mode 100644 index 00000000000..da59c91f856 --- /dev/null +++ b/std/haxe/async/Listener.hx @@ -0,0 +1,19 @@ +package haxe.async; + +import haxe.NoData; + +typedef ListenerData = (data:T) -> Void; + +/** + Signal listener. A signal listener is a function which accepts one argument + and has a `Void` return type. +**/ +@:callable +abstract Listener(ListenerData) from ListenerData { + /** + This function allows a listener to a `Signal` to be defined as a + function which accepts no arguments. + **/ + @:from static inline function fromNoArguments(f:() -> Void):Listener + return(data:NoData) -> f(); +} diff --git a/std/haxe/async/Signal.hx b/std/haxe/async/Signal.hx new file mode 100644 index 00000000000..52cb0acfea4 --- /dev/null +++ b/std/haxe/async/Signal.hx @@ -0,0 +1,43 @@ +package haxe.async; + +/** + Signals are a type-safe system to emit events. A signal will calls its + listeners whenever _something_ (the event that the signal represents) happens, + passing along any relevant associated data. + + Signals which have no associated data should use `haxe.NoData` as their type + parameter. +**/ +interface Signal { + /** + Number of listeners to `this` signal. + **/ + var listenerCount(get, never):Int; + + /** + Adds a listener to `this` signal, which will be called for all signal + emissions until it is removed with `off`. + **/ + function on(listener:Listener):Void; + + /** + Adds a listener to `this` signal, which will be called only once, the next + time the signal emits. + **/ + function once(listener:Listener):Void; + + /** + Removes the given listener from `this` signal. + **/ + function off(listener:Listener):Void; + + /** + Removes all listeners from `this` signal. + **/ + function clear():Void; + + /** + Emits `data` to all current listeners of `this` signal. + **/ + function emit(data:T):Void; +} diff --git a/std/haxe/async/WrappedSignal.hx b/std/haxe/async/WrappedSignal.hx new file mode 100644 index 00000000000..b3846360e4e --- /dev/null +++ b/std/haxe/async/WrappedSignal.hx @@ -0,0 +1,52 @@ +package haxe.async; + +import haxe.NoData; + +/** + An implementation of `haxe.async.Signal` which will listen for changes in its + listeners. This is useful when a class changes its behavior depending on + whether there are any listeners to some of its signals, e.g. a `Readable` + stream will not emit data signals when there are no data handlers. +**/ +class WrappedSignal implements Signal { + final listeners:Array> = []; + public final changeSignal:Signal = new ArraySignal(); + + function get_listenerCount():Int { + return listeners.length; + } + + public var listenerCount(get, never):Int; + + public function new() {} + + public function on(listener:Listener):Void { + listeners.push(listener); + changeSignal.emit(new NoData()); + } + + public function once(listener:Listener):Void { + listeners.push(function wrapped(data:T):Void { + listeners.remove(wrapped); + changeSignal.emit(new NoData()); + listener(data); + }); + changeSignal.emit(new NoData()); + } + + public function off(listener:Listener):Void { + listeners.remove(listener); + changeSignal.emit(new NoData()); + } + + public function clear():Void { + listeners.resize(0); + changeSignal.emit(new NoData()); + } + + public function emit(data:T):Void { + for (listener in listeners) { + listener(data); + } + } +} diff --git a/std/haxe/io/Duplex.hx b/std/haxe/io/Duplex.hx new file mode 100644 index 00000000000..e2a19a64da1 --- /dev/null +++ b/std/haxe/io/Duplex.hx @@ -0,0 +1,137 @@ +package haxe.io; + +import haxe.Error; +import haxe.NoData; +import haxe.async.Signal; +import haxe.ds.List; +import haxe.io.Readable.ReadResult; + +/** + A stream which is both readable and writable. + + This is an abstract base class that should never be used directly. Instead, + child classes should override the `internalRead` and `internalWrite` methods. + See `haxe.io.Readable` and `haxe.io.Writable`. +**/ +@:access(haxe.io.Readable) +@:access(haxe.io.Writable) +class Duplex implements IReadable implements IWritable { + public final dataSignal:Signal; + public final endSignal:Signal; + public final errorSignal:Signal; + public final pauseSignal:Signal; + public final resumeSignal:Signal; + + public final drainSignal:Signal; + public final finishSignal:Signal; + public final pipeSignal:Signal; + public final unpipeSignal:Signal; + + final input:Writable; + final output:Readable; + final inputBuffer:List; + final outputBuffer:List; + + function get_inputBufferLength() { + return input.bufferLength; + } + var inputBufferLength(get, never):Int; + + function get_outputBufferLength() { + return output.bufferLength; + } + var outputBufferLength(get, never):Int; + + function new() { + input = new DuplexWritable(this); + output = new DuplexReadable(this); + dataSignal = output.dataSignal; + endSignal = output.endSignal; + errorSignal = output.errorSignal; + pauseSignal = output.pauseSignal; + resumeSignal = output.resumeSignal; + drainSignal = input.drainSignal; + finishSignal = input.finishSignal; + pipeSignal = input.pipeSignal; + unpipeSignal = input.unpipeSignal; + inputBuffer = input.buffer; + outputBuffer = output.buffer; + } + + // override by implementing classes + function internalRead(remaining:Int):ReadResult { + throw "not implemented"; + } + + function internalWrite():Void { + throw "not implemented"; + } + + inline function pop():Bytes { + return input.pop(); + } + + inline function push(chunk:Bytes):Void { + output.push(chunk); + } + + inline function asyncRead(chunks:Array, eof:Bool):Void { + output.asyncRead(chunks, eof); + } + + public inline function write(chunk:Bytes):Bool { + return input.write(chunk); + } + + public function end():Void { + input.end(); + output.asyncRead(null, true); + } + + public inline function pause():Void { + output.pause(); + } + + public inline function resume():Void { + output.resume(); + } + + public inline function pipe(to:IWritable):Void { + output.pipe(to); + } + + public inline function cork():Void { + input.cork(); + } + + public inline function uncork():Void { + input.uncork(); + } +} + +@:access(haxe.io.Duplex) +private class DuplexWritable extends Writable { + final parent:Duplex; + + public function new(parent:Duplex) { + this.parent = parent; + } + + override function internalWrite():Void { + parent.internalWrite(); + } +} + +@:access(haxe.io.Duplex) +private class DuplexReadable extends Readable { + final parent:Duplex; + + public function new(parent:Duplex) { + super(); + this.parent = parent; + } + + override function internalRead(remaining):ReadResult { + return parent.internalRead(remaining); + } +} diff --git a/std/haxe/io/FilePath.hx b/std/haxe/io/FilePath.hx new file mode 100644 index 00000000000..0b666e449fb --- /dev/null +++ b/std/haxe/io/FilePath.hx @@ -0,0 +1,33 @@ +package haxe.io; + +/** + Represents a relative or absolute file path. +**/ +abstract FilePath(String) from String { + @:from public static function encode(bytes:Bytes):FilePath { + // TODO: standard UTF-8 decoding, except any invalid bytes is replaced + // with (for example) U+FFFD, followed by the byte itself as a codepoint + return null; + } + + public function decode():Bytes { + return null; + } + + /** + The components of `this` path. + **/ + public var components(get, never):Array; + + private function get_components():Array { + return this.split("/"); + } + + @:op(A / B) + public function addComponent(other:FilePath):FilePath { + return this + "/" + other.get_raw(); + } + + private function get_raw():String + return this; +} diff --git a/std/haxe/io/IDuplex.hx b/std/haxe/io/IDuplex.hx new file mode 100644 index 00000000000..60fa4b13a21 --- /dev/null +++ b/std/haxe/io/IDuplex.hx @@ -0,0 +1,12 @@ +package haxe.io; + +/** + A stream which is both readable and writable. + + This interface should be used wherever an object that is both readable and + writable is expected, regardless of a specific implementation. See `Duplex` + for an abstract base class that can be used to implement an `IDuplex`. + + See also `IReadable` and `IWritable`. +**/ +interface IDuplex extends IReadable extends IWritable {} diff --git a/std/haxe/io/IReadable.hx b/std/haxe/io/IReadable.hx new file mode 100644 index 00000000000..7fb0b1707d7 --- /dev/null +++ b/std/haxe/io/IReadable.hx @@ -0,0 +1,63 @@ +package haxe.io; + +import haxe.Error; +import haxe.NoData; +import haxe.async.Signal; + +/** + A readable stream. + + This interface should be used wherever an object that is readable is + expected, regardless of a specific implementation. See `Readable` for an + abstract base class that can be used to implement an `IReadable`. +**/ +interface IReadable { + /** + Emitted whenever a chunk of data is available. + **/ + final dataSignal:Signal; + + /** + Emitted when the stream is finished. No further signals will be emitted by + `this` instance after `endSignal` is emitted. + **/ + final endSignal:Signal; + + /** + Emitted for any error that occurs during reading. + **/ + final errorSignal:Signal; + + /** + Emitted when `this` stream is paused. + **/ + final pauseSignal:Signal; + + /** + Emitted when `this` stream is resumed. + **/ + final resumeSignal:Signal; + + /** + Resumes flow of data. Note that this method is called automatically + whenever listeners to either `dataSignal` or `endSignal` are added. + **/ + function resume():Void; + + /** + Pauses flow of data. + **/ + function pause():Void; + + /** + Pipes the data from `this` stream to `target`. + **/ + function pipe(target:IWritable):Void; + + /** + Indicates to `this` stream that an additional `amount` bytes should be read + from the underlying data source. Note that the actual data will arrive via + `dataSignal`. + **/ + // function read(amount:Int):Void; +} diff --git a/std/haxe/io/IWritable.hx b/std/haxe/io/IWritable.hx new file mode 100644 index 00000000000..14c2124806b --- /dev/null +++ b/std/haxe/io/IWritable.hx @@ -0,0 +1,15 @@ +package haxe.io; + +import haxe.NoData; +import haxe.async.Signal; + +interface IWritable { + final drainSignal:Signal; + final finishSignal:Signal; + final pipeSignal:Signal; + final unpipeSignal:Signal; + function write(chunk:Bytes):Bool; + function end():Void; + function cork():Void; + function uncork():Void; +} diff --git a/std/haxe/io/Readable.hx b/std/haxe/io/Readable.hx new file mode 100644 index 00000000000..b6080d7845b --- /dev/null +++ b/std/haxe/io/Readable.hx @@ -0,0 +1,240 @@ +package haxe.io; + +import haxe.Error; +import haxe.NoData; +import haxe.async.*; +import haxe.ds.List; + +/** + A readable stream. + + This is an abstract base class that should never be used directly. Instead, + subclasses should override the `internalRead` method. +**/ +class Readable implements IReadable { + /** + See `IReadable.dataSignal`. + **/ + public final dataSignal:Signal; + + /** + See `IReadable.endSignal`. + **/ + public final endSignal:Signal; + + /** + See `IReadable.errorSignal`. + **/ + public final errorSignal:Signal = new ArraySignal(); + + /** + See `IReadable.pauseSignal`. + **/ + public final pauseSignal:Signal = new ArraySignal(); + + /** + See `IReadable.resumeSignal`. + **/ + public final resumeSignal:Signal = new ArraySignal(); + + /** + High water mark. `Readable` will call `internalRead` pre-emptively to fill + up the internal buffer up to this value when possible. Set to `0` to + disable pre-emptive reading. + **/ + public var highWaterMark = 8192; + + /** + Total amount of data currently in the internal buffer, in bytes. + **/ + public var bufferLength(default, null) = 0; + + /** + Whether data is flowing at the moment. When flowing, data signals will be + emitted and the internal buffer will be empty. + **/ + public var flowing(default, null) = false; + + /** + Whether this stream is finished. When `true`, no further signals will be + emmited by `this` instance. + **/ + public var done(default, null) = false; + + var buffer = new List(); + var deferred:asys.Timer; + var willEof = false; + + @:dox(show) + function new(?highWaterMark:Int = 8192) { + this.highWaterMark = highWaterMark; + var dataSignal = new WrappedSignal(); + dataSignal.changeSignal.on(() -> { + if (dataSignal.listenerCount > 0) + resume(); + }); + this.dataSignal = dataSignal; + var endSignal = new WrappedSignal(); + endSignal.changeSignal.on(() -> { + if (endSignal.listenerCount > 0) + resume(); + }); + this.endSignal = endSignal; + } + + inline function shouldFlow():Bool { + return !done && (dataSignal.listenerCount > 0 || endSignal.listenerCount > 0); + } + + function process():Void { + deferred = null; + if (!shouldFlow()) + flowing = false; + if (!flowing) + return; + + var reschedule = false; + + // pre-emptive read until HWM + if (!willEof && !done) + while (bufferLength < highWaterMark) { + switch (internalRead(highWaterMark - bufferLength)) { + case None: + break; + case Data(chunks, eof): + reschedule = true; + for (chunk in chunks) + push(chunk); + if (eof) { + willEof = true; + break; + } + } + } + + // emit data + while (buffer.length > 0 && flowing && shouldFlow()) { + reschedule = true; + dataSignal.emit(pop()); + } + + if (willEof) { + endSignal.emit(new NoData()); + flowing = false; + done = true; + return; + } + + if (!shouldFlow()) + flowing = false; + else if (reschedule) + scheduleProcess(); + } + + inline function scheduleProcess():Void { + if (deferred == null) + deferred = Defer.nextTick(process); + } + + function push(chunk:Bytes):Bool { + if (done) + throw "stream already done"; + buffer.add(chunk); + bufferLength += chunk.length; + return bufferLength < highWaterMark; + } + + /** + This method should be used internally from `internalRead` to provide data + resulting from asynchronous operations. The arguments to this method are + the same as `ReadableResult.Data`. See `internalRead` for more details. + **/ + @:dox(show) + function asyncRead(chunks:Array, eof:Bool):Void { + if (done || willEof) + throw "stream already done"; + if (chunks != null) + for (chunk in chunks) + push(chunk); + if (eof) + willEof = true; + if (chunks != null || eof) + scheduleProcess(); + } + + function pop():Bytes { + if (done) + throw "stream already done"; + var chunk = buffer.pop(); + bufferLength -= chunk.length; + return chunk; + } + + /** + This method should be overridden by a subclass. + + This method will be called as needed by `Readable`. The `remaining` + argument is an indication of how much data is needed to fill the internal + buffer up to the high water mark, or the current requested amount of data. + This method is called in a cycle until the read cycle is stopped with a + `None` return or an EOF is indicated, as described below. + + If a call to this method returns `None`, the current read cycle is + ended. This value should be returned when there is no data available at the + moment, but a read request was scheduled and will later be fulfilled by a + call to `asyncRead`. + + If a call to this method returns `Data(chunks, eof)`, `chunks` will be + added to the internal buffer. If `eof` is `true`, the read cycle is ended + and the readable stream signals an EOF (end-of-file). After an EOF, no + further calls will be made. `chunks` should not be an empty array if `eof` + is `false`. + + Code inside this method should only call `asyncRead` (asynchronously from + a callback) or provide data using the return value. + **/ + @:dox(show) + function internalRead(remaining:Int):ReadResult { + throw "not implemented"; + } + + /** + See `IReadable.resume`. + **/ + public function resume():Void { + if (done) + return; + if (!flowing) { + resumeSignal.emit(new NoData()); + flowing = true; + scheduleProcess(); + } + } + + /** + See `IReadable.pause`. + **/ + public function pause():Void { + if (done) + return; + if (flowing) { + pauseSignal.emit(new NoData()); + flowing = false; + } + } + + /** + See `IReadable.pipe`. + **/ + public function pipe(to:IWritable):Void { + throw "!"; + } +} + +/** + See `Readable.internalRead`. +**/ +enum ReadResult { + None; + Data(chunks:Array, eof:Bool); +} diff --git a/std/haxe/io/StreamTools.hx b/std/haxe/io/StreamTools.hx new file mode 100644 index 00000000000..6d8617d48b5 --- /dev/null +++ b/std/haxe/io/StreamTools.hx @@ -0,0 +1,21 @@ +package haxe.io; + +class StreamTools { + /** + Creates a pipeline out of the given streams. `input` is piped to the first + element in `intermediate`, which is piped to the next element in + `intermediate`, and so on, until the last stream is piped to `output`. If + `intermediate` is `null`, it is treated as an empty array and `input` is + connected directly to `output`. + **/ + public static function pipeline(input:IReadable, ?intermediate:Array, output:IWritable):Void { + if (intermediate == null || intermediate.length == 0) + return input.pipe(output); + + input.pipe(intermediate[0]); + for (i in 0...intermediate.length - 1) { + intermediate[i].pipe(intermediate[i + 1]); + } + intermediate[intermediate.length - 1].pipe(output); + } +} diff --git a/std/haxe/io/Transform.hx b/std/haxe/io/Transform.hx new file mode 100644 index 00000000000..3fa822f5bda --- /dev/null +++ b/std/haxe/io/Transform.hx @@ -0,0 +1,94 @@ +package haxe.io; + +import haxe.Error; +import haxe.NoData; +import haxe.async.Signal; + +@:access(haxe.io.Readable) +@:access(haxe.io.Writable) +class Transform implements IReadable implements IWritable { + public final dataSignal:Signal; + public final endSignal:Signal; + public final errorSignal:Signal; + public final pauseSignal:Signal; + public final resumeSignal:Signal; + + public final drainSignal:Signal; + public final finishSignal:Signal; + public final pipeSignal:Signal; + public final unpipeSignal:Signal; + + final input:Writable; + final output:Readable; + + var transforming:Bool = false; + + function new() { + input = new TransformWritable(this); + output = @:privateAccess new Readable(0); + dataSignal = output.dataSignal; + endSignal = output.endSignal; + errorSignal = output.errorSignal; + pauseSignal = output.pauseSignal; + resumeSignal = output.resumeSignal; + drainSignal = input.drainSignal; + finishSignal = input.finishSignal; + pipeSignal = input.pipeSignal; + unpipeSignal = input.unpipeSignal; + } + + function internalTransform(chunk:Bytes):Void { + throw "not implemented"; + } + + function push(chunk:Bytes):Void { + transforming = false; + output.asyncRead([chunk], false); + input.internalWrite(); + } + + public inline function write(chunk:Bytes):Bool { + return input.write(chunk); + } + + public function end():Void { + input.end(); + output.asyncRead(null, true); + } + + public inline function pause():Void { + output.pause(); + } + + public inline function resume():Void { + output.resume(); + } + + public inline function pipe(to:IWritable):Void { + output.pipe(to); + } + + public inline function cork():Void { + input.cork(); + } + + public inline function uncork():Void { + input.uncork(); + } +} + +@:access(haxe.io.Transform) +private class TransformWritable extends Writable { + final parent:Transform; + + public function new(parent:Transform) { + this.parent = parent; + } + + override function internalWrite():Void { + if (buffer.length > 0) { + parent.transforming = true; + parent.internalTransform(pop()); + } + } +} diff --git a/std/haxe/io/Writable.hx b/std/haxe/io/Writable.hx new file mode 100644 index 00000000000..35ac5fbbcd6 --- /dev/null +++ b/std/haxe/io/Writable.hx @@ -0,0 +1,91 @@ +package haxe.io; + +import haxe.NoData; +import haxe.async.*; +import haxe.ds.List; + +/** + A writable stream. + + This is an abstract base class that should never be used directly. Instead, + subclasses should override the `internalWrite` method. +**/ +class Writable implements IWritable { + public final drainSignal:Signal = new ArraySignal(); + public final finishSignal:Signal = new ArraySignal(); + public final pipeSignal:Signal = new ArraySignal(); + public final unpipeSignal:Signal = new ArraySignal(); + + public var highWaterMark = 8192; + public var bufferLength(default, null) = 0; + public var corkCount(default, null) = 0; + public var done(default, null) = false; + + var willDrain = false; + var willFinish = false; + var deferred:asys.Timer; + var buffer = new List(); + + // for use by implementing classes + function pop():Bytes { + var chunk = buffer.pop(); + bufferLength -= chunk.length; + if (willDrain && buffer.length == 0) { + willDrain = false; + if (deferred == null) + deferred = Defer.nextTick(() -> { + deferred = null; + drainSignal.emit(new NoData()); + }); + } + if (willFinish && buffer.length == 0) { + willFinish = false; + Defer.nextTick(() -> finishSignal.emit(new NoData())); + } + return chunk; + } + + // override by implementing classes + function internalWrite():Void { + throw "not implemented"; + } + + // for producers + public function write(chunk:Bytes):Bool { + if (done) + throw "stream already done"; + buffer.add(chunk); + bufferLength += chunk.length; + if (corkCount <= 0) + internalWrite(); + if (bufferLength >= highWaterMark) { + willDrain = true; + return false; + } + return true; + } + + public function end():Void { + corkCount = 0; + if (buffer.length > 0) + internalWrite(); + if (buffer.length > 0) + willFinish = true; + else + finishSignal.emit(new NoData()); + done = true; + } + + public function cork():Void { + if (done) + return; + corkCount++; + } + + public function uncork():Void { + if (done || corkCount <= 0) + return; + if (--corkCount == 0 && buffer.length > 0) + internalWrite(); + } +}