0%

生产者
spring 配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
<rabbit:connection-factory id="connectionFactory" host="localhost" port="5672" publisher-confirms="true" virtual-host="/" username="absurd" password="absurd" />
<!--下面只有当声明了exchange和队列才可以使用->
<!-- <rabbit:queue id="queue" durable="true" auto-delete="false" exclusive="false" name="queue"/>
<rabbit:queue id="queue2" durable="true" auto-delete="false" exclusive="false" name="queue2"/>
将队列绑定到交换路由同时与key绑定
<rabbit:fanout-exchange name="absurd_exchange" durable="true" auto-delete="false" id="absurd_exchange">
<rabbit:bindings>
<rabbit:binding queue="queue" />
<rabbit:binding queue="queue2" />
</rabbit:bindings>
</rabbit:fanout-exchange>
<rabbit:template id="rabbitTemplate" exchange="absurd_exchange" connection-factory="connectionFactory"/> -->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>

service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Service
public class ProducerServiceImpl implements ProducerService{

@Autowired private RabbitTemplate rabbitTemplate;
public void sendMessage(String msg, String routingKey,String exchange) {
System.err.println("err"+msg+routingKey+exchange);
RabbitAdmin admin = new RabbitAdmin(this.rabbitTemplate.getConnectionFactory());
admin.declareExchange(new FanoutExchange(exchange,true,false));
admin.declareQueue(new Queue(routingKey,true,false,false) );
admin.declareBinding(new Binding(routingKey, DestinationType.QUEUE, exchange, routingKey, null));//如果声明了队列、exchange、绑定后就无需使用RabbitAdmin
rabbitTemplate.convertAndSend(exchange,routingKey,msg);
rabbitTemplate.convertAndSend(routingKey,msg);
}

}

controller

1
2
3
4
5
6
7
8
9
10
@RequestMapping(value="/publish/{msg}/{queue}/{exchange}",method=RequestMethod.GET)
public ModelAndView publish(@PathVariable(value="msg")String msg,@PathVariable(value="queue")String queue,@PathVariable(value="exchange")String exchange){
ModelAndView mv = new ModelAndView();
producerService.sendMessage(msg, queue,exchange);
System.out.println(msg);
mv.setViewName("a");
mv.addObject("msg", msg);
return mv;

}

消费者
spring配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    <!-- 连接工厂 -->
<rabbit:connection-factory id="connectionFactory" host="localhost" publisher-confirms="true" virtual-host="/" username="absurd" password="absurd" />
<!-- 监听器 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" task-executor="taskExecutor">
<!-- queues是队列名称,可填多个,用逗号隔开, method是ref指定的Bean调用Invoke方法执行的方法名称 -->
<rabbit:listener queues="queue" method="onMessage" ref="redQueueListener" />
</rabbit:listener-container>
<!-- 队列声明 -->
<rabbit:queue name="queue" durable="true" />

<!-- 配置线程池 -->
<bean id ="taskExecutor" class ="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
<!-- 线程池维护线程的最少数量 -->
<property name ="corePoolSize" value ="5" />
<!-- 线程池维护线程所允许的空闲时间 -->
<property name ="keepAliveSeconds" value ="30000" />
<!-- 线程池维护线程的最大数量 -->
<property name ="maxPoolSize" value ="1000" />
<!-- 线程池所使用的缓冲队列 -->
<property name ="queueCapacity" value ="200" />
</bean>
<!-- 红色监听处理器 -->
<bean id="redQueueListener" class="com.absurd.rabbitmqcustomer.RedQueueListener" />

监听方法

1
2
3
4
5
6
7
8
9
10
11
public class RedQueueListener {
private static Logger log = LoggerFactory.getLogger(RedQueueListener.class);
/**
* 处理消息
* @param message
* void
*/
public void onMessage(String message) {
log.info("RedQueueListener Receved:" + message);
}
}

rabbitmq引入:

