Skip to content

Commit 1d27849

Browse files
committed
feat(advanceto): implement advanceTo interface
- closes #36
1 parent 9d98836 commit 1d27849

File tree

5 files changed

+95
-23
lines changed

5 files changed

+95
-23
lines changed

src/index.ts

+13-6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ export type RxSandboxInstance = {
2727
* Flush out currently scheduled observables, fill values returned by `getMarbles`.
2828
*/
2929
flush: typeof TestScheduler.prototype.flush;
30+
/**
31+
* Flush out currently scheduled observables, only until reaches frame specfied.
32+
*/
33+
advanceTo: typeof TestScheduler.prototype.advanceTo;
3034
/**
3135
* Get array of observable value's metadata TestMessage<T> from observable
3236
* created via `hot` or `cold`. Returned array will be filled once scheduler flushes
@@ -53,10 +57,12 @@ export interface RxSandbox {
5357
*
5458
* @param {boolean} [autoFlush] Flush scheduler automatically when `getMarbles` is being called. False by default.
5559
* @param {number} [frameTimeFactor] Custom frametime factor for virtual time frame. 1 by default.
56-
*
60+
* @param {number} [maxFrameValue] Maximum frame value of marble diagram can be read.
61+
* If marble has value over max frame, it'll be ignored when scheduler flushes out.
62+
* 1000 * frameTimeFactory by default.
5763
* @return {RxSandboxInstance} instance of test scheduler interfaces.
5864
*/
59-
create(autoFlush?: boolean, frameTimeFactor?: number): RxSandboxInstance;
65+
create(autoFlush?: boolean, frameTimeFactor?: number, maxFrameValue?: number): RxSandboxInstance;
6066
/**
6167
* Utility assertion method to assert marble based observable test messages.
6268
* By default return values of sandbox functions are plain object works with
@@ -79,17 +85,18 @@ export interface RxSandbox {
7985
}
8086

8187
const rxSandbox: RxSandbox = {
82-
create: (autoFlush: boolean = false, frameTimeFactor: number = 1) => {
83-
const scheduler = new TestScheduler(autoFlush, frameTimeFactor);
88+
create: (autoFlush: boolean = false, frameTimeFactor: number = 1, maxFrameValue = 1000) => {
89+
const scheduler = new TestScheduler(autoFlush, frameTimeFactor, maxFrameValue);
8490

8591
return {
8692
hot: scheduler.createHotObservable.bind(scheduler) as typeof scheduler.createHotObservable,
8793
cold: scheduler.createColdObservable.bind(scheduler) as typeof scheduler.createColdObservable,
8894
flush: scheduler.flush.bind(scheduler) as typeof scheduler.flush,
95+
advanceTo: scheduler.advanceTo.bind(scheduler) as typeof scheduler.advanceTo,
8996
getMessages: scheduler.getMessages.bind(scheduler) as typeof scheduler.getMessages,
9097
e: <T = string>(marble: string, value?: { [key: string]: T } | null, error?: any) =>
91-
parseObservableMarble(marble, value, error, true, frameTimeFactor),
92-
s: (marble: string) => parseSubscriptionMarble(marble, frameTimeFactor)
98+
parseObservableMarble(marble, value, error, true, frameTimeFactor, frameTimeFactor * maxFrameValue),
99+
s: (marble: string) => parseSubscriptionMarble(marble, frameTimeFactor, frameTimeFactor * maxFrameValue)
93100
};
94101
},
95102
marbleAssert: marbleAssert

src/marbles/parseObservableMarble.ts

+3-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ const parseObservableMarble = <T = string>(
1717
value?: { [key: string]: T } | null,
1818
error?: any,
1919
materializeInnerObservables: boolean = false,
20-
frameTimeFactor = 1
20+
frameTimeFactor = 1,
21+
maxFrame = 1000
2122
): Readonly<Array<TestMessage<T | Array<TestMessage<T>>>>> => {
2223
if (marble.indexOf(SubscriptionMarbleToken.UNSUBSCRIBE) !== -1) {
2324
throw new Error(`Observable marble cannot have unsubscription marker ${SubscriptionMarbleToken.UNSUBSCRIBE}`);
@@ -29,7 +30,7 @@ const parseObservableMarble = <T = string>(
2930
const frameOffset = subscriptionIndex < 0 ? 0 : -subscriptionIndex;
3031

3132
const values = marbleTokenArray.reduce(
32-
observableTokenParseReducer(value || null, error, materializeInnerObservables, frameTimeFactor),
33+
observableTokenParseReducer(value || null, error, materializeInnerObservables, frameTimeFactor, maxFrame),
3334
{
3435
currentTimeFrame: frameOffset,
3536
messages: [],

src/marbles/parseSubscriptionMarble.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@ import { SubscriptionLog } from 'rxjs/testing/SubscriptionLog';
22
import { ObservableMarbleToken } from './ObservableMarbleToken';
33
import { subscriptionTokenParseReducer } from './tokenParseReducer';
44

5-
const parseSubscriptionMarble = (marble: string | null, frameTimeFactor: number = 1) => {
5+
const parseSubscriptionMarble = (marble: string | null, frameTimeFactor: number = 1, maxFrame = 1000) => {
66
if (!marble) {
77
return new SubscriptionLog(Number.POSITIVE_INFINITY);
88
}
99

1010
const marbleTokenArray = Array.from(marble).filter(token => token !== ObservableMarbleToken.NOOP);
11-
const value = marbleTokenArray.reduce(subscriptionTokenParseReducer(frameTimeFactor), {
11+
const value = marbleTokenArray.reduce(subscriptionTokenParseReducer(frameTimeFactor, maxFrame), {
1212
currentTimeFrame: 0,
1313
subscriptionFrame: Number.POSITIVE_INFINITY,
1414
unsubscriptionFrame: Number.POSITIVE_INFINITY,

src/marbles/tokenParseReducer.ts

+11-2
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,13 @@ const observableTokenParseReducer = <T>(
115115
value: { [key: string]: T } | null,
116116
error: any,
117117
materializeInnerObservables: boolean,
118-
frameTimeFactor: number
118+
frameTimeFactor: number,
119+
maxFrame: number
119120
) => (acc: ObservableTokenParseAccumulator<T>, token: any) => {
121+
if (acc.currentTimeFrame >= maxFrame) {
122+
return acc;
123+
}
124+
120125
let message: TestMessage<T | Array<TestMessage<T>>> | null = null;
121126

122127
switch (token) {
@@ -168,10 +173,14 @@ const observableTokenParseReducer = <T>(
168173
* Reducer to traverse subscription marble diagram to generate SubscriptionLog metadata.
169174
* @param frameTimeFactor Custom timeframe factor
170175
*/
171-
const subscriptionTokenParseReducer = (frameTimeFactor: number) => (
176+
const subscriptionTokenParseReducer = (frameTimeFactor: number, maxFrame: number) => (
172177
acc: SubscriptionTokenParseAccumulator,
173178
token: string
174179
) => {
180+
if (acc.currentTimeFrame >= maxFrame) {
181+
return acc;
182+
}
183+
175184
switch (token) {
176185
case SubscriptionMarbleToken.SUBSCRIBE:
177186
acc.subscriptionFrame = acc.currentTimeFrame;

src/scheduler/TestScheduler.ts

+66-11
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import { Notification } from 'rxjs/Notification';
22
import { Observable } from 'rxjs/Observable';
3-
import { VirtualTimeScheduler } from 'rxjs/scheduler/VirtualTimeScheduler';
3+
import { AsyncAction } from 'rxjs/scheduler/AsyncAction';
44
import { VirtualAction } from 'rxjs/scheduler/VirtualTimeScheduler';
5+
import { VirtualTimeScheduler } from 'rxjs/scheduler/VirtualTimeScheduler';
56
import { Subscription } from 'rxjs/Subscription';
67
import { ColdObservable } from 'rxjs/testing/ColdObservable';
78
import { HotObservable } from 'rxjs/testing/HotObservable';
@@ -18,19 +19,20 @@ class TestScheduler extends VirtualTimeScheduler {
1819
private readonly coldObservables: Array<ColdObservable<any>> = [];
1920
private readonly hotObservables: Array<HotObservable<any>> = [];
2021
private flushed: boolean = false;
22+
private flushing: boolean = false;
23+
24+
private readonly _maxFrame: number;
25+
public get maxFrame(): number {
26+
return this._maxFrame;
27+
}
2128

22-
constructor(private readonly autoFlush = false, private readonly frameTimeFactor = 1) {
29+
constructor(private readonly autoFlush: boolean, private readonly frameTimeFactor: number, maxFrameValue: number) {
2330
super(VirtualAction, Number.POSITIVE_INFINITY);
31+
this._maxFrame = maxFrameValue * frameTimeFactor;
2432
}
2533

2634
public flush(): void {
27-
const hotObservables = this.hotObservables;
28-
while (hotObservables.length > 0) {
29-
hotObservables.shift()!.setup();
30-
}
31-
32-
super.flush();
33-
this.flushed = true;
35+
this.flushUntil();
3436
}
3537

3638
public getMessages<T = string>(observable: Observable<T>, unsubscriptionMarbles: string | null = null) {
@@ -87,7 +89,7 @@ class TestScheduler extends VirtualTimeScheduler {
8789

8890
const messages = Array.isArray(marbleValue)
8991
? marbleValue
90-
: parseObservableMarble(marbleValue, value, error, false, this.frameTimeFactor) as any;
92+
: parseObservableMarble(marbleValue, value, error, false, this.frameTimeFactor, this._maxFrame) as any;
9193
const observable = new ColdObservable<T>(messages as Array<TestMessage<T | Array<TestMessage<T>>>>, this);
9294
this.coldObservables.push(observable);
9395
return observable;
@@ -104,12 +106,26 @@ class TestScheduler extends VirtualTimeScheduler {
104106

105107
const messages = Array.isArray(marbleValue)
106108
? marbleValue
107-
: parseObservableMarble(marbleValue, value, error, false, this.frameTimeFactor) as any;
109+
: parseObservableMarble(marbleValue, value, error, false, this.frameTimeFactor, this._maxFrame) as any;
108110
const subject = new HotObservable<T>(messages as Array<TestMessage<T | Array<TestMessage<T>>>>, this);
109111
this.hotObservables.push(subject);
112+
subject.setup();
110113
return subject;
111114
}
112115

116+
public advanceTo(toFrame: number): void {
117+
if (this.autoFlush) {
118+
throw new Error('Cannot advance frame manually with autoflushing scheduler');
119+
}
120+
121+
if (toFrame < 0 || toFrame < this.frame) {
122+
throw new Error(`Cannot advance frame, given frame is either negative or smaller than current frame`);
123+
}
124+
125+
this.flushUntil(toFrame);
126+
this.frame = toFrame;
127+
}
128+
113129
private materializeInnerObservable<T>(observable: Observable<any>, outerFrame: number): Array<TestMessage<T>> {
114130
const innerObservableMetadata: Array<TestMessage<T>> = [];
115131
const pushMetaData = (notification: Notification<T>) =>
@@ -123,6 +139,45 @@ class TestScheduler extends VirtualTimeScheduler {
123139

124140
return innerObservableMetadata;
125141
}
142+
143+
private peek(): AsyncAction<any> | null {
144+
const { actions } = this;
145+
return actions && actions.length > 0 ? actions[0] : null;
146+
}
147+
148+
private flushUntil(toFrame: number = this.maxFrame): void {
149+
if (this.flushing) {
150+
return;
151+
}
152+
153+
this.flushing = true;
154+
155+
const { actions } = this;
156+
let error: any;
157+
let action: AsyncAction<any> | null | undefined = null;
158+
159+
while (this.flushing && (action = this.peek()) && action.delay <= toFrame) {
160+
const action: AsyncAction<any> = actions.shift()!;
161+
this.frame = action.delay;
162+
163+
if ((error = action.execute(action.state, action.delay))) {
164+
break;
165+
}
166+
}
167+
168+
this.flushing = false;
169+
170+
if (toFrame >= this.maxFrame) {
171+
this.flushed = true;
172+
}
173+
174+
if (error) {
175+
while ((action = actions.shift())) {
176+
action.unsubscribe();
177+
}
178+
throw error;
179+
}
180+
}
126181
}
127182

128183
export { TestScheduler };

0 commit comments

Comments
 (0)