Skip to content

Commit 7beffc0

Browse files
committed
WIP: Review.
1 parent 3a4a911 commit 7beffc0

5 files changed

Lines changed: 738 additions & 726 deletions

File tree

asynciterator.ts

Lines changed: 119 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ export class AsyncIterator<T> extends EventEmitter {
463463
@returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator
464464
*/
465465
map<D>(map: (item: T, it: AsyncIterator<any>) => D | null, self?: any): AsyncIterator<D> {
466-
return new MappingIterator<T, D>(this, [bind(map, self)]);
466+
return new MappingIterator<T, D>(this, bind(map, self));
467467
}
468468

469469
/**
@@ -786,8 +786,112 @@ export class IntegerIterator extends AsyncIterator<number> {
786786
}
787787

788788

789+
export type MapItem<T, D = T, I extends AsyncIterator<D> = AsyncIterator<D>> =
790+
(item: T, iterator: I) => D;
791+
792+
/**
793+
An iterator that calls a synchronous function
794+
on every item from its source iterator.
795+
@extends module:asynciterator.AsyncIterator
796+
*/
797+
export class MappingIterator<T, D = T> extends AsyncIterator<D> {
798+
protected _source: AsyncIterator<T>;
799+
private readonly _destroySource: boolean;
800+
private readonly _mappings: MapItem<any, any, MappingIterator<T, D>>[];
801+
private readonly _mappingRoot: InternalSource<any>;
802+
803+
// This is wrong: readable should be set by listening to source events
804+
get readable() {
805+
return (this._state < CLOSED) && this._source.readable;
806+
}
807+
808+
/**
809+
* Applies the given mapping to the source iterator.
810+
*/
811+
constructor(
812+
source: AsyncIterator<T>,
813+
mapping?: MapItem<T, D, MappingIterator<T, D>>,
814+
options?: SourcedIteratorOptions,
815+
);
816+
817+
/**
818+
* Applies the given list mappings to the mapping root.
819+
*/
820+
constructor(
821+
source: AsyncIterator<T>,
822+
mappings: MapItem<any, any, MappingIterator<T, D>>[],
823+
mappingRoot: AsyncIterator<any>,
824+
options?: SourcedIteratorOptions,
825+
);
826+
827+
constructor(
828+
source: AsyncIterator<T>,
829+
mappings: MapItem<T, D, MappingIterator<T, D>> |
830+
MapItem<any, any, MappingIterator<T, D>>[] = [],
831+
mappingRoot?: AsyncIterator<any> | SourcedIteratorOptions,
832+
options: SourcedIteratorOptions = {},
833+
) {
834+
super();
835+
// Resolve optional parameters
836+
if (!isEventEmitter(mappingRoot)) {
837+
if (mappingRoot)
838+
options = mappingRoot;
839+
mappingRoot = source;
840+
}
841+
this._source = source;
842+
this._mappings = isFunction(mappings) ? [mappings] : mappings;
843+
this._mappingRoot = mappingRoot as InternalSource<any>;
844+
this._destroySource = options.destroySource !== false;
845+
846+
if (mappingRoot.done) {
847+
this.close();
848+
}
849+
else {
850+
_validateSource(mappingRoot);
851+
this._mappingRoot._destination = this;
852+
this._mappingRoot.on('end', destinationClose);
853+
this._mappingRoot.on('error', destinationEmitError);
854+
this._mappingRoot.on('readable', destinationEmitReadable);
855+
}
856+
}
857+
858+
read(): D | null {
859+
let mapped : any = null;
860+
while (mapped === null && (mapped = this._source.read()) !== null) {
861+
for (let i = 0; i < this._mappings.length; i++) {
862+
mapped = this._mappings[i](mapped, this);
863+
if (mapped === null)
864+
break;
865+
}
866+
}
867+
return mapped;
868+
}
869+
870+
map<K>(map: (item: D, it: AsyncIterator<any>) => K | null, self?: any): AsyncIterator<K> {
871+
return new MappingIterator<T, K>(this._source, [...this._mappings, bind(map, self)], this);
872+
}
873+
874+
public close() {
875+
if (this._destroySource)
876+
this._mappingRoot.destroy();
877+
super.close();
878+
}
879+
880+
/* Cleans up the source iterator and ends. */
881+
protected _end(destroy: boolean) {
882+
this._mappingRoot.removeListener('end', destinationClose);
883+
this._mappingRoot.removeListener('error', destinationEmitError);
884+
this._mappingRoot.removeListener('readable', destinationEmitReadable);
885+
delete this._mappingRoot._destination;
886+
if (this._destroySource)
887+
this._mappingRoot.destroy();
888+
super._end(destroy);
889+
}
890+
}
891+
892+
789893
/**
790-
A iterator that maintains an internal buffer of items.
894+
An iterator that maintains an internal buffer of items.
791895
This class serves as a base class for other iterators
792896
with a typically complex item generation process.
793897
@extends module:asynciterator.AsyncIterator
@@ -1252,102 +1356,23 @@ export class TransformIterator<S, D = S> extends BufferedIterator<D> {
12521356
}
12531357
}
12541358

1255-
function destinationEmitError<S>(this: InternalSource<S>, error: Error) {
1256-
this._destination.emit('error', error);
1359+
function destinationClose<S>(this: InternalSource<S>) {
1360+
this._destination.close();
12571361
}
12581362
function destinationCloseWhenDone<S>(this: InternalSource<S>) {
12591363
(this._destination as any)._closeWhenDone();
12601364
}
1365+
function destinationEmitReadable<S>(this: InternalSource<S>) {
1366+
this._destination.emit('readable');
1367+
}
1368+
function destinationEmitError<S>(this: InternalSource<S>, error: Error) {
1369+
this._destination.emit('error', error);
1370+
}
12611371
function destinationFillBuffer<S>(this: InternalSource<S>) {
12621372
if ((this._destination as any)._sourceStarted !== false)
12631373
(this._destination as any)._fillBuffer();
12641374
}
12651375

1266-
export class MappingIterator<T, D = T> extends AsyncIterator<D> {
1267-
private _destroySource: boolean;
1268-
1269-
get readable() {
1270-
return this.source.readable;
1271-
}
1272-
1273-
set readable(readable) {
1274-
this.source.readable = readable;
1275-
}
1276-
1277-
constructor(
1278-
protected source: AsyncIterator<T>,
1279-
private transforms: ((item: any, iterator: AsyncIterator<any>) => any)[],
1280-
private upstream: AsyncIterator<any> = source,
1281-
options: { destroySource?: boolean } = {}
1282-
) {
1283-
// Subscribe the iterator directly upstream rather than the original source to avoid over-subscribing
1284-
// listeners to the original source
1285-
super();
1286-
this._destroySource = options.destroySource !== false;
1287-
if (upstream.done) {
1288-
this.close();
1289-
}
1290-
else {
1291-
_validateSource(upstream);
1292-
// @ts-ignore
1293-
upstream._destination = this;
1294-
upstream.on('end', onSourceEnd);
1295-
upstream.on('error', onSourceError);
1296-
upstream.on('readable', onSourceReadable);
1297-
}
1298-
}
1299-
1300-
read(): D | null {
1301-
const { source, transforms } = this;
1302-
let item, i;
1303-
while ((item = source.read()) !== null) {
1304-
i = transforms.length;
1305-
// Applies each of the transforms in sequence, and terminates
1306-
// early if a transform returns null
1307-
//
1308-
// Do not use a for-of loop here, it slows down transformations
1309-
// by approximately a factor of 2.
1310-
while (i-- >= 1 && (item = transforms[i](item, this)) !== null)
1311-
;
1312-
if (item !== null)
1313-
return item;
1314-
}
1315-
return null;
1316-
}
1317-
1318-
map<K>(map: (item: D, it: AsyncIterator<any>) => K | null, self?: any): AsyncIterator<K> {
1319-
return new MappingIterator<T, K>(this.source, [bind(map, self), ...this.transforms], this);
1320-
}
1321-
1322-
destroy(cause?: Error): void {
1323-
this.upstream.destroy(cause);
1324-
super.destroy(cause);
1325-
}
1326-
1327-
public close() {
1328-
this.upstream.removeListener('end', onSourceEnd);
1329-
this.upstream.removeListener('error', onSourceError);
1330-
this.upstream.removeListener('readable', onSourceReadable);
1331-
if (this._destroySource)
1332-
this.upstream.destroy();
1333-
scheduleTask(() => {
1334-
// @ts-ignore
1335-
delete this.upstream._destination;
1336-
delete this.source;
1337-
});
1338-
super.close();
1339-
}
1340-
}
1341-
1342-
function onSourceError<S>(this: InternalSource<S>, error: Error) {
1343-
this._destination.emit('error', error);
1344-
}
1345-
function onSourceEnd<S>(this: InternalSource<S>) {
1346-
this._destination.close();
1347-
}
1348-
function onSourceReadable<S>(this: InternalSource<S>) {
1349-
this._destination.emit('readable');
1350-
}
13511376

13521377
/**
13531378
An iterator that generates items based on a source iterator
@@ -2023,15 +2048,18 @@ function isSourceExpression<T>(object: any): object is SourceExpression<T> {
20232048
return object && (isEventEmitter(object) || isPromise(object) || isFunction(object));
20242049
}
20252050

2051+
export interface SourcedIteratorOptions {
2052+
destroySource?: boolean;
2053+
}
2054+
20262055
export interface BufferedIteratorOptions {
20272056
maxBufferSize?: number;
20282057
autoStart?: boolean;
20292058
}
20302059

2031-
export interface TransformIteratorOptions<S> extends BufferedIteratorOptions {
2060+
export interface TransformIteratorOptions<S> extends SourcedIteratorOptions, BufferedIteratorOptions {
20322061
source?: SourceExpression<S>;
20332062
optional?: boolean;
2034-
destroySource?: boolean;
20352063
}
20362064

20372065
export interface TransformOptions<S, D> extends TransformIteratorOptions<S> {

0 commit comments

Comments
 (0)