Springboot 3.x - Reactive programming

一、Preliminary Knowledge

  1. Functional Interface
  2. Lambda expression
  3. Stream API
    1. Intermediate operation
      1. filter:Used to filter elements in a stream
      2. map:One-to-one conversion
      3. flatMap:One-to-many conversion
      4. distinct、sorted、peek、limit、skip、takeWhile…
    2. Terminal operation
      1. collect:toList/toMap/groupingBy

二、Reactor Core

1、Reactive Stream

https://www.reactive-streams.org

Java’s Reactive Streams is a standardized API for asynchronously processing data streams, designed to support backpressure mechanisms to ensure that system resources are not over-consumed due to mismatches in the speeds of producers and consumers during asynchronous data processing. The main goal of Reactive Streams is to provide a compatible and unified API that allows different libraries and frameworks to work seamlessly together, especially when dealing with large-scale data streams.

Core Concepts

The Reactive Streams API is primarily composed of the following four core interfaces:

  1. Publisher:A publisher, the source that produces the data stream.

    // The Publisher interface has only one method, subscribe, which is used to subscribe to the data stream.
    public interface Publisher<T> {
        void subscribe(Subscriber<? super T> s);
    }
    
  2. Subscriber:A subscriber consumes the data stream at the terminal.

    public interface Subscriber<T> {
        // Called when a subscriber subscribes, passing a Subscription object.
        void onSubscribe(Subscription s);
        // Called when a new data item is produced.
        void onNext(T t);
        //  Called when an error occurs.
        void onError(Throwable t);
        // Called when the data stream ends.
        void onComplete();
    }
    
  3. Subscription:A subscription manages the relationship between the publisher and the subscriber, including operations for requesting and canceling data.

    public interface Subscription {
        // Requesting the number of data items.
        void request(long n);
        // Canceling the subscription.
        void cancel();
    }
    
  4. Processor<T, R>:A processor is both a publisher and a subscriber, used to process data within the data stream.

    // The Processor interface inherits from both the Subscriber and Publisher interfaces, indicating that it can act as both a consumer and producer of data.
    public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
    }
    
Backpressure Mechanism and How It Works

Backpressure is an important concept in Reactive Streams, allowing subscribers to control the rate at which they receive data from publishers to prevent publishers from producing data too quickly for subscribers to handle. The backpressure mechanism is implemented through the request method in the Subscription interface, where subscribers can call this method to specify the number of data items they are able to handle.

The key to the backpressure mechanism lies in the Subscription interface, which provides two methods: request(long n) and cancel(). The main workflow of backpressure is as follows:

  1. Subscribers Request Data: When a subscriber subscribes to a publisher, it receives a Subscription object. The subscriber calls the request method of the Subscription to request the number of data items it can handle.
  2. Publishers Respond to Requests: Based on the subscriber’s request, the publisher sends the specified number of data items to the subscriber. If the subscriber does not request new data items, the publisher will not send data.
  3. Dynamically Adjusting the Requested Quantity: Subscribers can dynamically adjust the number of requested data items based on their processing capabilities to avoid data accumulation.
Backpressure Strategies

In practice, different backpressure strategies can be adopted based on specific needs:

  1. Directly Requesting All Data: If the consumer can handle all the data, it can request all the data at once, but this may lead to high memory usage.
  2. Batch Requesting Data: Request data in batches, each time requesting a batch of data items that can be processed.
  3. Requesting Data on Demand: Dynamically adjust the number of requested data items based on the real-time processing capability of the consumer.
Common Implementations of Backpressure Strategies

Different implementations of Reactive Streams provide a variety of built-in backpressure strategies. For example, Project Reactor and RxJava both offer several backpressure strategies:

  • Buffer: Buffer all data items until the consumer processes them.
  • Drop: Discard new data items, retaining old ones.
  • Latest: Only keep the latest data item, discarding old ones.
  • Error: Throw an error when exceeding the buffer limit.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/781739.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Linux muduo 网络库

