生产者
spring 配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13
| <rabbit:connection-factory id="connectionFactory" host="localhost" port="5672" publisher-confirms="true" virtual-host="/" username="absurd" password="absurd" />
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
|
service
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Service public class ProducerServiceImpl implements ProducerService{
@Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String msg, String routingKey,String exchange) { System.err.println("err"+msg+routingKey+exchange); RabbitAdmin admin = new RabbitAdmin(this.rabbitTemplate.getConnectionFactory()); admin.declareExchange(new FanoutExchange(exchange,true,false)); admin.declareQueue(new Queue(routingKey,true,false,false) ); admin.declareBinding(new Binding(routingKey, DestinationType.QUEUE, exchange, routingKey, null)); rabbitTemplate.convertAndSend(exchange,routingKey,msg); rabbitTemplate.convertAndSend(routingKey,msg); }
}
|
controller
1 2 3 4 5 6 7 8 9 10
| @RequestMapping(value="/publish/{msg}/{queue}/{exchange}",method=RequestMethod.GET) public ModelAndView publish(@PathVariable(value="msg")String msg,@PathVariable(value="queue")String queue,@PathVariable(value="exchange")String exchange){ ModelAndView mv = new ModelAndView(); producerService.sendMessage(msg, queue,exchange); System.out.println(msg); mv.setViewName("a"); mv.addObject("msg", msg); return mv;
}
|
消费者
spring配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| <rabbit:connection-factory id="connectionFactory" host="localhost" publisher-confirms="true" virtual-host="/" username="absurd" password="absurd" /> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" task-executor="taskExecutor"> <rabbit:listener queues="queue" method="onMessage" ref="redQueueListener" /> </rabbit:listener-container> <rabbit:queue name="queue" durable="true" />
<bean id ="taskExecutor" class ="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" > <property name ="corePoolSize" value ="5" /> <property name ="keepAliveSeconds" value ="30000" /> <property name ="maxPoolSize" value ="1000" /> <property name ="queueCapacity" value ="200" /> </bean> <bean id="redQueueListener" class="com.absurd.rabbitmqcustomer.RedQueueListener" />
|
监听方法
1 2 3 4 5 6 7 8 9 10 11
| public class RedQueueListener { private static Logger log = LoggerFactory.getLogger(RedQueueListener.class);
public void onMessage(String message) { log.info("RedQueueListener Receved:" + message); } }
|
rabbitmq引入:
1 2 3 4 5
| <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.5.6.RELEASE</version> </dependency>
|
效果:
访问http://localhost:8080/rabbitmqproducer/hello/publish/bfdbdfbdfg/queue/absurd_exchange5
消费者就能监听到消息