Skip to content

Commit dc33071

Browse files
committed
feat(parsesubscriptionmarble): implement parseSubscriptionMarble
1 parent da6ff75 commit dc33071

File tree

1 file changed

+101
-0
lines changed

1 file changed

+101
-0
lines changed
+101
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
import { SubscriptionLog } from 'rxjs/testing/SubscriptionLog';
2+
import { ObservableMarbleToken } from './ObservableMarbleToken';
3+
import { SubscriptionMarbleToken } from './SubscriptionMarbleToken';
4+
5+
interface SubscriptionTokenParseAccumulator {
6+
/**
7+
* Current virtual time passed
8+
*/
9+
currentTimeFrame: number;
10+
subscriptionFrame: number;
11+
unsubscriptionFrame: number;
12+
/**
13+
* Flag indicate values are grouped `()` and emitted simultaneously
14+
*/
15+
simultaneousGrouped: boolean;
16+
/**
17+
* Flag indicate timeframe expansion `...` is in progress
18+
*/
19+
expandingTokenCount: number;
20+
/**
21+
* Tokens for expanding timeframe, will be joined & parsed into number
22+
*/
23+
expandingValue: Array<string>;
24+
}
25+
26+
const subscriptionTokenParseReducer = (frameTimeFactor: number = 1) => (
27+
acc: SubscriptionTokenParseAccumulator,
28+
token: string
29+
) => {
30+
switch (token) {
31+
case SubscriptionMarbleToken.SUBSCRIBE:
32+
acc.subscriptionFrame = acc.currentTimeFrame;
33+
break;
34+
case SubscriptionMarbleToken.UNSUBSCRIBE:
35+
acc.currentTimeFrame += 1 * frameTimeFactor;
36+
acc.unsubscriptionFrame = acc.currentTimeFrame;
37+
break;
38+
case ObservableMarbleToken.TIMEFRAME_EXPAND:
39+
acc.expandingTokenCount += 1;
40+
41+
//When token reaches ...xxx..., clean up state
42+
if (acc.expandingTokenCount === 6) {
43+
acc.expandingValue.splice(0);
44+
acc.expandingTokenCount = 0;
45+
}
46+
47+
//When first ending token arrives ...xxx. , parse values and adjust timeframe
48+
if (acc.expandingTokenCount === 4) {
49+
if (acc.expandingValue.length === 0) {
50+
throw new Error(`There isn't value to expand timeframe`);
51+
}
52+
const expandedFrame = parseInt(acc.expandingValue.join(''), 10);
53+
acc.currentTimeFrame += expandedFrame * frameTimeFactor;
54+
}
55+
break;
56+
case ObservableMarbleToken.SIMULTANEOUS_START:
57+
if (acc.simultaneousGrouped) {
58+
throw new Error('Cannot nest grouped value');
59+
}
60+
acc.simultaneousGrouped = true;
61+
break;
62+
case ObservableMarbleToken.SIMULTANEOUS_END:
63+
acc.simultaneousGrouped = false;
64+
case ObservableMarbleToken.TIMEFRAME:
65+
if (acc.expandingTokenCount > 0 || acc.simultaneousGrouped) {
66+
throw new Error('Incorret timeframe specified');
67+
}
68+
case ObservableMarbleToken.ERROR:
69+
case ObservableMarbleToken.COMPLETE:
70+
default:
71+
if (acc.expandingTokenCount > 0) {
72+
acc.expandingValue.push(token);
73+
} else if (!acc.simultaneousGrouped) {
74+
acc.currentTimeFrame += 1 * frameTimeFactor;
75+
}
76+
break;
77+
}
78+
return acc;
79+
};
80+
81+
const parseSubscriptionMarble = (marble: string | null, frameTimeFactor: number = 1) => {
82+
if (!marble) {
83+
return new SubscriptionLog(Number.POSITIVE_INFINITY);
84+
}
85+
86+
const marbleTokenArray = Array.from(marble).filter(token => token !== ObservableMarbleToken.NOOP);
87+
const value = marbleTokenArray.reduce(subscriptionTokenParseReducer(frameTimeFactor), {
88+
currentTimeFrame: 0,
89+
subscriptionFrame: Number.POSITIVE_INFINITY,
90+
unsubscriptionFrame: Number.POSITIVE_INFINITY,
91+
simultaneousGrouped: false,
92+
expandingTokenCount: 0,
93+
expandingValue: []
94+
});
95+
96+
return value.unsubscriptionFrame === Number.POSITIVE_INFINITY
97+
? new SubscriptionLog(value.subscriptionFrame)
98+
: new SubscriptionLog(value.subscriptionFrame, value.unsubscriptionFrame);
99+
};
100+
101+
export { parseSubscriptionMarble };

0 commit comments

Comments
 (0)