主要记录示意图和知识点框架&#xff1a; 1、阻塞、非阻塞、同步、异步 在处理IO的时候&#xff0c;阻塞和非阻塞都是同步IO&#xff0c;只有使用了特殊的API才是异步IO。 2、五种IO模型&#xff1a; 阻塞、非阻塞、IO复用、信号驱动、异步IO 3、muduo网络库 muduo网络库给用…

【python】python当当数据分析可视化聚类支持向量机预测(源码+数据集+论文)【独一无二】

&#x1f449;博__主&#x1f448;&#xff1a;米码收割机 &#x1f449;技__能&#x1f448;&#xff1a;C/Python语言 &#x1f449;公众号&#x1f448;&#xff1a;测试开发自动化【获取源码商业合作】 &#x1f449;荣__誉&#x1f448;&#xff1a;阿里云博客专家博主、5…

windows 服务器安装svn服务端、迁移svn

svn服务器版本 因为要把旧svn迁移到新的svn&#xff0c;为了保证迁移后的稳定性&#xff0c;安装包使用的旧服务器的svn服务器版本 VisualSVN-Server-3.6.1-x64.msi 安装 配置仓库路径等 其他没截图的就默认配置下一步即可。安装完成先不要启动 迁移 旧的svn服务器直接把…

Windows远程桌面实现之十五:投射浏览器摄像头到xdisp_virt以及再次模拟摄像头(一)

by fanxiushu 2024-07-01 转载或引用请注明原始作者。 本文还是围绕xdisp_virt这个软件展开&#xff0c; 再次模拟成摄像头这个比较好理解&#xff0c;早在很久前&#xff0c;其实xdisp_virt项目中就有摄像头功能&#xff0c; 只是当时是分开的&#xff0c;使用起来…

【SpringBoot】IDEA查看spring bean的依赖关系

前因&#xff1a;在研究springcloud config组件时&#xff0c;我发现config-server包下的EnvironmentController不在扫描的包路径下却可以响应客户端的请求&#xff0c;这吸引了我的注意&#xff0c;我的问题是&#xff1a;EnvironmentController是怎么被添加进bean工厂的。本章…

Golang | Leetcode Golang题解之第218题天际线问题

题目&#xff1a; 题解&#xff1a; type pair struct{ right, height int } type hp []pairfunc (h hp) Len() int { return len(h) } func (h hp) Less(i, j int) bool { return h[i].height > h[j].height } func (h hp) Swap(i, j int) { h[i], h[j]…

26_嵌入式系统网络接口

以太网接口基本原理 IEEE802标准 局域网标准协议工作在物理层和数据链路层&#xff0c;其将数据链路层又划分为两层&#xff0c;从下到上分别为介质访问控制子层(不同的MAC子层&#xff0c;与具体接入的传输介质相关),逻辑链路控制子层(统一的LLC子层&#xff0c;为上层提供统…

CosyVoice多语言、音色和情感控制模型,one-shot零样本语音克隆模型本地部署(Win/Mac),通义实验室开源

近日&#xff0c;阿里通义实验室开源了CosyVoice语音模型&#xff0c;它支持自然语音生成&#xff0c;支持多语言、音色和情感控制&#xff0c;在多语言语音生成、零样本语音生成、跨语言声音合成和指令执行能力方面表现卓越。 CosyVoice采用了总共超15万小时的数据训练&#…

GuitarPro2024音乐软件#创作神器#音乐梦想

嘿&#xff0c;亲爱的朋友们&#xff01;&#x1f44b;&#x1f44b;&#x1f44b;今天我要给你们安利一款超赞的软件——Guitar Pro。这款软件简直是吉他手的福音啊&#xff01;&#x1f389;&#x1f389;&#x1f389; Guitar Pro免费绿色永久安装包下载&#xff1a;&#…

如何快速申请免费SSL证书,实现网站HTTPS安全传输

随着互联网技术的飞速发展&#xff0c;网络安全已成为不可忽视的重要议题。HTTPS协议&#xff0c;作为HTTP协议的安全版本&#xff0c;通过SSL协议加密客户端与服务器之间的数据传输&#xff0c;从而保障信息在传输过程中的安全性。对于网站运营者而言&#xff0c;为网站部署SS…