1
2
3
4
5
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.5.6.RELEASE</version>
</dependency>

效果:
访问http://localhost:8080/rabbitmqproducer/hello/publish/bfdbdfbdfg/queue/absurd_exchange5
消费者就能监听到消息

http://www.mongodb.org/downloads

mongod.exe –dbpath

创建数据库

1
use DATABASE_NAME

删除数据库

1
db.dropDatabase()

创建集合

1
db.createCollection(name, options)

options:
项​​参数是可选的,所以只需要到指定的集合名称。以下是可以使用的选项列表:

字段 类型 描述
capped Boolean (可选)如果为true,则启用封顶集合。封顶集合是固定大小的集合,会自动覆盖最早的条目,当它达到其最大大小。如果指定true,则需要也指定尺寸参数。
autoIndexID Boolean (可选)如果为true,自动创建索引_id字段的默认值是false。
size number (可选)指定最大大小字节封顶集合。如果封顶如果是 true,那么你还需要指定这个字段。
max number (可选)指定封顶集合允许在文件的最大数量。

删除集合

1
db.COLLECTION_NAME.drop()

数据类型:

  • String : 这是最常用的数据类型来存储数据。在MongoDB中的字符串必须是有效的UTF-8。
  • Integer : 这种类型是用来存储一个数值。整数可以是32位或64位,这取决于您的服务器。
  • Boolean : 此类型用于存储一个布尔值 (true/ false) 。
  • Double : 这种类型是用来存储浮点值。
  • Min/ Max keys : 这种类型被用来对BSON元素的最低和最高值比较。
  • Arrays : 使用此类型的数组或列表或多个值存储到一个键。
  • Timestamp : 时间戳。这可以方便记录时的文件已被修改或添加。
  • Object : 此数据类型用于嵌入式的文件。
  • Null : 这种类型是用来存储一个Null值。
  • Symbol : 此数据类型用于字符串相同,但它通常是保留给特定符号类型的语言使用。
  • Date : 此数据类型用于存储当前日期或时间的UNIX时间格式。可以指定自己的日期和时间,日期和年,月,日到创建对象。
  • Object ID : 此数据类型用于存储文档的ID。
  • Binary data : 此数据类型用于存储二进制数据。
  • Code : 此数据类型用于存储到文档中的JavaScript代码。
  • Regular expression : 此数据类型用于存储正则表达式

基本操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
db.COLLECTION_NAME.insert(document)
db.COLLECTION_NAME.find()

db.mycol.find().pretty()//格式化
db.mycol.find({key1:value1, key2:value2}).pretty()
db.mycol.find(
{
$or: [
{key1: value1}, {key2:value2}
]
}
).pretty()

db.mycol.find("likes": {$gt:10}, $or: [{"by": "yiibai"}, {"title": "MongoDB Overview"}] }).pretty()
// 'where likes>10 AND (by = 'yiibai' OR title = 'MongoDB Overview')'
db.COLLECTION_NAME.update(SELECTIOIN_CRITERIA, UPDATED_DATA)
db.COLLECTION_NAME.save({_id:ObjectId(),NEW_DATA})
db.COLLECTION_NAME.remove(DELLETION_CRITTERIA)
db.COLLECTION_NAME.remove(DELETION_CRITERIA,1)
db.mycol.find({},{"title":1,_id:0})//1表示显示该字段0不显示
db.COLLECTION_NAME.find().limit(NUMBER)
db.COLLECTION_NAME.find().limit(NUMBER).skip(NUMBER)
db.COLLECTION_NAME.find().sort({KEY:1})

db.COLLECTION_NAME.aggregate(AGGREGATE_OPERATION)

