Cancel concurrency HTTP requests after unsubscription











up vote
0
down vote

favorite












I have to following problem:



Many API calls go through an API interface (Google API) and have to be limited in request per seconds/concurrency because of the Google API limitation.



I use a subject (sink/call pool), which manages all API requests with mergeMap and returns a result to another, piped subject.



Because API requests can unsubscribe before they finish, they shouldn't block my sink. So I have to stop the API request (task) after unsubscription.



The issue:
I don't know how to capture this unsubscribed state correctly. What I currently do is overwriting subscribe and unsubscribe to catch this state. It works but it does not look to "rxjs"ish for me.



What could I improve it?



import {Observable, Subject, Subscription, Subscribable, EMPTY} from 'rxjs';
import {mergeMap, tap} from 'rxjs/operators';

function doHeavyRequest() {
return new Observable(subscribe => {
// Simulate delay.
setTimeout(() => {
subscribe.next(1);
subscribe.complete();
}, 1000);
});
}

const sink = new Subject<[Subject<any>, number]>();

sink.pipe(
mergeMap(([subject, id]) => {
// Stop request here if already unsubscribed.
if (subject.closed) {
console.log('Request cancelled:', id);
return EMPTY;
}
return doHeavyRequest()
.pipe(
tap(res => {
if (!subject.closed) {
subject.next(res);
subject.complete();
} else {
console.log('Request aborted:', id);
}
})
);
}, 2)
).subscribe();

// Insert request into sink.
// Overwrite subscribe and unsubscribe.
// Track unsubscribe over the flag alive.
function getSomething(id: number) {
const task = new Subject();

const ob = task.asObservable();

ob.subscribe = (...args: any) => {
const sub = Observable.prototype.subscribe.call(ob, ...args);
sub.unsubscribe = () => {
if (!task.isStopped)
task.unsubscribe();
Subscription.prototype.unsubscribe.call(sub);
};
return sub;
};

sink.next([task, id]);

return ob;
}

// Make 3 requests and unsubscribe.
export function test() {
const ob0 = getSomething(0);
const ob1 = getSomething(1);
const ob2 = getSomething(2);

const sub0 = ob0.subscribe(e => {
console.log('0:', e);
});
setTimeout(() => sub0.unsubscribe(), 1500);

const sub1 = ob1.subscribe(e => {
console.log('1:', e);
});
setTimeout(() => sub1.unsubscribe(), 900);

const sub2 = ob2.subscribe(e => {
console.log('2:', e);
});
setTimeout(() => sub2.unsubscribe(), 100);
}


See the test.ts at plunker and the console output:



https://next.plnkr.co/edit/KREjMprTrjHu2zMI?preview










share|improve this question
























  • Are you sure that unsubscribing the API call is going to help you in this case? Even if you unsubscribe your call will still be "on flight", Google will not be notified that you are not interested any more in those results.
    – Picci
    Nov 9 at 6:52










  • It does help. All outstanding Google API calls will not be processed (inside the sink) if the observable was unsubscribed before.
    – Viatorus
    Nov 9 at 7:03










  • OK, I understand
    – Picci
    Nov 9 at 7:32










  • Maybe this SO thread can give you some inspiration
    – Picci
    Nov 9 at 7:53















up vote
0
down vote

favorite












I have to following problem:



Many API calls go through an API interface (Google API) and have to be limited in request per seconds/concurrency because of the Google API limitation.



I use a subject (sink/call pool), which manages all API requests with mergeMap and returns a result to another, piped subject.



Because API requests can unsubscribe before they finish, they shouldn't block my sink. So I have to stop the API request (task) after unsubscription.



The issue:
I don't know how to capture this unsubscribed state correctly. What I currently do is overwriting subscribe and unsubscribe to catch this state. It works but it does not look to "rxjs"ish for me.



What could I improve it?



import {Observable, Subject, Subscription, Subscribable, EMPTY} from 'rxjs';
import {mergeMap, tap} from 'rxjs/operators';

function doHeavyRequest() {
return new Observable(subscribe => {
// Simulate delay.
setTimeout(() => {
subscribe.next(1);
subscribe.complete();
}, 1000);
});
}

const sink = new Subject<[Subject<any>, number]>();

