|
| 1 | +import { Notification } from 'rxjs/Notification'; |
1 | 2 | import { TestMessage } from '../message/TestMessage';
|
| 3 | +import { TestMessageValue } from '../message/TestMessageValue'; |
| 4 | +import { ObservableMarbleToken } from './ObservableMarbleToken'; |
2 | 5 | import { SubscriptionMarbleToken } from './SubscriptionMarbleToken';
|
3 | 6 |
|
4 | 7 | const parseObservableMarble = <T>(
|
5 | 8 | marble: string,
|
6 |
| - _value?: { [key: string]: T }, |
7 |
| - _error?: any, |
8 |
| - _materializeInnerObservables: boolean = false |
| 9 | + value?: { [key: string]: T } | null, |
| 10 | + error?: any, |
| 11 | + _materializeInnerObservables: boolean = false, |
| 12 | + frameTimeFactor = 1 |
9 | 13 | ): Array<TestMessage<T>> => {
|
10 | 14 | if (marble.indexOf(SubscriptionMarbleToken.UNSUBSCRIBE) !== -1) {
|
11 | 15 | throw new Error(`Observable marble cannot have unsubscription marker ${SubscriptionMarbleToken.UNSUBSCRIBE}`);
|
12 | 16 | }
|
13 | 17 |
|
14 |
| - const messages = Array.from(marble).reduce((acc: Array<any>, _value: string) => { |
| 18 | + const subscriptionIndex = marble.indexOf(SubscriptionMarbleToken.SUBSCRIBE) * frameTimeFactor; |
| 19 | + const frameOffset = subscriptionIndex < 0 ? 0 : subscriptionIndex; |
| 20 | + |
| 21 | + const values = Array.from(marble).filter(token => token !== ObservableMarbleToken.NOOP).slice(frameOffset).reduce(( |
| 22 | + acc: { currentOffset: number; messages: Array<TestMessage<T>> }, |
| 23 | + token: any |
| 24 | + ) => { |
| 25 | + let message: TestMessage<T> | null = null; |
| 26 | + |
| 27 | + switch (token) { |
| 28 | + case ObservableMarbleToken.TIMEFRAME: |
| 29 | + acc.currentOffset += 1 * frameTimeFactor; |
| 30 | + break; |
| 31 | + case ObservableMarbleToken.ERROR: |
| 32 | + message = new TestMessageValue<T>(acc.currentOffset, Notification.createError(error || '#')); |
| 33 | + break; |
| 34 | + case ObservableMarbleToken.COMPLETE: |
| 35 | + message = new TestMessageValue<T>(acc.currentOffset, Notification.createComplete()); |
| 36 | + break; |
| 37 | + case ObservableMarbleToken.TIMEFRAME_EXPAND: |
| 38 | + break; |
| 39 | + case ObservableMarbleToken.SIMULTANEOUS_START: |
| 40 | + acc.currentOffset += 1 * frameTimeFactor; |
| 41 | + break; |
| 42 | + case ObservableMarbleToken.SIMULTANEOUS_END: |
| 43 | + break; |
| 44 | + case SubscriptionMarbleToken.SUBSCRIBE: |
| 45 | + break; |
| 46 | + default: |
| 47 | + const customValue = value && value[token] ? value[token] : token; |
| 48 | + message = new TestMessageValue<T>(acc.currentOffset, Notification.createNext<T>(customValue)); |
| 49 | + } |
| 50 | + |
| 51 | + if (!!message) { |
| 52 | + acc.messages.push(message); |
| 53 | + acc.currentOffset += 1 * frameTimeFactor; |
| 54 | + } |
| 55 | + |
15 | 56 | return acc;
|
16 |
| - }, []); |
| 57 | + }, { |
| 58 | + currentOffset: frameOffset, |
| 59 | + messages: [] |
| 60 | + }); |
17 | 61 |
|
18 |
| - return messages; |
| 62 | + return values.messages; |
19 | 63 | };
|
20 | 64 |
|
21 | 65 | export { parseObservableMarble };
|
0 commit comments