-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstreams.js
More file actions
230 lines (202 loc) · 7.88 KB
/
Copy pathstreams.js
File metadata and controls
230 lines (202 loc) · 7.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
/* Stream functions implementations */
/*
* Creates a Stream object.
*/
function Stream(successHandler, errorHandler) {
this.successHandler = successHandler;
this.errorHandler = errorHandler;
}
Stream.prototype.Stream = function() {
return this
}
/*
* Creates a Stream object from a function.
*/
Function.prototype.Stream = function() {
var f = this; // the function
var error = function(e, ek, until) {
/* TODO: why check for undefined here?? */
if (until == undefined || !until.stop) {
ek(e)
}
}
var success = function(x, k, ek, id, until) {
if (!until.stop) {
try {
var res = f(x);
schedule(k, res, id, until);
} catch(err) {
ek(err)
}
}
}
return new Stream(success, error);
}
/*
* Creates a Stream object that is setup to handle errors. This is an interal function and should not be used by the user.
*/
Function.prototype.ErrorStream = function() {
var h = this
var success = function(x, k, id, until) {
if (!until.stop) {
schedule(k, x, id, until)
}
}
var error = function(err, k, ek, id, until) {
if (!until.stop) {
try {
var res = h(err)
schedule(k, res, id, until)
} catch (err) {
ek(err)
}
}
}
return new Stream(success, error)
}
/*
* Chains a call to g to the end of the chain.
*/
Stream.prototype.next = function (g) {
var f = this;
g = g.Stream();
var error = function(e, ek, until) {
if (!until.stop) {
ek(e)
}
}
var success = function (x, k, ek, id, until) {
if (!until.stop) {
f.successHandler(x,
function (y) { g.successHandler(y, k, ek, id, until); },
function(err) { g.errorHandler(err, ek, id, until); },
id,
until
);
}
};
return new Stream(success, error);
}
/*
* Creates a Stream object that will handle an error by calling h. This has no effect if there isn't an error.
*/
Stream.prototype.error = function(h) {
var f = this
h = h.ErrorStream()
var error = function(e, ek) {
ek(e)
}
var success = function(x, k, ek, id, until) {
if (!until.stop) {
f.successHandler(x, function(y) {h.successHandler(y, k, id, until)}, function(err) {h.errorHandler(err, k, ek, id, until)}, id, until);
}
}
return new Stream(success, error);
}
/*
* This designates the end of a stream. A stream will continue to stream data until stop evaluates to true if it's a function
* or stop happens if it's an event. Interval can be used to explicitly state the amount of time between each piece of data.
*
* Returns a StateMachine object.
*/
Stream.prototype.until = function(stop, interval) {
interval = interval !== undefined ? interval : 0
if (typeof(stop) === "function") {
return this._until_function(stop, interval);
} else if (stop instanceof Event) {
return this._until_event(stop, interval)
} else {
throw new Error("Until must be called with a function or an event.")
}
}
// an internal function for events
Stream.prototype._until_event = function(event, interval) {
var self = this
var success = function(x, k, ek) {
// the interval and calls the continuation.
self.successHandler(x,
function(y) {
self.stop = false
// Event is a StateMachine so we want to call it's success handler. Once the event happens our success function will be called which clears
event.successHandler(x, function(y) { self.stop = true; clearInterval(self.intervalId); k(y)})
self.intervalId = setInterval(function() {
self.successHandler(x,
function() {
// we don't want to do anything here since we're waiting on the event to fire
},
function(err) {
clearInterval(self.intervalId)
ek(err)
}, self.intervalId, self)
}, interval);
},
function(err) { ek(err) }, self.intervalId, self);
}
return new StateMachine(success, function(err, ek) { ek(err) })
}
// an interval function for functions with until.
Stream.prototype._until_function = function (f, interval) {
var self = this;
var success = function(x, k, ek) {
/*
* This is a little gross so here's what is happening. We don't want to wait for the first call in set interval to fire.
* This causes the entire chain to wait 'interval' before starting. So we call our success handler immediately. In order
* to ensure the chain before is ran before this stream starts, we have to put the interval code into the success handler
* of the first call. In that success handler, we test if we still want to keep going, and if so, set the interval.
*/
self.successHandler(x,
function(y) {
// now if we want to keep going, we'll set the timer
if (!f(y)) {
self.stop = false
self.intervalId = setInterval(function () {
self.successHandler(x,
function(y) {
// this chain has succeeded, clear the timer and call the continuation
if (!self.stop && f(y)) {
clearInterval(self.intervalId);
self.stop = true;
schedule(k, y)
}
},
function(err) {
// something in the chain failed and wasn't dealt with, so just pass it on
if (!self.stop) {
clearInterval(self.intervalId);
self.stop = true
ek(err)
}
}, self.intervalId, self);
}, interval);
} else {
schedule(k, y)
}
},
function(err) {
// this means an error has been thrown and wasn't dealt with, so just pass it on
ek(err);
}, self.intervalId, self);
}
return new StateMachine(success,
function(err, ek) {
ek(err)
});
}
/*
* Runs the stream indefinitely with x as the argument.
*/
Stream.prototype.run = function (x) {
this.until(function() {return false}).run(x)
}
/*
* Runs the stream indefinitely with f as a cleanup function to be called at the end of each firing.
*/
Stream.prototype.done = function(f, x) {
this.next(f).run(x);
}
/*
* A stream is not supported on a Stream object.
*/
Stream.prototype.stream = function () {
throw new Error("Cannot call .stream on a Stream object");
}