操作 语法 例子 RDBMS 等同
Equality {:} db.mycol.find({“by”:”tutorials yiibai”}).pretty() where by = ‘tutorials yiibai’
Less Than {:{$lt:}} db.mycol.find({“likes”:{$lt:50}}).pretty() where likes < 50
Less Than Equals {:{$lte:}} db.mycol.find({“likes”:{$lte:50}}).pretty() where likes <= 50
Greater Than {:{$gt:}} db.mycol.find({“likes”:{$gt:50}}).pretty() where likes > 50
Greater Than Equals {:{$gte:}} db.mycol.find({“likes”:{$gte:50}}).pretty() where likes >= 50
Not Equals {:{$ne:}} db.mycol.find({“likes”:{$ne:50}}).pretty() where likes != 50

1
db.COLLECTION_NAME.ensureIndex({KEY:1})//索引

ensureIndex() 方法也可以接受的选项列表(可选),其下面给出的列表:

参数 类型 描述
background Boolean 在后台建立索引,以便建立索引并不能阻止其他数据库活动。指定true建立在后台。默认值是 false.
unique Boolean 创建唯一索引,以便收集不会接受插入索引键或键匹配现有的值存储在索引文档。指定创建唯一索引。默认值是 false.
name string 索引的名称。如果未指定,MongoDB中都生成一个索引名索引字段的名称和排序顺序串联.
dropDups Boolean 创建一个唯一索引的字段,可能有重复。 MongoDB的索引只有第一次出现的一个键,从集合中删除的所有文件包含该键的后续出现的。指定创建唯一索引。默认值是 false.
sparse Boolean 如果为true,指数只引用文档指定的字段。这些索引使用更少的空间,但在某些情况下,特别是各种不同的表现。默认值是 false.
expireAfterSeconds integer 指定一个值,以秒为TTL控制多久MongoDB的文档保留在此集合.
v index version 索引版本号。默认的索引版本取决于mongodb 运行的版本在创建索引时.
weights document 权重是从1到99999范围内的数,表示该字段的意义,相对于其他的索引字段分数.
default_language string 对于文本索引时,决定停止词和词干分析器和标记生成规则列表的语言。默认值是 english.
language_override string 对于文本索引时,指定的名称在文档中包含覆盖默认的语言,语言字段中。默认值是语言。

1
db.mycol.aggregate([{$group : {_id : "$by_user", num_tutorial : {$sum : 1}}}])

表达式 描述 实例
$sum 总结从集合中的所有文件所定义的值. db.mycol.aggregate([{$group : {_id : “$by_user”, num_tutorial : {$sum : “$likes”}}}])
$avg 从所有文档集合中所有给定值计算的平均. db.mycol.aggregate([{$group : {_id : “$by_user”, num_tutorial : {$avg : “$likes”}}}])
$min 获取集合中的所有文件中的相应值最小. db.mycol.aggregate([{$group : {_id : “$by_user”, num_tutorial : {$min : “$likes”}}}])
$max 获取集合中的所有文件中的相应值的最大. db.mycol.aggregate([{$group : {_id : “$by_user”, num_tutorial : {$max : “$likes”}}}])
$push 值插入到一个数组生成文档中. db.mycol.aggregate([{$group : {_id : “$by_user”, url : {$push: “$url”}}}])
$addToSet 值插入到一个数组中所得到的文档,但不会创建重复. db.mycol.aggregate([{$group : {_id : “$by_user”, url : {$addToSet : “$url”}}}])
$first 根据分组从源文档中获取的第一个文档。通常情况下,这才有意义,连同以前的一些应用 “$sort”-stage. db.mycol.aggregate([{$group : {_id : “$by_user”, first_url : {$first : “$url”}}}])
$last 根据分组从源文档中获取最后的文档。通常,这才有意义,连同以前的一些应用 “$sort”-stage. db.mycol.aggregate([{$group : {_id : “$by_user”, last_url : {$last : “$url”}}}])

设置一个副本集
在本教程中,我们将mongod实例转换成独立的副本集。要转换到副本设置遵循以下步骤:

关闭停止已经运行的MongoDB服务器。
现在启动MongoDB服务器通过指定 –replSet 选项。 –replSet 基本语法如下:

