Skip to content

Commit f26fdea

Browse files
OlegDokukaviglucci
authored andcommitted
fixes request channel to drop second element (#244)
unexpectedly, in the rxjs `partition` operator the index is a local variable per subscription, thus the second subscription starts indexing from 0 once again even though the element is a second one (index should be 1 in that case). Therefore, to overcome this problem the PR introduces a local variable to track the first element. Signed-off-by: Kevin Viglucci <[email protected]>
1 parent 20990e6 commit f26fdea

File tree

1 file changed

+9
-1
lines changed

1 file changed

+9
-1
lines changed

packages/rsocket-adapter-rxjs/src/Requesters.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,14 +136,22 @@ export function requestChannel<TData, RData>(
136136
rsocket: RSocket,
137137
metadata?: Map<string | number | WellKnownMimeType, Buffer>
138138
) => Observable<RData> {
139+
let once = false;
139140
const [firstValueObservable, restValuesObservable] = partition(
140141
datas.pipe(
141142
share({
142143
connector: () => new Subject(),
143144
resetOnRefCountZero: true,
144145
})
145146
),
146-
(_value, index) => index === 0
147+
(_value) => {
148+
const previous = once;
149+
if (!previous) {
150+
once = true;
151+
}
152+
153+
return !previous;
154+
}
147155
);
148156

149157
return (

0 commit comments

Comments
 (0)