我的编程空间,编程开发者的网络收藏夹
学习永远不晚

源码解读Spring-Integration执行过程

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

源码解读Spring-Integration执行过程

一,前言

Spring-Integration基于Spring,在应用程序中启用了轻量级消息传递,并支持通过声明式适配器与外部系统集成。这一段官网的介绍,概况了整个Integration的用途。个人感觉消息传递是真正的重点。

如上图所示,典型的生产者-消费者模式,中间通过一个特定的通道进行数据传输,说到这,是不是隐隐感觉到queue的存在。确实事实上这个所谓的通道默认就是用的 blockingqueue。

Spring-Integration网上的资料是真少,再加上源码分析的是更少。关于Spring-Integration的基本介绍直接去官网上看更加的直观,这边就不累述了。

今天主要是看个简单的hello word进来分析下整个执行过程。

先看下代码:


<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:beans="http://www.springframework.org/schema/beans"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
			https://www.springframework.org/schema/beans/spring-beans.xsd
			http://www.springframework.org/schema/integration
			https://www.springframework.org/schema/integration/spring-integration.xsd">

	<annotation-config/>

	<channel id="oc" >
		<queue/>
	</channel>
	<beans:bean id="Beans" class="com.example.demo.Beans"/>
	
</beans:beans>

@Configuration
public class Beans {
    @ServiceActivator(inputChannel = "ic", outputChannel = "oc")
    public String sayHello(String name) {
        return "Hello " + name;
    }
}

public class HelloWorldDemo {
    @Test
    public void testDemo() throws Exception {
        ClassPathXmlApplicationContext context =
                new ClassPathXmlApplicationContext("/demo.xml", HelloWorldDemo.class);
        DirectChannel inputChannel = context.getBean("ic", DirectChannel.class);
        PollableChannel outputChannel = context.getBean("oc", PollableChannel.class);
        inputChannel.send(new GenericMessage<String>("World"));
        System.out.println("==> HelloWorldDemo: " + outputChannel.receive(0).getPayload());
        context.close();
    }

}

out:
==> HelloWorldDemo: Hello World

二,ServiceActivator

上面的代码演示了调用方法的入站通道适配器和标准的出站通道适配器, 它们之间是一个带注解的ServiceActivator。关于这个ServiceActivator就是一个消息端点。

消息端点的主要作用是以非侵入性方式将应用程序代码连接到消息传递框架。换句话说,理想情况下,应用程序代码应该不知道消息对象或消息管道。这类似于 MVC 范式中controller 的作用。正如controller 处理 HTTP 请求一样,消息端点处理消息。以及controller 映射到 URL 模式一样,消息端点映射到消息通道。这两种情况的目标是相同的。

ServiceActivator是用于将服务实例连接到消息传递系统的通用端点。必须配置输入消息通道,如果要调用的服务方法能够返回值,还可以提供输出消息通道。

具体流程如下图:

上面的代码比较简单,但是或许会发现我们只定义了输出通道oc,输入通道ic竟然没有定义也能正常应用,是不是很奇怪?带着疑问我们先看下ServiceActivator的源码:

注释上写的很清楚,如果输入通道不存在,将在应用程序上下文中注册具有此名称的DirectChannel 。具体在哪定义,我们后面会看到,现在不急,先一步步来看他的执行过程。

我们全局查找ServiceActivator,看他是哪边进行处理的,最后发现了MessagingAnnotationPostProcessor类,用来处理方法级消息注解的BeanPostProcessor实现。