1
mongod --port "PORT" --dbpath "YOUR_DB_DATA_PATH" --replSet 

“REPLICA_SET_INSTANCE_NAME”
例子

1
mongod --port 27017 --dbpath "D:set upmongodbdata" --replSet rs0

它会启动一个mongod 实例名称rs0 ,端口为27017。启动命令提示符 rs.initiate(),并连接到这个mongod实例。在mongod客户端执行命令rs.initiate()启动一个新的副本集。要检查副本集的配置执行命令rs.conf()。要检查的状态副本sete执行命令:rs.status()。

将成员添加到副本集
将成员添加到副本集,在多台机器上启动mongod 实例。现在开始一个mongod 客户和发出命令 rs.add().

1
rs.add("mongod1.net:27017")

java
建立连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import com.mongodb.MongoClient;
import com.mongodb.MongoException;
import com.mongodb.WriteConcern;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.BasicDBObject;
import com.mongodb.DBObject;
import com.mongodb.DBCursor;
import com.mongodb.ServerAddress;

import java.util.Arrays;

// To connect to mongodb server
MongoClient mongoClient = new MongoClient( "localhost" , 27017 );
// Now connect to your databases
DB db = mongoClient.getDB( "test" );
boolean auth = db.authenticate(myUserName, myPassword);

获取一个集合列表

1
2
3
4
Set colls = db.getCollectionNames();
for (String s : colls) {
System.out.println(s);
}

获取/选择一个集合

1
DBCollection coll = db.getCollection("mycol");

插入文档

1
2
3
4
5
6
7
BasicDBObject doc = new BasicDBObject("title", "MongoDB").
append("description", "database").
append("likes", 100).
append("url", "http://www.yiibai.com/mongodb/").
append("by", "yiibai.com").
;
coll.insert(doc);

查找第一个文档

1
2
DBObject myDoc = coll.findOne();
System.out.println(myDoc);

和spring配合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<mongo:mongo id="mongo" replica-set="${mongodb.url}">
<mongo:options connections-per-host="${mongo.connectionsPerHost}"
threads-allowed-to-block-for-connection-multiplier="${mongo.threadsAllowedToBlockForConnectionMultiplier}"
connect-timeout="${mongo.connectTimeout}"
max-wait-time="${mongo.maxWaitTime}"
auto-connect-retry="${mongo.autoConnectRetry}"
socket-keep-alive="${mongo.socketKeepAlive}"
socket-timeout="${mongo.socketTimeout}" slave-ok="${mongo.slaveOk}"
write-number="1" write-timeout="0" write-fsync="true" />
<mongo:options write-number="1" write-timeout="0"
write-fsync="true" />
</mongo:mongo>
<mongo:db-factory dbname="${mongodb.dbname}" mongo-ref="mongo" />
<bean id="mongoTemplate" class="org.springframework.data.mongodb.core.MongoTemplate">
<constructor-arg name="mongoDbFactory" ref="mongoDbFactory" />
</bean>

使用 MongoTemplate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Person p = new Person("Joe", 34);

// Insert is used to initially store the object into the database.
mongoOps.insert(p);
log.info("Insert: " + p);

// Find
p = mongoOps.findById(p.getId(), Person.class);
log.info("Found: " + p);

// Update
mongoOps.updateFirst(query(where("name").is("Joe")), update("age", 35), Person.class);
p = mongoOps.findOne(query(where("name").is("Joe")), Person.class);
log.info("Updated: " + p);

// Delete
mongoOps.remove(p);

// Check that deletion worked
List<Person> people = mongoOps.findAll(Person.class);
log.info("Number of people = : " + people.size());

mongoOps.dropCollection(Person.class);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
Query query=new Query(
Criteria.where("AAA").is(XXobj.getAAA()).
orOperator(Criteria.where("BBB").is(XXobj.getBBB()))
);
List<XXObject> result = mongoTemplate.find(query, XXObject.class);
if(result!=null && !result.isEmpty()){
return result.get(0);
}

