-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathindex.js
92 lines (75 loc) · 1.89 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
'use strict'
const extend = require('xtend')
const DuplexStream = require('stream').Duplex
class ObservableStore extends DuplexStream {
constructor (initState = {}) {
// construct as duplex stream
super({
// pass values not serializations
objectMode: true,
// a writer can end and we are still readable
allowHalfOpen: true,
})
// dont buffer outgoing updates
this.resume()
// set init state
this._state = initState
}
// wrapper around internal getState
getState () {
return this._getState()
}
// wrapper around internal putState
putState (newState) {
this._putState(newState)
this.emit('update', newState)
this.push(this.getState())
}
updateState (partialState) {
// if non-null object, merge
if (partialState && typeof partialState === 'object') {
const state = this.getState()
const newState = Object.assign({}, state, partialState)
this.putState(newState)
// if not object, use new value
} else {
this.putState(partialState)
}
}
// subscribe to changes
subscribe (handler) {
this.on('update', handler)
}
// unsubscribe to changes
unsubscribe (handler) {
this.removeListener('update', handler)
}
//
// private
//
// read from persistence
_getState () {
return this._state
}
// write to persistence
_putState (newState) {
this._state = newState
}
//
// stream implementation
//
// emit current state on new destination
pipe (dest, options) {
const result = DuplexStream.prototype.pipe.call(this, dest, options)
dest.write(this.getState())
return result
}
// write from incomming stream to state
_write (chunk, encoding, callback) {
this.putState(chunk)
callback()
}
// noop - outgoing stream is asking us if we have data we arent giving it
_read (size) { }
}
module.exports = ObservableStore