Skip to content

Commit

Permalink
perf improvements for creation ops
Browse files Browse the repository at this point in the history
  • Loading branch information
mattpodwysocki committed Apr 7, 2015
1 parent f460a08 commit fcedc06
Show file tree
Hide file tree
Showing 34 changed files with 1,482 additions and 386 deletions.
60 changes: 30 additions & 30 deletions Gruntfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,19 @@ module.exports = function (grunt) {
'src/core/perf/operators/toarray.js',
'src/core/linq/observable/create.js',
'src/core/linq/observable/defer.js',
'src/core/linq/observable/empty.js',
'src/core/perf/operators/empty.js',
'src/core/perf/operators/from.js',
'src/core/perf/operators/fromarrayobservable.js','src/core/perf/operators/fromarray.js',
'src/core/linq/observable/generate.js',
'src/core/perf/operators/of.js',
'src/core/linq/observable/ofarraychanges.js',
'src/core/linq/observable/ofobjectchanges.js',
'src/core/linq/observable/never.js',
'src/core/linq/observable/pairs.js',
'src/core/perf/operators/never.js',
'src/core/perf/operators/pairs.js',
'src/core/perf/operators/range.js',
'src/core/linq/observable/repeat.js',
'src/core/linq/observable/return.js',
'src/core/linq/observable/throw.js',
'src/core/perf/operators/just.js',
'src/core/perf/operators/throw.js',
'src/core/linq/observable/using.js',

// Multiple
Expand Down Expand Up @@ -372,17 +372,17 @@ module.exports = function (grunt) {
'src/core/perf/operators/toarray.js',
'src/core/linq/observable/create.js',
'src/core/linq/observable/defer.js',
'src/core/linq/observable/empty.js',
'src/core/perf/operators/empty.js',
'src/core/perf/operators/from.js',
'src/core/perf/operators/fromarrayobservable.js','src/core/perf/operators/fromarray.js',
'src/core/linq/observable/generate.js',
'src/core/perf/operators/of.js',
'src/core/linq/observable/never.js',
'src/core/linq/observable/pairs.js',
'src/core/perf/operators/never.js',
'src/core/perf/operators/pairs.js',
'src/core/perf/operators/range.js',
'src/core/linq/observable/repeat.js',
'src/core/linq/observable/return.js',
'src/core/linq/observable/throw.js',
'src/core/perf/operators/just.js',
'src/core/perf/operators/throw.js',
'src/core/linq/observable/using.js',

// Multiple
Expand Down Expand Up @@ -665,17 +665,17 @@ module.exports = function (grunt) {
'src/core/perf/operators/toarray.js',
'src/core/linq/observable/create.js',
'src/core/linq/observable/defer.js',
'src/core/linq/observable/empty.js',
'src/core/perf/operators/empty.js',
'src/core/perf/operators/from.js',
'src/core/perf/operators/fromarrayobservable.js','src/core/perf/operators/fromarray.js',
'src/core/linq/observable/generate.js',
'src/core/linq/observable/never.js',
'src/core/perf/operators/never.js',
'src/core/perf/operators/of.js',
'src/core/linq/observable/pairs.js',
'src/core/perf/operators/pairs.js',
'src/core/perf/operators/range.js',
'src/core/linq/observable/repeat.js',
'src/core/linq/observable/return.js',
'src/core/linq/observable/throw.js',
'src/core/perf/operators/just.js',
'src/core/perf/operators/throw.js',
'src/core/linq/observable/using.js',

// Multiple
Expand Down Expand Up @@ -814,17 +814,17 @@ module.exports = function (grunt) {
'src/core/perf/operators/toarray.js',
'src/core/linq/observable/create.js',
'src/core/linq/observable/defer.js',
'src/core/linq/observable/empty.js',
'src/core/perf/operators/empty.js',
'src/core/perf/operators/from.js',
'src/core/perf/operators/fromarrayobservable.js','src/core/perf/operators/fromarray.js',
'src/core/linq/observable/generate.js',
'src/core/linq/observable/never.js',
'src/core/perf/operators/never.js',
'src/core/perf/operators/of.js',
'src/core/linq/observable/pairs.js',
'src/core/perf/operators/pairs.js',
'src/core/perf/operators/range.js',
'src/core/linq/observable/repeat.js',
'src/core/linq/observable/return.js',
'src/core/linq/observable/throw.js',
'src/core/perf/operators/just.js',
'src/core/perf/operators/throw.js',
'src/core/linq/observable/using.js',

// Multiple
Expand Down Expand Up @@ -951,16 +951,16 @@ module.exports = function (grunt) {
'src/core/perf/operators/toarray.js',
'src/core/linq/observable/create.js',
'src/core/linq/observable/defer.js',
'src/core/linq/observable/empty.js',
'src/core/perf/operators/empty.js',
'src/core/perf/operators/from.js',
'src/core/perf/operators/fromarrayobservable.js','src/core/perf/operators/fromarray.js',
'src/core/linq/observable/never.js',
'src/core/perf/operators/never.js',
'src/core/perf/operators/of.js',
'src/core/linq/observable/pairs.js',
'src/core/perf/operators/pairs.js',
'src/core/perf/operators/range.js',
'src/core/linq/observable/repeat.js',
'src/core/linq/observable/return.js',
'src/core/linq/observable/throw.js',
'src/core/perf/operators/just.js',
'src/core/perf/operators/throw.js',

// Multiple
'src/core/linq/observable/catchproto.js',
Expand Down Expand Up @@ -1121,16 +1121,16 @@ module.exports = function (grunt) {
'src/core/perf/operators/toarray.js',
'src/core/linq/observable/create.js',
'src/core/linq/observable/defer.js',
'src/core/linq/observable/empty.js',
'src/core/perf/operators/empty.js',
'src/core/perf/operators/from.js',
'src/core/perf/operators/fromarrayobservable.js','src/core/perf/operators/fromarray.js',
'src/core/linq/observable/never.js',
'src/core/perf/operators/never.js',
'src/core/perf/operators/of.js',
'src/core/linq/observable/pairs.js',
'src/core/perf/operators/pairs.js',
'src/core/perf/operators/range.js',
'src/core/linq/observable/repeat.js',
'src/core/linq/observable/return.js',
'src/core/linq/observable/throw.js',
'src/core/perf/operators/just.js',
'src/core/perf/operators/throw.js',

// Multiple
'src/core/linq/observable/catchproto.js',
Expand Down
187 changes: 147 additions & 40 deletions dist/rx.all.compat.js
Original file line number Diff line number Diff line change
Expand Up @@ -2588,6 +2588,34 @@
});
};

var EmptyObservable = (function(__super__) {
inherits(EmptyObservable, __super__);
function EmptyObservable(scheduler) {
this.scheduler = scheduler;
__super__.call(this);
}

EmptyObservable.prototype.subscribeCore = function (observer) {
var sink = new EmptySink(observer, this);
return sink.run();
};

function EmptySink(observer, parent) {
this.observer = observer;
this.parent = parent;
}

function scheduleItem(s, state) {
state.onCompleted();
}

EmptySink.prototype.run = function () {
return this.parent.scheduler.scheduleWithState(this.observer, scheduleItem);
};

return EmptyObservable;
}(ObservableBase));

/**
* Returns an empty observable sequence, using the specified scheduler to send out the single OnCompleted message.
*
Expand All @@ -2599,11 +2627,7 @@
*/
var observableEmpty = Observable.empty = function (scheduler) {
isScheduler(scheduler) || (scheduler = immediateScheduler);
return new AnonymousObservable(function (observer) {
return scheduler.scheduleWithState(null, function () {
observer.onCompleted();
});
});
return new EmptyObservable(scheduler);
};

var FromObservable = (function(__super__) {
Expand Down Expand Up @@ -2883,14 +2907,62 @@
return new FromArrayObservable(args, scheduler);
};

var NeverObservable = (function(__super__) {
inherits(NeverObservable, __super__);
function NeverObservable() {
__super__.call(this);
}

NeverObservable.prototype.subscribeCore = function (observer) {
return disposableEmpty;
};

return NeverObservable;
}(ObservableBase));

/**
* Returns a non-terminating observable sequence, which can be used to denote an infinite duration (e.g. when using reactive joins).
* Returns a non-terminating observable sequence, which can be used to denote an infinite duration (e.g. when using reactive joins).
* @returns {Observable} An observable sequence whose observers will never get called.
*/
var observableNever = Observable.never = function () {
return new AnonymousObservable(function () {
return disposableEmpty;
});
return new NeverObservable();
};

var PairsObservable = (function(__super__) {
inherits(PairsObservable, __super__);
function PairsObservable(obj, scheduler) {
this.obj = obj;
this.keys = Object.keys(obj);
this.scheduler = scheduler;
__super__.call(this);
}

PairsObservable.prototype.subscribeCore = function (observer) {
var sink = new PairsSink(observer, this);
return sink.run();
};

return PairsObservable;
}(ObservableBase));

function PairsSink(observer, parent) {
this.observer = observer;
this.parent = parent;
}

PairsSink.prototype.run = function () {
var observer = this.observer, obj = this.parent.obj, keys = this.parent.keys, len = keys.length;
function loopRecursive(i, recurse) {
if (i < len) {
var key = keys[i];
observer.onNext([key, obj[key]]);
recurse(i + 1);
} else {
observer.onCompleted();
}
}

return this.parent.scheduler.scheduleRecursiveWithState(0, loopRecursive);
};

/**
Expand All @@ -2900,19 +2972,8 @@
* @returns {Observable} An observable sequence of [key, value] pairs from the object.
*/
Observable.pairs = function (obj, scheduler) {
scheduler || (scheduler = Rx.Scheduler.currentThread);
return new AnonymousObservable(function (observer) {
var keys = Object.keys(obj), len = keys.length;
return scheduler.scheduleRecursiveWithState(0, function (idx, self) {
if (idx < len) {
var key = keys[idx];
observer.onNext([key, obj[key]]);
self(idx + 1);
} else {
observer.onCompleted();
}
});
});
scheduler || (scheduler = currentThreadScheduler);
return new PairsObservable(obj, scheduler);
};

var RangeObservable = (function(__super__) {
Expand Down Expand Up @@ -2985,6 +3046,37 @@
return observableReturn(value, scheduler).repeat(repeatCount == null ? -1 : repeatCount);
};

var JustObservable = (function(__super__) {
inherits(JustObservable, __super__);
function JustObservable(value, scheduler) {
this.value = value;
this.scheduler = scheduler;
__super__.call(this);
}

JustObservable.prototype.subscribeCore = function (observer) {
var sink = new JustSink(observer, this);
return sink.run();
};

function JustSink(observer, parent) {
this.observer = observer;
this.parent = parent;
}

function scheduleItem(s, state) {
var value = state[0], observer = state[1];
observer.onNext(value);
observer.onCompleted();
}

JustSink.prototype.run = function () {
return this.parent.scheduler.scheduleWithState([this.parent.value, this.observer], scheduleItem);
};

return JustObservable;
}(ObservableBase));

/**
* Returns an observable sequence that contains a single element, using the specified scheduler to send out observer messages.
* There is an alias called 'just' or browsers <IE9.
Expand All @@ -2994,34 +3086,49 @@
*/
var observableReturn = Observable['return'] = Observable.just = Observable.returnValue = function (value, scheduler) {
isScheduler(scheduler) || (scheduler = immediateScheduler);
return new AnonymousObservable(function (o) {
return scheduler.scheduleWithState(value, function(_,v) {
o.onNext(v);
o.onCompleted();
});
});
return new JustObservable(value, scheduler);
};

var ThrowObservable = (function(__super__) {
inherits(ThrowObservable, __super__);
function ThrowObservable(error, scheduler) {
this.error = error;
this.scheduler = scheduler;
__super__.call(this);
}

ThrowObservable.prototype.subscribeCore = function (observer) {
var sink = new ThrowSink(observer, this);
return sink.run();
};

function ThrowSink(observer, parent) {
this.observer = observer;
this.parent = parent;
}

function scheduleItem(s, state) {
var error = state[0], observer = state[1];
observer.onError(error);
}

ThrowSink.prototype.run = function () {
return this.parent.scheduler.scheduleWithState([this.parent.error, this.observer], scheduleItem);
};

return ThrowObservable;
}(ObservableBase));

/**
* Returns an observable sequence that terminates with an exception, using the specified scheduler to send out the single onError message.
* There is an alias to this method called 'throwError' for browsers <IE9.
* @param {Mixed} error An object used for the sequence's termination.
* @param {Scheduler} scheduler Scheduler to send the exceptional termination call on. If not specified, defaults to Scheduler.immediate.
* @returns {Observable} The observable sequence that terminates exceptionally with the specified exception object.
*/
var observableThrow = Observable['throw'] = Observable.throwError = function (error, scheduler) {
var observableThrow = Observable['throw'] = Observable.throwError = Observable.throwException = function (error, scheduler) {
isScheduler(scheduler) || (scheduler = immediateScheduler);
return new AnonymousObservable(function (observer) {
return scheduler.schedule(function () {
observer.onError(error);
});
});
};

/** @deprecated use #some instead */
Observable.throwException = function () {
//deprecate('throwException', 'throwError');
return Observable.throwError.apply(null, arguments);
return new ThrowObservable(error, scheduler);
};

/**
Expand Down
2 changes: 1 addition & 1 deletion dist/rx.all.compat.map

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions dist/rx.all.compat.min.js

Large diffs are not rendered by default.

Loading

0 comments on commit fcedc06

Please sign in to comment.