SpringBoot测试类注入Bean失败的原因

针对SpringBoot的测试类&#xff0c;2.2版本之前和之后是不一样的。 2.2版本之后 导包pom.xml 添加test依赖 <!-- starter-test&#xff1a;junit spring-test mockito --> <dependency><groupId>org.springframework.boot</groupId><artifac…

论文解析——FTRANS: Energy-Efficient Acceleration of Transformers using FPGA

作者及发刊详情 Li B , Pandey S , Fang H ,et al.FTRANS: energy-efficient acceleration of transformers using FPGA[J].ACM, 2020.DOI:10.1145/3370748.3406567. 摘要 正文 主要工作贡献 与CPU和GPU在执行Transformer和RoBERTa相比&#xff0c;提出的FTRANS框架获得了…

ansible常见问题配置好了密码还是报错

| FAILED! > { “msg”: “Using a SSH password instead of a key is not possible because Host Key checking is enabled and sshpass does not support this. Please add this host’s fingerprint to your known_hosts file to manage this host.” } 怎么解决&#xf…

[终端安全]-3 移动终端之硬件安全(TEE)

&#xff08;参考资料&#xff1a;TrustZone for V8-A. pdf&#xff0c;来源ARM DEVELOPER官网&#xff09; TEE&#xff08;Trusted Execution Environment&#xff0c;可信执行环境&#xff09;是用于执行敏感代码和处理敏感数据的独立安全区域&#xff1b;以ARM TrustZone为…

分布式技术栈、微服务架构 区分

1.分布式技术栈 这些技术栈都是为了更好的开发分布式架构的项目。 &#xff08;大营销平台的系统框架如下图&#xff0c;扩展的分布式技术栈&#xff09; &#xff08;1&#xff09;Dubbo——分布式技术栈 DubboNacos注册中心是应用可以分布式部署&#xff0c;并且提供RPC接…

HTML5使用<pre>标签:保留原始排版方式

在网页创作中&#xff0c;一般是通过各种标记对文字进行排版的。但是在实际应用中&#xff0c;往往需要一些特殊的排版效果&#xff0c;这样使用标记控制起来会比较麻烦。解决的方法就是保留文本格式的排版效果&#xff0c;如空格、制表符等。 如果要保留原始的文本排版效果&a…

redis并发、穿透、雪崩

Redis如何实现高并发 首先是单线程模型&#xff1a;redis采用单线程可以避免多线程下切换和竞争的开销&#xff0c;提高cpu的利用率&#xff0c;如果是多核cpu&#xff0c;可以部署多个redis实例。基于内存的数据存储&#xff1a;redis将数据存储在内存中&#xff0c;相比于硬…

回溯算法-以景点门票销售管理系统为例

1.回溯算法介绍 1.来源 回溯算法也叫试探法&#xff0c;它是一种系统地搜索问题的解的方法。 用回溯算法解决问题的一般步骤&#xff1a; 1、 针对所给问题&#xff0c;定义问题的解空间&#xff0c;它至少包含问题的一个&#xff08;最优&#xff09;解。 2 、确定易于搜…

唤醒知识循环,共筑绿色阅读梦——探索旧书回收小程序的无限可能

在这个信息爆炸的时代&#xff0c;书籍作为知识与智慧的载体&#xff0c;其重要性不言而喻。然而&#xff0c;随着电子阅读的兴起和书籍更新换代的加速&#xff0c;大量旧书被束之高阁&#xff0c;甚至面临被遗弃的命运。这不仅是对宝贵文化资源的浪费&#xff0c;也是对环境保…

12 电商高并发缓存实战

序章 项目代码缓存的数据一致性 延时双删 淘汰缓存写数据库休眠1s,再次淘汰缓存缺点:如果mysql是主从复制,去从库中拿去数据,此时同步数据还未完成,拿到的数据是旧数据。 先更新 DB,后删除缓存 采用异步延时删除策略. ①利用消息队列进行删除的补偿。②Mysql 数据库更新操…