sink.pipe(
mergeMap(([subject, id]) => {
// Stop request here if already unsubscribed.
if (subject.closed) {
console.log('Request cancelled:', id);
return EMPTY;
}
return doHeavyRequest()
.pipe(
tap(res => {
if (!subject.closed) {
subject.next(res);
subject.complete();
} else {
console.log('Request aborted:', id);
}
})
);
}, 2)
).subscribe();

// Insert request into sink.
// Overwrite subscribe and unsubscribe.
// Track unsubscribe over the flag alive.
function getSomething(id: number) {
const task = new Subject();

const ob = task.asObservable();

ob.subscribe = (...args: any) => {
const sub = Observable.prototype.subscribe.call(ob, ...args);
sub.unsubscribe = () => {
if (!task.isStopped)
task.unsubscribe();
Subscription.prototype.unsubscribe.call(sub);
};
return sub;
};

sink.next([task, id]);

return ob;
}

// Make 3 requests and unsubscribe.
export function test() {
const ob0 = getSomething(0);
const ob1 = getSomething(1);
const ob2 = getSomething(2);

const sub0 = ob0.subscribe(e => {
console.log('0:', e);
});
setTimeout(() => sub0.unsubscribe(), 1500);

const sub1 = ob1.subscribe(e => {
console.log('1:', e);
});
setTimeout(() => sub1.unsubscribe(), 900);

const sub2 = ob2.subscribe(e => {
console.log('2:', e);
});
setTimeout(() => sub2.unsubscribe(), 100);
}


See the test.ts at plunker and the console output:



https://next.plnkr.co/edit/KREjMprTrjHu2zMI?preview










share|improve this question
























  • Are you sure that unsubscribing the API call is going to help you in this case? Even if you unsubscribe your call will still be "on flight", Google will not be notified that you are not interested any more in those results.
    – Picci
    Nov 9 at 6:52










  • It does help. All outstanding Google API calls will not be processed (inside the sink) if the observable was unsubscribed before.
    – Viatorus
    Nov 9 at 7:03










  • OK, I understand
    – Picci
    Nov 9 at 7:32










  • Maybe this SO thread can give you some inspiration
    – Picci
    Nov 9 at 7:53













up vote
0
down vote

favorite









up vote
0
down vote

favorite











I have to following problem:



Many API calls go through an API interface (Google API) and have to be limited in request per seconds/concurrency because of the Google API limitation.



I use a subject (sink/call pool), which manages all API requests with mergeMap and returns a result to another, piped subject.



Because API requests can unsubscribe before they finish, they shouldn't block my sink. So I have to stop the API request (task) after unsubscription.



The issue:
I don't know how to capture this unsubscribed state correctly. What I currently do is overwriting subscribe and unsubscribe to catch this state. It works but it does not look to "rxjs"ish for me.



What could I improve it?



import {Observable, Subject, Subscription, Subscribable, EMPTY} from 'rxjs';
import {mergeMap, tap} from 'rxjs/operators';

function doHeavyRequest() {
return new Observable(subscribe => {
// Simulate delay.
setTimeout(() => {
subscribe.next(1);
subscribe.complete();
}, 1000);
});
}

const sink = new Subject<[Subject<any>, number]>();

sink.pipe(
mergeMap(([subject, id]) => {
// Stop request here if already unsubscribed.
if (subject.closed) {
console.log('Request cancelled:', id);
return EMPTY;
}
return doHeavyRequest()
.pipe(
tap(res => {
if (!subject.closed) {
subject.next(res);
subject.complete();
} else {
console.log('Request aborted:', id);
}
})
);
}, 2)
).subscribe();

// Insert request into sink.
// Overwrite subscribe and unsubscribe.
// Track unsubscribe over the flag alive.
function getSomething(id: number) {
const task = new Subject();

const ob = task.asObservable();

ob.subscribe = (...args: any) => {
const sub = Observable.prototype.subscribe.call(ob, ...args);
sub.unsubscribe = () => {
if (!task.isStopped)
task.unsubscribe();
Subscription.prototype.unsubscribe.call(sub);
};
return sub;
};

sink.next([task, id]);

return ob;
}

