@@ -11,6 +11,9 @@ import { SubscriptionMarbleToken } from '../marbles/SubscriptionMarbleToken';
11
11
import { TestMessage } from '../message/TestMessage' ;
12
12
import { TestMessageValue } from '../message/TestMessageValue' ;
13
13
14
+ /**
15
+ * @internal
16
+ */
14
17
class TestScheduler extends VirtualTimeScheduler {
15
18
private readonly coldObservables : Array < ColdObservable < any > > = [ ] ;
16
19
private readonly hotObservables : Array < HotObservable < any > > = [ ] ;
@@ -31,13 +34,13 @@ class TestScheduler extends VirtualTimeScheduler {
31
34
}
32
35
33
36
public getMessages < T = string > ( observable : Observable < T > , unsubscriptionMarbles : string | null = null ) {
34
- const { unsubscribedFrame } = parseSubscriptionMarble ( unsubscriptionMarbles ) ;
37
+ const { subscribedFrame, unsubscribedFrame } = this . calculateSubscriptionFrame ( observable , unsubscriptionMarbles ) ;
38
+
35
39
const observableMetadata : Array < TestMessage < T | Array < TestMessage < T > > > > = [ ] ;
36
40
const pushMetadata = ( notification : Notification < T | Array < TestMessage < T > > > ) =>
37
41
observableMetadata . push ( new TestMessageValue < T | Array < TestMessage < T > > > ( this . frame , notification ) ) ;
38
42
39
43
let subscription : Subscription | null = null ;
40
-
41
44
this . schedule ( ( ) => {
42
45
subscription = observable . subscribe (
43
46
( value : T ) =>
@@ -49,9 +52,9 @@ class TestScheduler extends VirtualTimeScheduler {
49
52
( err : any ) => pushMetadata ( Notification . createError ( err ) ) ,
50
53
( ) => pushMetadata ( Notification . createComplete ( ) )
51
54
) ;
52
- } , 0 ) ;
55
+ } , subscribedFrame ) ;
53
56
54
- if ( unsubscribedFrame !== Number . POSITIVE_INFINITY && ! ! subscription ) {
57
+ if ( unsubscribedFrame !== Number . POSITIVE_INFINITY ) {
55
58
this . schedule ( ( ) => subscription ! . unsubscribe ( ) , unsubscribedFrame ) ;
56
59
}
57
60
@@ -116,6 +119,29 @@ class TestScheduler extends VirtualTimeScheduler {
116
119
117
120
return innerObservableMetadata ;
118
121
}
122
+
123
+ private calculateSubscriptionFrame ( observable : Observable < any > , unsubscriptionMarbles : string | null = null ) {
124
+ const { subscribedFrame, unsubscribedFrame } = parseSubscriptionMarble ( unsubscriptionMarbles ) ;
125
+
126
+ if ( subscribedFrame === Number . POSITIVE_INFINITY ) {
127
+ return { subscribedFrame : 0 , unsubscribedFrame } ;
128
+ }
129
+
130
+ //looks internal of Observable implementation to determine source is hot or cold observable.
131
+ //if source is hot, subscription / unsubscription works as specified,
132
+ //if source is cold, subscription always triggers start of observable - adjust unsubscription frame as well
133
+ let source = observable ;
134
+ while ( ! ! source ) {
135
+ if ( source instanceof HotObservable ) {
136
+ return { subscribedFrame, unsubscribedFrame } ;
137
+ } else if ( source instanceof ColdObservable ) {
138
+ return { subscribedFrame : 0 , unsubscribedFrame : unsubscribedFrame - subscribedFrame } ;
139
+ }
140
+ source = ( source as any ) . source ;
141
+ }
142
+
143
+ throw new Error ( 'Cannot detect source observable type' ) ;
144
+ }
119
145
}
120
146
121
147
export { TestScheduler } ;
0 commit comments