XXObject obj = mongoTemplate.findOne(query, XXObject.class);
if(obj!=null){
return obj;
}

关于详细介绍:http://blog.csdn.net/column/details/rabbitmq.html
RabbitMQ:基于AMQP协议(Advanced Message Queue Protocol)介绍:http://www.infoq.com/cn/articles/AMQP-RabbitMQ/
ActiveMQ:基于STOMP协议

所需环境:
1.Erlang
2.RabbitMQ
3.rabbit-client.jar api

http://www.lxway.com/991402946.htm
Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
image

Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
image

Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“_”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit._” 只会匹配到“audit.irs”。我在RedHat的朋友做了一张不错的图,来表明topic交换机是如何工作的:
image

1
2
3
4
5
6
7
8
9
10
 ConnectionFactory connFactory = new ConnectionFactory();//创建连接连接到MabbitMQ 
connFactory.setUri(uri);//或 factory.setHost("localhost"); 设置ip、uri或host
Connection connection = factory.newConnection(); //创建一个连接
Channel channel = connection.createChannel(); //创建一个Channel
channel.queueDeclare(queue, true, false, false, null);//指定队列
channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); //往队列中发出一条消息
//关闭频道和连接
channel.close();
connection.close();

Connecting to a broker

1
2
3
4
5
6
7
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();

uri

1
2
3
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
Connection conn = factory.newConnection();

Using Exchanges and Queues
声明一个exchange然后把队列和exchange和队列绑定起来(只有绑定以后,往exchange投递才会跑到相应队列)

1
2
3
channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);

(完整的绑定过程)

1
2
3
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

Publishing messages

1
2
byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
1
2
3
channel.basicPublish(exchangeName, routingKey, mandatory,
MessageProperties.PERSISTENT_TEXT_PLAIN,
messageBodyBytes);

delivery mode 2 (persistent), priority 1 , content-type “text/plain”.

1
2
3
4
5
6
7
8
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2)
.priority(1)
.userId("bob")
.build()),
messageBodyBytes);

自定义header

1
2
3
4
5
6
7
8
9
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("latitude", 51.5252949);
headers.put("longitude", -0.0905493);

channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.headers(headers)
.build()),
messageBodyBytes);

expiration

1
2
3
4
5
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.expiration("60000")
.build()),
messageBodyBytes);

在确认模式下发布大量的信息到一个通道,等待确认

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package com.rabbitmq.examples;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.QueueingConsumer;

public class ConfirmDontLoseMessages {
static int msgCount = 10000;
final static String QUEUE_NAME = "confirm-test";
static ConnectionFactory connectionFactory;

public static void main(String[] args)
throws IOException, InterruptedException
{
if (args.length > 0) {
msgCount = Integer.parseInt(args[0]);
}

connectionFactory = new ConnectionFactory();

// Consume msgCount messages.
(new Thread(new Consumer())).start();
// Publish msgCount messages and wait for confirms.
(new Thread(new Publisher())).start();
}

@SuppressWarnings("ThrowablePrintedToSystemOut")
static class Publisher implements Runnable {
public void run() {
try {
long startTime = System.currentTimeMillis();

// Setup
Connection conn = connectionFactory.newConnection();
Channel ch = conn.createChannel();
ch.queueDeclare(QUEUE_NAME, true, false, false, null);
ch.confirmSelect();

// Publish
for (long i = 0; i < msgCount; ++i) {
ch.basicPublish("", QUEUE_NAME,
MessageProperties.PERSISTENT_BASIC,
"nop".getBytes());
}

ch.waitForConfirmsOrDie();

// Cleanup
ch.queueDelete(QUEUE_NAME);
ch.close();
conn.close();

long endTime = System.currentTimeMillis();
System.out.printf("Test took %.3fs\n",
(float)(endTime - startTime)/1000);
} catch (Throwable e) {
System.out.println("foobar :(");
System.out.print(e);
}
}
}

static class Consumer implements Runnable {
public void run() {
try {
// Setup
Connection conn = connectionFactory.newConnection();
Channel ch = conn.createChannel();
ch.queueDeclare(QUEUE_NAME, true, false, false, null);

// Consume
QueueingConsumer qc = new QueueingConsumer(ch);
ch.basicConsume(QUEUE_NAME, true, qc);
for (int i = 0; i < msgCount; ++i) {
qc.nextDelivery();
}

// Cleanup
ch.close();
conn.close();
} catch (Throwable e) {
System.out.println("Whoosh!");
System.out.print(e);
}
}
}
}

