1
+ import { Notification } from 'rxjs/Notification' ;
1
2
import { Observable } from 'rxjs/Observable' ;
2
- import { VirtualAction } from 'rxjs/scheduler/VirtualTimeScheduler' ;
3
3
import { VirtualTimeScheduler } from 'rxjs/scheduler/VirtualTimeScheduler' ;
4
+ import { VirtualAction } from 'rxjs/scheduler/VirtualTimeScheduler' ;
5
+ import { Subscription } from 'rxjs/Subscription' ;
4
6
import { ColdObservable } from 'rxjs/testing/ColdObservable' ;
5
7
import { HotObservable } from 'rxjs/testing/HotObservable' ;
6
8
import { parseObservableMarble } from '../marbles/parseObservableMarble' ;
9
+ import { parseSubscriptionMarble } from '../marbles/parseSubscriptionMarble' ;
7
10
import { SubscriptionMarbleToken } from '../marbles/SubscriptionMarbleToken' ;
8
11
import { TestMessage } from '../message/TestMessage' ;
12
+ import { TestMessageValue } from '../message/TestMessageValue' ;
9
13
10
14
class TestScheduler extends VirtualTimeScheduler {
11
15
private readonly coldObservables : Array < ColdObservable < any > > = [ ] ;
@@ -15,10 +19,50 @@ class TestScheduler extends VirtualTimeScheduler {
15
19
super ( VirtualAction , Number . POSITIVE_INFINITY ) ;
16
20
}
17
21
18
- public getMarbles ( ) : void {
22
+ private materializeInnerObservable < T > ( observable : Observable < any > , outerFrame : number ) : Array < TestMessage < T > > {
23
+ const innerObservableMetadata : Array < TestMessage < T > > = [ ] ;
24
+ const pushMetaData = ( notification : Notification < T > ) =>
25
+ innerObservableMetadata . push ( new TestMessageValue < T > ( this . frame - outerFrame , notification ) ) ;
26
+
27
+ observable . subscribe (
28
+ value => pushMetaData ( Notification . createNext ( value ) ) ,
29
+ err => pushMetaData ( Notification . createError ( err ) ) ,
30
+ ( ) => pushMetaData ( Notification . createComplete ( ) )
31
+ ) ;
32
+
33
+ return innerObservableMetadata ;
34
+ }
35
+
36
+ public getMarbles < T = string > ( observable : Observable < T > , unsubscriptionMarbles : string | null = null ) {
19
37
if ( this . autoFlush ) {
20
38
throw new Error ( 'not implemented' ) ;
21
39
}
40
+
41
+ const { unsubscribedFrame } = parseSubscriptionMarble ( unsubscriptionMarbles ) ;
42
+ const observableMetadata : Array < TestMessage < T | Array < TestMessage < T > > > > = [ ] ;
43
+ const pushMetadata = ( notification : Notification < T | Array < TestMessage < T > > > ) =>
44
+ observableMetadata . push ( new TestMessageValue < T | Array < TestMessage < T > > > ( this . frame , notification ) ) ;
45
+
46
+ let subscription : Subscription | null = null ;
47
+
48
+ this . schedule ( ( ) => {
49
+ subscription = observable . subscribe (
50
+ ( value : T ) =>
51
+ pushMetadata (
52
+ Notification . createNext (
53
+ value instanceof Observable ? this . materializeInnerObservable < T > ( value , this . frame ) : value
54
+ )
55
+ ) ,
56
+ ( err : any ) => pushMetadata ( Notification . createError ( err ) ) ,
57
+ ( ) => pushMetadata ( Notification . createComplete ( ) )
58
+ ) ;
59
+ } , 0 ) ;
60
+
61
+ if ( unsubscribedFrame !== Number . POSITIVE_INFINITY && ! ! subscription ) {
62
+ this . schedule ( ( ) => subscription ! . unsubscribe ( ) , unsubscribedFrame ) ;
63
+ }
64
+
65
+ return observableMetadata ;
22
66
}
23
67
24
68
public advanceBy ( _frameTime : number ) : void {
0 commit comments