javascript - "Unsubscribe" function callbackhook in Observable "executor" function - Stack O

I am confused about what the purpose of the "dispose" or "unsubscribe" function is

I am confused about what the purpose of the "dispose" or "unsubscribe" function is for, which is (optionally) returned from an observable "executor" function, like so:

const Rx = require('rxjs');

const obs = Rx.Observable.create(obs => {

    // we are in the Observable "executor" function
    obs.next(4);

     // we return this function, which gets called if we unsubscribe
    return function () {
        console.log('disposed');
    }

});

    const s1 = obs.subscribe(
        function (v) {
            console.log(v);
        },
        function (e) {
            console.log(e);
        },
        function () {
            console.log('plete');
        }
    );

    const s2 = obs.subscribe(
        function (v) {
            console.log(v);
        },
        function (e) {
            console.log(e);
        },
        function () {
            console.log('plete');
        }
    );


    s1.unsubscribe();
    s2.unsubscribe();

What confuses me is that such a function would actually be more likely to hold on to references in your code and therefore prevent garbage collection.

Can anyone tell me what the purpose is of returning a function in that scenario, what the function is called, and what it's signature is? I am having trouble figuring out information about it.

I also see much more plex examples of returning a subscription from the executor function, for example this:

    let index = 0;

    let obsEnqueue = this.obsEnqueue = new Rx.Subject();

    this.queueStream = Rx.Observable.create(obs => {

        const push = Rx.Subscriber.create(v => {
            if ((index % obsEnqueue.observers.length) === obsEnqueue.observers.indexOf(push)) {
                obs.next(v);
            }
        });

        return obsEnqueue.subscribe(push);
    });

This seems to return a subscription instead of just a plain function. Can anyone explain what's going on with this?

To make it a clear question, what is the difference between doing this:

const sub = new Rx.Subject();

const obs = Rx.Observable.create($obs => {

    $obs.next(4);
    return sub.subscribe($obs);

});

and not returning the result of the subscribe call:

const sub = new Rx.Subject();

const obs = Rx.Observable.create($obs => {

    $obs.next(4);
    sub.subscribe($obs);

});

I am confused about what the purpose of the "dispose" or "unsubscribe" function is for, which is (optionally) returned from an observable "executor" function, like so:

const Rx = require('rxjs');

const obs = Rx.Observable.create(obs => {

    // we are in the Observable "executor" function
    obs.next(4);

     // we return this function, which gets called if we unsubscribe
    return function () {
        console.log('disposed');
    }

});

    const s1 = obs.subscribe(
        function (v) {
            console.log(v);
        },
        function (e) {
            console.log(e);
        },
        function () {
            console.log('plete');
        }
    );

    const s2 = obs.subscribe(
        function (v) {
            console.log(v);
        },
        function (e) {
            console.log(e);
        },
        function () {
            console.log('plete');
        }
    );


    s1.unsubscribe();
    s2.unsubscribe();

What confuses me is that such a function would actually be more likely to hold on to references in your code and therefore prevent garbage collection.

Can anyone tell me what the purpose is of returning a function in that scenario, what the function is called, and what it's signature is? I am having trouble figuring out information about it.

I also see much more plex examples of returning a subscription from the executor function, for example this:

    let index = 0;

    let obsEnqueue = this.obsEnqueue = new Rx.Subject();

    this.queueStream = Rx.Observable.create(obs => {

        const push = Rx.Subscriber.create(v => {
            if ((index % obsEnqueue.observers.length) === obsEnqueue.observers.indexOf(push)) {
                obs.next(v);
            }
        });

        return obsEnqueue.subscribe(push);
    });

This seems to return a subscription instead of just a plain function. Can anyone explain what's going on with this?

To make it a clear question, what is the difference between doing this:

const sub = new Rx.Subject();

const obs = Rx.Observable.create($obs => {

    $obs.next(4);
    return sub.subscribe($obs);

});

and not returning the result of the subscribe call:

const sub = new Rx.Subject();

const obs = Rx.Observable.create($obs => {

    $obs.next(4);
    sub.subscribe($obs);

});
Share Improve this question edited Jan 6, 2017 at 6:37 Mark van Straten 9,4253 gold badges40 silver badges57 bronze badges asked Jan 6, 2017 at 6:03 user5047085user5047085 1
  • I am not sure if it's actually called an "executor" function, but that's the name of the analogue in the Promises world – user5047085 Commented Jan 6, 2017 at 6:10
Add a ment  | 

1 Answer 1

Reset to default 6

The unsubscribe function that Rx.Observable.create needs to return is invoked when downstream does not listen to the stream anymore, effectively giving you time to clean up resources.

In regards to your question; .subscribe() returns the subscription on which you can call .unsubscribe(). So if you want to do something with an other subscription you can pipe through that subscription to your downstream:

    const obs = Rx.Observable.create($obs => {
      const timer = Rx.Observable.interval(300)
        .do(i => console.log('emission: ' + i))

      return timer.subscribe($obs);
    });
    obs.take(4).subscribe(i => console.log('outer-emission:'+i))
<script src="https://cdnjs.cloudflare./ajax/libs/rxjs/5.0.2/Rx.js"></script>

Without the unsubscribe function you would stop listening to the observable but the interval created internally would keep on running:

const obs = Rx.Observable.create($obs => {
  const timer = Rx.Observable.interval(300)
    .do(i => console.log('emission: ' + i))
    .take(10)
    .subscribe(
      val => $obs.next(val),
      err => $obs.error(err),
      () => $obs.plete()
    );

  return function(){} // empty unsubscribe function, internal subscription will keep on running
});
obs.take(4).subscribe(i => console.log('outer-emission:'+i))
<script src="https://cdnjs.cloudflare./ajax/libs/rxjs/5.0.2/Rx.js"></script>

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744121923a4559440.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信