*(the AMQP specification document)[http://www.amqp.org/]

接收消息的最有效的方法是建立一个订阅使用消费者接口。将自动被交付的消息到达,而不必显式地请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
channel.basicAck(deliveryTag, false);
}
});

接收个别信息

1
2
3
4
5
6
7
8
9
10
boolean autoAck = false;
GetResponse response = channel.basicGet(queueName, autoAck);
if (response == null) {
// No message retrieved.
} else {
AMQP.BasicProperties props = response.getProps();
byte[] body = response.getBody();
long deliveryTag = response.getEnvelope().getDeliveryTag();
...
channel.basicAck(method.deliveryTag, false); // autoAck = false必须设置 Channel.basicAck来确认已经接受消息

处理不被路由的消息
假如一个信息被设置强制性(mandatory)的flag不被路由的话会被送到发送端。
如果客户端没有配置返回特定通道侦听器,将放弃返回的相关消息。
为了获取这个消息,客户端可以实现ReturnListener 接口还有调用 Channel.setReturnListener

1
2
3
4
5
6
7
8
9
10
11
channel.setReturnListener(new ReturnListener() {
public void handleBasicReturn(int replyCode,
String replyText,
String exchange,
String routingKey,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
...
}
});

关闭协议

The AMQP 0-9-1 connection and channel have the following lifecycle states:

open: the object is ready to use
closing: the object has been explicitly notified to shut down locally, has issued a shutdown request to any supporting lower-layer objects, and is waiting for their shutdown procedures to complete
closed: the object has received all shutdown-complete notification(s) from any lower-layer objects, and as a consequence has shut itself down

The AMQP connection and channel objects possess the following shutdown-related methods:

addShutdownListener(ShutdownListener listener) and removeShutdownListener(ShutdownListener listener), to manage any listeners, which will be fired when the object transitions to closed state. Note that, adding a ShutdownListener to an object that is already closed will fire the listener immediately
getCloseReason(), to allow the investigation of what was the reason of the object’s shutdown
isOpen(), useful for testing whether the object is in an open state
close(int closeCode, String closeMessage), to explictly notify the object to shut down.

1
2
3
4
5
6
7
8
9
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.ShutdownListener;

connection.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause)
{
...
}
});

ShutdownSignalException包含了关闭时的错误异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void shutdownCompleted(ShutdownSignalException cause)
{
if (cause.isHardError())
{
Connection conn = (Connection)cause.getReference();
if (!cause.isInitiatedByApplication())
{
Method reason = cause.getReason();
...
}
...
} else {
Channel ch = (Channel)cause.getReference();
...
}
}

原子性的使用open

1
2
3
4
5
6
7
8
9
10
11
public void brokenMethod(Channel channel)
{
if (channel.isOpen())
{
// The following code depends on the channel being in open state.
// However there is a possibility of the change in the channel state
// between isOpen() and basicQos(1) call
...
channel.basicQos(1);//告诉RabbitMQ同一时间给一个消息给消费者
}
}

处于无效状态时应该抓取异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void validMethod(Channel channel)
{
try {
...
channel.basicQos(1);
} catch (ShutdownSignalException sse) {
// possibly check if channel was closed
// by the time we started action and reasons for
// closing it
...
} catch (IOException ioe) {
// check why connection was closed
...
}
}