// Make 3 requests and unsubscribe.
export function test() {
const ob0 = getSomething(0);
const ob1 = getSomething(1);
const ob2 = getSomething(2);

const sub0 = ob0.subscribe(e => {
console.log('0:', e);
});
setTimeout(() => sub0.unsubscribe(), 1500);

const sub1 = ob1.subscribe(e => {
console.log('1:', e);
});
setTimeout(() => sub1.unsubscribe(), 900);

const sub2 = ob2.subscribe(e => {
console.log('2:', e);
});
setTimeout(() => sub2.unsubscribe(), 100);
}


See the test.ts at plunker and the console output:



https://next.plnkr.co/edit/KREjMprTrjHu2zMI?preview










share|improve this question















I have to following problem:



Many API calls go through an API interface (Google API) and have to be limited in request per seconds/concurrency because of the Google API limitation.



I use a subject (sink/call pool), which manages all API requests with mergeMap and returns a result to another, piped subject.



Because API requests can unsubscribe before they finish, they shouldn't block my sink. So I have to stop the API request (task) after unsubscription.



The issue:
I don't know how to capture this unsubscribed state correctly. What I currently do is overwriting subscribe and unsubscribe to catch this state. It works but it does not look to "rxjs"ish for me.



What could I improve it?



import {Observable, Subject, Subscription, Subscribable, EMPTY} from 'rxjs';
import {mergeMap, tap} from 'rxjs/operators';

function doHeavyRequest() {
return new Observable(subscribe => {
// Simulate delay.
setTimeout(() => {
subscribe.next(1);
subscribe.complete();
}, 1000);
});
}

const sink = new Subject<[Subject<any>, number]>();

sink.pipe(
mergeMap(([subject, id]) => {
// Stop request here if already unsubscribed.
if (subject.closed) {
console.log('Request cancelled:', id);
return EMPTY;
}
return doHeavyRequest()
.pipe(
tap(res => {
if (!subject.closed) {
subject.next(res);
subject.complete();
} else {
console.log('Request aborted:', id);
}
})
);
}, 2)
).subscribe();

// Insert request into sink.
// Overwrite subscribe and unsubscribe.
// Track unsubscribe over the flag alive.
function getSomething(id: number) {
const task = new Subject();

const ob = task.asObservable();

ob.subscribe = (...args: any) => {
const sub = Observable.prototype.subscribe.call(ob, ...args);
sub.unsubscribe = () => {
if (!task.isStopped)
task.unsubscribe();
Subscription.prototype.unsubscribe.call(sub);
};
return sub;
};

sink.next([task, id]);

return ob;
}

// Make 3 requests and unsubscribe.
export function test() {
const ob0 = getSomething(0);
const ob1 = getSomething(1);
const ob2 = getSomething(2);

const sub0 = ob0.subscribe(e => {
console.log('0:', e);
});
setTimeout(() => sub0.unsubscribe(), 1500);

const sub1 = ob1.subscribe(e => {
console.log('1:', e);
});
setTimeout(() => sub1.unsubscribe(), 900);

const sub2 = ob2.subscribe(e => {
console.log('2:', e);
});
setTimeout(() => sub2.unsubscribe(), 100);
}


See the test.ts at plunker and the console output:



https://next.plnkr.co/edit/KREjMprTrjHu2zMI?preview







angular typescript rxjs google-api-client rxjs6






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 9 at 7:26

























asked Nov 8 at 23:25









Viatorus

878827




878827












  • Are you sure that unsubscribing the API call is going to help you in this case? Even if you unsubscribe your call will still be "on flight", Google will not be notified that you are not interested any more in those results.
    – Picci
    Nov 9 at 6:52










  • It does help. All outstanding Google API calls will not be processed (inside the sink) if the observable was unsubscribed before.
    – Viatorus
    Nov 9 at 7:03










  • OK, I understand
    – Picci
    Nov 9 at 7:32










  • Maybe this SO thread can give you some inspiration
    – Picci
    Nov 9 at 7:53


















  • Are you sure that unsubscribing the API call is going to help you in this case? Even if you unsubscribe your call will still be "on flight", Google will not be notified that you are not interested any more in those results.
    – Picci
    Nov 9 at 6:52










  • It does help. All outstanding Google API calls will not be processed (inside the sink) if the observable was unsubscribed before.
    – Viatorus
    Nov 9 at 7:03










  • OK, I understand
    – Picci
    Nov 9 at 7:32










  • Maybe this SO thread can give you some inspiration
    – Picci
    Nov 9 at 7:53
















