最近在学响应式编程,这里先记录下,响应式编程的一些基础内容
Reactive Streams、Reactor、WebFlux以及响应式编程之间存在密切的关系,它们共同构成了在Java生态系统中处理异步和响应式编程的一系列工具和框架。
Reactive Streams:
Reactor:
WebFlux:
响应式编程:
综上所述,Reactive Streams 提供了规范,Reactor 是一个实现了该规范的响应式编程框架,而WebFlux是Spring对于响应式编程的支持。它们共同致力于构建异步、非阻塞、响应式的应用程序。响应式编程则是一种更广义的编程范式,与Reactive Streams和Reactor等具体实现密切相关。
在java.util.concurrent.Flow 类中,定义了Reactive Streams规范
interface Publisher{ void subscribe(Subscriber super T> subscriber); }
interface Subscriber{ void onSubscribe(Subscription subscription); void onNext(T item); void onError(Throwable throwable); void onComplete(); }
onSubscribe(Subscription subscription): 在订阅关系建立时调用。通过这个方法,订阅者可以持有 Subscription 对象,以便后续请求数据和取消订阅。
onNext(T item): 在接收到新元素时调用。订阅者通过这个方法处理收到的数据。
onError(Throwable throwable): 在数据流中出现错误时调用。订阅者通过这个方法处理错误情况。
onComplete(): 在数据流完成时调用。通知订阅者数据流结束,不再有新的元素。
interface Subscription { void request(long n); void cancel(); }
request(long n): 用于请求订阅者处理指定数量的元素。订阅者通过这个方法告知发布者它可以处理多少个元素。
cancel(): 用于取消订阅关系。当订阅者不再需要接收数据时,调用此方法取消订阅。
interface Processorextends Subscriber , Publisher { }
Processor 接口是 Subscriber 和 Publisher 的组合,表示一个中间处理组件,可以同时充当订阅者和发布者的角色。
Subscriber 部分的方法:onSubscribe(Subscription subscription), onNext(T item), onError(Throwable throwable), onComplete()。
Publisher 部分的方法:subscribe(Subscriber super R> subscriber)。表示 Processor 可以被其他订阅者订阅。
泛型T即为数据流
这些方法共同构成 Reactive Streams 协议,定义了发布者和订阅者之间的协作方式,以及订阅者如何处理数据流。在实际的使用中,这些方法的实现通常需要考虑异步处理、背压机制等方面,以确保响应式编程的目标得以实现。
在 Reactive Streams 中,Publisher、Subscriber、Subscription 和 Processor 之间的协作流程如下:
有时间再补流程图
Publisher(发布者):
interface Publisher{ void subscribe(Subscriber super T> subscriber); }
Subscriber(订阅者):
interface Subscriber{ void onSubscribe(Subscription subscription); void onNext(T item); void onError(Throwable throwable); void onComplete(); }
Subscription(订阅):
interface Subscription { void request(long n); void cancel(); }
Processor(处理器):
interface Processorextends Subscriber , Publisher { }
这些接口一起构成了 Reactive Streams 的基本协议。发布者产生数据,订阅者订阅数据流并通过 onNext 方法接收元素,订阅者通过 request 方法请求处理一定数量的元素,同时可以通过 cancel 方法取消订阅。Processor 则可以用于在订阅者和发布者之间进行数据转换和处理。在 Reactive Streams 的实现中,这些接口的方法调用是异步进行的,以支持非阻塞的数据流处理。
自己实现了一个,参考了SubmissionPublisher
- 同步实现的
- 功能不完善
- 有bug
class MyPublisher implements Flow.Publisher{ MySubscription subscription; public int request ; public void publish(String item){ subscription.items.add(item); while (true) { if (request > 0) { for (int i = 0; i < request; i++) { if (!subscription.items.isEmpty()) { try { Object o = subscription.items.get(subscription.items.size() - 1); subscription.subscriber.onNext(o.toString()); subscription.items.remove(o); }catch (Exception e){ subscription.subscriber.onError(e); return; } } } } if (subscription.items.isEmpty()) { break; } } } @Override public void subscribe(Flow.Subscriber super String> subscriber) { System.out.println("第一步:绑定订阅者" ); MySubscription subscription = new MySubscription<>(subscriber,this); this.subscription = subscription; subscriber.onSubscribe(subscription); } } class MySubscriber implements Flow.Subscriber { private Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { System.out.println("第二步:接收Subscription" ); this.subscription = subscription; // 请求订阅者处理的元素数量 subscription.request(1); } @Override public void onNext(String item) { System.out.println("第四步:推送数据" ); System.out.println("MySubscriber 消费了item = " + item); subscription.request(1); } @Override public void onError(Throwable throwable) { System.out.println("出异常了 = " + throwable); } @Override public void onComplete() { } } class MySubscription implements Flow.Subscription{ final Flow.Subscriber super T> subscriber; final MyPublisher publisher; List items = new ArrayList(); public MySubscription(Flow.Subscriber super T> subscriber, MyPublisher publisher) { this.subscriber = subscriber; this.publisher = publisher; } @Override public void request(long n) { this.publisher.request++; System.out.println("第三步:拉取请求" ); } @Override public void cancel() { } } public class FlowDemo { public static void main(String[] args) { MyPublisher myPublisher = new MyPublisher(); MySubscriber mySubscriber = new MySubscriber(); myPublisher.subscribe(mySubscriber); myPublisher.publish("111"); myPublisher.publish("222"); myPublisher.publish(null); } }
class SimplePublisher implements Flow.Publisher{ private final SubmissionPublisher publisher = new SubmissionPublisher<>(); public void publishItems() { for (int i = 1; i <= 5; i++) { publisher.submit(i); } // 发布者完成发布 publisher.close(); } @Override public void subscribe(Flow.Subscriber super Integer> subscriber) { publisher.subscribe(subscriber); } } class SimpleSubscriber implements Flow.Subscriber { private Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; // 请求订阅者处理的元素数量 subscription.request(1); } @Override public void onNext(Integer item) { System.out.println("Received item: " + item); // 处理完一个元素后请求下一个 subscription.request(1); } @Override public void onError(Throwable throwable) { System.err.println("Error occurred: " + throwable.getMessage()); } @Override public void onComplete() { System.out.println("Processing completed."); } } public class ReactiveStreamsExample { public static void main(String[] args) throws InterruptedException { // 创建发布者和订阅者 SimplePublisher simplePublisher = new SimplePublisher(); SimpleSubscriber simpleSubscriber = new SimpleSubscriber(); // 订阅者订阅发布者 simplePublisher.subscribe(simpleSubscriber); // 发布者发布数据 simplePublisher.publishItems(); // 睡一觉,确保数据处理完成 Thread.sleep(3000); } }
学习打卡:Java学习笔记-day05-响应式编程初探-自定义实现Reactive Streams规范