连接设置
设置pool数

1
2
ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);

使用地址列表

1
2
3
Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1)
, new Address(hostname2, portnumber2)};
Connection conn = factory.newConnection(addrArr);

心跳超时(Heartbeat Timeout) Heartbeats guide
自定义线程工厂

1
2
3
4
import com.google.appengine.api.ThreadManager;

ConnectionFactory cf = new ConnectionFactory();
cf.setThreadFactory(ThreadManager.backgroundThreadFactory());

Automatic Recovery From Network Failures

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
factory.setAutomaticRecoveryEnabled(true);
// connection that will recover automatically
Connection conn = factory.newConnection();


ConnectionFactory factory = new ConnectionFactory();
// attempt recovery every 10 seconds
factory.setNetworkRecoveryInterval(10000);


ConnectionFactory factory = new ConnectionFactory();

Address[] addresses = {new Address("192.168.1.4"), new Address("192.168.1.5")};
factory.newConnection(addresses);

The RPC (Request/Reply) Pattern

1
2
3
4
5
6
7
8
9
10
import com.rabbitmq.client.RpcClient;

RpcClient rpc = new RpcClient(channel, exchangeName, routingKey);



byte[] primitiveCall(byte[] message);
String stringCall(String message)
Map mapCall(Map message)
Map mapCall(Object[] keyValuePairs)

安装过程略去,配置下GRADLE_HOME和GRADLE_HOME\bin

  1. 创建一个空目录,新建build.gradle
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
apply plugin: 'idea'
apply plugin: 'java'
apply plugin: 'war'
sourceCompatibility = 1.7

repositories {
mavenCentral()
}

dependencies {
compile 'org.springframework.boot:spring-boot-starter-web:1.3.5.RELEASE'
compile 'log4j:log4j:1.2.17'
}


task createJavaProject << {
sourceSets*.java.srcDirs*.each { it.mkdirs() }
sourceSets*.resources.srcDirs*.each { it.mkdirs()}
}

task createWebProject(dependsOn: 'createJavaProject') << {
def webAppDir = file("$webAppDirName")
webAppDir.mkdirs()
}

2.gradle idea
3.gradle createWebProject
3.gradle build

经常会遇到一种场景,直接访问某些权限被拒绝后跳转登陆页面,然而ajax不会跳转
这个时候使用全局的:

1
2
3
4
5
6
7
8
9
10
11
12
  $(function(){
//全局的ajax访问,处理ajax清求时sesion超时
$.ajaxSetup({
complete:function(XMLHttpRequest,textStatus){
var sessionstatus=XMLHttpRequest.getResponseHeader("sessionstatus"); //通过XMLHttpRequest取得响应头,sessionstatus,
if(sessionstatus=="timeout"){
//如果超时就处理 ,指定要跳转的页面
window.location.replace(urlconfig.url.ctx+"/login.jsp");
}
}
})
})

在拦截器里面:

1
2
3
4
5
6
7
8
9
if (httpRequest.getHeader("x-requested-with") != null
&& httpRequest.getHeader("x-requested-with").equalsIgnoreCase("XMLHttpRequest"))// 如果是ajax请求响应头会有,x-requested-with;
{
httpResponse.setHeader("sessionstatus", "timeout");// 在响应头设置session状态
httpResponse.setStatus(403);
return false;
} else {
httpResponse.sendRedirect(httpResponse.encodeRedirectURL("/login.jsp"));
}

Welcome to Hexo! This is your very first post. Check documentation for more info. If you get any problems when using Hexo, you can find the answer in troubleshooting or you can ask me on GitHub.

Quick Start

Create a new post

1
$ hexo new "My New Post"

More info: Writing

Run server

1
$ hexo server

More info: Server

Generate static files

1
$ hexo generate

More info: Generating

Deploy to remote sites

1
$ hexo deploy

More info: Deployment