Are you sure that unsubscribing the API call is going to help you in this case? Even if you unsubscribe your call will still be "on flight", Google will not be notified that you are not interested any more in those results.
– Picci
Nov 9 at 6:52




Are you sure that unsubscribing the API call is going to help you in this case? Even if you unsubscribe your call will still be "on flight", Google will not be notified that you are not interested any more in those results.
– Picci
Nov 9 at 6:52












It does help. All outstanding Google API calls will not be processed (inside the sink) if the observable was unsubscribed before.
– Viatorus
Nov 9 at 7:03




It does help. All outstanding Google API calls will not be processed (inside the sink) if the observable was unsubscribed before.
– Viatorus
Nov 9 at 7:03












OK, I understand
– Picci
Nov 9 at 7:32




OK, I understand
– Picci
Nov 9 at 7:32












Maybe this SO thread can give you some inspiration
– Picci
Nov 9 at 7:53




Maybe this SO thread can give you some inspiration
– Picci
Nov 9 at 7:53












2 Answers
2






active

oldest

votes

















up vote
1
down vote



accepted










I'm not sure I understood it properly, but it looks like you want to do some cleanup upon unsubscribing, correct?



You can add teardown logic to a single subscription like so:



const subscription = obs.subscribe(() => {...})
subscription.add(() => { /* do cleanup here. This is executed upon unsubscribing. */})


Perhaps the finalize pipeable operator might be useful as well. This one adds logic to an observable when it completes, which most of the time is upon complete OR unsubscription. Varies a bit for hot observables, so be aware.



When creating an observable, you can also add teardown logic into it by returning a function from its inner logic function, much like pipe'ing a finalize:



const obs = new Observable(subject => { /* subject.next/error/complete somewhere */
return () => { /* cleanup resources upon unsubscribe OR complete */ }
})





share|improve this answer























  • Yes, something like a cleanup. I want to prevent an API call which stucks in the pipeline of the sink (because of limited concurrency) if it is already unsubscribed.
    – Viatorus
    Nov 9 at 7:05










  • Your first example (subscription.add) does not work for me, because "I don't know" where the observable get subscribed. The second example looks better for me but a subject (task) doesn't have a finalize method?
    – Viatorus
    Nov 9 at 7:15








  • 1




    Since Subjects are observables, you can pipe the finalize operator on them: const subWithFinalize = subject.pipe(finalize(cleanupFunction)).
    – Badashi
    Nov 9 at 9:41


















up vote
0
down vote













Thanks to @Badashi, using finalize worked and looks much better:



import {Observable, Subject, Subscription, Subscribable, EMPTY} from 'rxjs';
import {mergeMap, tap, finalize} from 'rxjs/operators';

function doHeavyRequest() {
return new Observable(subscribe => {
// Simulate delay.
setTimeout(() => {
subscribe.next(1);
subscribe.complete();
}, 1000);
});
}

const sink = new Subject<[Subject<any>, number]>();

sink.pipe(
mergeMap(([subject, id]) => {
// Stop request here if already unsubscribed.
if (subject.closed) {
console.log('Request cancelled:', id);
return EMPTY;
}
return doHeavyRequest()
.pipe(
tap(res => {
if (!subject.closed) {
subject.next(res);
subject.complete();
} else {
console.log('Request aborted:', id);
}
})
);
}, 2)
).subscribe();

// Insert request into sink.
// Overwrite subscribe and unsubscribe.
// Track unsubscribe.
function getSomething(id: number) {
const task = new Subject();
const ob = task.pipe(finalize(() => {
if (!task.isStopped) {
task.unsubscribe();
}
}));

sink.next([task, id]);

return ob;
}

// Make 3 requests and unsubscribe.
export function test() {
const ob0 = getSomething(0);
const ob1 = getSomething(1);
const ob2 = getSomething(2);

const sub0 = ob0.subscribe(e => {
console.log('0:', e);
});
setTimeout(() => sub0.unsubscribe(), 1500);

const sub1 = ob1.subscribe(e => {
console.log('1:', e);
});
setTimeout(() => sub1.unsubscribe(), 900);

const sub2 = ob2.subscribe(e => {
console.log('2:', e);
});
setTimeout(() => sub2.unsubscribe(), 100);
}


