@@ -9,8 +9,11 @@ import dev.openfeature.sdk.events.observe
9
9
import kotlinx.coroutines.CoroutineDispatcher
10
10
import kotlinx.coroutines.CoroutineScope
11
11
import kotlinx.coroutines.Dispatchers
12
+ import kotlinx.coroutines.ExperimentalCoroutinesApi
12
13
import kotlinx.coroutines.cancel
13
14
import kotlinx.coroutines.flow.Flow
15
+ import kotlinx.coroutines.flow.distinctUntilChanged
16
+ import kotlinx.coroutines.flow.flatMapLatest
14
17
import kotlinx.coroutines.flow.onStart
15
18
import kotlinx.coroutines.flow.take
16
19
import kotlinx.coroutines.launch
@@ -42,6 +45,17 @@ internal fun FeatureProvider.observeProviderReady() = observe<OpenFeatureEvents.
42
45
}
43
46
}
44
47
48
+ /*
49
+ Observe events from currently configured Provider.
50
+ The most recent event prior to starting the observation is collected on start.
51
+ */
52
+ @OptIn(ExperimentalCoroutinesApi ::class )
53
+ fun OpenFeatureAPI.observeEvents (): Flow <OpenFeatureEvents > {
54
+ return sharedProvidersFlow.flatMapLatest { provider ->
55
+ provider.observe().distinctUntilChanged()
56
+ }
57
+ }
58
+
45
59
internal fun FeatureProvider.observeProviderError () = observe<OpenFeatureEvents .ProviderError >()
46
60
.onStart {
47
61
val status = getProviderStatus()
@@ -50,10 +64,6 @@ internal fun FeatureProvider.observeProviderError() = observe<OpenFeatureEvents.
50
64
}
51
65
}
52
66
53
- inline fun <reified T : OpenFeatureEvents > OpenFeatureAPI.observeEvents (): Flow <T >? {
54
- return getProvider()?.observe<T >()
55
- }
56
-
57
67
suspend fun FeatureProvider.awaitReadyOrError (
58
68
dispatcher : CoroutineDispatcher = Dispatchers .IO
59
69
) = suspendCancellableCoroutine { continuation ->
0 commit comments