请选择 进入手机版 | 继续访问电脑版

Reactor3 源码解析六: FluxPublishOn源码剖析

[复制链接]
小小海 发表于 2021-1-2 19:41:42 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
  1.                         Flux.create(emitter -> {                                for (int i = 0; i  {                                System.out.println(Thread.currentThread().getName() + ":Source pushed " + s);                        }).publishOn(Schedulers.parallel())                        .subscribe(custer -> {                                System.out.println(Thread.currentThread().getName() + ":consume " + custer);                        });
复制代码
执行效果:
main:Source created 0
main:Source pushed 0
main:Source created 1
parallel-1:consume 0
main:Source pushed 1
main:Source created 2
parallel-1:consume 1
main:Source pushed 2
main:Source created 3
main:Source pushed 3
main:Source created 4
main:Source pushed 4
main:Source created 5
main:Source pushed 5
parallel-1:consume 2
parallel-1:consume 3
parallel-1:consume 4
parallel-1:consume 5
从执行效果可以看出:  publishOn之前的使用仍然是同步的,publishOn之后的使用酿成了异步,且线程使用了parallel-1.
下面重点研究下PublishOnSubscriber的源码,根据本节及之前的源码,我们发现FluxPublishOn,FluxPeek,FluxFilter等中间层生产者类都会在内部提供一个内部类,如PublishOnSubscriber,这个类一般饰演双重脚色,即兼职消费者和订阅关系,主要的核心逻辑都在此内部类中实现。而FluxArray,FluxCreate等sourceproducer则会提供一个内部类,仅饰演订阅关系的脚色。
PublishOnSubscriber#request
  1.                 public void request(long n) {                        if (Operators.validate(n)) {                                Operators.addCap(REQUESTED, this, n);                                //WIP also guards during request and onError is possible                                trySchedule(this, null, null);                        }                }                void trySchedule(                                @Nullable Subscription subscription,                                @Nullable Throwable suppressed,                                @Nullable Object dataSignal) {                        if (WIP.getAndIncrement(this) != 0) {                                if (cancelled) {                                        if (sourceMode == ASYNC) {                                                // delegates discarding to the queue holder to ensure there is no racing on draining from the SpScQueue                                                queue.clear();                                        }                                        else {                                                // discard given dataSignal since no more is enqueued (spec guarantees serialised onXXX calls)                                                Operators.onDiscard(dataSignal, actual.currentContext());                                        }                                }                                return;                        }                        try {                                worker.schedule(this);                        }                        catch (RejectedExecutionException ree) {                                if (sourceMode == ASYNC) {                                        // delegates discarding to the queue holder to ensure there is no racing on draining from the SpScQueue                                        queue.clear();                                } else if (outputFused) {                                        // We are the holder of the queue, but we still have to perform discarding under the guarded block                                        // to prevent any racing done by downstream                                        this.clear();                                }                                else {                                        // In all other modes we are free to discard queue immediately since there is no racing on pooling                                        Operators.onDiscardQueueWithClear(queue, actual.currentContext(), null);                                }                                actual.onError(Operators.onRejectedExecution(ree, subscription, suppressed, dataSignal,                                                actual.currentContext()));                        }                }
复制代码
针对下游请示的request(n),PublishOnSubscriber并没有直接通报到上游,而是通过worker.schedule(this)举行调理。这是一个异步的使用,新起线程ExecutorServiceWorker,而runnable则是PublishOnSubscriber(身兼三职:订阅关系  消费者  runnable)类自己.
PublishOnSubscriber#run
[code]                public void run() {                        if (outputFused) {                                runBackfused();                        }                        else if (sourceMode == Fuseable.SYNC) {                                runSync();                        }                        else {                                runAsync();                        }                }        void runAsync() {                        int missed = 1;                        final Subscriber
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则


专注素材教程免费分享
全国免费热线电话

18768367769

周一至周日9:00-23:00

反馈建议

27428564@qq.com 在线QQ咨询

扫描二维码关注我们

Powered by Discuz! X3.4© 2001-2013 Comsenz Inc.( 蜀ICP备2021001884号-1 )