借鉴:https://blog.csdn.net/zxfhahaha/article/details/112558429
https://blog.csdn.net/qq_46940224/article/details/131043987
https://blog.csdn.net/weixin_37672801/article/details/128803430
https://www.cnblogs.com/shamo89/p/15715091.html
https://blog.csdn.net/laohuangaa/article/details/119772453
1.事件发布的使用:
事件驱动机制其实是观察者模式(又称发布订阅)具体实现,事件对象(HtwBikeRoadSignEvent)相当于被观察对象, 事件监听器(EventListener) 相当于观察者。@EventListener()括号里面是哪个对象就是监听哪个对象。
HtwBikeRoadSignAction.java: 自己定义的类,在方法中发布事件
@Service public class HtwBikeRoadSignAction implements AssetInBoundAction { @Autowired private HtwBikeRepository htwBikeRepository; @Resource private ApplicationContext applicationContext; @Resource private CityExtInfoIntegrationService cityExtInfoIntegrationService; @Autowired private AssetAddressAndStatusConvertManager convertManager; @Override public AssetsInBoundResult apply(AssetContext context, AssetsInBoundParam param) { AssetsInBoundResult assetsResult = new AssetsInBoundResult(); ListbikeIds = Collections.emptyList(); bikeIds.add(12245L); List htwBikeEntityList = htwBikeRepository.getByIds(bikeIds); HtwBikeRoadSignEvent htwBikeRoadSignEvent = HtwBikeRoadSignEvent.builder() .context(context) .htwBikeEntities(htwBikeEntityList) .build(); //更新车辆信息 modifyHtwBikeInfo(context, htwBikeEntityList); */ // 5.发送车辆绑定事件 applicationContext.publishEvent(htwBikeRoadSignEvent); assetsResult.setInBoundResult(true); return assetsResult; } }
HtwBikeRoadSignEvent.java:
/** * 路面签收事件 */ @Data @Builder public class HtwBikeRoadSignEvent extends AssetSopEvent { /** * 操作上下文 */ private AssetContext context; /** * 车辆信息 */ private ListhtwBikeEntities; /** * 车辆更新字段 */ private Map bikeUpdateMap; /** * 锁更新字段 */ private Map lockUpdateMap; }
HtwBikeRoadSignSrmHandler.java:
/** * 车辆路面签收通知 */ @Component public class HtwBikeRoadSignSrmHandler { @Value("${srm.producer.topic}") private String srmGatewayTopic; /*@Resource(name = "exporterBlocMessageProducer") private BlocMessageProducerImpl producer;*/ @Async @EventListener(HtwBikeRoadSignEvent.class) public void onRoadSign(HtwBikeRoadSignEvent event) { AssetContext context = event.getContext(); ListhtwBikeEntities = event.getHtwBikeEntities(); AssetChangeOrderMessageDTO assetChangeOrderMessageDTO = new AssetChangeOrderMessageDTO(); assetChangeOrderMessageDTO.setChangeType(1); //producer.sendMessage(srmGatewayTopic, assetChangeOrderMessageDTO); } } @Async 添加注解表示这个方法要异步执行
2.Spring的两种监听器的实现有两种方式:
实现ApplicationListener接口 :方法是onApplicationEvent(E event);
注解@EventListener
3.原理: 也可以看这个里面的有源码贴图: https://www.cnblogs.com/shamo89/p/15715091.html 或者 https://blog.csdn.net/laohuangaa/article/details/119772453
第一部分原理:发布事件 applicationContext.publishEvent(htwBikeRoadSignEvent)方法:
ApplicationContext接口: 继承了ApplicationEventPublisher接口。ApplicationContext接口的对象在SpringBoot启动的时候自动装配有初始化。
public interface ApplicationContext extends EnvironmentCapable, ListableBeanFactory, HierarchicalBeanFactory, MessageSource, ApplicationEventPublisher, ResourcePatternResolver { @Nullable String getId(); String getApplicationName(); String getDisplayName(); long getStartupDate(); @Nullable ApplicationContext getParent(); AutowireCapableBeanFactory getAutowireCapableBeanFactory() throws IllegalStateException; }
ApplicationEventPublisher接口:
public interface ApplicationEventPublisher { default void publishEvent(ApplicationEvent event) { publishEvent((Object) event); } void publishEvent(Object event); //本文以看这个方法源码分析 }
AbstractApplicationContext.java:
public abstract class AbstractApplicationContext extends DefaultResourceLoader implements ConfigurableApplicationContext { @Override public void publishEvent(Object event) { publishEvent(event, null); } protected void publishEvent(Object event, @Nullable ResolvableType eventType) { Assert.notNull(event, "Event must not be null"); ApplicationEvent applicationEvent; if (event instanceof ApplicationEvent) { applicationEvent = (ApplicationEvent) event; } else { applicationEvent = new PayloadApplicationEvent<>(this, event); if (eventType == null) { eventType = ((PayloadApplicationEvent>) applicationEvent).getResolvableType(); } } if (this.earlyApplicationEvents != null) { this.earlyApplicationEvents.add(applicationEvent); } else { //重点看这个:调用了SimpleApplicationEventMulticaster类的multicastEvent() getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType); } if (this.parent != null) { if (this.parent instanceof AbstractApplicationContext) { ((AbstractApplicationContext) this.parent).publishEvent(event, eventType); } else { this.parent.publishEvent(event); } } } }
SimpleApplicationEventMulticaster.java: 事件广播器
public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster { @Nullable private Executor taskExecutor; @Nullable private ErrorHandler errorHandler; public SimpleApplicationEventMulticaster() { } @Override public void multicastEvent(ApplicationEvent event) { //调了下一个multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) 方法 multicastEvent(event, resolveDefaultEventType(event)); } @Override public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) { ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event)); Executor executor = getTaskExecutor(); for (ApplicationListener> listener : getApplicationListeners(event, type)) {//感兴趣可以继续看getApplicationListeners方法获取所有的监听器 //根据监听对象类型返回所有的监听器,循环调用 if (executor != null) { //线程池异步调用invokeListener() executor.execute(() -> invokeListener(listener, event)); } else { invokeListener(listener, event); } } } //执行invokeListener() protected void invokeListener(ApplicationListener> listener, ApplicationEvent event) { ErrorHandler errorHandler = getErrorHandler(); if (errorHandler != null) { try { //调用doInvokeListener() doInvokeListener(listener, event); } catch (Throwable err) { errorHandler.handleError(err); } } else { doInvokeListener(listener, event); } } //执行() @SuppressWarnings({"rawtypes", "unchecked"}) private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) { try { //这个地方调了监听器所实现的接口ApplicationListener的onApplicationEvent()。注解@EventListener其实就是生成了一个代理类,代理类也实现了接口ApplicationListener的onApplicationEvent()方法,所以原理一样 listener.onApplicationEvent(event); } catch (ClassCastException ex) { String msg = ex.getMessage(); if (msg == null || matchesClassCastMessage(msg, event.getClass())) { Log logger = LogFactory.getLog(getClass()); if (logger.isTraceEnabled()) { logger.trace("Non-matching event type for listener: " + listener, ex); } } else { throw ex; } } } }
第二部分原理:事件广播器的初始化:
SpringApplication.java: SpringApplication.run(UsualServiceApplication.class, args)启动的入口
public class SpringApplication { public ConfigurableApplicationContext run(String... args) { StopWatch stopWatch = new StopWatch(); stopWatch.start(); ConfigurableApplicationContext context = null; CollectionexceptionReporters = new ArrayList<>(); configureHeadlessProperty(); SpringApplicationRunListeners listeners = getRunListeners(args); listeners.starting(); try { ApplicationArguments applicationArguments = new DefaultApplicationArguments(args); ConfigurableEnvironment environment = prepareEnvironment(listeners, applicationArguments); configureIgnoreBeanInfo(environment); Banner printedBanner = printBanner(environment); context = createApplicationContext(); exceptionReporters = getSpringFactoriesInstances(SpringBootExceptionReporter.class, new Class[] { ConfigurableApplicationContext.class }, context); prepareContext(context, environment, listeners, applicationArguments, printedBanner); //重点在这个方法里面 refreshContext(context); afterRefresh(context, applicationArguments); stopWatch.stop(); if (this.logStartupInfo) { new StartupInfoLogger(this.mainApplicationClass).logStarted(getApplicationLog(), stopWatch); } listeners.started(context); callRunners(context, applicationArguments); } catch (Throwable ex) { handleRunFailure(context, ex, exceptionReporters, listeners); throw new IllegalStateException(ex); } try { listeners.running(context); } catch (Throwable ex) { handleRunFailure(context, ex, exceptionReporters, null); throw new IllegalStateException(ex); } return context; } private void refreshContext(ConfigurableApplicationContext context) { //调用refresh() refresh((ApplicationContext) context); if (this.registerShutdownHook) { try { context.registerShutdownHook(); } catch (AccessControlException ex) { // Not allowed in some environments. } } } //执行refresh() @Deprecated protected void refresh(ApplicationContext applicationContext) { Assert.isInstanceOf(ConfigurableApplicationContext.class, applicationContext); //调用refresh((ConfigurableApplicationContext) applicationContext) refresh((ConfigurableApplicationContext) applicationContext); } //执行refresh((ConfigurableApplicationContext) applicationContext) protected void refresh(ConfigurableApplicationContext applicationContext) { //调用的是AbstractApplicationContext.java抽象类的refresh() applicationContext.refresh(); } }
AbstractApplicationContext.java抽象类:
//执行AbstractApplicationContext.java抽象类的refresh() public abstract class AbstractApplicationContext extends DefaultResourceLoader implements ConfigurableApplicationContext { @Override public void refresh() throws BeansException, IllegalStateException { synchronized (this.startupShutdownMonitor) { prepareRefresh(); ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory(); prepareBeanFactory(beanFactory); try { postProcessBeanFactory(beanFactory); invokeBeanFactoryPostProcessors(beanFactory); registerBeanPostProcessors(beanFactory); initMessageSource(); //调用初始化事件广播器方法 initApplicationEventMulticaster(); onRefresh(); registerListeners(); finishBeanFactoryInitialization(beanFactory); finishRefresh(); } catch (BeansException ex) { if (logger.isWarnEnabled()) { logger.warn("Exception encountered during context initialization - " + "cancelling refresh attempt: " + ex); } destroyBeans(); cancelRefresh(ex); throw ex; } finally { resetCommonCaches(); } } } //执行初始化事件广播器的方法 protected void initApplicationEventMulticaster() { ConfigurableListableBeanFactory beanFactory = getBeanFactory(); if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) { this.applicationEventMulticaster = beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class); if (logger.isTraceEnabled()) { logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]"); } } else { this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory); beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster); if (logger.isTraceEnabled()) { logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " + "[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]"); } } } }
ConfigurableApplicationContext接口: 继承了ApplicationContext接口
public interface ConfigurableApplicationContext extends ApplicationContext, Lifecycle, Closeable { }
ApplicationContext接口:
public interface ApplicationContext extends EnvironmentCapable, ListableBeanFactory, HierarchicalBeanFactory, MessageSource, ApplicationEventPublisher, ResourcePatternResolver { }
4.观察者模式:
包含观察者和目标(被观察者)两种对象。目标和观察者之间存在一对多的关系,通过建立目标与观察者之间的联系,当目标的某个状态发生变化时,所有与之关联的观察者都会得到通知并执行各自的任务。
也称 发布-订阅模式:
订阅:观察者向目标对象注册自己,告诉目标对象自己在观察它。
发布:当目标对象的状态发生改变时,向它所有的观察者发送通知,观察者收到通知后执行各自的操作。
优点:降低了目标与观察者之间的耦合关系。
缺点:目标与观察者之间没有完全解耦,还可能出现循环引用;当观察者对象很多时,通知的发布可能会很耗时。
5.Spring5种标准事件: Spring源码中也是用了事件发布,下面的是监听对象即事件
上下文更新事件(ContextRefreshedEvent): ApplicationContext初始化或刷新,或者说在容器实例化(refresh())时发布事件;
上下文开始事件(ContextStartedEvent):Spring容器启动时,即调用ConfigurableApplicationContext接口的start()时发布事件;
上下文停止事件(ContextStoppedEvent):Spring容器停止时,即调用ConfigurableApplicationContext接口的stop()时发布事件,关闭的容器可以通过start()重启;
上下文关闭事件(ContextClosedEvent):Spring容器关闭时,即调用ConfigurableApplicationContext接口的close()时发布事件,所有的Bean已被销毁,无法重启容器;
请求处理事件(RequestHandledEvent):当一个请求被处理完成时发布事件。