Cancel concurrency HTTP requests after unsubscription
up vote
0
down vote
favorite
I have to following problem:
Many API calls go through an API interface (Google API) and have to be limited in request per seconds/concurrency because of the Google API limitation.
I use a subject (sink/call pool), which manages all API requests with mergeMap and returns a result to another, piped subject.
Because API requests can unsubscribe before they finish, they shouldn't block my sink. So I have to stop the API request (task) after unsubscription.
The issue:
I don't know how to capture this unsubscribed state correctly. What I currently do is overwriting subscribe and unsubscribe to catch this state. It works but it does not look to "rxjs"ish for me.
What could I improve it?
import {Observable, Subject, Subscription, Subscribable, EMPTY} from 'rxjs';
import {mergeMap, tap} from 'rxjs/operators';
function doHeavyRequest() {
return new Observable(subscribe => {
// Simulate delay.
setTimeout(() => {
subscribe.next(1);
subscribe.complete();
}, 1000);
});
}
const sink = new Subject<[Subject<any>, number]>();
sink.pipe(
mergeMap(([subject, id]) => {
// Stop request here if already unsubscribed.
if (subject.closed) {
console.log('Request cancelled:', id);
return EMPTY;
}
return doHeavyRequest()
.pipe(
tap(res => {
if (!subject.closed) {
subject.next(res);
subject.complete();
} else {
console.log('Request aborted:', id);
}
})
);
}, 2)
).subscribe();
// Insert request into sink.
// Overwrite subscribe and unsubscribe.
// Track unsubscribe over the flag alive.
function getSomething(id: number) {
const task = new Subject();
const ob = task.asObservable();
ob.subscribe = (...args: any) => {
const sub = Observable.prototype.subscribe.call(ob, ...args);
sub.unsubscribe = () => {
if (!task.isStopped)
task.unsubscribe();
Subscription.prototype.unsubscribe.call(sub);
};
return sub;
};
sink.next([task, id]);
return ob;
}
// Make 3 requests and unsubscribe.
export function test() {
const ob0 = getSomething(0);
const ob1 = getSomething(1);
const ob2 = getSomething(2);
const sub0 = ob0.subscribe(e => {
console.log('0:', e);
});
setTimeout(() => sub0.unsubscribe(), 1500);
const sub1 = ob1.subscribe(e => {
console.log('1:', e);
});
setTimeout(() => sub1.unsubscribe(), 900);
const sub2 = ob2.subscribe(e => {
console.log('2:', e);
});
setTimeout(() => sub2.unsubscribe(), 100);
}
See the test.ts at plunker and the console output:
https://next.plnkr.co/edit/KREjMprTrjHu2zMI?preview
angular typescript rxjs google-api-client rxjs6
add a comment |
up vote
0
down vote
favorite
I have to following problem:
Many API calls go through an API interface (Google API) and have to be limited in request per seconds/concurrency because of the Google API limitation.
I use a subject (sink/call pool), which manages all API requests with mergeMap and returns a result to another, piped subject.
Because API requests can unsubscribe before they finish, they shouldn't block my sink. So I have to stop the API request (task) after unsubscription.
The issue:
I don't know how to capture this unsubscribed state correctly. What I currently do is overwriting subscribe and unsubscribe to catch this state. It works but it does not look to "rxjs"ish for me.
What could I improve it?
import {Observable, Subject, Subscription, Subscribable, EMPTY} from 'rxjs';
import {mergeMap, tap} from 'rxjs/operators';
function doHeavyRequest() {
return new Observable(subscribe => {
// Simulate delay.
setTimeout(() => {
subscribe.next(1);
subscribe.complete();
}, 1000);
});
}
const sink = new Subject<[Subject<any>, number]>();
sink.pipe(
mergeMap(([subject, id]) => {
// Stop request here if already unsubscribed.
if (subject.closed) {
console.log('Request cancelled:', id);
return EMPTY;
}
return doHeavyRequest()
.pipe(
tap(res => {
if (!subject.closed) {
subject.next(res);
subject.complete();
} else {
console.log('Request aborted:', id);
}
})
);
}, 2)
).subscribe();
// Insert request into sink.
// Overwrite subscribe and unsubscribe.
// Track unsubscribe over the flag alive.
function getSomething(id: number) {
const task = new Subject();
const ob = task.asObservable();
ob.subscribe = (...args: any) => {
const sub = Observable.prototype.subscribe.call(ob, ...args);
sub.unsubscribe = () => {
if (!task.isStopped)
task.unsubscribe();
Subscription.prototype.unsubscribe.call(sub);
};
return sub;
};
sink.next([task, id]);
return ob;
}
// Make 3 requests and unsubscribe.
export function test() {
const ob0 = getSomething(0);
const ob1 = getSomething(1);
const ob2 = getSomething(2);
const sub0 = ob0.subscribe(e => {
console.log('0:', e);
});
setTimeout(() => sub0.unsubscribe(), 1500);
const sub1 = ob1.subscribe(e => {
console.log('1:', e);
});
setTimeout(() => sub1.unsubscribe(), 900);
const sub2 = ob2.subscribe(e => {
console.log('2:', e);
});
setTimeout(() => sub2.unsubscribe(), 100);
}
See the test.ts at plunker and the console output:
https://next.plnkr.co/edit/KREjMprTrjHu2zMI?preview
angular typescript rxjs google-api-client rxjs6
Are you sure that unsubscribing the API call is going to help you in this case? Even if you unsubscribe your call will still be "on flight", Google will not be notified that you are not interested any more in those results.
– Picci
Nov 9 at 6:52
It does help. All outstanding Google API calls will not be processed (inside the sink) if the observable was unsubscribed before.
– Viatorus
Nov 9 at 7:03
OK, I understand
– Picci
Nov 9 at 7:32
Maybe this SO thread can give you some inspiration
– Picci
Nov 9 at 7:53
add a comment |
up vote
0
down vote
favorite
up vote
0
down vote
favorite
I have to following problem:
Many API calls go through an API interface (Google API) and have to be limited in request per seconds/concurrency because of the Google API limitation.
I use a subject (sink/call pool), which manages all API requests with mergeMap and returns a result to another, piped subject.
Because API requests can unsubscribe before they finish, they shouldn't block my sink. So I have to stop the API request (task) after unsubscription.
The issue:
I don't know how to capture this unsubscribed state correctly. What I currently do is overwriting subscribe and unsubscribe to catch this state. It works but it does not look to "rxjs"ish for me.
What could I improve it?
import {Observable, Subject, Subscription, Subscribable, EMPTY} from 'rxjs';
import {mergeMap, tap} from 'rxjs/operators';
function doHeavyRequest() {
return new Observable(subscribe => {
// Simulate delay.
setTimeout(() => {
subscribe.next(1);
subscribe.complete();
}, 1000);
});
}
const sink = new Subject<[Subject<any>, number]>();
sink.pipe(
mergeMap(([subject, id]) => {
// Stop request here if already unsubscribed.
if (subject.closed) {
console.log('Request cancelled:', id);
return EMPTY;
}
return doHeavyRequest()
.pipe(
tap(res => {
if (!subject.closed) {
subject.next(res);
subject.complete();
} else {
console.log('Request aborted:', id);
}
})
);
}, 2)
).subscribe();
// Insert request into sink.
// Overwrite subscribe and unsubscribe.
// Track unsubscribe over the flag alive.
function getSomething(id: number) {
const task = new Subject();
const ob = task.asObservable();
ob.subscribe = (...args: any) => {
const sub = Observable.prototype.subscribe.call(ob, ...args);
sub.unsubscribe = () => {
if (!task.isStopped)
task.unsubscribe();
Subscription.prototype.unsubscribe.call(sub);
};
return sub;
};
sink.next([task, id]);
return ob;
}
// Make 3 requests and unsubscribe.
export function test() {
const ob0 = getSomething(0);
const ob1 = getSomething(1);
const ob2 = getSomething(2);
const sub0 = ob0.subscribe(e => {
console.log('0:', e);
});
setTimeout(() => sub0.unsubscribe(), 1500);
const sub1 = ob1.subscribe(e => {
console.log('1:', e);
});
setTimeout(() => sub1.unsubscribe(), 900);
const sub2 = ob2.subscribe(e => {
console.log('2:', e);
});
setTimeout(() => sub2.unsubscribe(), 100);
}
See the test.ts at plunker and the console output:
https://next.plnkr.co/edit/KREjMprTrjHu2zMI?preview
angular typescript rxjs google-api-client rxjs6
I have to following problem:
Many API calls go through an API interface (Google API) and have to be limited in request per seconds/concurrency because of the Google API limitation.
I use a subject (sink/call pool), which manages all API requests with mergeMap and returns a result to another, piped subject.
Because API requests can unsubscribe before they finish, they shouldn't block my sink. So I have to stop the API request (task) after unsubscription.
The issue:
I don't know how to capture this unsubscribed state correctly. What I currently do is overwriting subscribe and unsubscribe to catch this state. It works but it does not look to "rxjs"ish for me.
What could I improve it?
import {Observable, Subject, Subscription, Subscribable, EMPTY} from 'rxjs';
import {mergeMap, tap} from 'rxjs/operators';
function doHeavyRequest() {
return new Observable(subscribe => {
// Simulate delay.
setTimeout(() => {
subscribe.next(1);
subscribe.complete();
}, 1000);
});
}
const sink = new Subject<[Subject<any>, number]>();
sink.pipe(
mergeMap(([subject, id]) => {
// Stop request here if already unsubscribed.
if (subject.closed) {
console.log('Request cancelled:', id);
return EMPTY;
}
return doHeavyRequest()
.pipe(
tap(res => {
if (!subject.closed) {
subject.next(res);
subject.complete();
} else {
console.log('Request aborted:', id);
}
})
);
}, 2)
).subscribe();
// Insert request into sink.
// Overwrite subscribe and unsubscribe.
// Track unsubscribe over the flag alive.
function getSomething(id: number) {
const task = new Subject();
const ob = task.asObservable();
ob.subscribe = (...args: any) => {
const sub = Observable.prototype.subscribe.call(ob, ...args);
sub.unsubscribe = () => {
if (!task.isStopped)
task.unsubscribe();
Subscription.prototype.unsubscribe.call(sub);
};
return sub;
};
sink.next([task, id]);
return ob;
}
// Make 3 requests and unsubscribe.
export function test() {
const ob0 = getSomething(0);
const ob1 = getSomething(1);
const ob2 = getSomething(2);
const sub0 = ob0.subscribe(e => {
console.log('0:', e);
});
setTimeout(() => sub0.unsubscribe(), 1500);
const sub1 = ob1.subscribe(e => {
console.log('1:', e);
});
setTimeout(() => sub1.unsubscribe(), 900);
const sub2 = ob2.subscribe(e => {
console.log('2:', e);
});
setTimeout(() => sub2.unsubscribe(), 100);
}
See the test.ts at plunker and the console output:
https://next.plnkr.co/edit/KREjMprTrjHu2zMI?preview
angular typescript rxjs google-api-client rxjs6
angular typescript rxjs google-api-client rxjs6
edited Nov 9 at 7:26
asked Nov 8 at 23:25
Viatorus
878827
878827
Are you sure that unsubscribing the API call is going to help you in this case? Even if you unsubscribe your call will still be "on flight", Google will not be notified that you are not interested any more in those results.
– Picci
Nov 9 at 6:52
It does help. All outstanding Google API calls will not be processed (inside the sink) if the observable was unsubscribed before.
– Viatorus
Nov 9 at 7:03
OK, I understand
– Picci
Nov 9 at 7:32
Maybe this SO thread can give you some inspiration
– Picci
Nov 9 at 7:53
add a comment |
Are you sure that unsubscribing the API call is going to help you in this case? Even if you unsubscribe your call will still be "on flight", Google will not be notified that you are not interested any more in those results.
– Picci
Nov 9 at 6:52
It does help. All outstanding Google API calls will not be processed (inside the sink) if the observable was unsubscribed before.
– Viatorus
Nov 9 at 7:03
OK, I understand
– Picci
Nov 9 at 7:32
Maybe this SO thread can give you some inspiration
– Picci
Nov 9 at 7:53
Are you sure that unsubscribing the API call is going to help you in this case? Even if you unsubscribe your call will still be "on flight", Google will not be notified that you are not interested any more in those results.
– Picci
Nov 9 at 6:52
Are you sure that unsubscribing the API call is going to help you in this case? Even if you unsubscribe your call will still be "on flight", Google will not be notified that you are not interested any more in those results.
– Picci
Nov 9 at 6:52
It does help. All outstanding Google API calls will not be processed (inside the sink) if the observable was unsubscribed before.
– Viatorus
Nov 9 at 7:03
It does help. All outstanding Google API calls will not be processed (inside the sink) if the observable was unsubscribed before.
– Viatorus
Nov 9 at 7:03
OK, I understand
– Picci
Nov 9 at 7:32
OK, I understand
– Picci
Nov 9 at 7:32
Maybe this SO thread can give you some inspiration
– Picci
Nov 9 at 7:53
Maybe this SO thread can give you some inspiration
– Picci
Nov 9 at 7:53
add a comment |
2 Answers
2
active
oldest
votes
up vote
1
down vote
accepted
I'm not sure I understood it properly, but it looks like you want to do some cleanup upon unsubscribing, correct?
You can add teardown logic to a single subscription like so:
const subscription = obs.subscribe(() => {...})
subscription.add(() => { /* do cleanup here. This is executed upon unsubscribing. */})
Perhaps the finalize
pipeable operator might be useful as well. This one adds logic to an observable when it completes, which most of the time is upon complete OR unsubscription. Varies a bit for hot observables, so be aware.
When creating an observable, you can also add teardown logic into it by returning a function from its inner logic function, much like pipe'ing a finalize
:
const obs = new Observable(subject => { /* subject.next/error/complete somewhere */
return () => { /* cleanup resources upon unsubscribe OR complete */ }
})
Yes, something like a cleanup. I want to prevent an API call which stucks in the pipeline of the sink (because of limited concurrency) if it is already unsubscribed.
– Viatorus
Nov 9 at 7:05
Your first example (subscription.add) does not work for me, because "I don't know" where the observable get subscribed. The second example looks better for me but a subject (task) doesn't have a finalize method?
– Viatorus
Nov 9 at 7:15
1
Since Subjects are observables, you can pipe the finalize operator on them:const subWithFinalize = subject.pipe(finalize(cleanupFunction))
.
– Badashi
Nov 9 at 9:41
add a comment |
up vote
0
down vote
Thanks to @Badashi, using finalize worked and looks much better:
import {Observable, Subject, Subscription, Subscribable, EMPTY} from 'rxjs';
import {mergeMap, tap, finalize} from 'rxjs/operators';
function doHeavyRequest() {
return new Observable(subscribe => {
// Simulate delay.
setTimeout(() => {
subscribe.next(1);
subscribe.complete();
}, 1000);
});
}
const sink = new Subject<[Subject<any>, number]>();
sink.pipe(
mergeMap(([subject, id]) => {
// Stop request here if already unsubscribed.
if (subject.closed) {
console.log('Request cancelled:', id);
return EMPTY;
}
return doHeavyRequest()
.pipe(
tap(res => {
if (!subject.closed) {
subject.next(res);
subject.complete();
} else {
console.log('Request aborted:', id);
}
})
);
}, 2)
).subscribe();
// Insert request into sink.
// Overwrite subscribe and unsubscribe.
// Track unsubscribe.
function getSomething(id: number) {
const task = new Subject();
const ob = task.pipe(finalize(() => {
if (!task.isStopped) {
task.unsubscribe();
}
}));
sink.next([task, id]);
return ob;
}
// Make 3 requests and unsubscribe.
export function test() {
const ob0 = getSomething(0);
const ob1 = getSomething(1);
const ob2 = getSomething(2);
const sub0 = ob0.subscribe(e => {
console.log('0:', e);
});
setTimeout(() => sub0.unsubscribe(), 1500);
const sub1 = ob1.subscribe(e => {
console.log('1:', e);
});
setTimeout(() => sub1.unsubscribe(), 900);
const sub2 = ob2.subscribe(e => {
console.log('2:', e);
});
setTimeout(() => sub2.unsubscribe(), 100);
}
Output:
0: 1
Request cancelled: 2
Request aborted: 1
add a comment |
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
1
down vote
accepted
I'm not sure I understood it properly, but it looks like you want to do some cleanup upon unsubscribing, correct?
You can add teardown logic to a single subscription like so:
const subscription = obs.subscribe(() => {...})
subscription.add(() => { /* do cleanup here. This is executed upon unsubscribing. */})
Perhaps the finalize
pipeable operator might be useful as well. This one adds logic to an observable when it completes, which most of the time is upon complete OR unsubscription. Varies a bit for hot observables, so be aware.
When creating an observable, you can also add teardown logic into it by returning a function from its inner logic function, much like pipe'ing a finalize
:
const obs = new Observable(subject => { /* subject.next/error/complete somewhere */
return () => { /* cleanup resources upon unsubscribe OR complete */ }
})
Yes, something like a cleanup. I want to prevent an API call which stucks in the pipeline of the sink (because of limited concurrency) if it is already unsubscribed.
– Viatorus
Nov 9 at 7:05
Your first example (subscription.add) does not work for me, because "I don't know" where the observable get subscribed. The second example looks better for me but a subject (task) doesn't have a finalize method?
– Viatorus
Nov 9 at 7:15
1
Since Subjects are observables, you can pipe the finalize operator on them:const subWithFinalize = subject.pipe(finalize(cleanupFunction))
.
– Badashi
Nov 9 at 9:41
add a comment |
up vote
1
down vote
accepted
I'm not sure I understood it properly, but it looks like you want to do some cleanup upon unsubscribing, correct?
You can add teardown logic to a single subscription like so:
const subscription = obs.subscribe(() => {...})
subscription.add(() => { /* do cleanup here. This is executed upon unsubscribing. */})
Perhaps the finalize
pipeable operator might be useful as well. This one adds logic to an observable when it completes, which most of the time is upon complete OR unsubscription. Varies a bit for hot observables, so be aware.
When creating an observable, you can also add teardown logic into it by returning a function from its inner logic function, much like pipe'ing a finalize
:
const obs = new Observable(subject => { /* subject.next/error/complete somewhere */
return () => { /* cleanup resources upon unsubscribe OR complete */ }
})
Yes, something like a cleanup. I want to prevent an API call which stucks in the pipeline of the sink (because of limited concurrency) if it is already unsubscribed.
– Viatorus
Nov 9 at 7:05
Your first example (subscription.add) does not work for me, because "I don't know" where the observable get subscribed. The second example looks better for me but a subject (task) doesn't have a finalize method?
– Viatorus
Nov 9 at 7:15
1
Since Subjects are observables, you can pipe the finalize operator on them:const subWithFinalize = subject.pipe(finalize(cleanupFunction))
.
– Badashi
Nov 9 at 9:41
add a comment |
up vote
1
down vote
accepted
up vote
1
down vote
accepted
I'm not sure I understood it properly, but it looks like you want to do some cleanup upon unsubscribing, correct?
You can add teardown logic to a single subscription like so:
const subscription = obs.subscribe(() => {...})
subscription.add(() => { /* do cleanup here. This is executed upon unsubscribing. */})
Perhaps the finalize
pipeable operator might be useful as well. This one adds logic to an observable when it completes, which most of the time is upon complete OR unsubscription. Varies a bit for hot observables, so be aware.
When creating an observable, you can also add teardown logic into it by returning a function from its inner logic function, much like pipe'ing a finalize
:
const obs = new Observable(subject => { /* subject.next/error/complete somewhere */
return () => { /* cleanup resources upon unsubscribe OR complete */ }
})
I'm not sure I understood it properly, but it looks like you want to do some cleanup upon unsubscribing, correct?
You can add teardown logic to a single subscription like so:
const subscription = obs.subscribe(() => {...})
subscription.add(() => { /* do cleanup here. This is executed upon unsubscribing. */})
Perhaps the finalize
pipeable operator might be useful as well. This one adds logic to an observable when it completes, which most of the time is upon complete OR unsubscription. Varies a bit for hot observables, so be aware.
When creating an observable, you can also add teardown logic into it by returning a function from its inner logic function, much like pipe'ing a finalize
:
const obs = new Observable(subject => { /* subject.next/error/complete somewhere */
return () => { /* cleanup resources upon unsubscribe OR complete */ }
})
edited Nov 9 at 1:29
answered Nov 9 at 1:22
Badashi
33719
33719
Yes, something like a cleanup. I want to prevent an API call which stucks in the pipeline of the sink (because of limited concurrency) if it is already unsubscribed.
– Viatorus
Nov 9 at 7:05
Your first example (subscription.add) does not work for me, because "I don't know" where the observable get subscribed. The second example looks better for me but a subject (task) doesn't have a finalize method?
– Viatorus
Nov 9 at 7:15
1
Since Subjects are observables, you can pipe the finalize operator on them:const subWithFinalize = subject.pipe(finalize(cleanupFunction))
.
– Badashi
Nov 9 at 9:41
add a comment |
Yes, something like a cleanup. I want to prevent an API call which stucks in the pipeline of the sink (because of limited concurrency) if it is already unsubscribed.
– Viatorus
Nov 9 at 7:05
Your first example (subscription.add) does not work for me, because "I don't know" where the observable get subscribed. The second example looks better for me but a subject (task) doesn't have a finalize method?
– Viatorus
Nov 9 at 7:15
1
Since Subjects are observables, you can pipe the finalize operator on them:const subWithFinalize = subject.pipe(finalize(cleanupFunction))
.
– Badashi
Nov 9 at 9:41
Yes, something like a cleanup. I want to prevent an API call which stucks in the pipeline of the sink (because of limited concurrency) if it is already unsubscribed.
– Viatorus
Nov 9 at 7:05
Yes, something like a cleanup. I want to prevent an API call which stucks in the pipeline of the sink (because of limited concurrency) if it is already unsubscribed.
– Viatorus
Nov 9 at 7:05
Your first example (subscription.add) does not work for me, because "I don't know" where the observable get subscribed. The second example looks better for me but a subject (task) doesn't have a finalize method?
– Viatorus
Nov 9 at 7:15
Your first example (subscription.add) does not work for me, because "I don't know" where the observable get subscribed. The second example looks better for me but a subject (task) doesn't have a finalize method?
– Viatorus
Nov 9 at 7:15
1
1
Since Subjects are observables, you can pipe the finalize operator on them:
const subWithFinalize = subject.pipe(finalize(cleanupFunction))
.– Badashi
Nov 9 at 9:41
Since Subjects are observables, you can pipe the finalize operator on them:
const subWithFinalize = subject.pipe(finalize(cleanupFunction))
.– Badashi
Nov 9 at 9:41
add a comment |
up vote
0
down vote
Thanks to @Badashi, using finalize worked and looks much better:
import {Observable, Subject, Subscription, Subscribable, EMPTY} from 'rxjs';
import {mergeMap, tap, finalize} from 'rxjs/operators';
function doHeavyRequest() {
return new Observable(subscribe => {
// Simulate delay.
setTimeout(() => {
subscribe.next(1);
subscribe.complete();
}, 1000);
});
}
const sink = new Subject<[Subject<any>, number]>();
sink.pipe(
mergeMap(([subject, id]) => {
// Stop request here if already unsubscribed.
if (subject.closed) {
console.log('Request cancelled:', id);
return EMPTY;
}
return doHeavyRequest()
.pipe(
tap(res => {
if (!subject.closed) {
subject.next(res);
subject.complete();
} else {
console.log('Request aborted:', id);
}
})
);
}, 2)
).subscribe();
// Insert request into sink.
// Overwrite subscribe and unsubscribe.
// Track unsubscribe.
function getSomething(id: number) {
const task = new Subject();
const ob = task.pipe(finalize(() => {
if (!task.isStopped) {
task.unsubscribe();
}
}));
sink.next([task, id]);
return ob;
}
// Make 3 requests and unsubscribe.
export function test() {
const ob0 = getSomething(0);
const ob1 = getSomething(1);
const ob2 = getSomething(2);
const sub0 = ob0.subscribe(e => {
console.log('0:', e);
});
setTimeout(() => sub0.unsubscribe(), 1500);
const sub1 = ob1.subscribe(e => {
console.log('1:', e);
});
setTimeout(() => sub1.unsubscribe(), 900);
const sub2 = ob2.subscribe(e => {
console.log('2:', e);
});
setTimeout(() => sub2.unsubscribe(), 100);
}
Output:
0: 1
Request cancelled: 2
Request aborted: 1
add a comment |
up vote
0
down vote
Thanks to @Badashi, using finalize worked and looks much better:
import {Observable, Subject, Subscription, Subscribable, EMPTY} from 'rxjs';
import {mergeMap, tap, finalize} from 'rxjs/operators';
function doHeavyRequest() {
return new Observable(subscribe => {
// Simulate delay.
setTimeout(() => {
subscribe.next(1);
subscribe.complete();
}, 1000);
});
}
const sink = new Subject<[Subject<any>, number]>();
sink.pipe(
mergeMap(([subject, id]) => {
// Stop request here if already unsubscribed.
if (subject.closed) {
console.log('Request cancelled:', id);
return EMPTY;
}
return doHeavyRequest()
.pipe(
tap(res => {
if (!subject.closed) {
subject.next(res);
subject.complete();
} else {
console.log('Request aborted:', id);
}
})
);
}, 2)
).subscribe();
// Insert request into sink.
// Overwrite subscribe and unsubscribe.
// Track unsubscribe.
function getSomething(id: number) {
const task = new Subject();
const ob = task.pipe(finalize(() => {
if (!task.isStopped) {
task.unsubscribe();
}
}));
sink.next([task, id]);
return ob;
}
// Make 3 requests and unsubscribe.
export function test() {
const ob0 = getSomething(0);
const ob1 = getSomething(1);
const ob2 = getSomething(2);
const sub0 = ob0.subscribe(e => {
console.log('0:', e);
});
setTimeout(() => sub0.unsubscribe(), 1500);
const sub1 = ob1.subscribe(e => {
console.log('1:', e);
});
setTimeout(() => sub1.unsubscribe(), 900);
const sub2 = ob2.subscribe(e => {
console.log('2:', e);
});
setTimeout(() => sub2.unsubscribe(), 100);
}
Output:
0: 1
Request cancelled: 2
Request aborted: 1
add a comment |
up vote
0
down vote
up vote
0
down vote
Thanks to @Badashi, using finalize worked and looks much better:
import {Observable, Subject, Subscription, Subscribable, EMPTY} from 'rxjs';
import {mergeMap, tap, finalize} from 'rxjs/operators';
function doHeavyRequest() {
return new Observable(subscribe => {
// Simulate delay.
setTimeout(() => {
subscribe.next(1);
subscribe.complete();
}, 1000);
});
}
const sink = new Subject<[Subject<any>, number]>();
sink.pipe(
mergeMap(([subject, id]) => {
// Stop request here if already unsubscribed.
if (subject.closed) {
console.log('Request cancelled:', id);
return EMPTY;
}
return doHeavyRequest()
.pipe(
tap(res => {
if (!subject.closed) {
subject.next(res);
subject.complete();
} else {
console.log('Request aborted:', id);
}
})
);
}, 2)
).subscribe();
// Insert request into sink.
// Overwrite subscribe and unsubscribe.
// Track unsubscribe.
function getSomething(id: number) {
const task = new Subject();
const ob = task.pipe(finalize(() => {
if (!task.isStopped) {
task.unsubscribe();
}
}));
sink.next([task, id]);
return ob;
}
// Make 3 requests and unsubscribe.
export function test() {
const ob0 = getSomething(0);
const ob1 = getSomething(1);
const ob2 = getSomething(2);
const sub0 = ob0.subscribe(e => {
console.log('0:', e);
});
setTimeout(() => sub0.unsubscribe(), 1500);
const sub1 = ob1.subscribe(e => {
console.log('1:', e);
});
setTimeout(() => sub1.unsubscribe(), 900);
const sub2 = ob2.subscribe(e => {
console.log('2:', e);
});
setTimeout(() => sub2.unsubscribe(), 100);
}
Output:
0: 1
Request cancelled: 2
Request aborted: 1
Thanks to @Badashi, using finalize worked and looks much better:
import {Observable, Subject, Subscription, Subscribable, EMPTY} from 'rxjs';
import {mergeMap, tap, finalize} from 'rxjs/operators';
function doHeavyRequest() {
return new Observable(subscribe => {
// Simulate delay.
setTimeout(() => {
subscribe.next(1);
subscribe.complete();
}, 1000);
});
}
const sink = new Subject<[Subject<any>, number]>();
sink.pipe(
mergeMap(([subject, id]) => {
// Stop request here if already unsubscribed.
if (subject.closed) {
console.log('Request cancelled:', id);
return EMPTY;
}
return doHeavyRequest()
.pipe(
tap(res => {
if (!subject.closed) {
subject.next(res);
subject.complete();
} else {
console.log('Request aborted:', id);
}
})
);
}, 2)
).subscribe();
// Insert request into sink.
// Overwrite subscribe and unsubscribe.
// Track unsubscribe.
function getSomething(id: number) {
const task = new Subject();
const ob = task.pipe(finalize(() => {
if (!task.isStopped) {
task.unsubscribe();
}
}));
sink.next([task, id]);
return ob;
}
// Make 3 requests and unsubscribe.
export function test() {
const ob0 = getSomething(0);
const ob1 = getSomething(1);
const ob2 = getSomething(2);
const sub0 = ob0.subscribe(e => {
console.log('0:', e);
});
setTimeout(() => sub0.unsubscribe(), 1500);
const sub1 = ob1.subscribe(e => {
console.log('1:', e);
});
setTimeout(() => sub1.unsubscribe(), 900);
const sub2 = ob2.subscribe(e => {
console.log('2:', e);
});
setTimeout(() => sub2.unsubscribe(), 100);
}
Output:
0: 1
Request cancelled: 2
Request aborted: 1
answered Nov 9 at 9:59
Viatorus
878827
878827
add a comment |
add a comment |
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53217669%2fcancel-concurrency-http-requests-after-unsubscription%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Are you sure that unsubscribing the API call is going to help you in this case? Even if you unsubscribe your call will still be "on flight", Google will not be notified that you are not interested any more in those results.
– Picci
Nov 9 at 6:52
It does help. All outstanding Google API calls will not be processed (inside the sink) if the observable was unsubscribed before.
– Viatorus
Nov 9 at 7:03
OK, I understand
– Picci
Nov 9 at 7:32
Maybe this SO thread can give you some inspiration
– Picci
Nov 9 at 7:53