Output:



0: 1
Request cancelled: 2
Request aborted: 1





share|improve this answer





















    Your Answer






    StackExchange.ifUsing("editor", function () {
    StackExchange.using("externalEditor", function () {
    StackExchange.using("snippets", function () {
    StackExchange.snippets.init();
    });
    });
    }, "code-snippets");

    StackExchange.ready(function() {
    var channelOptions = {
    tags: "".split(" "),
    id: "1"
    };
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function() {
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled) {
    StackExchange.using("snippets", function() {
    createEditor();
    });
    }
    else {
    createEditor();
    }
    });

    function createEditor() {
    StackExchange.prepareEditor({
    heartbeatType: 'answer',
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader: {
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    },
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    });


    }
    });














     

    draft saved


    draft discarded


















    StackExchange.ready(
    function () {
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53217669%2fcancel-concurrency-http-requests-after-unsubscription%23new-answer', 'question_page');
    }
    );

    Post as a guest















    Required, but never shown

























    2 Answers
    2






    active

    oldest

    votes








    2 Answers
    2






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes








    up vote
    1
    down vote



    accepted










    I'm not sure I understood it properly, but it looks like you want to do some cleanup upon unsubscribing, correct?



    You can add teardown logic to a single subscription like so:



    const subscription = obs.subscribe(() => {...})
    subscription.add(() => { /* do cleanup here. This is executed upon unsubscribing. */})


    Perhaps the finalize pipeable operator might be useful as well. This one adds logic to an observable when it completes, which most of the time is upon complete OR unsubscription. Varies a bit for hot observables, so be aware.



    When creating an observable, you can also add teardown logic into it by returning a function from its inner logic function, much like pipe'ing a finalize:



    const obs = new Observable(subject => { /* subject.next/error/complete somewhere */
    return () => { /* cleanup resources upon unsubscribe OR complete */ }
    })





    share|improve this answer























    • Yes, something like a cleanup. I want to prevent an API call which stucks in the pipeline of the sink (because of limited concurrency) if it is already unsubscribed.
      – Viatorus
      Nov 9 at 7:05










    • Your first example (subscription.add) does not work for me, because "I don't know" where the observable get subscribed. The second example looks better for me but a subject (task) doesn't have a finalize method?
      – Viatorus
      Nov 9 at 7:15








    • 1




      Since Subjects are observables, you can pipe the finalize operator on them: const subWithFinalize = subject.pipe(finalize(cleanupFunction)).
      – Badashi
      Nov 9 at 9:41















    up vote
    1
    down vote



    accepted










    I'm not sure I understood it properly, but it looks like you want to do some cleanup upon unsubscribing, correct?



    You can add teardown logic to a single subscription like so:



    const subscription = obs.subscribe(() => {...})
    subscription.add(() => { /* do cleanup here. This is executed upon unsubscribing. */})


    Perhaps the finalize pipeable operator might be useful as well. This one adds logic to an observable when it completes, which most of the time is upon complete OR unsubscription. Varies a bit for hot observables, so be aware.



    When creating an observable, you can also add teardown logic into it by returning a function from its inner logic function, much like pipe'ing a finalize:



    const obs = new Observable(subject => { /* subject.next/error/complete somewhere */
    return () => { /* cleanup resources upon unsubscribe OR complete */ }
    })





    share|improve this answer























    • Yes, something like a cleanup. I want to prevent an API call which stucks in the pipeline of the sink (because of limited concurrency) if it is already unsubscribed.
      – Viatorus
      Nov 9 at 7:05










    • Your first example (subscription.add) does not work for me, because "I don't know" where the observable get subscribed. The second example looks better for me but a subject (task) doesn't have a finalize method?
      – Viatorus
      Nov 9 at 7:15








    • 1




      Since Subjects are observables, you can pipe the finalize operator on them: const subWithFinalize = subject.pipe(finalize(cleanupFunction)).
      – Badashi
      Nov 9 at 9:41













    up vote
    1
    down vote



    accepted







    up vote
    1
    down vote



    accepted






    I'm not sure I understood it properly, but it looks like you want to do some cleanup upon unsubscribing, correct?



    You can add teardown logic to a single subscription like so:



    const subscription = obs.subscribe(() => {...})
    subscription.add(() => { /* do cleanup here. This is executed upon unsubscribing. */})


    Perhaps the finalize pipeable operator might be useful as well. This one adds logic to an observable when it completes, which most of the time is upon complete OR unsubscription. Varies a bit for hot observables, so be aware.



    When creating an observable, you can also add teardown logic into it by returning a function from its inner logic function, much like pipe'ing a finalize:



    const obs = new Observable(subject => { /* subject.next/error/complete somewhere */
    return () => { /* cleanup resources upon unsubscribe OR complete */ }
    })





    share|improve this answer














    I'm not sure I understood it properly, but it looks like you want to do some cleanup upon unsubscribing, correct?



    You can add teardown logic to a single subscription like so:



    const subscription = obs.subscribe(() => {...})
    subscription.add(() => { /* do cleanup here. This is executed upon unsubscribing. */})


    Perhaps the finalize pipeable operator might be useful as well. This one adds logic to an observable when it completes, which most of the time is upon complete OR unsubscription. Varies a bit for hot observables, so be aware.



    When creating an observable, you can also add teardown logic into it by returning a function from its inner logic function, much like pipe'ing a finalize:



    const obs = new Observable(subject => { /* subject.next/error/complete somewhere */
    return () => { /* cleanup resources upon unsubscribe OR complete */ }
    })






    share|improve this answer














    share|improve this answer



    share|improve this answer








    edited Nov 9 at 1:29

























    answered Nov 9 at 1:22









    Badashi

    33719




    33719












    • Yes, something like a cleanup. I want to prevent an API call which stucks in the pipeline of the sink (because of limited concurrency) if it is already unsubscribed.
      – Viatorus
      Nov 9 at 7:05










    • Your first example (subscription.add) does not work for me, because "I don't know" where the observable get subscribed. The second example looks better for me but a subject (task) doesn't have a finalize method?
      – Viatorus
      Nov 9 at 7:15








    • 1




      Since Subjects are observables, you can pipe the finalize operator on them: const subWithFinalize = subject.pipe(finalize(cleanupFunction)).
      – Badashi
      Nov 9 at 9:41


















    • Yes, something like a cleanup. I want to prevent an API call which stucks in the pipeline of the sink (because of limited concurrency) if it is already unsubscribed.
      – Viatorus
      Nov 9 at 7:05










    • Your first example (subscription.add) does not work for me, because "I don't know" where the observable get subscribed. The second example looks better for me but a subject (task) doesn't have a finalize method?
      – Viatorus
      Nov 9 at 7:15








    • 1




      Since Subjects are observables, you can pipe the finalize operator on them: const subWithFinalize = subject.pipe(finalize(cleanupFunction)).
      – Badashi
      Nov 9 at 9:41
















    Yes, something like a cleanup. I want to prevent an API call which stucks in the pipeline of the sink (because of limited concurrency) if it is already unsubscribed.
    – Viatorus
    Nov 9 at 7:05




    Yes, something like a cleanup. I want to prevent an API call which stucks in the pipeline of the sink (because of limited concurrency) if it is already unsubscribed.
    – Viatorus
    Nov 9 at 7:05












    Your first example (subscription.add) does not work for me, because "I don't know" where the observable get subscribed. The second example looks better for me but a subject (task) doesn't have a finalize method?
    – Viatorus
    Nov 9 at 7:15






    Your first example (subscription.add) does not work for me, because "I don't know" where the observable get subscribed. The second example looks better for me but a subject (task) doesn't have a finalize method?
    – Viatorus
    Nov 9 at 7:15






    1




    1




    Since Subjects are observables, you can pipe the finalize operator on them: const subWithFinalize = subject.pipe(finalize(cleanupFunction)).
    – Badashi
    Nov 9 at 9:41




    Since Subjects are observables, you can pipe the finalize operator on them: const subWithFinalize = subject.pipe(finalize(cleanupFunction)).
    – Badashi
    Nov 9 at 9:41












    up vote
    0
    down vote













    Thanks to @Badashi, using finalize worked and looks much better:



    import {Observable, Subject, Subscription, Subscribable, EMPTY} from 'rxjs';
    import {mergeMap, tap, finalize} from 'rxjs/operators';

    function doHeavyRequest() {
    return new Observable(subscribe => {
    // Simulate delay.
    setTimeout(() => {
    subscribe.next(1);
    subscribe.complete();
    }, 1000);
    });
    }

    const sink = new Subject<[Subject<any>, number]>();

    sink.pipe(
    mergeMap(([subject, id]) => {
    // Stop request here if already unsubscribed.
    if (subject.closed) {
    console.log('Request cancelled:', id);
    return EMPTY;
    }
    return doHeavyRequest()
    .pipe(
    tap(res => {
    if (!subject.closed) {
    subject.next(res);
    subject.complete();
    } else {
    console.log('Request aborted:', id);
    }
    })
    );
    }, 2)
    ).subscribe();

    // Insert request into sink.
    // Overwrite subscribe and unsubscribe.
    // Track unsubscribe.
    function getSomething(id: number) {
    const task = new Subject();
    const ob = task.pipe(finalize(() => {
    if (!task.isStopped) {
    task.unsubscribe();
    }
    }));

    sink.next([task, id]);

    return ob;
    }

    // Make 3 requests and unsubscribe.
    export function test() {
    const ob0 = getSomething(0);
    const ob1 = getSomething(1);
    const ob2 = getSomething(2);

    const sub0 = ob0.subscribe(e => {
    console.log('0:', e);
    });
    setTimeout(() => sub0.unsubscribe(), 1500);

    const sub1 = ob1.subscribe(e => {
    console.log('1:', e);
    });
    setTimeout(() => sub1.unsubscribe(), 900);

    const sub2 = ob2.subscribe(e => {
    console.log('2:', e);
    });
    setTimeout(() => sub2.unsubscribe(), 100);
    }


    Output:



    0: 1
    Request cancelled: 2
    Request aborted: 1





    share|improve this answer

























      up vote
      0
      down vote













      Thanks to @Badashi, using finalize worked and looks much better:



      import {Observable, Subject, Subscription, Subscribable, EMPTY} from 'rxjs';
      import {mergeMap, tap, finalize} from 'rxjs/operators';

      function doHeavyRequest() {
      return new Observable(subscribe => {
      // Simulate delay.
      setTimeout(() => {
      subscribe.next(1);
      subscribe.complete();
      }, 1000);
      });
      }

      const sink = new Subject<[Subject<any>, number]>();

      sink.pipe(
      mergeMap(([subject, id]) => {
      // Stop request here if already unsubscribed.
      if (subject.closed) {
      console.log('Request cancelled:', id);
      return EMPTY;
      }
      return doHeavyRequest()
      .pipe(
      tap(res => {
      if (!subject.closed) {
      subject.next(res);
      subject.complete();
      } else {
      console.log('Request aborted:', id);
      }
      })
      );
      }, 2)
      ).subscribe();

      // Insert request into sink.
      // Overwrite subscribe and unsubscribe.
      // Track unsubscribe.
      function getSomething(id: number) {
      const task = new Subject();
      const ob = task.pipe(finalize(() => {
      if (!task.isStopped) {
      task.unsubscribe();
      }
      }));

      sink.next([task, id]);

      return ob;
      }

      // Make 3 requests and unsubscribe.
      export function test() {
      const ob0 = getSomething(0);
      const ob1 = getSomething(1);
      const ob2 = getSomething(2);

      const sub0 = ob0.subscribe(e => {
      console.log('0:', e);
      });
      setTimeout(() => sub0.unsubscribe(), 1500);

      const sub1 = ob1.subscribe(e => {
      console.log('1:', e);
      });
      setTimeout(() => sub1.unsubscribe(), 900);

      const sub2 = ob2.subscribe(e => {
      console.log('2:', e);
      });
      setTimeout(() => sub2.unsubscribe(), 100);
      }


      Output:



      0: 1
      Request cancelled: 2
      Request aborted: 1





      share|improve this answer























        up vote
        0
        down vote










        up vote
        0
        down vote









        Thanks to @Badashi, using finalize worked and looks much better:



        import {Observable, Subject, Subscription, Subscribable, EMPTY} from 'rxjs';
        import {mergeMap, tap, finalize} from 'rxjs/operators';

        function doHeavyRequest() {
        return new Observable(subscribe => {
        // Simulate delay.
        setTimeout(() => {
        subscribe.next(1);
        subscribe.complete();
        }, 1000);
        });
        }

        const sink = new Subject<[Subject<any>, number]>();

        sink.pipe(
        mergeMap(([subject, id]) => {
        // Stop request here if already unsubscribed.
        if (subject.closed) {
        console.log('Request cancelled:', id);
        return EMPTY;
        }
        return doHeavyRequest()
        .pipe(
        tap(res => {
        if (!subject.closed) {
        subject.next(res);
        subject.complete();
        } else {
        console.log('Request aborted:', id);
        }
        })
        );
        }, 2)
        ).subscribe();

        // Insert request into sink.
        // Overwrite subscribe and unsubscribe.
        // Track unsubscribe.
        function getSomething(id: number) {
        const task = new Subject();
        const ob = task.pipe(finalize(() => {
        if (!task.isStopped) {
        task.unsubscribe();
        }
        }));

        sink.next([task, id]);

        return ob;
        }

        // Make 3 requests and unsubscribe.
        export function test() {
        const ob0 = getSomething(0);
        const ob1 = getSomething(1);
        const ob2 = getSomething(2);

        const sub0 = ob0.subscribe(e => {
        console.log('0:', e);
        });
        setTimeout(() => sub0.unsubscribe(), 1500);

        const sub1 = ob1.subscribe(e => {
        console.log('1:', e);
        });
        setTimeout(() => sub1.unsubscribe(), 900);

        const sub2 = ob2.subscribe(e => {
        console.log('2:', e);
        });
        setTimeout(() => sub2.unsubscribe(), 100);
        }


        Output:



        0: 1
        Request cancelled: 2
        Request aborted: 1





        share|improve this answer












        Thanks to @Badashi, using finalize worked and looks much better:



        import {Observable, Subject, Subscription, Subscribable, EMPTY} from 'rxjs';
        import {mergeMap, tap, finalize} from 'rxjs/operators';

        function doHeavyRequest() {
        return new Observable(subscribe => {
        // Simulate delay.
        setTimeout(() => {
        subscribe.next(1);
        subscribe.complete();
        }, 1000);
        });
        }

        const sink = new Subject<[Subject<any>, number]>();

        sink.pipe(
        mergeMap(([subject, id]) => {
        // Stop request here if already unsubscribed.
        if (subject.closed) {
        console.log('Request cancelled:', id);
        return EMPTY;
        }
        return doHeavyRequest()
        .pipe(
        tap(res => {
        if (!subject.closed) {
        subject.next(res);
        subject.complete();
        } else {
        console.log('Request aborted:', id);
        }
        })
        );
        }, 2)
        ).subscribe();

        // Insert request into sink.
        // Overwrite subscribe and unsubscribe.
        // Track unsubscribe.
        function getSomething(id: number) {
        const task = new Subject();
        const ob = task.pipe(finalize(() => {
        if (!task.isStopped) {
        task.unsubscribe();
        }
        }));

        sink.next([task, id]);

        return ob;
        }

        // Make 3 requests and unsubscribe.
        export function test() {
        const ob0 = getSomething(0);
        const ob1 = getSomething(1);
        const ob2 = getSomething(2);

        const sub0 = ob0.subscribe(e => {
        console.log('0:', e);
        });
        setTimeout(() => sub0.unsubscribe(), 1500);

        const sub1 = ob1.subscribe(e => {
        console.log('1:', e);
        });
        setTimeout(() => sub1.unsubscribe(), 900);

        const sub2 = ob2.subscribe(e => {
        console.log('2:', e);
        });
        setTimeout(() => sub2.unsubscribe(), 100);
        }


        Output:



        0: 1
        Request cancelled: 2
        Request aborted: 1






        share|improve this answer












        share|improve this answer



        share|improve this answer










        answered Nov 9 at 9:59









        Viatorus

        878827




        878827






























             

            draft saved


            draft discarded



















































             


            draft saved


            draft discarded














            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53217669%2fcancel-concurrency-http-requests-after-unsubscription%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            Schultheiß

            Verwaltungsgliederung Dänemarks

            Liste der Kulturdenkmale in Wilsdruff