Skip to content

Commit 9cbb534

Browse files
committed
feat(parseobservablemarble): support flatten inner observables
1 parent d9fde83 commit 9cbb534

File tree

1 file changed

+42
-16
lines changed

1 file changed

+42
-16
lines changed

src/marbles/parseObservableMarble.ts

+42-16
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { Notification } from 'rxjs/Notification';
2+
import { ColdObservable } from 'rxjs/testing/ColdObservable';
23
import { TestMessage } from '../message/TestMessage';
34
import { TestMessageValue } from '../message/TestMessageValue';
45
import { ObservableMarbleToken } from './ObservableMarbleToken';
@@ -12,7 +13,7 @@ interface TokenParseAccumulator<T> {
1213
/**
1314
* Meta values emitted by marbles (value, error, complete)
1415
*/
15-
messages: Array<TestMessage<T>>;
16+
messages: Array<TestMessage<T | Array<TestMessage<T>>>>;
1617
/**
1718
* Flag indicate values are grouped `()` and emitted simultaneously
1819
*/
@@ -27,11 +28,29 @@ interface TokenParseAccumulator<T> {
2728
expandingValue: Array<string>;
2829
}
2930

30-
const marbleTokenParseReducer = <T>(value?: { [key: string]: T } | null, error?: any, frameTimeFactor: number = 1) => (
31-
acc: TokenParseAccumulator<T>,
32-
token: any
31+
/**
32+
* Translate single token in marble diagram to correct metadata
33+
* @param {any} token Single char in marble diagram
34+
* @param {{ [key: string]: T }} [value] Custom value for marble value
35+
* @param {boolean} [materializeInnerObservables] Flatten inner observables in cold observable. False by default.
36+
*/
37+
const getMarbleTokenValue = <T>(
38+
token: any,
39+
value?: { [key: string]: T } | null,
40+
materializeInnerObservables: boolean = false
3341
) => {
34-
let message: TestMessage<T> | null = null;
42+
const customValue = value && value[token] ? value[token] : token;
43+
44+
return materializeInnerObservables && customValue instanceof ColdObservable ? customValue.messages : customValue;
45+
};
46+
47+
const marbleTokenParseReducer = <T>(
48+
value?: { [key: string]: T } | null,
49+
error?: any,
50+
materializeInnerObservables: boolean = false,
51+
frameTimeFactor: number = 1
52+
) => (acc: TokenParseAccumulator<T>, token: any) => {
53+
let message: TestMessage<T | Array<TestMessage<T>>> | null = null;
3554

3655
switch (token) {
3756
case ObservableMarbleToken.TIMEFRAME:
@@ -80,8 +99,11 @@ const marbleTokenParseReducer = <T>(value?: { [key: string]: T } | null, error?:
8099
if (acc.expandingTokenCount > 0) {
81100
acc.expandingValue.push(token);
82101
} else {
83-
const customValue = value && value[token] ? value[token] : token;
84-
message = new TestMessageValue<T>(acc.currentTimeframe, Notification.createNext<T>(customValue));
102+
const tokenValue = getMarbleTokenValue(token, value, materializeInnerObservables);
103+
message = new TestMessageValue<T | Array<TestMessage<T>>>(
104+
acc.currentTimeframe,
105+
Notification.createNext<T | Array<TestMessage<T>>>(tokenValue)
106+
);
85107
}
86108
}
87109

@@ -94,6 +116,7 @@ const marbleTokenParseReducer = <T>(value?: { [key: string]: T } | null, error?:
94116

95117
return acc;
96118
};
119+
97120
/**
98121
* Parse marble DSL diagram, generates array of TestMessageValue for metadata of each marble values to be scheduled into.
99122
*
@@ -107,9 +130,9 @@ const parseObservableMarble = <T>(
107130
marble: string,
108131
value?: { [key: string]: T } | null,
109132
error?: any,
110-
_materializeInnerObservables: boolean = false,
133+
materializeInnerObservables: boolean = false,
111134
frameTimeFactor = 1
112-
): Array<TestMessage<T>> => {
135+
): Array<TestMessage<T | Array<TestMessage<T>>>> => {
113136
if (marble.indexOf(SubscriptionMarbleToken.UNSUBSCRIBE) !== -1) {
114137
throw new Error(`Observable marble cannot have unsubscription marker ${SubscriptionMarbleToken.UNSUBSCRIBE}`);
115138
}
@@ -118,13 +141,16 @@ const parseObservableMarble = <T>(
118141
const frameOffset = subscriptionIndex < 0 ? 0 : subscriptionIndex;
119142

120143
const marbleTokenArray = Array.from(marble).filter(token => token !== ObservableMarbleToken.NOOP).slice(frameOffset);
121-
const values = marbleTokenArray.reduce(marbleTokenParseReducer(value, error, frameTimeFactor), {
122-
currentTimeframe: frameOffset,
123-
messages: [],
124-
simultaneousGrouped: false,
125-
expandingTokenCount: 0,
126-
expandingValue: []
127-
});
144+
const values = marbleTokenArray.reduce(
145+
marbleTokenParseReducer(value, error, materializeInnerObservables, frameTimeFactor),
146+
{
147+
currentTimeframe: frameOffset,
148+
messages: [],
149+
simultaneousGrouped: false,
150+
expandingTokenCount: 0,
151+
expandingValue: []
152+
}
153+
);
128154

129155
return values.messages;
130156
};

0 commit comments

Comments
 (0)