I am trying to bring spring cloud stream into a spring based web application. I was trying to define the beans as below. The reason i opted for spring-cloud-stream because of its binder implementation to switch between different broker or multiple brokers. The code I have implemented is as below.
<context:annotation-config />
<rabbit:connection-factory
id="rabbitMQconnectionFactory"
host="localhost"
username="guest"
password="guest"
port="5672"
virtual-host="/"
/>
<rabbit:admin
id="rabbitAdmin"
connection-factory="rabbitMQconnectionFactory"
/>
<bean
id="rabbitProperties" class=".springframework.boot.autoconfigure.amqp.RabbitProperties"
>
<property name="host" value="localhost"/>
<property name="port" value="5672"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="virtualHost" value="/"/>
</bean>
<bean id="rabbitExchangeQueueProvisioner" class=".springframework.cloud.stream.binder.rabbit.provisioning.RabbitExchangeQueueProvisioner">
<constructor-arg ref="rabbitMQconnectionFactory"/>
</bean>
<bean id="rabbitMessageChannelBinder" class="com.journaldev.spring.config.CustomRabbitMessageChannelBinder">
<constructor-arg ref="rabbitMQconnectionFactory"/>
<constructor-arg ref="rabbitProperties"/>
<constructor-arg ref="rabbitExchangeQueueProvisioner"/>
</bean>
<bean class="com.journaldev.spring.config.RabbitBinderPostProcessor"/>
<bean id="bindingServiceProperties" class=".springframework.cloud.stream.config.BindingServiceProperties">
<property name="bindings">
<map>
<entry key="personInputChannel">
<bean class=".springframework.cloud.stream.config.BindingProperties">
<property name="destination" value="testQueue"/>
<property name="group" value="personConsumerGroup"/>
<property name="consumer">
<bean class=".springframework.cloud.stream.binder.ConsumerProperties">
<property name="concurrency" value="1"/>
</bean>
</property>
</bean>
</entry>
<entry key="personOutputChannel">
<bean class=".springframework.cloud.stream.config.BindingProperties">
<property name="destination" value="testQueue"/>
<property name="producer">
<bean class=".springframework.cloud.stream.binder.ProducerProperties">
<property name="requiredGroups">
<list>
<value>personConsumerGroup</value>
</list>
</property>
</bean>
</property>
</bean>
</entry>
<entry key="addressInputChannel">
<bean class=".springframework.cloud.stream.config.BindingProperties">
<property name="destination" value="testQueue1"/>
<property name="group" value="addressConsumerGroup"/>
<property name="consumer">
<bean class=".springframework.cloud.stream.binder.ConsumerProperties">
<property name="concurrency" value="1"/>
</bean>
</property>
</bean>
</entry>
<entry key="addressOutputChannel">
<bean class=".springframework.cloud.stream.config.BindingProperties">
<property name="destination" value="testQueue1"/>
<property name="producer">
<bean class=".springframework.cloud.stream.binder.ProducerProperties">
<property name="requiredGroups">
<list>
<value>addressConsumerGroup</value>
</list>
</property>
</bean>
</property>
</bean>
</entry>
</map>
</property>
</bean>
<bean id="binderConfiguration"
class=".springframework.cloud.stream.binder.BinderConfiguration">
<constructor-arg name="binderType" value="rabbit"/>
<constructor-arg name="properties">
<bean class="java.util.HashMap"/>
</constructor-arg>
<constructor-arg name="inheritEnvironment" value="true"/>
<constructor-arg name="defaultCandidate" value="true"/>
</bean>
<bean id="binderType" class=".springframework.cloud.stream.binder.BinderType">
<constructor-arg name="" value="rabbit"/>
<constructor-arg>
<list>
<!-- <value>.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder</value> -->
<value>.springframework.cloud.stream.binder.rabbit.config.RabbitBinderConfiguration</value>
</list>
</constructor-arg>
</bean>
<bean id="binderTypeRegistry" class=".springframework.cloud.stream.binder.DefaultBinderTypeRegistry">
<constructor-arg name="binderTypes">
<map>
<entry key="rabbit" value-ref="binderType"/>
</map>
</constructor-arg>
</bean>
<bean id="binderFactory"
class=".springframework.cloud.stream.binder.DefaultBinderFactory">
<constructor-arg>
<map>
<entry key="rabbit">
<ref bean="binderConfiguration"/>
</entry>
</map>
</constructor-arg>
<constructor-arg ref="binderTypeRegistry"/>
<constructor-arg><null/></constructor-arg>
</bean>
<bean id="bindingService" class=".springframework.cloud.stream.binding.BindingService">
<constructor-arg ref="bindingServiceProperties"/>
<constructor-arg ref="binderFactory"/>
<constructor-arg>
<bean class="com.fasterxml.jackson.databind.ObjectMapper"/>
</constructor-arg>
</bean>
<bean id="personInputChannel" class=".springframework.integration.channel.DirectChannel"/>
<bean id="personOutputChannel" class=".springframework.integration.channel.DirectChannel"/>
<bean id="addressInputChannel" class=".springframework.integration.channel.DirectChannel"/>
<bean id="addressOutputChannel" class=".springframework.integration.channel.DirectChannel"/>
<bean id="personProcessor" class="com.journaldev.spring.message.processor.PersonProcessor">
<property name="addressService" ref="addressService"/>
<property name="batchSize" value="10"/>
</bean>
<int:service-activator input-channel="personInputChannel"
ref="personConsumerFunction"
method="accept"/>
<int:service-activator input-channel="addressInputChannel"
ref="addressConsumerFunction"
method="accept" />
<!-- Define Spring Cloud Stream Functions -->
<bean id="personConsumerFunction" class="com.journaldev.spring.message.functions.PersonConsumerFunction">
<property name="processor" ref="personProcessor"/>
</bean>
<bean id="addressConsumerFunction" class="com.journaldev.spring.message.functions.AddressConsumerFunction">
<property name="addressService" ref="addressService"/>
</bean>
<!-- Spring Integration environment beans -->
<bean id="integrationConversionService" class=".springframework.context.support.ConversionServiceFactoryBean"/>
<!--bean id="integrationHeaderChannelRegistry" class=".springframework.integration.channel.DefaultHeaderChannelRegistry"/>-->
<bean id="dynamicDestinationsBindable" class=".springframework.cloud.stream.binding.DynamicDestinationsBindable"/>
<bean id="streamFunctionProperties" class=".springframework.cloud.stream.function.StreamFunctionProperties">
</bean>
<bean id="bindable" class=".springframework.cloud.stream.binding.SubscribableChannelBindingTargetFactory"/>
<bean id="outputBindingLifecycle" class=".springframework.cloud.stream.binding.OutputBindingLifecycle">
<constructor-arg ref="bindingService"/>
<constructor-arg>
<map>
<entry key="personOutputChannel" value-ref="personOutputChannel"/>
<entry key="addressOutputChannel" value-ref="addressOutputChannel"/>
</map>
</constructor-arg>
</bean>
<bean id="inputBindingLifecycle" class=".springframework.cloud.stream.binding.InputBindingLifecycle">
<constructor-arg ref="bindingService"/>
<constructor-arg>
<map>
<entry key="personInputChannel" value-ref="personInputChannel"/>
<entry key="addressInputChannel" value-ref="addressInputChannel"/>
</map>
</constructor-arg>
</bean>
<bean id="compositeMessageConverter" class=".springframework.messaging.converter.CompositeMessageConverter" primary="true">
<constructor-arg>
<list>
<bean class=".springframework.messaging.converter.MappingJackson2MessageConverter"/>
<bean class=".springframework.messaging.converter.StringMessageConverter"/>
<bean class=".springframework.integration.support.converter.SimpleMessageConverter"/>
</list>
</constructor-arg>
</bean>
<bean id="messageConverterConfigurer" class=".springframework.cloud.stream.binding.MessageConverterConfigurer">
<constructor-arg ref="bindingServiceProperties"/>
<constructor-arg ref="compositeMessageConverter"/>
<constructor-arg><null/></constructor-arg>
</bean>
<bean id="messageSourceBindingTargetFactory" class=".springframework.cloud.stream.binding.MessageSourceBindingTargetFactory">
<constructor-arg ref="compositeMessageConverter"/>
<constructor-arg ref="messageConverterConfigurer"/>
</bean>
<bean id="binderAwareChannelResolver" class=".springframework.cloud.stream.binding.BinderAwareChannelResolver">
<constructor-arg ref="bindingService"/>
<constructor-arg ref="messageSourceBindingTargetFactory"/>
<constructor-arg ref="dynamicDestinationsBindable"/>
</bean>
<bean id="lifecycleProcessor" class=".springframework.context.support.DefaultLifecycleProcessor">
<property name="timeoutPerShutdownPhase" value="30000"/>
</bean>
<bean id="springCloudStreamInitializer" class="com.journaldev.spring.config.SpringCloudStreamInitializer"/>
<!-- Message Producer Service -->
<bean id="messageProducerService" class="com.journaldev.spring.message.functions.MessageProducerService">
<property name="personOutputChannel" ref="personOutputChannel"/>
<property name="addressOutputChannel" ref="addressOutputChannel"/>
<property name="channelResolver" ref="binderAwareChannelResolver"/>
</bean>
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744250465a4565144.html
评论列表(0条)