@Override
	public void afterPropertiesSet() {
		Assert.notNull(this.beanFactory, "BeanFactory must not be null");
		((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(
				IntegrationContextUtils.DISPOSABLES_BEAN_NAME,
				BeanDefinitionBuilder.genericBeanDefinition(Disposables.class, Disposables::new)
						.getRawBeanDefinition());
		this.postProcessors.put(Filter.class, new FilterAnnotationPostProcessor(this.beanFactory));
		this.postProcessors.put(Router.class, new RouterAnnotationPostProcessor(this.beanFactory));
		this.postProcessors.put(Transformer.class, new TransformerAnnotationPostProcessor(this.beanFactory));
		this.postProcessors.put(ServiceActivator.class, new ServiceActivatorAnnotationPostProcessor(this.beanFactory));
		this.postProcessors.put(Splitter.class, new SplitterAnnotationPostProcessor(this.beanFactory));
		this.postProcessors.put(Aggregator.class, new AggregatorAnnotationPostProcessor(this.beanFactory));
		this.postProcessors.put(InboundChannelAdapter.class,
				new InboundChannelAdapterAnnotationPostProcessor(this.beanFactory));
		this.postProcessors.put(BridgeFrom.class, new BridgeFromAnnotationPostProcessor(this.beanFactory));
		this.postProcessors.put(BridgeTo.class, new BridgeToAnnotationPostProcessor(this.beanFactory));
		Map<Class<? extends Annotation>, MethodAnnotationPostProcessor<?>> customPostProcessors =
				setupCustomPostProcessors();
		if (!CollectionUtils.isEmpty(customPostProcessors)) {
			this.postProcessors.putAll(customPostProcessors);
		}
	}

在afterPropertiesSet方法中,我们看到定义了一个后处理器postProcessors,里面注册了相关的注解处理类。包含各种消息端点处理,除了上面写的ServiceActivator,还有过滤器,路由,转换器等各种不同的端点方法。

接着往向下看,既然实现了BeanPostProcessor,那必然要用到postProcessAfterInitialization方法实现,这里的流程大概就是遍历出包含有@ServiceActivator的bean方法,用来做后续处理。我们直接看重点的代码。


Object result = postProcessor.postProcess(bean, beanName, targetMethod, annotations);

三,postProcess

AbstractMethodAnnotationPostProcessor中有个共通方法postProcess用来生成对应的端点信息。具体代码:


@Override
	public Object postProcess(Object bean, String beanName, Method method, List<Annotation> annotations) {
		Object sourceHandler = null;
		if (beanAnnotationAware() && AnnotatedElementUtils.isAnnotated(method, Bean.class.getName())) {
			if (!this.beanFactory.containsBeanDefinition(resolveTargetBeanName(method))) {
				this.logger.debug("Skipping endpoint creation; perhaps due to some '@Conditional' annotation.");
				return null;
			}
			else {
				sourceHandler = resolveTargetBeanFromMethodWithBeanAnnotation(method);
			}
		}
                //生成对应的MessageHandler,用来执行对应的注解的方法
		MessageHandler handler = createHandler(bean, method, annotations);

		if (!(handler instanceof ReactiveMessageHandlerAdapter)) {
			orderable(method, handler);
			producerOrRouter(annotations, handler);

			if (!handler.equals(sourceHandler)) {
				handler = registerHandlerBean(beanName, method, handler);
			}

			handler = annotated(method, handler);
			handler = adviceChain(beanName, annotations, handler);
		}
                //将MessageHandler实现连接到消息端点,生成对应的endpoint。
		AbstractEndpoint endpoint = createEndpoint(handler, method, annotations);
		if (endpoint != null) {
			return endpoint;
		}
		else {
			return handler;
		}
	}

这里面主要是两件事:

  • 根据模板模式中不同的createHandler抽象方法实现,生成对应的MessageHandler。譬如说我们这边的ServiceActivatorAnnotationPostProcessor
  • 将MessageHandler实现连接到消息端点,生成对应的endpoint。

1.createHandler


@Override
	protected MessageHandler createHandler(Object bean, Method method, List<Annotation> annotations) {
		AbstractReplyProducingMessageHandler serviceActivator;
		if (AnnotatedElementUtils.isAnnotated(method, Bean.class.getName())) {
			...
		else {
			serviceActivator = new ServiceActivatingHandler(bean, method);
		}

		String requiresReply = MessagingAnnotationUtils.resolveAttribute(annotations, "requiresReply", String.class);
		if (StringUtils.hasText(requiresReply)) {
			serviceActivator.setRequiresReply(resolveAttributeToBoolean(requiresReply));
		}

		String isAsync = MessagingAnnotationUtils.resolveAttribute(annotations, "async", String.class);
		if (StringUtils.hasText(isAsync)) {
			serviceActivator.setAsync(resolveAttributeToBoolean(isAsync));
		}
                //是否设置了输出通道
		setOutputChannelIfPresent(annotations, serviceActivator);
		return serviceActivator;
	}

createHandler的代码比较简单,就是根据注解中的几个属性还有对应的方法参数,生成ServiceActivatingHandler。追溯下去ServiceActivatingHandler中最后会生成一个委托对象MessagingMethodInvokerHelper用来以反射的方式来执行目标方法。

2.createEndpoint

createEndpoint字面上都能知道是生成消息端点,事实上也是,把生成的handler和对应的管道进行关联。具体看下代码体会:


	protected AbstractEndpoint createEndpoint(MessageHandler handler, @SuppressWarnings("unused") Method method,
			List<Annotation> annotations) {

		AbstractEndpoint endpoint = null;
                //取得注解中inputChannelName 
		String inputChannelName = MessagingAnnotationUtils.resolveAttribute(annotations, getInputChannelAttribute(),
				String.class);
		if (StringUtils.hasText(inputChannelName)) {
			MessageChannel inputChannel;
			try {
                //从beanFactory中取得对应的通道bean
				inputChannel = this.channelResolver.resolveDestination(inputChannelName);
			}
			catch (DestinationResolutionException e) {
                               //取不到,则自动注册一个类型为DirectChannel的inputChannel 
				if (e.getCause() instanceof NoSuchBeanDefinitionException) {
					inputChannel = new DirectChannel();
					this.beanFactory.registerSingleton(inputChannelName, inputChannel);
					inputChannel = (MessageChannel) this.beanFactory.initializeBean(inputChannel, inputChannelName);
					if (this.disposables != null) {
						this.disposables.add((DisposableBean) inputChannel);
					}
				}
				else {
					throw e;
				}
			}
			Assert.notNull(inputChannel, () -> "failed to resolve inputChannel '" + inputChannelName + "'");
                        //生成endpoint 
			endpoint = doCreateEndpoint(handler, inputChannel, annotations);
		}
		return endpoint;
	}

上面的代码中,我们就能清楚的看到为什么我们在demo中没有注册输入通道也能正常应用的原因了,从而回答之前的疑问。


protected AbstractEndpoint doCreateEndpoint(MessageHandler handler, MessageChannel inputChannel,
			List<Annotation> annotations) {
		....
		else if (inputChannel instanceof SubscribableChannel) {
                        //生成SubscribableChannel类型对应的执行端点
			return new EventDrivenConsumer((SubscribableChannel) inputChannel, handler);
		}
		else if (inputChannel instanceof PollableChannel) {
			return pollingConsumer(inputChannel, handler, pollers);
		}
		else {
			throw new IllegalArgumentException("Unsupported 'inputChannel' type: '"
					+ inputChannel.getClass().getName() + "'. " +
					"Must be one of 'SubscribableChannel', 'PollableChannel' or 'ReactiveStreamsSubscribableChannel'");
		}
	}

通道类型一共有两种,一种是发布订阅,一种是可轮询的,我们是默认是走的第一种,因为DirectChannel默认就是个SubscribableChannel。所以最终我们生成了对应的信息端点类EventDrivenConsumer。

我们先看下EventDrivenConsumer整体结构:

EventDrivenConsumer上面有一个抽象类AbstractEndpoint,最上面实现了Lifecycle接口,所以生命周期跟着容器走,我们直接跳到star方法看:


@Override
	protected void doStart() {
		this.logComponentSubscriptionEvent(true);
                //把handler和inputChannel进行绑定
		this.inputChannel.subscribe(this.handler);
		if (this.handler instanceof Lifecycle) {
			((Lifecycle) this.handler).start();
		}
	}

@Override
	public synchronized boolean addHandler(MessageHandler handler) {
		Assert.notNull(handler, "handler must not be null");
		Assert.isTrue(this.handlers.size() < this.maxSubscribers, "Maximum subscribers exceeded");
		boolean added = this.handlers.add(handler);
		if (this.handlers.size() == 1) {
			this.theOneHandler = handler;
		}
		else {
			this.theOneHandler = null;
		}
		return added;
	}

上面的代码主要就是把handler注册到inputChannel中,这样只要inputChannel通道一收到信息,就会通知他注册的handlers进行处理。代码中比较清楚的记录了一切的操作,就不多解释了。

四,发送信息

执行完上面一系列的注册,已经把这一些的通道打通了,剩下的就是真正的发送操作了。下面分析下inputChannel.send(new GenericMessage<String>("World"));看看send操作:


 
	@Override 
	public boolean send(Message<?> messageArg, long timeout) {
		...
		try {
                        //message是否需要转换
			message = convertPayloadIfNecessary(message);
		        //发送前拦截器
			if (interceptorList.getSize() > 0) {
				interceptorStack = new ArrayDeque<>();
				message = interceptorList.preSend(message, this, interceptorStack);
				if (message == null) {
					return false;
				}
			}
			if (this.metricsCaptor != null) {
				sample = this.metricsCaptor.start();
			}
                        //发送操作
			sent = doSend(message, timeout);
			if (sample != null) {
				sample.stop(sendTimer(sent));
			}
			metricsProcessed = true;

			if (debugEnabled) {
				logger.debug("postSend (sent=" + sent + ") on channel '" + this + "', message: " + message);
			}
                        //发送后拦截器
			if (interceptorStack != null) {
				interceptorList.postSend(message, this, sent);
				interceptorList.afterSendCompletion(message, this, sent, null, interceptorStack);
			}
			return sent;
		}
		catch (Exception ex) {
			...
		}
	}

真正的send操作跟下去,会发现层次极深,碍于篇幅,我们直接跟到重点代码:


@Override
	protected final void handleMessageInternal(Message<?> message) {
		Object result;
		if (this.advisedRequestHandler == null) {
                        //反射执行对应的端点方法
			result = handleRequestMessage(message);
		}
		else {
			result = doInvokeAdvisedRequestHandler(message);
		}
		if (result != null) {
                        //往outputChannel发送执行结果 
			sendOutputs(result, message);
		}
		...
	}

handleRequestMessage的操作就是用之前我们handler中的委托类MessagingMethodInvokerHelper去反射运行对应的端点方法,然后把执行结果发送outputChannel。最后我们直接定位到具体的发送操作:


@Override
	protected boolean doSend(Message<?> message, long timeout) {
		Assert.notNull(message, "'message' must not be null");
		try {
			if (this.queue instanceof BlockingQueue) {
				BlockingQueue<Message<?>> blockingQueue = (BlockingQueue<Message<?>>) this.queue;
				if (timeout > 0) {
					return blockingQueue.offer(message, timeout, TimeUnit.MILLISECONDS);
				}
				if (timeout == 0) {
					return blockingQueue.offer(message);
				}
				blockingQueue.put(message);
				return true;
			}
			else {
				try {
					return this.queue.offer(message);
				}
				finally {
					this.queueSemaphore.release();
				}
			}
		}
		catch (InterruptedException e) {
			Thread.currentThread().interrupt();
			return false;
		}
	}

看到这,我们就明白了数据的去向,存储在队列里了,生产者产生的数据就已经生成了,所以发送的操作基本上就告一段落了。

五,接收信息

数据已经生成,后面就是看如何消费操作了,下面分析下 outputChannel.receive(0).getPayload()操作:




	@Override // NOSONAR complexity
	@Nullable
	public Message<?> receive(long timeout) {
		...
		try {
			//接受前拦截器操作
			if (interceptorList.getSize() > 0) {
				interceptorStack = new ArrayDeque<>();
                                //一旦调用接收并在实际检索消息之前调用
				if (!interceptorList.preReceive(this, interceptorStack)) {
					return null;
				}
			}
                        //接收操作
			Message<?> message = doReceive(timeout);
		        ...
                        //在检索到 Message 之后但在将其返回给调用者之前立即调用。 必要时可以修改消息
			if (interceptorStack != null && message != null) {
				message = interceptorList.postReceive(message, this);
			}
                        //在接收完成后调用,而不管已引发的任何异常,从而允许适当的资源清理
			interceptorList.afterReceiveCompletion(message, this, null, interceptorStack);
			return message;
		}
		catch (RuntimeException ex) {
			...
		}
	}

最后的doReceive操作,其实大家都心知肚明了,就是从上面的队列中直接读取数据,代码比较简单,就不注释了:


@Override
	@Nullable
	protected Message<?> doReceive(long timeout) {
		try {
			if (timeout > 0) {
				if (this.queue instanceof BlockingQueue) {
					return ((BlockingQueue<Message<?>>) this.queue).poll(timeout, TimeUnit.MILLISECONDS);
				}
				else {
					return pollNonBlockingQueue(timeout);
				}
			}
			if (timeout == 0) {
				return this.queue.poll();
			}

			if (this.queue instanceof BlockingQueue) {
				return ((BlockingQueue<Message<?>>) this.queue).take();
			}
			else {
				Message<?> message = this.queue.poll();
				while (message == null) {
					this.queueSemaphore.tryAcquire(50, TimeUnit.MILLISECONDS); // NOSONAR ok to ignore result
					message = this.queue.poll();
				}
				return message;
			}
		}
		catch (InterruptedException e) {
			Thread.currentThread().interrupt();
			return null;
		}
	}

六,结语

能坚持看到这的,基本上都是勇士了。这一系列的执行过程其实还是比较绕的,我估计有些人看得也是云里雾里。其实我已经尽量精简了许多,Spring-Integration其实涉及到的应用分支更多,我这也只是十分基础的东西,我只能把我自己知道的先记录下来。如果让你对Spring-Integration产生了兴趣,那本文的目的就达到了。这需要你自己去实地操作研究下,总是有收获的。

以上就是源码简析Spring-Integration执行流程的详细内容,更多关于Spring Integration执行的资料请关注编程网其它相关文章!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

源码解读Spring-Integration执行过程

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Spring@Bean注解深入分析源码执行过程

随着SpringBoot的流行,我们现在更多采用基于注解式的配置从而替换掉了基于XML的配置,所以本篇文章我们主要探讨基于注解的@Bean以及和其他注解的使用
2023-01-10

useEffect 返回函数执行过程源码解析

这篇文章主要为大家介绍了useEffect 返回函数执行过程源码解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-05-16

spring的异步执行使用与源码详解

这篇文章主要介绍了spring的异步执行使用与源码详解,Spring中通过在方法上设置@Async注解,可使得方法被异步调用,需要的朋友可以参考下
2023-05-20

Django执行源生mysql语句实现过程解析

1.使用extra方法解释:结果集修改器,一种提供额外查询参数的机制说明:依赖model模型 使用方式:用在where后:Book.objects.filter(publisher_id="1").extra(where=["title='
2022-05-11

【Mybatis源码解析】mapper实例化及执行流程源码分析

文章目录 简介 环境搭建 源码解析 附 基础环境:JDK17、SpringBoot3.0、mysql5.7 储备知识:《【Spring6源码・AOP】AOP源码解析》、《JDBC详细
2023-08-20

Java从源码角度解析SpringMVC执行流程

这篇文章主要介绍了Java从源码角度解析SpringMVC执行流程,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
2023-05-16

ThreadPoolExecutor参数含义及源码执行流程详解

这篇文章主要为大家介绍了ThreadPoolExecutor参数含义及源码执行流程详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2022-11-13

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录