0%

http://blog.jobbole.com/24006/

索引区分度

区分度: 指字段在数据库中的不重复比
区分度在新建索引时有着非常重要的参考价值,在MySQL中,区分度的计算规则如下:

字段去重后的总数与全表总记录数的商。

例如:

select count(distinct(name))/count(*) from t_base_user;

count(distinct(name))/count(*)

1.0000

其中区分度最大值为1.000,最小为0.0000,区分度的值越大,也就是数据不重复率越大,新建索引效果也越好,在主键以及唯一键上面的区分度是最高的,为1.0000,在状态,性别等字段上面的区分度值是最小的。 (这个就要看数据量了,如果只有几条数据,这时区分度还挺高的,如果数据量多,区分度基本为0.0000。也就是在这些字段上添加索引后,效果也不佳的原因。)

值得注意的是: 如果表中没有任何记录时,计算区分度的结果是为空值,其他情况下,区分度值均分布在0.0000-1.0000之间。

个人强烈建议, 建索引时,一定要先计算该字段的区分度,原因如下:

1、单列索引

可以查看该字段的区分度,根据区分度的大小,也能大概知道在该字段上的新建索引是否有效,以及效果如何。区分度越大,索引效果越明显。

2、多列索引(联合索引)

多列索引中其实还有一个字段的先后顺序问题,一般是将区分度较高的放在前面,这样联合索引才更有效,例如:

select * from t_base_user where name="" and status=1;

像上述语句,如果建联合索引的话,就应该是:

alter table t_base_user add index idx_name_status(name,status);

而不是:

alter table t_base_user add index idx_status_name(status,name);

最左前缀匹配原则

MySQL会一直向右匹配直到遇到范围查询(>、<、between、like)就停止匹配,比如
select * from t_base_user where type="10" and created_at<"2017-11-03" and status=1, (该语句仅作为演示)

在上述语句中,status就不会走索引,因为遇到<时,MySQL已经停止匹配,此时走的索引为:(type,created_at),其先后顺序是可以调整的,而走不到status索引,此时需要修改语句为:

select * from t_base_user where type=10 and status=1 and created_at<"2017-11-03"

举例:
CREATE TABLE titles (
id varchar(50) NOT NULL DEFAULT ‘’,
emp_no varchar(50) NOT NULL DEFAULT ‘’,
title varchar(50) NOT NULL DEFAULT ‘’,
from_date datetime DEFAULT NULL COMMENT ‘from’,
to_date datetime DEFAULT NULL,
date_create datetime DEFAULT NULL,
date_update datetime DEFAULT NULL,
date_delete datetime DEFAULT NULL,
PRIMARY KEY (id),
KEY idx_emp (emp_no,title,from_date) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

一:全列匹配
EXPLAIN SELECT * FROM titles WHERE emp_no='10001' AND title='Senior Engineer' AND from_date='1986-06-26';

id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE titles ref idx_emp idx_emp 310 const,const,const 1

次序调换也是一样
EXPLAIN SELECT * FROM titles WHERE from_date='1986-06-26' AND emp_no='10001' AND title='Senior Engineer';

二:最左前缀匹配

EXPLAIN SELECT * FROM titles WHERE emp_no='10001';

id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE titles ref idx_emp idx_emp 152 const 1

三:查询条件用到了索引中列的精确匹配,但是中间某个条件未提供
EXPLAIN SELECT * FROM employees.titles WHERE emp_no='10001' AND from_date='1986-06-26';

id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE titles ref idx_emp idx_emp 152 const 1 Using index condition
1
2
3
4
EXPLAIN SELECT * FROM titles
WHERE emp_no='10001'
AND title IN ('Senior Engineer', 'Staff', 'Engineer', 'Senior Staff', 'Assistant Engineer', 'Technique Leader', 'Manager')
AND from_date='1986-06-26';
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE titles ALL idx_emp NULL 7 Using where

四:查询条件没有指定索引第一列

EXPLAIN SELECT * FROM titles WHERE from_date='1986-06-26';

id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE titles ALL NULL NULL NULL NULL 443308 Using where

五:匹配某列的前缀字符串
EXPLAIN SELECT * FROM employees.titles WHERE emp_no='10001' AND title LIKE 'Senior%';

id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE titles range PRIMARY PRIMARY 56 NULL 1 Using where

避免全表扫描

  1. 应尽量避免在 where 子句中使用!=或<>操作符,否则将引擎放弃使用索引而进行全表扫描
  2. 对查询进行优化,应尽量避免全表扫描,首先应考虑在 where 及 order by 涉及的列上建立索引。
  3. 应尽量避免在 where 子句中对字段进行 null 值判断,否则将导致引擎放弃使用索引而进行全表扫描
    如:
1
select id from t where num is null

可以在num上设置默认值0,确保表中num列没有null值,然后这样查询:

1
select id from t where num=0
  1. 尽量避免在 where 子句中使用 or 来连接条件,否则将导致引擎放弃使用索引而进行全表扫描,如:
1
2
select id from t where num=10 or num=20

可以这样查询:

1
2
3
select id from t where num=10
union all
select id from t where num=20

5.不能前置百分号

1
select id from t where name like ‘hkjh%’

若要提高效率,可以考虑全文检索。

  1. in 和 not in 也要慎用,如:
1
select id from t where num in(1,2,3)

对于连续的数值,能用 between 就不要用 in 了:

1
select id from t where num between 1 and 3

7.如果在 where 子句中使用参数,也会导致全表扫描。因为SQL只有在运行时才会解析局部变量,但优化程序不能将访问计划的选择推迟到运行时;它必须在编译时进行选择。然 而,如果在编译时建立访问计划,变量的值还是未知的,因而无法作为索引选择的输入项。如下面语句将进行全表扫描:

1
select id from t where num=@num

可以改为强制查询使用索引:

1
select id from t with(index(索引名)) where num=@num

8.应尽量避免在where子句中对字段进行函数操作,这将导致引擎放弃使用索引而进行全表扫描。如:

1
2
select id from t where substring(name,1,3)=’abc’–name以abc开头的id
select id from t where datediff(day,createdate,’2005-11-30′)=0–’2005-11-30′生成的id

应改为:

1
2
select id from t where name like ‘abc%’
select id from t where createdate>=’2005-11-30′ and createdate<’2005-12-1′

用 exists 代替 in

很多时候用 exists 代替 in 是一个好的选择:

1
select num from a where num in(select num from b)

用下面的语句替换:

1
select num from a where exists(select 1 from b where num=a.num)

使用数字型字段

1.尽量使用数字型字段,若只含数值信息的字段尽量不要设计为字符型,这会降低查询和连接的性能,并会增加存储开销。这是因为引擎在处理查询和连接时会 逐个比较字符串中每一个字符,而对于数字型而言只需要比较一次就够了。

2.尽可能的使用 varchar/nvarchar 代替 char/nchar ,因为首先变长字段存储空间小,可以节省存储空间,其次对于查询来说,在一个相对较小的字段内搜索效率显然要高些。

临时表

  • 尽量使用表变量来代替临时表。如果表变量包含大量数据,请注意索引非常有限(只有主键索引)。
  • 在新建临时表时,如果一次性插入数据量很大,那么可以使用 select into 代替 create table,避免造成大量 log ,以提高速度;如果数据量不大,为了缓和系统表的资源,应先create table,然后insert。
  • 如果使用到了临时表,在存储过程的最后务必将所有的临时表显式删除,先 truncate table ,然后 drop table ,这样可以避免系统表的较长时间锁定。

    重构查询的方式

  1. 将一个复杂查询拆分为数个小且简单的查询,数据返回也快。
  2. 切分查询,如删除10万条数据,可以切分为10次,每次删除1万条。
  3. 分解关联查询:
1
2
3
4
SELECT * FROM tag
JOIN tag_post ON tag_post.tag_id = tag.id
JOIN post ON tag_post.post_id = post.id
WHERE tag.name = 'mysql';

分解为

1
2
3
SELECT * FROM tag WHERE name = 'mysql';
SELECT * FROM tag_post WHERE tag_id = 1234;
SELECT * FROM post WHERE post.id in (123,456,789,818);

4.当只要一行数据时使用 LIMIT 1

一个实例,如何对单表查询优化:

select * from product limit 866613,20 37.44秒

select id from product limit 866613,20 0.2秒(主键索引)

SELECT * FROM product WHERE ID >= (select id from product limit 866613,1) limit 20
0.2秒

慢查询基础

优化数据访问,就是优化访问的数据,操作对象是要访问的数据,两方面,是否向服务器请求了大量不需要的数据,二是是否逼迫MySQL扫描额外的记录(没有必要扫描)。

请求不需要数据的典型案例:不加LIMIT(返回全部数据,只取10条)、多表关联Select * 返回全部列(多表关联查询时*返回多个表的全部列)、还是Select *(可能写程序方面或代码复用方面有好处,但还要权衡)、重复查询相同数据(真需要这样,可以缓存下来,移动开发这个很有必要本地存储)。

标志额外扫描的三个指标:响应时间(自己判断是否合理值)、扫描的行数、返回的行数,一般扫描行数>返回行数。

扫描的行数需要与一个“访问类型”概念关联,就是 Explain 中的 type,explain的type结果由差到优分别是:ALL(全表扫描)、index(索引扫描)、range(范围扫描)、ref(唯一索引查询 key_col=xx)、const(常数引用)等。从“访问类型”可以明白,索引让 MySQL 以最高效、扫描行数最少的方式找到需要的记录。

书中有个例子,说明在where中使用已是索引的列和取消该列的索引后两种结果,type由ref变为All,预估要访问的rows从10变为5073,差异非常明显。

MySQL查询优化器的局限性

一个UNION限制,无法将限制条件从外层下推到内层,改造例子如下

1
2
3
4
5
6
7
8
(SELECT first_name,last_name
FROM sak.actor
ORDER BY last_name)
UNION ALL
(SELECT first_name,last_name
FROM sak.customer
ORDER BY last_name)
LIMIT 20;

优化后

1
2
3
4
5
6
7
8
9
10
(SELECT first_name,last_name
FROM sak.actor
ORDER BY last_name
LIMIT 20)
UNION ALL
(SELECT first_name,last_name
FROM sak.customer
ORDER BY last_name
LIMIT 20)
LIMIT 20;

等值传递:讲的IN列表,MySQL会将IN列表的值传到各个过滤子句,如果IN列表太大,会造成额外消耗,优化和执行都很慢。

最大值和最小值,MySQL对 MIN()和MAX()做得不好

1
SELECT MIN(actor_id) FROM sak.actor WHERE first_name = 'EE';

改造后(first_name 不是索引,原来必须全表查询)

1
2
SELECT actor_id FROM sak.actor USE INDEX(PRIMARY)
WHERE first_name = 'EE' LIMIT 1;

存储层

rocketmq-store主要提供了消息存储及管理。主要针对CommitLog、ConsumeQueue的相关操作。 ConsumeQueue是消息的逻辑队列,队列的每一个元素是一个20字节的定长的数据,元素结构有三部分构成:commitLogOffset(8byte,消息所在CommitLog的实际偏移量),size(4byte,消息的大小),hashTag(8byte,消息Tag的哈希值))。 实际上ConsumeQueue中存储的只是消息所在CommitLog的偏移量,可以理解为是CommitLog的索引文件。

主要功能

  • 提供保存消息至CommitLog并持久化到物理文件(刷盘方式:同步、异步)

  • 维护Topic与ConsumeQueue的关系

  • 维护内存映射文件及队列(MapedFile、MapedFileQueue)

  • 提供broker主从同步功能(同步双写、异步复制)

  • 提供recover数据恢复功能(正常恢复、异常恢复(OSCRASH或者JVM CRASH或者机器掉电等))

  • 提供数据索引服务

BrokerRole

1
2
3
4
5
public enum BrokerRole {
ASYNC_MASTER,
SYNC_MASTER,
SLAVE;
}

FlushDiskType

1
2
3
4
public enum FlushDiskType {
SYNC_FLUSH,//同步刷盘
ASYNC_FLUSH//异步刷盘
}

CommitLog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Message's MAGIC CODE daa320a7
//魔数
public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8;
// End of file empty MAGIC CODE cbd43194
private final static int BLANK_MAGIC_CODE = 0xBBCCDDEE ^ 1880681586 + 8;
//消息文件队列,包含所有保存在磁盘上的文件
private final MappedFileQueue mappedFileQueue;
//默认消息存储类对象
private final DefaultMessageStore defaultMessageStore;
//刷盘操作服务类
private final FlushCommitLogService flushCommitLogService;

//If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
private final FlushCommitLogService commitLogService;
//添加消息的回调,在doAppend方法中追加消息到内存
private final AppendMessageCallback appendMessageCallback;
private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal;
//Topic与每个队列的偏移量关系
private HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
private volatile long confirmOffset = -1L;

private volatile long beginTimeInLock = 0;
private final PutMessageLock putMessageLock;

putMessage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;

StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

String topic = msg.getTopic();
int queueId = msg.getQueueId();

final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}

topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

msg.setTopic(topic);
msg.setQueueId(queueId);
}
}

long eclipseTimeInLock = 0;
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;

// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);

if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}

result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}

eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}

if (eclipseTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
}

if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}

PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());

handleDiskFlush(result, putMessageResult, msg);
handleHA(result, putMessageResult, msg);

return putMessageResult;
}
  • ConsumeQueue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static final int CQ_STORE_UNIT_SIZE = 20;
private final DefaultMessageStore defaultMessageStore;
//消息文件队列,包含所有保存在磁盘上的文件
private final MappedFileQueue mappedFileQueue;
//Topic名称
private final String topic;
//队列ID
private final int queueId;
//内存中索引位置
private final ByteBuffer byteBufferIndex;
//存储的位置
private final String storePath;
//映射文件的大小
private final int mappedFileSize;
//CommitLog最大的偏移
private long maxPhysicOffset = -1;
//CommitLog最小的偏移
private volatile long minLogicOffset = 0;
private ConsumeQueueExt consumeQueueExt = null;
  • MappedFileQueue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static final int DELETE_FILES_BATCH_MAX = 10;
//存储的位置
private final String storePath;
//映射文件的大小
private final int mappedFileSize;

private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
//预创建MapedFile服务,MapedFile
private final AllocateMappedFileService allocateMappedFileService;

private long flushedWhere = 0;
//已刷盘位置
private long committedWhere = 0;
//最后刷盘完成的时间
private volatile long storeTimestamp = 0;
  • MappedFile
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//系统每页缓存大小为4K
public static final int OS_PAGE_SIZE = 1024 * 4;
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
//总共映射虚拟内存的大小
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
//文件开始写的位置
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
//文件已经刷盘的位置
protected final AtomicInteger committedPosition = new AtomicInteger(0);
private final AtomicInteger flushedPosition = new AtomicInteger(0);
//文件大小
protected int fileSize;
protected FileChannel fileChannel;
/**
* Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
*/
protected ByteBuffer writeBuffer = null;
protected TransientStorePool transientStorePool = null;
//文件名称
private String fileName;
//文件的全局offset,也就是文件名的前缀
private long fileFromOffset;
//文件对象
private File file;
//文件映射的内存
private MappedByteBuffer mappedByteBuffer;
//最后刷盘完成的时间
private volatile long storeTimestamp = 0;
private boolean firstCreateInQueue = false;

接口MessageStore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
public interface MessageStore {

//加载之前存储的消息
boolean load();

/**
* Launch this message store.
*
* @throws Exception if there is any error.
*/
void start() throws Exception;

/**
* Shutdown this message store.
*/
void shutdown();

/**
* Destroy this message store. Generally, all persistent files should be removed after invocation.
*/
void destroy();

/**
* Store a message into store.
*
* @param msg Message instance to store
* @return result of store operation.
*/
PutMessageResult putMessage(final MessageExtBrokerInner msg);

/**
* Store a batch of messages.
*
* @param messageExtBatch Message batch.
* @return result of storing batch messages.
*/
PutMessageResult putMessages(final MessageExtBatch messageExtBatch);

/**
* Query at most <code>maxMsgNums</code> messages belonging to <code>topic</code> at <code>queueId</code> starting
* from given <code>offset</code>. Resulting messages will further be screened using provided message filter.
*
* @param group Consumer group that launches this query.
* @param topic Topic to query.
* @param queueId Queue ID to query.
* @param offset Logical offset to start from.
* @param maxMsgNums Maximum count of messages to query.
* @param messageFilter Message filter used to screen desired messages.
* @return Matched messages.
*/
GetMessageResult getMessage(final String group, final String topic, final int queueId,
final long offset, final int maxMsgNums, final MessageFilter messageFilter);

/**
* Get maximum offset of the topic queue.
*
* @param topic Topic name.
* @param queueId Queue ID.
* @return Maximum offset at present.
*/
long getMaxOffsetInQueue(final String topic, final int queueId);

/**
* Get the minimum offset of the topic queue.
*
* @param topic Topic name.
* @param queueId Queue ID.
* @return Minimum offset at present.
*/
long getMinOffsetInQueue(final String topic, final int queueId);

/**
* Get the offset of the message in the commit log, which is also known as physical offset.
*
* @param topic Topic of the message to lookup.
* @param queueId Queue ID.
* @param consumeQueueOffset offset of consume queue.
* @return physical offset.
*/
long getCommitLogOffsetInQueue(final String topic, final int queueId, final long consumeQueueOffset);

/**
* Look up the physical offset of the message whose store timestamp is as specified.
*
* @param topic Topic of the message.
* @param queueId Queue ID.
* @param timestamp Timestamp to look up.
* @return physical offset which matches.
*/
long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp);

/**
* Look up the message by given commit log offset.
*
* @param commitLogOffset physical offset.
* @return Message whose physical offset is as specified.
*/
MessageExt lookMessageByOffset(final long commitLogOffset);

/**
* Get one message from the specified commit log offset.
*
* @param commitLogOffset commit log offset.
* @return wrapped result of the message.
*/
SelectMappedBufferResult selectOneMessageByOffset(final long commitLogOffset);

/**
* Get one message from the specified commit log offset.
*
* @param commitLogOffset commit log offset.
* @param msgSize message size.
* @return wrapped result of the message.
*/
SelectMappedBufferResult selectOneMessageByOffset(final long commitLogOffset, final int msgSize);

/**
* Get the running information of this store.
*
* @return message store running info.
*/
String getRunningDataInfo();

/**
* Message store runtime information, which should generally contains various statistical information.
*
* @return runtime information of the message store in format of key-value pairs.
*/
HashMap<String, String> getRuntimeInfo();

/**
* Get the maximum commit log offset.
*
* @return maximum commit log offset.
*/
long getMaxPhyOffset();

/**
* Get the minimum commit log offset.
*
* @return minimum commit log offset.
*/
long getMinPhyOffset();

/**
* Get the store time of the earliest message in the given queue.
*
* @param topic Topic of the messages to query.
* @param queueId Queue ID to find.
* @return store time of the earliest message.
*/
long getEarliestMessageTime(final String topic, final int queueId);

/**
* Get the store time of the earliest message in this store.
*
* @return timestamp of the earliest message in this store.
*/
long getEarliestMessageTime();

/**
* Get the store time of the message specified.
*
* @param topic message topic.
* @param queueId queue ID.
* @param consumeQueueOffset consume queue offset.
* @return store timestamp of the message.
*/
long getMessageStoreTimeStamp(final String topic, final int queueId, final long consumeQueueOffset);

/**
* Get the total number of the messages in the specified queue.
*
* @param topic Topic
* @param queueId Queue ID.
* @return total number.
*/
long getMessageTotalInQueue(final String topic, final int queueId);

/**
* Get the raw commit log data starting from the given offset, which should used for replication purpose.
*
* @param offset starting offset.
* @return commit log data.
*/
SelectMappedBufferResult getCommitLogData(final long offset);

/**
* Append data to commit log.
*
* @param startOffset starting offset.
* @param data data to append.
* @return true if success; false otherwise.
*/
boolean appendToCommitLog(final long startOffset, final byte[] data);

/**
* Execute file deletion manually.
*/
void executeDeleteFilesManually();

/**
* Query messages by given key.
*
* @param topic topic of the message.
* @param key message key.
* @param maxNum maximum number of the messages possible.
* @param begin begin timestamp.
* @param end end timestamp.
*/
QueryMessageResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
final long end);

/**
* Update HA master address.
*
* @param newAddr new address.
*/
void updateHaMasterAddress(final String newAddr);

/**
* Return how much the slave falls behind.
*
* @return number of bytes that slave falls behind.
*/
long slaveFallBehindMuch();

/**
* Return the current timestamp of the store.
*
* @return current time in milliseconds since 1970-01-01.
*/
long now();

/**
* Clean unused topics.
*
* @param topics all valid topics.
* @return number of the topics deleted.
*/
int cleanUnusedTopic(final Set<String> topics);

/**
* Clean expired consume queues.
*/
void cleanExpiredConsumerQueue();

/**
* Check if the given message has been swapped out of the memory.
*
* @param topic topic.
* @param queueId queue ID.
* @param consumeOffset consume queue offset.
* @return true if the message is no longer in memory; false otherwise.
*/
boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset);

/**
* Get number of the bytes that have been stored in commit log and not yet dispatched to consume queue.
*
* @return number of the bytes to dispatch.
*/
long dispatchBehindBytes();

/**
* Flush the message store to persist all data.
*
* @return maximum offset flushed to persistent storage device.
*/
long flush();

/**
* Reset written offset.
*
* @param phyOffset new offset.
* @return true if success; false otherwise.
*/
boolean resetWriteOffset(long phyOffset);

/**
* Get confirm offset.
*
* @return confirm offset.
*/
long getConfirmOffset();

/**
* Set confirm offset.
*
* @param phyOffset confirm offset to set.
*/
void setConfirmOffset(long phyOffset);

/**
* Check if the operation system page cache is busy or not.
*
* @return true if the OS page cache is busy; false otherwise.
*/
boolean isOSPageCacheBusy();

/**
* Get lock time in milliseconds of the store by far.
*
* @return lock time in milliseconds.
*/
long lockTimeMills();

/**
* Check if the transient store pool is deficient.
*
* @return true if the transient store pool is running out; false otherwise.
*/
boolean isTransientStorePoolDeficient();

/**
* Get the dispatcher list.
*
* @return list of the dispatcher.
*/
LinkedList<CommitLogDispatcher> getDispatcherList();

/**
* Get consume queue of the topic/queue.
*
* @param topic Topic.
* @param queueId Queue ID.
* @return Consume queue.
*/
ConsumeQueue getConsumeQueue(String topic, int queueId);
}

DefaultMessageStore

不同版本jdk实现都不太一样。
关于1.6
http://ifeve.com/java-concurrent-hashmap-1/
http://ifeve.com/java-concurrent-hashmap-2/
http://www.importnew.com/16147.html
为什么ConcurrentHashMap是弱一致的? (注意这是jdk1.6,1.7、1.8都不是弱一致了)
http://ifeve.com/concurrenthashmap-weakly-consistent/

关于1.7
http://www.blogjava.net/DLevin/archive/2013/10/18/405030.html

关于1.7和1.8区别
http://www.importnew.com/23610.html

接下来我们来分析1.8源码
先看下类的层次结构
image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {

//最大容量不得超过1<<30
private static final int MAXIMUM_CAPACITY = 1 << 30;

//默认初始化容量,ConcurrentHashMap容量必须是2的幂次方
private static final int DEFAULT_CAPACITY = 16;

/**
* The largest possible (non-power of two) array size.
* Needed by toArray and related methods.
*/
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

//表的默认并发级别
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

//默认装载因子,0.75是权衡空间和时间开销之后的综合考虑
private static final float LOAD_FACTOR = 0.75f;

//超过这个阈值将使用红黑树组织桶中的结点,而不是链表
static final int TREEIFY_THRESHOLD = 8;

static final int UNTREEIFY_THRESHOLD = 6;

//只有表的大小超过这个阈值,桶才可以被转换成树而不是链表(为超过这个值时,应该使用resize)
//这个值是TREEIFY_THRESHOLD的4倍,以便resizing和treeification之间产生冲突
static final int MIN_TREEIFY_CAPACITY = 64;

/**
* Minimum number of rebinnings per transfer step. Ranges are
* subdivided to allow multiple resizer threads. This value
* serves as a lower bound to avoid resizers encountering
* excessive memory contention. The value should be at least
* DEFAULT_CAPACITY.
*/
private static final int MIN_TRANSFER_STRIDE = 16;

/**
* The number of bits used for generation stamp in sizeCtl.
* Must be at least 6 for 32bit arrays.
*/
private static int RESIZE_STAMP_BITS = 16;

/**
* The maximum number of threads that can help resize.
* Must fit in 32 - RESIZE_STAMP_BITS bits.
*/
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

/**
* The bit shift for recording size stamp in sizeCtl.
*/
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

/*
* Encodings for Node hash fields. See above for explanation.
*/
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

/** Number of CPUS, to place bounds on some sizings */
static final int NCPU = Runtime.getRuntime().availableProcessors();

/** For serialization compatibility. */
private static final ObjectStreamField[] serialPersistentFields = {
new ObjectStreamField("segments", Segment[].class),
new ObjectStreamField("segmentMask", Integer.TYPE),
new ObjectStreamField("segmentShift", Integer.TYPE)
};

//存储元素的实体数组
transient volatile Node<K,V>[] table;

private transient volatile Node<K,V>[] nextTable;

/**
* Base counter value, used mainly when there is no contention,
* but also as a fallback during table initialization
* races. Updated via CAS.
*/
private transient volatile long baseCount;

//表初始化和扩容的控制变量
//负数时候表正在初始化或者扩容的时候:-1表示初始化,其他时候是-(1+活跃的扩容线程数)
//正数表空的
private transient volatile int sizeCtl;

/**
* The next table index (plus one) to split while resizing.
*/
private transient volatile int transferIndex;

/**
* Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
*/
private transient volatile int cellsBusy;

/**
* Table of counter cells. When non-null, size is a power of 2.
*/
private transient volatile CounterCell[] counterCells;

// views
private transient KeySetView<K,V> keySet;
private transient ValuesView<K,V> values;
private transient EntrySetView<K,V> entrySet;

public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}


static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentHashMap.class;
//取到sizeCtl字段的偏移值,用于CAS操作
SIZECTL = U.objectFieldOffset
(k.getDeclaredField("sizeCtl"));
TRANSFERINDEX = U.objectFieldOffset
(k.getDeclaredField("transferIndex"));
BASECOUNT = U.objectFieldOffset
(k.getDeclaredField("baseCount"));
CELLSBUSY = U.objectFieldOffset
(k.getDeclaredField("cellsBusy"));
Class<?> ck = CounterCell.class;
CELLVALUE = U.objectFieldOffset
(ck.getDeclaredField("value"));
Class<?> ak = Node[].class;
//Node数组首指针偏移量
ABASE = U.arrayBaseOffset(ak);
//数组中元素的增量地址
int scale = U.arrayIndexScale(ak);
//不是2的平方报错
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
//log<sub>2</sub>(x) 数组元素的bit偏移量
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}

我们看下最重要的Node类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
//与hashmap的Node区别最重要的一点,使用volatile保证可见性防重排
volatile V val;
volatile Node<K,V> next;

Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}

public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
public final V setValue(V value) {
throw new UnsupportedOperationException();
}

public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
//取出快照
(v == (u = val) || v.equals(u)));
}

//遍历链表取到结点
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//小于0表示另一个线程在使用,挂起等待
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//第一个参数为需要改变的对象,第二个为偏移量(即之前求出来的valueOffset的值),第三个参数为期待的值,第四个为更新后的值
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

hash算法

1
2
3
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}

CAS判断获取桶的第i个元素

1
2
3
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

CAS设置桶的第i个元素

1
2
3
4
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

CAS设置桶第i个元素

1
2
3
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

put

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public V put(K key, V value) {
return putVal(key, value, false);
}


final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//表为空初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//hash不到(桶空的)
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//如果hash到结点是-1,表示hash到ForwardingNode
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
//hash到的桶里不为空
else {
V oldVal = null;
//多线程同步
synchronized (f) {
//再次取出节点,还是那个节点的话,表示没被其他线程修改
if (tabAt(tab, i) == f) {
//hash值大于0
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
//找到一样的节点,替换掉
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
//不一样则尾插
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//如果是红黑树,更新或添加节点
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//如果链表节点binCount大于8个转化为红黑树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

hash值为-1的节点,代表正在扩容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}

Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
}

链表转红黑树

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
//桶长度小于64,则会触发扩容,不转树
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
//桶大于64,重新取出该位置元素
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
//未被操作
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
//遍历构造每一个
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
//首节点设为hd
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private final void tryPresize(int size) {
//给定的容量若>=MAXIMUM_CAPACITY的一半,直接扩容到允许的最大值,否则调用tableSizeFor函数扩容
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
//只有大于等于0才表示该线程可以扩容,小于0时候表示有线程正在扩容,自旋等待下
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
//空的,重新初始化
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
//同样设置为-1表示正在扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
//没被其他线程修改过
else if (tab == table) {
int rs = resizeStamp(n);
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
//真正扩容的地方
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}

transfer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

脚本分析

mqbroker.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#!/bin/sh
if [ -z "$ROCKETMQ_HOME" ] ; then
## resolve links - $0 may be a link to maven's home
PRG="$0"

# need this for relative symlinks
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG="`dirname "$PRG"`/$link"
fi
done

saveddir=`pwd`

ROCKETMQ_HOME=`dirname "$PRG"`/..

# make it fully qualified
ROCKETMQ_HOME=`cd "$ROCKETMQ_HOME" && pwd`

cd "$saveddir"
fi

export ROCKETMQ_HOME

sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup $@

BrokerStartup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
public class BrokerStartup {
public static Properties properties = null;
public static CommandLine commandLine = null;
public static String configFile = null;
public static Logger log;

public static void main(String[] args) {
start(createBrokerController(args));
}

public static BrokerController start(BrokerController controller) {
try {
controller.start();
String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
+ controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
if (null != controller.getBrokerConfig().getNamesrvAddr()) {
tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
}
log.info(tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}

public static BrokerController createBrokerController(String[] args) {
//设置rocketmq.remoting.version,设置版本号
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//获取com.rocketmq.remoting.socket.sndbuf.size配置,没有则设置为131072
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
NettySystemConfig.socketSndbufSize = 131072;
}
//com.rocketmq.remoting.socket.rcvbuf.size配置
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
NettySystemConfig.socketRcvbufSize = 131072;
}

try {
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
new PosixParser());
if (null == commandLine) {
System.exit(-1);
}
final BrokerConfig brokerConfig = new BrokerConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyServerConfig.setListenPort(10911);
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();

//ASYNC_MASTER,SYNC_MASTER,SLAVE; new的时候默认ASYNC_MASTER
if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
}
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
configFile = file;
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
properties2SystemEnv(properties);
MixAll.properties2Object(properties, brokerConfig);
MixAll.properties2Object(properties, nettyServerConfig);
MixAll.properties2Object(properties, nettyClientConfig);
MixAll.properties2Object(properties, messageStoreConfig);
BrokerPathConfigHelper.setBrokerConfigPath(file);
in.close();
}
}

MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);

if (null == brokerConfig.getRocketmqHome()) {
System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV
+ " variable in your environment to match the location of the RocketMQ installation");
System.exit(-2);
}

String namesrvAddr = brokerConfig.getNamesrvAddr();
if (null != namesrvAddr) {
try {
String[] addrArray = namesrvAddr.split(";");
for (String addr : addrArray) {
RemotingUtil.string2SocketAddress(addr);
}
} catch (Exception e) {
System.out.printf(
"The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
namesrvAddr);
System.exit(-3);
}
}

switch (messageStoreConfig.getBrokerRole()) {
case ASYNC_MASTER:
case SYNC_MASTER:
brokerConfig.setBrokerId(MixAll.MASTER_ID);
break;
case SLAVE:
if (brokerConfig.getBrokerId() <= 0) {
System.out.printf("Slave's brokerId must be > 0");
System.exit(-3);
}

break;
default:
break;
}

messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");

if (commandLine.hasOption('p')) {
Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
MixAll.printObjectProperties(console, nettyClientConfig);
MixAll.printObjectProperties(console, messageStoreConfig);
System.exit(0);
} else if (commandLine.hasOption('m')) {
Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig, true);
MixAll.printObjectProperties(console, nettyServerConfig, true);
MixAll.printObjectProperties(console, nettyClientConfig, true);
MixAll.printObjectProperties(console, messageStoreConfig, true);
System.exit(0);
}

log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
MixAll.printObjectProperties(log, brokerConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
MixAll.printObjectProperties(log, nettyClientConfig);
MixAll.printObjectProperties(log, messageStoreConfig);

final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
//初始化
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);

@Override
public void run() {
synchronized (this) {
log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
if (!this.hasShutdown) {
this.hasShutdown = true;
long beginTime = System.currentTimeMillis();
controller.shutdown();
long consumingTimeTotal = System.currentTimeMillis() - beginTime;
log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
}
}
}
}, "ShutdownHook"));

return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}

return null;
}

}

BrokerController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private final BrokerConfig brokerConfig;
private final NettyServerConfig nettyServerConfig;
private final NettyClientConfig nettyClientConfig;
private final MessageStoreConfig messageStoreConfig;
private final ConsumerOffsetManager consumerOffsetManager;
private final ConsumerManager consumerManager;
private final ConsumerFilterManager consumerFilterManager;
private final ProducerManager producerManager;
private final ClientHousekeepingService clientHousekeepingService;
private final PullMessageProcessor pullMessageProcessor;
private final PullRequestHoldService pullRequestHoldService;
private final MessageArrivingListener messageArrivingListener;
private final Broker2Client broker2Client;
private final SubscriptionGroupManager subscriptionGroupManager;
private final ConsumerIdsChangeListener consumerIdsChangeListener;
private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager();
private final BrokerOuterAPI brokerOuterAPI;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"BrokerControllerScheduledThread"));
private final SlaveSynchronize slaveSynchronize;
private final BlockingQueue<Runnable> sendThreadPoolQueue;
private final BlockingQueue<Runnable> pullThreadPoolQueue;
private final BlockingQueue<Runnable> queryThreadPoolQueue;
private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
private final FilterServerManager filterServerManager;
private final BrokerStatsManager brokerStatsManager;
private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
private MessageStore messageStore;
private RemotingServer remotingServer;
private RemotingServer fastRemotingServer;
private TopicConfigManager topicConfigManager;
private ExecutorService sendMessageExecutor;
private ExecutorService pullMessageExecutor;
private ExecutorService queryMessageExecutor;
private ExecutorService adminBrokerExecutor;
private ExecutorService clientManageExecutor;
private ExecutorService consumerManageExecutor;
private boolean updateMasterHAServerAddrPeriodically = false;
private BrokerStats brokerStats;
private InetSocketAddress storeHost;
private BrokerFastFailure brokerFastFailure;
private Configuration configuration;

initialize

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
public boolean initialize() throws CloneNotSupportedException {
//加载主要模块的配置信息
boolean result = this.topicConfigManager.load();

result = result && this.consumerOffsetManager.load();
result = result && this.subscriptionGroupManager.load();
result = result && this.consumerFilterManager.load();

if (result) {
try {
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
} catch (IOException e) {
result = false;
log.error("Failed to initialize", e);
}
}

result = result && this.messageStore.load();

if (result) {
//启动服务器初始化
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
//初始化线程池
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_"));

this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));

this.adminBrokerExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
"AdminBrokerThread_"));

this.clientManageExecutor = new ThreadPoolExecutor(
this.brokerConfig.getClientManageThreadPoolNums(),
this.brokerConfig.getClientManageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.clientManagerThreadPoolQueue,
new ThreadFactoryImpl("ClientManageThread_"));

this.consumerManageExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
"ConsumerManageThread_"));
//注册请求处理器
this.registerProcessor();
//设置各种定时任务 、、等等。
//获取broker状态的
final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();
final long period = 1000 * 60 * 60 * 24;
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.getBrokerStats().record();
} catch (Throwable e) {
log.error("schedule record error.", e);
}
}
}, initialDelay, period, TimeUnit.MILLISECONDS);
//周期性将消费者的offset刷到硬盘
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
//周期性将消费者消息记录写入文件
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerFilterManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumer filter error.", e);
}
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
//周期性检查消费者的消费能力以保护broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.protectBroker();
} catch (Throwable e) {
log.error("protectBroker error.", e);
}
}
}, 3, 3, TimeUnit.MINUTES);
//定期打印消息消费标记
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printWaterMark();
} catch (Throwable e) {
log.error("printWaterMark error.", e);
}
}
}, 10, 1, TimeUnit.SECONDS);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
} catch (Throwable e) {
log.error("schedule dispatchBehindBytes error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
//获取name server的地址
if (this.brokerConfig.getNamesrvAddr() != null) { this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
} catch (Throwable e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}

if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
//如果是slave,主从同步
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
} catch (Throwable e) {
log.error("ScheduledTask syncAll slave exception", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
} else {
//打印主从差异配置
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
}

return result;
}

start

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public void start() throws Exception {
//启动存储服务
//包括逻辑队列刷盘服务,就是把内存的msg刷入磁盘
//元数据刷盘服务
//存储层状态服务 比如 put_tps get_found_tps get_miss_tps get_transfered_tps
if (this.messageStore != null) {
this.messageStore.start();
}
//启动 netty serverBootstrap 绑定端口 监听等
if (this.remotingServer != null) {
this.remotingServer.start();
}

if (this.fastRemotingServer != null) {
this.fastRemotingServer.start();
}
//客户端netty启动
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}
//启动请求Hold服务
if (this.pullRequestHoldService != null) {
this.pullRequestHoldService.start();
}
//定期检测客户端连接
if (this.clientHousekeepingService != null) {
this.clientHousekeepingService.start();
}
//过滤服务启动
if (this.filterServerManager != null) {
this.filterServerManager.start();
}
//向namesrv注册broker
this.registerBrokerAll(true, false);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false);
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
// broker状态管理启动
if (this.brokerStatsManager != null) {
this.brokerStatsManager.start();
}

if (this.brokerFastFailure != null) {
this.brokerFastFailure.start();
}
}

registerBrokerAll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) {
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
//同步 Broker 读写权限
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}

RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.getHAServerAddr(),
topicConfigWrapper,
this.filterServerManager.buildNewFilterServerList(),
oneway,
this.brokerConfig.getRegisterBrokerTimeoutMills());

if (registerBrokerResult != null) {
if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
}

this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());

if (checkOrderConfig) {
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
}
}
}

rocketMQ通信模块

image

NettyRemotingClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
 @Override
public void start() {
//业务线程池
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});

Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
defaultEventExecutorGroup,
//编码(RemotingCommand 的Header和Body依次写入)
new NettyEncoder(),
//解码(LengthFieldBasedFrameDecoder自定义长度解码器解决TCP粘包)
new NettyDecoder(),
//netty 4.X心跳检测
//readerIdleTime:为读超时时间(即测试端一定时间内未接受到被测试端消息)、writerIdleTime:为写超时时间(即测试端一定时间内向被测试端发送消息)、allIdleTime:所有类型的超时时间 120
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
//连接管理handler,处理connect, disconnect, close等事件
new NettyConnectManageHandler(),
//处理接收到RemotingCommand消息后的事件, 收到服务器端响应后的相关操作
new NettyClientHandler());
}
});
//每次有消息需要发送, 就会生成resposneFuture用于接收消息回应, 但是如果始终没有收到回应, //Map(scanResponseTable)中的responseFuture就会堆积.
//这个时候就需要一个线程来专门做Map的清理回收,
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);

if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void scanResponseTable() {
final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, ResponseFuture> next = it.next();
ResponseFuture rep = next.getValue();

if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
rep.release();
it.remove();
rfList.add(rep);
log.warn("remove timeout request, " + rep);
}
}

for (ResponseFuture rf : rfList) {
try {
executeInvokeCallback(rf);
} catch (Throwable e) {
log.warn("scanResponseTable, operationComplete Exception", e);
}
}
}

RocketMQ 通信编解码源码解析

编码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);

@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
//nio ByteBuffer 具体见http://www1350.github.io/#post/115
ByteBuffer header = remotingCommand.encodeHeader();
out.writeBytes(header);
byte[] body = remotingCommand.getBody();
if (body != null) {
out.writeBytes(body);
}
} catch (Exception e) {
log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
if (remotingCommand != null) {
log.error(remotingCommand.toString());
}
RemotingUtil.closeChannel(ctx.channel());
}
}
}

解码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class NettyDecoder extends LengthFieldBasedFrameDecoder {
private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);

private static final int FRAME_MAX_LENGTH =
Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));

public NettyDecoder() {
super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
}

@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = null;
try {
frame = (ByteBuf) super.decode(ctx, in);
if (null == frame) {
return null;
}
//netty的ByteBuf 转化为nio 的ByteBuffer
// http://www.jianshu.com/p/0f93834f23de
ByteBuffer byteBuffer = frame.nioBuffer();

return RemotingCommand.decode(byteBuffer);
} catch (Exception e) {
log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
RemotingUtil.closeChannel(ctx.channel());
} finally {
if (null != frame) {
frame.release();
}
}

return null;
}
}

抓一下包:
三次握手
b79ea69e-e0c3-4400-813b-66d201dab19d

内容
wx20171122-225830 2x

我们来看看通信的时候编解码的包:
main

处理header

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public ByteBuffer encodeHeader() {
return encodeHeader(this.body != null ? this.body.length : 0);
}

public ByteBuffer encodeHeader(final int bodyLength) {
// 1> header length size
int length = 4;

// 2> header data length
byte[] headerData;
headerData = this.headerEncode();

length += headerData.length;

// 3> body data length
length += bodyLength;

ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);

// length
result.putInt(length);

// header length
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

// header data
result.put(headerData);

result.flip();

return result;
}


public static byte[] markProtocolType(int source, SerializeType type) {
byte[] result = new byte[4];

result[0] = type.getCode();
result[1] = (byte) ((source >> 16) & 0xFF);
result[2] = (byte) ((source >> 8) & 0xFF);
result[3] = (byte) (source & 0xFF);
return result;
}

header内容:

字段名 类型 Request Response 描述
code short 请求操作码,请求接收方根据不同代码做不同操作 应答结果代码,0成功非0失败
language byte 请求发起方语言,默认JAVA 应答接收方语言 详见LanguageCode
version short 请求发起方版本 应答接收方 环境变量 rocketmq.remoting.version
opaque int 请求发起方在同一连接不同的请求标识符,多线程连接时复用 应答方不做处理直接返回 RPC请求的序号
flag int 通信层标志位 通信层标志位 区分是普通RPC还是onewayRPC得标志
remark String 传输自定义文本 错误详细描述信息
extFields HashMap<String, String> 请求自定义字段 应答自定义字段 customHeader的每个字段
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND rpc类型的标注,一种是普通的RPC请求
private static final int RPC_ONEWAY = 1; // 0, 这种ONEWAY 是指单向RPC,比如心跳包

private static final Map<Class<? extends CommandCustomHeader>, Field[]> clazzFieldsCache =
new HashMap<Class<? extends CommandCustomHeader>, Field[]>();//**CommandCustomHader**是所有headerData都要实现的接口,后面的Field[]就是解析header所对应的成员属性,所以这个map就是解析时候的字段缓存,下面两个map也是分别对应类名缓存和注解缓存。
private static final Map<Class, String> canonicalNameCache = new HashMap<Class, String>();
// 1, RESPONSE_COMMAND
private static final Map<Field, Annotation> notNullAnnotationCache = new HashMap<Field, Annotation>();
private static AtomicInteger requestId = new AtomicInteger(0);//这里的requestId是RPC请求的序号,每次请求的时候都会increment一下,同时后面会讲到的responseTable会用这个requestId作为key。

private int code;//这里的code是用来区分request类型的
private LanguageCode language = LanguageCode.JAVA;//区分语言种类
private int version = 0;//RPC版本号
private int opaque = requestId.getAndIncrement();//这里的opaque就是requestId
private int flag = 0;//区分是普通RPC还是onewayRPC得标志
private String remark;//标注信息
private HashMap<String, String> extFields;//存放本次RPC通信中所有的extFeilds,extFeilds其实就可以理解成本次通信的包头数据
private transient CommandCustomHeader customHeader; //包头数据,注意transient标记,不会被序列化
private transient byte[] body; //body数据,注意transient标记,不会被序列化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
   private byte[] headerEncode() {
this.makeCustomHeaderToNet();
//基于rocketmq还是基于JSON的序列化方式
if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
return RocketMQSerializable.rocketMQProtocolEncode(this);
} else {
return RemotingSerializable.encode(this);
}
}


public void makeCustomHeaderToNet() {
if (this.customHeader != null) {
Field[] fields = getClazzFields(customHeader.getClass());
if (null == this.extFields) {
this.extFields = new HashMap<String, String>();
}

for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
String name = field.getName();
if (!name.startsWith("this")) {
Object value = null;
try {
field.setAccessible(true);
value = field.get(this.customHeader);
} catch (Exception e) {
log.error("Failed to access field [{}]", name, e);
}

if (value != null) {
this.extFields.put(name, value.toString());
}
}
}
}
}
}

ba3e2d6c-d79a-4148-9c88-facba76ff0e1

RocketMQ序列化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) {
// String remark
byte[] remarkBytes = null;
int remarkLen = 0;
if (cmd.getRemark() != null && cmd.getRemark().length() > 0) {
remarkBytes = cmd.getRemark().getBytes(CHARSET_UTF8);
remarkLen = remarkBytes.length;
}

// HashMap<String, String> extFields
byte[] extFieldsBytes = null;
int extLen = 0;
if (cmd.getExtFields() != null && !cmd.getExtFields().isEmpty()) {
extFieldsBytes = mapSerialize(cmd.getExtFields());
extLen = extFieldsBytes.length;
}

int totalLen = calTotalLen(remarkLen, extLen);

ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen);
// int code(~32767)
headerBuffer.putShort((short) cmd.getCode());
// LanguageCode language
headerBuffer.put(cmd.getLanguage().getCode());
// int version(~32767)
headerBuffer.putShort((short) cmd.getVersion());
// int opaque
headerBuffer.putInt(cmd.getOpaque());
// int flag
headerBuffer.putInt(cmd.getFlag());
// String remark
if (remarkBytes != null) {
headerBuffer.putInt(remarkBytes.length);
headerBuffer.put(remarkBytes);
} else {
headerBuffer.putInt(0);
}
// HashMap<String, String> extFields;
if (extFieldsBytes != null) {
headerBuffer.putInt(extFieldsBytes.length);
headerBuffer.put(extFieldsBytes);
} else {
headerBuffer.putInt(0);
}

return headerBuffer.array();
}

JSON序列化

1
2
3
4
5
6
7
public static byte[] encode(final Object obj) {
final String json = toJson(obj, false);
if (json != null) {
return json.getBytes(CHARSET_UTF8);
}
return null;
}

OnewayRPC

1
2
3
4
5
6
7
8
9
10
11
public void markOnewayRPC() {
int bits = 1 << RPC_ONEWAY;
this.flag |= bits;
}

@JSONField(serialize = false)
public boolean isOnewayRPC() {
int bits = 1 << RPC_ONEWAY;
return (this.flag & bits) == bits;
}

1
2
3
4
5
@JSONField(serialize = false)
public boolean isResponseType() {
int bits = 1 << RPC_TYPE;
return (this.flag & bits) == bits;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}

processRequestCommand

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
//缓存着每个请求的code对应的处理器
//MQClientAPIImpl构造器里面初始化了
//CHECK_TRANSACTION_STATE(39)、NOTIFY_CONSUMER_IDS_CHANGED(40)、RESET_CONSUMER_CLIENT_OFFSET(220)、GET_CONSUMER_STATUS_FROM_CLIENT(221)、GET_CONSUMER_RUNNING_INFO(307)、CONSUME_MESSAGE_DIRECTLY(309)
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();

if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
if (rpcHook != null) {
rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
}

final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
if (rpcHook != null) {
rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
}

if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {

}
}
} catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString());

if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
};

if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}

try {
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ ", too many requests and system thread pool busy, RejectedExecutionException "
+ pair.getObject2().toString()
+ " request code: " + cmd.getCode());
}

if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
} else {
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}

processRequestCommand

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();
//获取请求对应的responseFuture
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
responseFuture.setResponseCommand(cmd);

responseFuture.release();

responseTable.remove(opaque);

if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);
} else {
responseFuture.putResponse(cmd);
}
} else {
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
}
}

sequencediagram1

部分重点代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
 private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}

SendMessageContext context = null;
if (brokerAddr != null) {
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}

int sysFlag = 0;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
}

final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}

if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
}

if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}

if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
}

//构成cmd里面的customHeader
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
//ProducerGroup
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
//Topic
requestHeader.setTopic(msg.getTopic());
//DefaultTopic
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
//DefaultTopicQueueNums在发送消息时,自动创建服务器不存在的topic,默认创建的队列数(默认4)
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
//QueueId
requestHeader.setQueueId(mq.getQueueId());
//参见MessageSysFlag 跟事务消息有关
requestHeader.setSysFlag(sysFlag);
//
requestHeader.setBornTimestamp(System.currentTimeMillis());

requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
//reconsumeTimes代表消息重试次数
requestHeader.setReconsumeTimes(0);

requestHeader.setUnitMode(this.isUnitMode());
//如果是批处理消息,放入状态
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}

String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}

SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}

if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}

return sendResult;
} catch (RemotingException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (MQBrokerException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (InterruptedException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} finally {
msg.setBody(prevBody);
}
}

throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
1
2
3
4
5
6
7
8
9
10
//12
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);

public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
RemotingCommand cmd = new RemotingCommand();
cmd.setCode(code);
cmd.customHeader = customHeader;
setCmdVersion(cmd);
return cmd;
}

发送端做了什么:
invokesync-client png

  • 构建ResponseFuture,设置opaque值,把ResponseFuture以opaque为键放入Map中保存
  • netty发送请送请求
  • 发送成功设置ResponseFuture发送状态为成功;发送失败设置ResponseFuture发送失败,并且从Map存中移除ResponseFuture
  • responseFuture.waitResponse(timeoutMillis)获取响应(如果超时则抛出异常)
  • 收到服务端的回应以后,从Map中根据opaque拿出responseFuture,将回应写入其中,并从Map中移除
  • resposneFuture得到回应,并将返回给客户端

接收端做了什么:

  • netty监听得到发送过来的消息,分拣给Server端进行处理
  • 根据消息的code得到对应的处理器(Processor)
  • 创建一个新的线程,在这个线程中让处理器去处理消息,并得到回应(Response)。判断如果消息不是单向的(one-way),则把请求中的opaque放回response中,并把消息发回给请求端。
  • 将线程放入线程池–这里注意 请求消息的code对应了一组(Processor,ExectorService),处理器和线程池是对应的

invokesync2

源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
//根据地址获取channel
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
//
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis);
if (this.rpcHook != null) {this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
}
return response;
} catch (RemotingSendRequestException e) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
} catch (RemotingTimeoutException e) {
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeChannel(addr, channel);
log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
//获取请求的序号
final int opaque = request.getOpaque();
try {
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
//根据opaque序号存入responseFuture
this.responseTable.put(opaque, responseFuture);
//获取addr
final SocketAddress addr = channel.remoteAddress();
//调用netty的channel发送请求,监听返回回写response结果到responseFuture
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
//移除相应opaque对应responseFuture
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
//等待putResponse 返回结果 this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}

return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}

SendResult [sendStatus=SEND_OK, msgId=AC1107140D7C3764951D701DA4250000, offsetMsgId=AC1128E100002A9F0000000000579CFC, messageQueue=MessageQueue [topic=TopicTest, brokerName=kickseed, queueId=0], queueOffset=7595]

wx20171122-231321 2x wx20171122-231450 2x

Catalina

1
2
3
4
5
6
7
digester.addRule("Server/Service/Connector",
new ConnectorCreateRule());
digester.addRule("Server/Service/Connector",
new SetAllPropertiesRule(new String[]{"executor", "sslImplementationName"}));
digester.addSetNext("Server/Service/Connector",
"addConnector",
"org.apache.catalina.connector.Connector");

ConnectorCreateRule

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public void begin(String namespace, String name, Attributes attributes)
throws Exception {
Service svc = (Service)digester.peek();
Executor ex = null;
if ( attributes.getValue("executor")!=null ) {
ex = svc.getExecutor(attributes.getValue("executor"));
}
//根据protocol
Connector con = new Connector(attributes.getValue("protocol"));
if (ex != null) {
setExecutor(con, ex);
}
String sslImplementationName = attributes.getValue("sslImplementationName");
if (sslImplementationName != null) {
setSSLImplementationName(con, sslImplementationName);
}
digester.push(con);
}

Connector

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
    public Connector(String protocol) {
setProtocol(protocol);
// Instantiate protocol handler
ProtocolHandler p = null;
try {
//加载ProtocolHandlerClassName
Class<?> clazz = Class.forName(protocolHandlerClassName);
p = (ProtocolHandler) clazz.getConstructor().newInstance();
} catch (Exception e) {
log.error(sm.getString(
"coyoteConnector.protocolHandlerInstantiationFailed"), e);
} finally {
this.protocolHandler = p;
}

if (Globals.STRICT_SERVLET_COMPLIANCE) {
uriCharset = StandardCharsets.ISO_8859_1;
} else {
uriCharset = StandardCharsets.UTF_8;
}
}


@Deprecated
public void setProtocol(String protocol) {

boolean aprConnector = AprLifecycleListener.isAprAvailable() &&
AprLifecycleListener.getUseAprConnector();

//如果是Connector protocol="HTTP/1.1"
if ("HTTP/1.1".equals(protocol) || protocol == null) {
if (aprConnector) {
//设置ProtocolHandlerClassName
setProtocolHandlerClassName("org.apache.coyote.http11.Http11AprProtocol");
} else {
setProtocolHandlerClassName("org.apache.coyote.http11.Http11NioProtocol");
}
} else if ("AJP/1.3".equals(protocol)) {
if (aprConnector) {
setProtocolHandlerClassName("org.apache.coyote.ajp.AjpAprProtocol");
} else {
setProtocolHandlerClassName("org.apache.coyote.ajp.AjpNioProtocol");
}
} else {
setProtocolHandlerClassName(protocol);
}
}

Http11AprProtocol

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public Http11AprProtocol() {
super(new AprEndpoint());
}

public AbstractHttp11Protocol(AbstractEndpoint<S> endpoint) {
super(endpoint);
setConnectionTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
//初始化ConnectionHandler
ConnectionHandler<S> cHandler = new ConnectionHandler<>(this);
setHandler(cHandler);
getEndpoint().setHandler(cHandler);
}

public AbstractProtocol(AbstractEndpoint<S> endpoint) {
this.endpoint = endpoint;
setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
}


public AprEndpoint() {
// Need to override the default for maxConnections to align it with what
// was pollerSize (before the two were merged)
setMaxConnections(8 * 1024);
}

image

AbstractProtocol的start

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
    @Override
public void start() throws Exception {

//AbstractEndpoint
endpoint.start();

// Start async timeout thread
asyncTimeout = new AsyncTimeout();
//创建http-apr-8080-AsyncTimeout线程(守护进程)
//作用:检测超时的请求,并将该请求再转发到工作线程池处理
Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout");
int priority = endpoint.getThreadPriority();
if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
priority = Thread.NORM_PRIORITY;
}
timeoutThread.setPriority(priority);
timeoutThread.setDaemon(true);
//启动
timeoutThread.start();
}

AprEndpoint(AbstractEndpoint)的start

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
    public final void start() throws Exception {
if (bindState == BindState.UNBOUND) {
//bind
bind();
bindState = BindState.BOUND_ON_START;
}
startInternal();
}





@Override
public void startInternal() throws Exception {

if (!running) {
running = true;
paused = false;

processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getProcessorCache());

// Create worker collection
if (getExecutor() == null) {
createExecutor();
}

initializeConnectionLatch();

// Start poller thread
poller = new Poller();
poller.init();
Thread pollerThread = new Thread(poller, getName() + "-Poller");
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();

// Start sendfile thread
if (getUseSendfile()) {
sendfile = new Sendfile();
sendfile.init();
Thread sendfileThread =
new Thread(sendfile, getName() + "-Sendfile");
sendfileThread.setPriority(threadPriority);
sendfileThread.setDaemon(true);
sendfileThread.start();
}
//创建Acceptor线程
startAcceptorThreads();
}
}


public void createExecutor() {
internalExecutor = true;
TaskQueue taskqueue = new TaskQueue();
//http-apr-8080-exec-
TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
taskqueue.setParent( (ThreadPoolExecutor) executor);
}


protected final void startAcceptorThreads() {
//accept队列的长度;当accept队列中连接的个数达到acceptCount时,队列满,进来的请求一律被拒绝。默认值是100。
int count = getAcceptorThreadCount();
acceptors = new Acceptor[count];

for (int i = 0; i < count; i++) {
//new Acceptor()
acceptors[i] = createAcceptor();
//http-apr-8080-Acceptor-0
String threadName = getName() + "-Acceptor-" + i;
acceptors[i].setThreadName(threadName);
Thread t = new Thread(acceptors[i], threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
}

到这里就启动了一个http-apr-8080-AsyncTimeout线程和多个http-apr-8080-Acceptor-线程

这里我们着重看下Acceptor线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
protected class Acceptor extends AbstractEndpoint.Acceptor {

private final Log log = LogFactory.getLog(AprEndpoint.Acceptor.class);

@Override
public void run() {

int errorDelay = 0;

// 除非收到shutdown命令否则一直循环
while (running) {

// Loop if endpoint is paused
while (paused && running) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}

if (!running) {
break;
}
state = AcceptorState.RUNNING;

try {
//if we have reached max connections, wait
//使用了connectionLimitLatch限制最大连接数,到达上限就wait
countUpOrAwaitConnection();

long socket = 0;
try {
// Accept the next incoming connection from the server
// socket 监听到客户端的连接
socket = Socket.accept(serverSock);

} catch (Exception e) {
// We didn't get a socket
countDownConnection();
if (running) {
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw e;
} else {
break;
}
}
// Successful accept, reset the error delay
errorDelay = 0;

if (running && !paused) {
// 把这个socket给一个合适的处理器处理
if (!processSocketWithOptions(socket)) {
// Close socket right away
closeSocket(socket);
}
} else {
// Close socket right away
// No code path could have added the socket to the
// Poller so use destroySocket()
destroySocket(socket);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
String msg = sm.getString("endpoint.accept.fail");
if (t instanceof Error) {
Error e = (Error) t;
if (e.getError() == 233) {
// Not an error on HP-UX so log as a warning
// so it can be filtered out on that platform
// See bug 50273
log.warn(msg, t);
} else {
log.error(msg, t);
}
} else {
log.error(msg, t);
}
}
// The processor will recycle itself when it finishes
}
state = AcceptorState.ENDED;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
protected boolean processSocketWithOptions(long socket) {
try {
// During shutdown, executor may be null - avoid NPE
if (running) {
//包装成AprSocketWrapper
AprSocketWrapper wrapper = new AprSocketWrapper(Long.valueOf(socket), this);
wrapper.setKeepAliveLeft(getMaxKeepAliveRequests());
wrapper.setSecure(isSSLEnabled());
wrapper.setReadTimeout(getConnectionTimeout());
wrapper.setWriteTimeout(getConnectionTimeout());
connections.put(Long.valueOf(socket), wrapper);
//SocketWithOptionsProcessor丢到线程池执行
getExecutor().execute(new SocketWithOptionsProcessor(wrapper));
}
} catch (RejectedExecutionException x) {
log.warn("Socket processing request was rejected for:"+socket,x);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}

SocketWithOptionsProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
protected class SocketWithOptionsProcessor implements Runnable {

protected SocketWrapperBase<Long> socket = null;

public SocketWithOptionsProcessor(SocketWrapperBase<Long> socket) {
this.socket = socket;
}

@Override
public void run() {

synchronized (socket) {
if (!deferAccept) {
if (setSocketOptions(socket)) {
getPoller().add(socket.getSocket().longValue(),
getConnectionTimeout(), Poll.APR_POLLIN);
} else {
// Close socket and pool
closeSocket(socket.getSocket().longValue());
socket = null;
}
} else {
// Process the request from this socket
if (!setSocketOptions(socket)) {
// Close socket and pool
closeSocket(socket.getSocket().longValue());
socket = null;
return;
}
// Process the request from this socket
//用Http11AprProtocol 处理
Handler.SocketState state = getHandler().process(socket,
SocketEvent.OPEN_READ);
if (state == Handler.SocketState.CLOSED) {
// Close socket and pool
closeSocket(socket.getSocket().longValue());
socket = null;
}
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
@Override
public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
if (wrapper == null) {
// Nothing to do. Socket has been closed.
return SocketState.CLOSED;
}

S socket = wrapper.getSocket();
//缓存一个map
Processor processor = connections.get(socket);

if (processor != null) {
// Make sure an async timeout doesn't fire
getProtocol().removeWaitingProcessor(processor);
} else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) {
// Nothing to do. Endpoint requested a close and there is no
// longer a processor associated with this socket.
return SocketState.CLOSED;
}

ContainerThreadMarker.set();

try {
if (processor == null) {
String negotiatedProtocol = wrapper.getNegotiatedProtocol();
if (negotiatedProtocol != null) {
UpgradeProtocol upgradeProtocol =
getProtocol().getNegotiatedProtocol(negotiatedProtocol);
if (upgradeProtocol != null) {
processor = upgradeProtocol.getProcessor(
wrapper, getProtocol().getAdapter());
} else if (negotiatedProtocol.equals("http/1.1")) {
// Explicitly negotiated the default protocol.
// Obtain a processor below.
} else {
// TODO:
// OpenSSL 1.0.2's ALPN callback doesn't support
// failing the handshake with an error if no
// protocol can be negotiated. Therefore, we need to
// fail the connection here. Once this is fixed,
// replace the code below with the commented out
// block.
return SocketState.CLOSED;
/*
* To replace the code above once OpenSSL 1.1.0 is
* used.
// Failed to create processor. This is a bug.
throw new IllegalStateException(sm.getString(
"abstractConnectionHandler.negotiatedProcessor.fail",
negotiatedProtocol));
*/
}
}
}
if (processor == null) {
processor = recycledProcessors.pop();
}
if (processor == null) {
//创建processor,这里不贴代码了。实际上就是Http11AprProtocol(AbstractHttp11Protocol)的createProcessor,最后构造的是Http11Processor类。
processor = getProtocol().createProcessor();
register(processor);
}

processor.setSslSupport(
wrapper.getSslSupport(getProtocol().getClientCertProvider()));

// Associate the processor with the connection
connections.put(socket, processor);

SocketState state = SocketState.CLOSED;
do {
//最关键的地方!
state = processor.process(wrapper, status);

if (state == SocketState.UPGRADING) {
...
//UPGRADING
}
} while ( state == SocketState.UPGRADING);

if (state == SocketState.LONG) {
// In the middle of processing a request/response. Keep the
// socket associated with the processor. Exact requirements
// depend on type of long poll
longPoll(wrapper, processor);
if (processor.isAsync()) {
getProtocol().addWaitingProcessor(processor);
}
} else if (state == SocketState.OPEN) {
// In keep-alive but between requests. OK to recycle
// processor. Continue to poll for the next request.
connections.remove(socket);
release(processor);
wrapper.registerReadInterest();
} else if (state == SocketState.SENDFILE) {
// Sendfile in progress. If it fails, the socket will be
// closed. If it works, the socket either be added to the
// poller (or equivalent) to await more data or processed
// if there are any pipe-lined requests remaining.
} else if (state == SocketState.UPGRADED) {
// Don't add sockets back to the poller if this was a
// non-blocking write otherwise the poller may trigger
// multiple read events which may lead to thread starvation
// in the connector. The write() method will add this socket
// to the poller if necessary.
if (status != SocketEvent.OPEN_WRITE) {
longPoll(wrapper, processor);
}
} else if (state == SocketState.SUSPENDED) {
// Don't add sockets back to the poller.
// The resumeProcessing() method will add this socket
// to the poller.
} else {
...
//Upgrade
}
return state;
} finally {
ContainerThreadMarker.clear();
}

// Make sure socket/processor is removed from the list of current
// connections
connections.remove(socket);
release(processor);
return SocketState.CLOSED;
}

image

AbstractProcessorLight的process

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Override
public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
throws IOException {

SocketState state = SocketState.CLOSED;
Iterator<DispatchType> dispatches = null;
do {
if (dispatches != null) {
DispatchType nextDispatch = dispatches.next();
state = dispatch(nextDispatch.getSocketStatus());
} else if (status == SocketEvent.DISCONNECT) {
// Do nothing here, just wait for it to get recycled
} else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
state = dispatch(status);
if (state == SocketState.OPEN) {
// There may be pipe-lined data to read. If the data isn't
// processed now, execution will exit this loop and call
// release() which will recycle the processor (and input
// buffer) deleting any pipe-lined data. To avoid this,
// process it now.
state = service(socketWrapper);
}
} else if (status == SocketEvent.OPEN_WRITE) {
// Extra write event likely after async, ignore
state = SocketState.LONG;
} else if (status == SocketEvent.OPEN_READ){
state = service(socketWrapper);
} else {
// Default to closing the socket if the SocketEvent passed in
// is not consistent with the current state of the Processor
state = SocketState.CLOSED;
}

if (state != SocketState.CLOSED && isAsync()) {
state = asyncPostProcess();
}


if (dispatches == null || !dispatches.hasNext()) {
// Only returns non-null iterator if there are
// dispatches to process.
dispatches = getIteratorAndClearDispatches();
}
} while (state == SocketState.ASYNC_END ||
dispatches != null && state != SocketState.CLOSED);

return state;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
@Override
public SocketState service(SocketWrapperBase<?> socketWrapper)
throws IOException {
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);

// Setting up the I/O
setSocketWrapper(socketWrapper);
inputBuffer.init(socketWrapper);
outputBuffer.init(socketWrapper);

// Flags
keepAlive = true;
openSocket = false;
readComplete = true;
boolean keptAlive = false;
SendfileState sendfileState = SendfileState.DONE;

while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null &&
sendfileState == SendfileState.DONE && !endpoint.isPaused()) {

// Parsing the request header
try {
//解析请求头,并装入inputBuffer里的私有字段org.apache.coyote.Request的属性里
if (!inputBuffer.parseRequestLine(keptAlive)) {
if (inputBuffer.getParsingRequestLinePhase() == -1) {
return SocketState.UPGRADING;
} else if (handleIncompleteRequestLineRead()) {
break;
}
}

if (endpoint.isPaused()) {
// 503 - Service unavailable
response.setStatus(503);
setErrorState(ErrorState.CLOSE_CLEAN, null);
} else {
keptAlive = true;
// Set this every time in case limit has been changed via JMX
request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount());
if (!inputBuffer.parseHeaders()) {
// We've read part of the request, don't recycle it
// instead associate it with the socket
openSocket = true;
readComplete = false;
break;
}
if (!disableUploadTimeout) {
socketWrapper.setReadTimeout(connectionUploadTimeout);
}
}
} catch (IOException e) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("http11processor.header.parse"), e);
}
setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
break;
} catch (Throwable t) {
...
// 400 - Bad Request
response.setStatus(400);
setErrorState(ErrorState.CLOSE_CLEAN, t);
getAdapter().log(request, response, 0);
}

// Has an upgrade been requested?
Enumeration<String> connectionValues = request.getMimeHeaders().values("Connection");
boolean foundUpgrade = false;
while (connectionValues.hasMoreElements() && !foundUpgrade) {
foundUpgrade = connectionValues.nextElement().toLowerCase(
Locale.ENGLISH).contains("upgrade");
}

if (foundUpgrade) {
// Check the protocol
String requestedProtocol = request.getHeader("Upgrade");

UpgradeProtocol upgradeProtocol = httpUpgradeProtocols.get(requestedProtocol);
//转换传输协议
if (upgradeProtocol != null) {
if (upgradeProtocol.accept(request)) {
// TODO Figure out how to handle request bodies at this
// point.
response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
response.setHeader("Connection", "Upgrade");
response.setHeader("Upgrade", requestedProtocol);
action(ActionCode.CLOSE, null);
getAdapter().log(request, response, 0);

InternalHttpUpgradeHandler upgradeHandler =
upgradeProtocol.getInternalUpgradeHandler(
getAdapter(), cloneRequest(request));
UpgradeToken upgradeToken = new UpgradeToken(upgradeHandler, null, null);
action(ActionCode.UPGRADE, upgradeToken);
return SocketState.UPGRADING;
}
}
}

if (!getErrorState().isError()) {
// Setting up filters, and parse some request headers
rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
try {
prepareRequest();
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
if (log.isDebugEnabled()) {
log.debug(sm.getString("http11processor.request.prepare"), t);
}
// 500 - Internal Server Error
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, t);
getAdapter().log(request, response, 0);
}
}

if (maxKeepAliveRequests == 1) {
keepAlive = false;
} else if (maxKeepAliveRequests > 0 &&
socketWrapper.decrementKeepAlive() <= 0) {
keepAlive = false;
}

// Process the request in the adapter
//用adapter处理请求
if (!getErrorState().isError()) {
try {
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
getAdapter().service(request, response);
// Handle when the response was committed before a serious
// error occurred. Throwing a ServletException should both
// set the status to 500 and set the errorException.
// If we fail here, then the response is likely already
// committed, so we can't try and set headers.
if(keepAlive && !getErrorState().isError() && !isAsync() &&
statusDropsConnection(response.getStatus())) {
setErrorState(ErrorState.CLOSE_CLEAN, null);
}
} catch (InterruptedIOException e) {
setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
} catch (HeadersTooLargeException e) {
log.error(sm.getString("http11processor.request.process"), e);
// The response should not have been committed but check it
// anyway to be safe
if (response.isCommitted()) {
setErrorState(ErrorState.CLOSE_NOW, e);
} else {
response.reset();
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, e);
response.setHeader("Connection", "close"); // TODO: Remove
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("http11processor.request.process"), t);
// 500 - Internal Server Error
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, t);
getAdapter().log(request, response, 0);
}
}

// Finish the handling of the request
rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);
if (!isAsync()) {
// If this is an async request then the request ends when it has
// been completed. The AsyncContext is responsible for calling
// endRequest() in that case.
endRequest();
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);

// If there was an error, make sure the request is counted as
// and error, and update the statistics counter
if (getErrorState().isError()) {
response.setStatus(500);
}

if (!isAsync() || getErrorState().isError()) {
request.updateCounters();
if (getErrorState().isIoAllowed()) {
inputBuffer.nextRequest();
outputBuffer.nextRequest();
}
}

if (!disableUploadTimeout) {
int soTimeout = endpoint.getConnectionTimeout();
if(soTimeout > 0) {
socketWrapper.setReadTimeout(soTimeout);
} else {
socketWrapper.setReadTimeout(0);
}
}

rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);

sendfileState = processSendfile(socketWrapper);
}

rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);

if (getErrorState().isError() || endpoint.isPaused()) {
return SocketState.CLOSED;
} else if (isAsync()) {
return SocketState.LONG;
} else if (isUpgrade()) {
return SocketState.UPGRADING;
} else {
if (sendfileState == SendfileState.PENDING) {
return SocketState.SENDFILE;
} else {
if (openSocket) {
if (readComplete) {
return SocketState.OPEN;
} else {
return SocketState.LONG;
}
} else {
return SocketState.CLOSED;
}
}
}
}

所以很重要的一句getAdapter().service(request, response);,而这里的adapter就是CoyoteAdapter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
@Override
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
throws Exception {
//org.apache.coyote.Request转化成org.apache.catalina.connector.Request
Request request = (Request) req.getNote(ADAPTER_NOTES);
//类似
Response response = (Response) res.getNote(ADAPTER_NOTES);

if (request == null) {
// Create objects
request = connector.createRequest();
request.setCoyoteRequest(req);
response = connector.createResponse();
response.setCoyoteResponse(res);

// Link objects
request.setResponse(response);
response.setRequest(request);

// Set as notes
req.setNote(ADAPTER_NOTES, request);
res.setNote(ADAPTER_NOTES, response);

// Set query string encoding
req.getParameters().setQueryStringCharset(connector.getURICharset());
}

if (connector.getXpoweredBy()) {
response.addHeader("X-Powered-By", POWERED_BY);
}

boolean async = false;
boolean postParseSuccess = false;

req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());

try {
// Parse and set Catalina and configuration specific
// request parameters
//很关键,请求转化为 Host、Context、Wrapper组件
postParseSuccess = postParseRequest(req, request, res, response);
if (postParseSuccess) {
//check valves if we support async
request.setAsyncSupported(
connector.getService().getContainer().getPipeline().isAsyncSupported());
// Calling the container
//connector.getService()获取StandardService
//getContainer()获取StandardEngine
//getPipeline()获取StandardPipeline
//getFirst() 获取第一个阀 Valve
//这里的基础阀其实就是StandardContextValve,见下面
connector.getService().getContainer().getPipeline().getFirst().invoke(
request, response);
}
if (request.isAsync()) {
async = true;
ReadListener readListener = req.getReadListener();
if (readListener != null && request.isFinished()) {
// Possible the all data may have been read during service()
// method so this needs to be checked here
ClassLoader oldCL = null;
try {
oldCL = request.getContext().bind(false, null);
if (req.sendAllDataReadEvent()) {
req.getReadListener().onAllDataRead();
}
} finally {
request.getContext().unbind(false, oldCL);
}
}

Throwable throwable =
(Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);

// If an async request was started, is not going to end once
// this container thread finishes and an error occurred, trigger
// the async error process
if (!request.isAsyncCompleting() && throwable != null) {
request.getAsyncContextInternal().setErrorState(throwable, true);
}
} else {
request.finishRequest();
response.finishResponse();
}

} catch (IOException e) {
// Ignore
} finally {
AtomicBoolean error = new AtomicBoolean(false);
res.action(ActionCode.IS_ERROR, error);

if (request.isAsyncCompleting() && error.get()) {
// Connection will be forcibly closed which will prevent
// completion happening at the usual point. Need to trigger
// call to onComplete() here.
res.action(ActionCode.ASYNC_POST_PROCESS, null);
async = false;
}

// Access log
if (!async && postParseSuccess) {
// Log only if processing was invoked.
// If postParseRequest() failed, it has already logged it.
Context context = request.getContext();
// If the context is null, it is likely that the endpoint was
// shutdown, this connection closed and the request recycled in
// a different thread. That thread will have updated the access
// log so it is OK not to update the access log here in that
// case.
if (context != null) {
context.logAccess(request, response,
System.currentTimeMillis() - req.getStartTime(), false);
}
}

req.getRequestProcessor().setWorkerThreadName(null);

// Recycle the wrapper request and response
if (!async) {
request.recycle();
response.recycle();
}
}
}

postParseRequest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
protected boolean postParseRequest(org.apache.coyote.Request req, Request request,
org.apache.coyote.Response res, Response response) throws IOException, ServletException {

// If the processor has set the scheme (AJP does this, HTTP does this if
// SSL is enabled) use this to set the secure flag as well. If the
// processor hasn't set it, use the settings from the connector
if (req.scheme().isNull()) {
// Use connector scheme and secure configuration, (defaults to
// "http" and false respectively)
req.scheme().setString(connector.getScheme());
request.setSecure(connector.getSecure());
} else {
// Use processor specified scheme to determine secure state
request.setSecure(req.scheme().equals("https"));
}

// At this point the Host header has been processed.
// Override if the proxyPort/proxyHost are set
String proxyName = connector.getProxyName();
int proxyPort = connector.getProxyPort();
if (proxyPort != 0) {
req.setServerPort(proxyPort);
} else if (req.getServerPort() == -1) {
// Not explicitly set. Use default ports based on the scheme
if (req.scheme().equals("https")) {
req.setServerPort(443);
} else {
req.setServerPort(80);
}
}
if (proxyName != null) {
req.serverName().setString(proxyName);
}

//uri
MessageBytes undecodedURI = req.requestURI();

// Check for ping OPTIONS * request
if (undecodedURI.equals("*")) {
if (req.method().equalsIgnoreCase("OPTIONS")) {
StringBuilder allow = new StringBuilder();
allow.append("GET, HEAD, POST, PUT, DELETE");
// Trace if allowed
if (connector.getAllowTrace()) {
allow.append(", TRACE");
}
// Always allow options
allow.append(", OPTIONS");
res.setHeader("Allow", allow.toString());
} else {
res.setStatus(404);
res.setMessage("Not found");
}
connector.getService().getContainer().logAccess(
request, response, 0, true);
return false;
}

MessageBytes decodedURI = req.decodedURI();

if (undecodedURI.getType() == MessageBytes.T_BYTES) {
// Copy the raw URI to the decodedURI
decodedURI.duplicate(undecodedURI);

// Parse the path parameters. This will:
// - strip out the path parameters
// - convert the decodedURI to bytes
parsePathParameters(req, request);

// URI decoding
// %xx decoding of the URL
try {
req.getURLDecoder().convert(decodedURI, false);
} catch (IOException ioe) {
res.setStatus(400);
res.setMessage("Invalid URI: " + ioe.getMessage());
connector.getService().getContainer().logAccess(
request, response, 0, true);
return false;
}
// Normalization
if (!normalize(req.decodedURI())) {
res.setStatus(400);
res.setMessage("Invalid URI");
connector.getService().getContainer().logAccess(
request, response, 0, true);
return false;
}
// Character decoding
convertURI(decodedURI, request);
// Check that the URI is still normalized
if (!checkNormalize(req.decodedURI())) {
res.setStatus(400);
res.setMessage("Invalid URI character encoding");
connector.getService().getContainer().logAccess(
request, response, 0, true);
return false;
}
} else {
/* The URI is chars or String, and has been sent using an in-memory
* protocol handler. The following assumptions are made:
* - req.requestURI() has been set to the 'original' non-decoded,
* non-normalized URI
* - req.decodedURI() has been set to the decoded, normalized form
* of req.requestURI()
*/
decodedURI.toChars();
// Remove all path parameters; any needed path parameter should be set
// using the request object rather than passing it in the URL
CharChunk uriCC = decodedURI.getCharChunk();
int semicolon = uriCC.indexOf(';');
if (semicolon > 0) {
decodedURI.setChars
(uriCC.getBuffer(), uriCC.getStart(), semicolon);
}
}

// Request mapping.
MessageBytes serverName;
if (connector.getUseIPVHosts()) {
serverName = req.localName();
if (serverName.isNull()) {
// well, they did ask for it
res.action(ActionCode.REQ_LOCAL_NAME_ATTRIBUTE, null);
}
} else {
serverName = req.serverName();
}

// Version for the second mapping loop and
// Context that we expect to get for that version
String version = null;
Context versionContext = null;
boolean mapRequired = true;

while (mapRequired) {
// This will map the the latest version by default
//请求转化成Host、Context、Wrapper
connector.getService().getMapper().map(serverName, decodedURI,
version, request.getMappingData());

// If there is no context at this point, it is likely no ROOT context
// has been deployed
if (request.getContext() == null) {
res.setStatus(404);
res.setMessage("Not found");
// No context, so use host
Host host = request.getHost();
// Make sure there is a host (might not be during shutdown)
if (host != null) {
host.logAccess(request, response, 0, true);
}
return false;
}

// Now we have the context, we can parse the session ID from the URL
// (if any). Need to do this before we redirect in case we need to
// include the session id in the redirect
String sessionID;
if (request.getServletContext().getEffectiveSessionTrackingModes()
.contains(SessionTrackingMode.URL)) {

// Get the session ID if there was one
sessionID = request.getPathParameter(
SessionConfig.getSessionUriParamName(
request.getContext()));
if (sessionID != null) {
request.setRequestedSessionId(sessionID);
request.setRequestedSessionURL(true);
}
}

// Look for session ID in cookies and SSL session
parseSessionCookiesId(request);
parseSessionSslId(request);

sessionID = request.getRequestedSessionId();

mapRequired = false;
if (version != null && request.getContext() == versionContext) {
// We got the version that we asked for. That is it.
} else {
version = null;
versionContext = null;

Context[] contexts = request.getMappingData().contexts;
// Single contextVersion means no need to remap
// No session ID means no possibility of remap
if (contexts != null && sessionID != null) {
// Find the context associated with the session
for (int i = (contexts.length); i > 0; i--) {
Context ctxt = contexts[i - 1];
if (ctxt.getManager().findSession(sessionID) != null) {
// We found a context. Is it the one that has
// already been mapped?
if (!ctxt.equals(request.getMappingData().context)) {
// Set version so second time through mapping
// the correct context is found
version = ctxt.getWebappVersion();
versionContext = ctxt;
// Reset mapping
request.getMappingData().recycle();
mapRequired = true;
// Recycle cookies and session info in case the
// correct context is configured with different
// settings
request.recycleSessionInfo();
request.recycleCookieInfo(true);
}
break;
}
}
}
}

if (!mapRequired && request.getContext().getPaused()) {
// Found a matching context but it is paused. Mapping data will
// be wrong since some Wrappers may not be registered at this
// point.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Should never happen
}
// Reset mapping
request.getMappingData().recycle();
mapRequired = true;
}
}

// Possible redirect
MessageBytes redirectPathMB = request.getMappingData().redirectPath;
if (!redirectPathMB.isNull()) {
String redirectPath = URLEncoder.DEFAULT.encode(
redirectPathMB.toString(), StandardCharsets.UTF_8);
String query = request.getQueryString();
if (request.isRequestedSessionIdFromURL()) {
// This is not optimal, but as this is not very common, it
// shouldn't matter
redirectPath = redirectPath + ";" +
SessionConfig.getSessionUriParamName(
request.getContext()) +
"=" + request.getRequestedSessionId();
}
if (query != null) {
// This is not optimal, but as this is not very common, it
// shouldn't matter
redirectPath = redirectPath + "?" + query;
}
response.sendRedirect(redirectPath);
request.getContext().logAccess(request, response, 0, true);
return false;
}

// Filter trace method
if (!connector.getAllowTrace()
&& req.method().equalsIgnoreCase("TRACE")) {
Wrapper wrapper = request.getWrapper();
String header = null;
if (wrapper != null) {
String[] methods = wrapper.getServletMethods();
if (methods != null) {
for (int i=0; i<methods.length; i++) {
if ("TRACE".equals(methods[i])) {
continue;
}
if (header == null) {
header = methods[i];
} else {
header += ", " + methods[i];
}
}
}
}
res.setStatus(405);
res.addHeader("Allow", header);
res.setMessage("TRACE method is not allowed");
request.getContext().logAccess(request, response, 0, true);
return false;
}

doConnectorAuthenticationAuthorization(req, request);

return true;
}

在 Service 内只有一个 Engine ,但可能有多个 Connector ,在 Engine 内部 Engine 和 Host ,Host 和 Context,Context 和 Wrapper 都是一对多的关系。但浏览器发出一次请求连接并不需要也不可能让部署在 Tomcat 中的所有 Web 应用的所有 Servlet 类都执行一遍,本文所说的 Map 机制就是为了 Connector 在接收到一次 Socket 连接时转化成请求后,能够找到 Engine 下具体哪个 Host、哪个 Context、哪个 Wrapper来执行这个请求。

StandardPipeline内部有三个成员变量basic(Valve)、container(Container)、first(Valve)

StandardPipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Override
public void addValve(Valve valve) {

// Validate that we can add this Valve
if (valve instanceof Contained)
((Contained) valve).setContainer(this.container);

// Start the new component if necessary
if (getState().isAvailable()) {
if (valve instanceof Lifecycle) {
try {
((Lifecycle) valve).start();
} catch (LifecycleException e) {
log.error("StandardPipeline.addValve: start: ", e);
}
}
}

// Add this Valve to the set associated with this Pipeline
//每次给管道添加一个普通阀的时候如果管道内原来没有普通阀则将新添加的阀作为该管道的成员变量 first 的引用,如果管道内已有普通阀,则把新加的阀加到所有普通阀链条末端,并且将该阀的下一个阀的引用设置为管道的基础阀
if (first == null) {
first = valve;
valve.setNext(basic);
} else {
Valve current = first;
while (current != null) {
if (current.getNext() == basic) {
current.setNext(valve);
valve.setNext(basic);
break;
}
current = current.getNext();
}
}

container.fireContainerEvent(Container.ADD_VALVE_EVENT, valve);
}

//如果管道中有普通阀则返回普通阀链条最开始的那个,否则就返回基础阀。
@Override
public Valve getFirst() {
if (first != null) {
return first;
}

return basic;
}

Valve共分为两类,一类叫基础阀(通过 getBasic、setBasic 方法调用),一类是普通阀(通过 addValve、removeValve 调用)。管道都是包含在一个容器当中,所以 API 里还有 getContainer 和 setContainer 方法。一个管道一般有一个基础阀(通过 setBasic 添加),可以有 0 到多个普通阀(通过 addValve 添加)。

如果想加普通阀<Valve className="org.apache.catalina.authenticator.SingleSignOn" />

所以上面的connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
就是StandardContextValve

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Override
public final void invoke(Request request, Response response)
throws IOException, ServletException {

// Disallow any direct access to resources under WEB-INF or META-INF
MessageBytes requestPathMB = request.getRequestPathMB();
if ((requestPathMB.startsWithIgnoreCase("/META-INF/", 0))
|| (requestPathMB.equalsIgnoreCase("/META-INF"))
|| (requestPathMB.startsWithIgnoreCase("/WEB-INF/", 0))
|| (requestPathMB.equalsIgnoreCase("/WEB-INF"))) {
response.sendError(HttpServletResponse.SC_NOT_FOUND);
return;
}

// Select the Wrapper to be used for this Request
//StandardWrapper
Wrapper wrapper = request.getWrapper();
if (wrapper == null || wrapper.isUnavailable()) {
response.sendError(HttpServletResponse.SC_NOT_FOUND);
return;
}

// Acknowledge the request
try {
response.sendAcknowledgement();
} catch (IOException ioe) {
container.getLogger().error(sm.getString(
"standardContextValve.acknowledgeException"), ioe);
request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, ioe);
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
return;
}

if (request.isAsyncSupported()) {
request.setAsyncSupported(wrapper.getPipeline().isAsyncSupported());
}
//这里又链式调用了StandardWrapperValve的invoke
wrapper.getPipeline().getFirst().invoke(request, response);
}

StandardWrapperValve

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Override
public final void invoke(Request request, Response response)
throws IOException, ServletException {

// Initialize local variables we may need
boolean unavailable = false;
Throwable throwable = null;
// This should be a Request attribute...
long t1=System.currentTimeMillis();
requestCount.incrementAndGet();
StandardWrapper wrapper = (StandardWrapper) getContainer();
Servlet servlet = null;
Context context = (Context) wrapper.getParent();

...


//分配一个servlet实例
if (!unavailable) {
servlet = wrapper.allocate();
}
...

MessageBytes requestPathMB = request.getRequestPathMB();
DispatcherType dispatcherType = DispatcherType.REQUEST;
if (request.getDispatcherType()==DispatcherType.ASYNC) dispatcherType = DispatcherType.ASYNC;
request.setAttribute(Globals.DISPATCHER_TYPE_ATTR,dispatcherType);
request.setAttribute(Globals.DISPATCHER_REQUEST_PATH_ATTR,
requestPathMB);
// Create the filter chain for this request
//ApplicationFilterChain filterChain = ApplicationFilterFactory.createFilterChain(request, wrapper, servlet);
//最后调用了servlet 的FilterChain,然后是各个过滤器的doFilter
ApplicationFilterChain filterChain =
ApplicationFilterFactory.createFilterChain(request, wrapper, servlet);

...
//dofilter & servlet.service
filterChain.doFilter(request.getRequest(),
response.getResponse());
...
// Release the filter chain (if any) for this request
if (filterChain != null) {
filterChain.release();
}

//解除
if (servlet != null) {
wrapper.deallocate(servlet);
}

// 如果这个servlet永久不可用,卸载和释放这个实例
// unload it and release this instance
...

}

ApplicationFilterFactory

1
2
3
4
5
6
7
8
public static ApplicationFilterChain createFilterChain(ServletRequest request,
Wrapper wrapper, Servlet servlet) {
...
filterChain.setServlet(servlet);
filterChain.setServletSupportsAsync(wrapper.isAsyncSupported());
...
return filterChain;
}

ApplicationFilterChain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
    @Override
public void doFilter(ServletRequest request, ServletResponse response)
throws IOException, ServletException {
...
internalDoFilter(req,res);
...
}

private void internalDoFilter(ServletRequest request,
ServletResponse response)
throws IOException, ServletException {
...
filter.doFilter(request, response, this);
...
if ((request instanceof HttpServletRequest) &&
(response instanceof HttpServletResponse) &&
Globals.IS_SECURITY_ENABLED ) {
final ServletRequest req = request;
final ServletResponse res = response;
Principal principal =
((HttpServletRequest) req).getUserPrincipal();
Object[] args = new Object[]{req, res};
SecurityUtil.doAsPrivilege("service",
servlet,
classTypeUsedInService,
args,
principal);
} else {
servlet.service(request, response);
}
...
}

很明显最后就调用到service了,后续大家应该很了解

sequencediagram22

Lifecycle接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Add a LifecycleEvent listener to this component.
*
* @param listener The listener to add
*/
public void addLifecycleListener(LifecycleListener listener);//给该组将添加一个监听器


/**
* Get the life cycle listeners associated with this life cycle. If this
* component has no listeners registered, a zero-length array is returned.
*/
public LifecycleListener[] findLifecycleListeners();//获取该组件所有已注册的监听器


/**
* Remove a LifecycleEvent listener from this component.
*
* @param listener The listener to remove
*/
public void removeLifecycleListener(LifecycleListener listener);//删除该组件中的一个监听器

LifecycleListener接口

1
2
3
4
5
6
7
8
9
public interface LifecycleListener {
/**
* Acknowledge the occurrence of the specified event.
*
* @param event LifecycleEvent that has occurred
*/
public void lifecycleEvent(LifecycleEvent event);
}

LifecycleEvent接口(继承jdk 内置的 java.util.EventObject)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public final class LifecycleEvent extends EventObject {

private static final long serialVersionUID = 1L;


// ----------------------------------------------------------- Constructors

/**
* Construct a new LifecycleEvent with the specified parameters.
*
* @param lifecycle Component on which this event occurred
* @param type Event type (required)
* @param data Event data (if any)
*/
public LifecycleEvent(Lifecycle lifecycle, String type, Object data) {

super(lifecycle);
this.type = type;
this.data = data;
}


// ----------------------------------------------------- Instance Variables


/**
* The event data associated with this event.
*/
private final Object data;


/**
* The event type this instance represents.
*/
private final String type;


// ------------------------------------------------------------- Properties


/**
* Return the event data of this event.
*/
public Object getData() {

return (this.data);

}


/**
* Return the Lifecycle on which this event occurred.
*/
public Lifecycle getLifecycle() {

return (Lifecycle) getSource();

}


/**
* Return the event type of this event.
*/
public String getType() {

return (this.type);

}


}

LifecycleSupport的fireLifecycleEvent,向注册到组件中的所有监听器发布这个新构造的事件对象。

在 LifecycleBase类里LifecycleSupport是这么初始化的:private final LifecycleSupport lifecycle = new LifecycleSupport(this);,就是把StandardServer(LifecycleBase)的实例作为Lifecycle

1
2
3
4
5
6
7
8
public void fireLifecycleEvent(String type, Object data) {

LifecycleEvent event = new LifecycleEvent(lifecycle, type, data);
LifecycleListener interested[] = listeners;
for (int i = 0; i < interested.length; i++)
interested[i].lifecycleEvent(event);

}

org.apache.catalina.startup.Catalina 类的 createStartDigester 方法有这么一段代码:

1
2
3
4
5
6
7
digester.addObjectCreate("Server/Listener",
null, // MUST be specified in the element
"className");
digester.addSetProperties("Server/Listener");
digester.addSetNext("Server/Listener",
"addLifecycleListener",
"org.apache.catalina.LifecycleListener");

LifecycleSupport

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    public void addLifecycleListener(LifecycleListener listener) {

synchronized (listenersLock) {
LifecycleListener results[] =
new LifecycleListener[listeners.length + 1];
for (int i = 0; i < listeners.length; i++)
results[i] = listeners[i];
results[listeners.length] = listener;
listeners = results;
}

}

private LifecycleListener listeners[] = new LifecycleListener[0];

如果对设计模式比较熟悉的话会发现 Tomcat 的 Lifecycle 使用的是观察者模式:LifecycleListener 代表的是抽象观察者,它定义一个 lifecycleEvent 方法,而实现该接口的监听器是作为具体的观察者。Lifecycle 接口代表的是抽象主题,它定义了管理观察者的方法和它要所做的其它方法。而各组件代表的是具体主题,它实现了抽象主题的所有方法。通常会由具体主题保存对具体观察者对象有用的内部状态;在这种内部状态改变时给其观察者发出一个通知。Tomcat 对这种模式做了改进,增加了另外两个工具类:LifecycleSupport、LifecycleEvent ,它们作为辅助类扩展了观察者的功能。LifecycleEvent 中定义了事件类别,不同的事件在具体观察者中可区别处理,更加灵活。LifecycleSupport 类代理了所有具体主题对观察者的管理,将这个管理抽出来统一实现,以后如果修改只要修改 LifecycleSupport 类就可以了,不需要去修改所有具体主题,因为所有具体主题的对观察者的操作都被代理给 LifecycleSupport 类了。
事件的发布使用的是推模式,即每发布一个事件都会通知主题的所有具体观察者,由各观察者再来决定是否需要对该事件进行后续处理。

我们看到最早的解析,是解析xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
<Server port="8005" shutdown="SHUTDOWN">
<!-- Security listener. Documentation at /docs/config/listeners.html
<Listener className="org.apache.catalina.security.SecurityListener" />
-->
<!--APR library loader. Documentation at /docs/apr.html -->
<Listener className="org.apache.catalina.core.AprLifecycleListener" SSLEngine="on" />
<!-- Prevent memory leaks due to use of particular java/javax APIs-->
<Listener className="org.apache.catalina.core.JreMemoryLeakPreventionListener" />
<Listener className="org.apache.catalina.mbeans.GlobalResourcesLifecycleListener" />
<Listener className="org.apache.catalina.core.ThreadLocalLeakPreventionListener" />

<!-- Global JNDI resources
Documentation at /docs/jndi-resources-howto.html
-->
<GlobalNamingResources>
<!-- Editable user database that can also be used by
UserDatabaseRealm to authenticate users
-->
<Resource name="UserDatabase" auth="Container"
type="org.apache.catalina.UserDatabase"
description="User database that can be updated and saved"
factory="org.apache.catalina.users.MemoryUserDatabaseFactory"
pathname="conf/tomcat-users.xml" />
</GlobalNamingResources>

<!-- A "Service" is a collection of one or more "Connectors" that share
a single "Container" Note: A "Service" is not itself a "Container",
so you may not define subcomponents such as "Valves" at this level.
Documentation at /docs/config/service.html
-->
<Service name="Catalina">

<!--The connectors can use a shared executor, you can define one or more named thread pools-->
<!--
<Executor name="tomcatThreadPool" namePrefix="catalina-exec-"
maxThreads="150" minSpareThreads="4"/>
-->


<!-- A "Connector" represents an endpoint by which requests are received
and responses are returned. Documentation at :
Java HTTP Connector: /docs/config/http.html (blocking & non-blocking)
Java AJP Connector: /docs/config/ajp.html
APR (HTTP/AJP) Connector: /docs/apr.html
Define a non-SSL HTTP/1.1 Connector on port 8080
-->
<Connector port="8080" protocol="HTTP/1.1"
connectionTimeout="20000"
redirectPort="8443" />
<!-- A "Connector" using the shared thread pool-->
<!--
<Connector executor="tomcatThreadPool"
port="8080" protocol="HTTP/1.1"
connectionTimeout="20000"
redirectPort="8443" />
-->
<!-- Define a SSL HTTP/1.1 Connector on port 8443
This connector uses the JSSE configuration, when using APR, the
connector should be using the OpenSSL style configuration
described in the APR documentation -->
<!--
<Connector port="8443" protocol="HTTP/1.1" SSLEnabled="true"
maxThreads="150" scheme="https" secure="true"
clientAuth="false" sslProtocol="TLS" />
-->

<!-- Define an AJP 1.3 Connector on port 8009 -->
<Connector port="8009" protocol="AJP/1.3" redirectPort="8443" />


<!-- An Engine represents the entry point (within Catalina) that processes
every request. The Engine implementation for Tomcat stand alone
analyzes the HTTP headers included with the request, and passes them
on to the appropriate Host (virtual host).
Documentation at /docs/config/engine.html -->

<!-- You should set jvmRoute to support load-balancing via AJP ie :
<Engine name="Catalina" defaultHost="localhost" jvmRoute="jvm1">
-->
<Engine name="Catalina" defaultHost="localhost">

<!--For clustering, please take a look at documentation at:
/docs/cluster-howto.html (simple how to)
/docs/config/cluster.html (reference documentation) -->
<!--
<Cluster className="org.apache.catalina.ha.tcp.SimpleTcpCluster"/>
-->

<!-- Use the LockOutRealm to prevent attempts to guess user passwords
via a brute-force attack -->
<Realm className="org.apache.catalina.realm.LockOutRealm">
<!-- This Realm uses the UserDatabase configured in the global JNDI
resources under the key "UserDatabase". Any edits
that are performed against this UserDatabase are immediately
available for use by the Realm. -->
<Realm className="org.apache.catalina.realm.UserDatabaseRealm"
resourceName="UserDatabase"/>
</Realm>

<Host name="localhost" appBase="webapps"
unpackWARs="true" autoDeploy="true">

<!-- SingleSignOn valve, share authentication between web applications
Documentation at: /docs/config/valve.html -->
<!--
<Valve className="org.apache.catalina.authenticator.SingleSignOn" />
-->

<!-- Access log processes all example.
Documentation at: /docs/config/valve.html
Note: The pattern used is equivalent to using pattern="common" -->
<Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"
prefix="localhost_access_log" suffix=".txt"
pattern="%h %l %u %t &quot;%r&quot; %s %b" />

</Host>
</Engine>
</Service>
</Server>

StandardServer

image

StandardServer(LifecycleBase)的init

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public final synchronized void init() throws LifecycleException {
if (!state.equals(LifecycleState.NEW)) {
invalidTransition(Lifecycle.BEFORE_INIT_EVENT);
}
setStateInternal(LifecycleState.INITIALIZING, null, false);

try {
initInternal();
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
setStateInternal(LifecycleState.FAILED, null, false);
throw new LifecycleException(
sm.getString("lifecycleBase.initFail",toString()), t);
}

setStateInternal(LifecycleState.INITIALIZED, null, false);
}

initInternal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Override
protected void initInternal() throws LifecycleException {

super.initInternal();

// Register global String cache
// Note although the cache is global, if there are multiple Servers
// present in the JVM (may happen when embedding) then the same cache
// will be registered under multiple names
onameStringCache = register(new StringCache(), "type=StringCache");

// Register the MBeanFactory
MBeanFactory factory = new MBeanFactory();
factory.setContainer(this);
onameMBeanFactory = register(factory, "type=MBeanFactory");

// Register the naming resources
globalNamingResources.init();

// Populate the extension validator with JARs from common and shared
// class loaders
if (getCatalina() != null) {
ClassLoader cl = getCatalina().getParentClassLoader();
// Walk the class loader hierarchy. Stop at the system class loader.
// This will add the shared (if present) and common class loaders
while (cl != null && cl != ClassLoader.getSystemClassLoader()) {
if (cl instanceof URLClassLoader) {
URL[] urls = ((URLClassLoader) cl).getURLs();
for (URL url : urls) {
if (url.getProtocol().equals("file")) {
try {
File f = new File (url.toURI());
if (f.isFile() &&
f.getName().endsWith(".jar")) {
ExtensionValidator.addSystemResource(f);
}
} catch (URISyntaxException e) {
// Ignore
} catch (IOException e) {
// Ignore
}
}
}
}
cl = cl.getParent();
}
}
// 初始化每个services
for (int i = 0; i < services.length; i++) {
services[i].init();
}
}

StandardServer(LifecycleBase)的setStateInternal,fireLifecycleEvent是LifecycleSupport的fireLifecycleEvent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private synchronized void setStateInternal(LifecycleState state,
Object data, boolean check) throws LifecycleException {

if (log.isDebugEnabled()) {
log.debug(sm.getString("lifecycleBase.setState", this, state));
}

if (check) {
// Must have been triggered by one of the abstract methods (assume
// code in this class is correct)
// null is never a valid state
if (state == null) {
invalidTransition("null");
// Unreachable code - here to stop eclipse complaining about
// a possible NPE further down the method
return;
}

// Any method can transition to failed
// startInternal() permits STARTING_PREP to STARTING
// stopInternal() permits STOPPING_PREP to STOPPING and FAILED to
// STOPPING
if (!(state == LifecycleState.FAILED ||
(this.state == LifecycleState.STARTING_PREP &&
state == LifecycleState.STARTING) ||
(this.state == LifecycleState.STOPPING_PREP &&
state == LifecycleState.STOPPING) ||
(this.state == LifecycleState.FAILED &&
state == LifecycleState.STOPPING))) {
// No other transition permitted
invalidTransition(state.name());
}
}
//将入参 LifecycleState 实例赋值给本类中的实例变量保存起来
this.state = state;
String lifecycleEvent = state.getLifecycleEvent();
if (lifecycleEvent != null) {
//发布事件
fireLifecycleEvent(lifecycleEvent, data);
}
}

StandardServer(LifecycleBase)的start

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Override
public final synchronized void start() throws LifecycleException {
//如果已经调用过start,返回
if (LifecycleState.STARTING_PREP.equals(state) ||
LifecycleState.STARTING.equals(state) ||
LifecycleState.STARTED.equals(state)) {

if (log.isDebugEnabled()) {
Exception e = new LifecycleException();
log.debug(sm.getString("lifecycleBase.alreadyStarted",
toString()), e);
} else if (log.isInfoEnabled()) {
log.info(sm.getString("lifecycleBase.alreadyStarted",
toString()));
}

return;
}
//调用init
if (state.equals(LifecycleState.NEW)) {
init();
} else if (state.equals(LifecycleState.FAILED)){
//stop
stop();
} else if (!state.equals(LifecycleState.INITIALIZED) &&
!state.equals(LifecycleState.STOPPED)) {
invalidTransition(Lifecycle.BEFORE_START_EVENT);
}

setStateInternal(LifecycleState.STARTING_PREP, null, false);

try {
//重要
startInternal();
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
setStateInternal(LifecycleState.FAILED, null, false);
throw new LifecycleException(
sm.getString("lifecycleBase.startFail",toString()), t);
}

if (state.equals(LifecycleState.FAILED) ||
state.equals(LifecycleState.MUST_STOP)) {
stop();
} else {
// Shouldn't be necessary but acts as a check that sub-classes are
// doing what they are supposed to.
if (!state.equals(LifecycleState.STARTING)) {
invalidTransition(Lifecycle.AFTER_START_EVENT);
}

setStateInternal(LifecycleState.STARTED, null, false);
}
}

startInternal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
protected void startInternal() throws LifecycleException {

fireLifecycleEvent(CONFIGURE_START_EVENT, null);
setState(LifecycleState.STARTING);

globalNamingResources.start();

// Start所有service
synchronized (servicesLock) {
for (int i = 0; i < services.length; i++) {
services[i].start();
}
}
}

StandardService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@Override
protected void initInternal() throws LifecycleException {

super.initInternal();

if (container != null) {
container.init();
}

// Initialize any Executors
for (Executor executor : findExecutors()) {
if (executor instanceof JmxEnabled) {
((JmxEnabled) executor).setDomain(getDomain());
}
executor.init();
}

// Initialize mapper listener
mapperListener.init();

// Initialize our defined Connectors
synchronized (connectorsLock) {
for (Connector connector : connectors) {
try {
connector.init();
} catch (Exception e) {
String message = sm.getString(
"standardService.connector.initFailed", connector);
log.error(message, e);

if (Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE"))
throw new LifecycleException(message);
}
}
}
}



@Override
protected void startInternal() throws LifecycleException {

//...
setState(LifecycleState.STARTING);

// 开启StandardEngine
if (container != null) {
synchronized (container) {
container.start();
}
}

synchronized (executors) {
for (Executor executor: executors) {
executor.start();
}
}


mapperListener.start();

// 开启Connector
synchronized (connectorsLock) {
for (Connector connector: connectors) {
try {
// If it has already failed, don't try and start it
if (connector.getState() != LifecycleState.FAILED) {
connector.start();
}
} catch (Exception e) {
log.error(sm.getString(
"standardService.connector.startFailed",
connector), e);
}
}
}
}

StandardEngine

initInternal、startInternal是直接调用父类initInternal、startInternal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Override
protected synchronized void startInternal() throws LifecycleException {

// Start our subordinate components, if any
logger = null;
getLogger();
Cluster cluster = getClusterInternal();
if ((cluster != null) && (cluster instanceof Lifecycle))
((Lifecycle) cluster).start();
Realm realm = getRealmInternal();
if ((realm != null) && (realm instanceof Lifecycle))
((Lifecycle) realm).start();

// Start our child containers, if any
Container children[] = findChildren();
List<Future<Void>> results = new ArrayList<>();
for (int i = 0; i < children.length; i++) {
//StartChild实现了Callable ,call里面其实就是Container#start
results.add(startStopExecutor.submit(new StartChild(children[i])));
}

boolean fail = false;
for (Future<Void> result : results) {
try {
result.get();
} catch (Exception e) {
log.error(sm.getString("containerBase.threadedStartFailed"), e);
fail = true;
}

}
if (fail) {
throw new LifecycleException(
sm.getString("containerBase.threadedStartFailed"));
}

// Start the Valves in our pipeline (including the basic), if any
if (pipeline instanceof Lifecycle)
((Lifecycle) pipeline).start();


setState(LifecycleState.STARTING);

// Start our thread
threadStart();

}

StandardHost

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
 @Override
protected synchronized void startInternal() throws LifecycleException {
// Set error report valve
String errorValve = getErrorReportValveClass();
if ((errorValve != null) && (!errorValve.equals(""))) {
try {
boolean found = false;
Valve[] valves = getPipeline().getValves();
for (Valve valve : valves) {
if (errorValve.equals(valve.getClass().getName())) {
found = true;
break;
}
}
if(!found) {
Valve valve =
(Valve) Class.forName(errorValve).getConstructor().newInstance();
getPipeline().addValve(valve);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
//...
}
}
super.startInternal();
}

Connector

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
    @Override
protected void startInternal() throws LifecycleException {

// Validate settings before starting
if (getPort() < 0) {
throw new LifecycleException(sm.getString(
"coyoteConnector.invalidPort", Integer.valueOf(getPort())));
}

setState(LifecycleState.STARTING);

try {
//其实就是Http11AprProtocol见后文
protocolHandler.start();
} catch (Exception e) {
throw new LifecycleException(
sm.getString("coyoteConnector.protocolHandlerStartFailed"), e);
}
}

initInternal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Override
protected void initInternal() throws LifecycleException {

super.initInternal();

// Initialize adapter,后面处理service的时候用的就是这个
adapter = new CoyoteAdapter(this);
protocolHandler.setAdapter(adapter);

// Make sure parseBodyMethodsSet has a default
if (null == parseBodyMethodsSet) {
setParseBodyMethods(getParseBodyMethods());
}

if (protocolHandler.isAprRequired() && !AprLifecycleListener.isAprAvailable()) {
throw new LifecycleException(sm.getString("coyoteConnector.protocolHandlerNoApr",
getProtocolHandlerClassName()));
}
if (AprLifecycleListener.isAprAvailable() && AprLifecycleListener.getUseOpenSSL() &&
protocolHandler instanceof AbstractHttp11JsseProtocol) {
AbstractHttp11JsseProtocol<?> jsseProtocolHandler =
(AbstractHttp11JsseProtocol<?>) protocolHandler;
if (jsseProtocolHandler.isSSLEnabled() &&
jsseProtocolHandler.getSslImplementationName() == null) {
// OpenSSL is compatible with the JSSE configuration, so use it if APR is available
jsseProtocolHandler.setSslImplementationName(OpenSSLImplementation.class.getName());
}
}

try {
protocolHandler.init();
} catch (Exception e) {
throw new LifecycleException(
sm.getString("coyoteConnector.protocolHandlerInitializationFailed"), e);
}
}

image

参考:https://juejin.im/post/5a6d6f6751882573520da54d

之前我们看到其实启动脚本就是使用Bootstrap 的main启动的

解析Bootstrap

Bootstrap 的main

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public static void main(String args[]) {

if (daemon == null) {
// Don't set daemon until init() has completed
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.init();
} catch (Throwable t) {
handleThrowable(t);
t.printStackTrace();
return;
}
daemon = bootstrap;
} else {
// When running as a service the call to stop will be on a new
// thread so make sure the correct class loader is used to prevent
// a range of class not found exceptions.
Thread.currentThread().setContextClassLoader(daemon.catalinaLoader);
}

try {
String command = "start";
if (args.length > 0) {
command = args[args.length - 1];
}

if (command.equals("startd")) {
args[args.length - 1] = "start";
daemon.load(args);
daemon.start();
} else if (command.equals("stopd")) {
args[args.length - 1] = "stop";
daemon.stop();
} else if (command.equals("start")) {
daemon.setAwait(true);
//调用load ,最后调用Catalina#load
daemon.load(args);
//调用start
daemon.start();
} else if (command.equals("stop")) {
daemon.stopServer(args);
} else if (command.equals("configtest")) {
daemon.load(args);
if (null==daemon.getServer()) {
System.exit(1);
}
System.exit(0);
} else {
log.warn("Bootstrap: command \"" + command + "\" does not exist.");
}
} catch (Throwable t) {
// Unwrap the Exception for clearer error reporting
if (t instanceof InvocationTargetException &&
t.getCause() != null) {
t = t.getCause();
}
handleThrowable(t);
t.printStackTrace();
System.exit(1);
}

}

init

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public void init() throws Exception {

initClassLoaders();

//设置目前线程的类装载器catalinaLoader
Thread.currentThread().setContextClassLoader(catalinaLoader);

SecurityClassLoad.securityClassLoad(catalinaLoader);

// 加载类Catalina并实例化最后赋值给catalinaDaemon
if (log.isDebugEnabled())
log.debug("Loading startup class");
Class<?> startupClass =
catalinaLoader.loadClass
("org.apache.catalina.startup.Catalina");
Object startupInstance = startupClass.newInstance();

// 以 sharedLoader 作为入参通过反射调用该对象的 setParentClassLoader 方法。
//也就是sharedLoader作为父ClassLoader
if (log.isDebugEnabled())
log.debug("Setting startup class properties");
String methodName = "setParentClassLoader";
Class<?> paramTypes[] = new Class[1];
paramTypes[0] = Class.forName("java.lang.ClassLoader");
Object paramValues[] = new Object[1];
paramValues[0] = sharedLoader;
Method method =
startupInstance.getClass().getMethod(methodName, paramTypes);
method.invoke(startupInstance, paramValues);

catalinaDaemon = startupInstance;

}

private void initClassLoaders() {
try {
//catalina.properties 的common.loader("${catalina.base}/lib","${catalina.base}/lib/*.jar","${catalina.home}/lib","${catalina.home}/lib/*.jar")
//创建commonLoader类加载器
commonLoader = createClassLoader("common", null);
if( commonLoader == null ) {
// no config file, default to this loader - we might be in a 'single' env.
commonLoader=this.getClass().getClassLoader();
}
//server.loader (搜索路径空)
//创建catalinaLoader类加载器
catalinaLoader = createClassLoader("server", commonLoader);
//shared.loader (搜索路径空)
//创建sharedLoader类加载器
sharedLoader = createClassLoader("shared", commonLoader);
} catch (Throwable t) {
handleThrowable(t);
log.error("Class loader creation threw exception", t);
System.exit(1);
}
}

start

1
2
3
4
5
6
7
8
9
10
    public void start()
throws Exception {
if( catalinaDaemon==null ) init();

//通过反射调用执行Catalina #start
Method method = catalinaDaemon.getClass().getMethod("start", (Class [] )null);
method.invoke(catalinaDaemon, (Object [])null);

}

解析Catalina

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
public void load() {

long t1 = System.nanoTime();

initDirs();

// Before digester - it may be needed

initNaming();

// 创建Digester
Digester digester = createStartDigester();

InputSource inputSource = null;
InputStream inputStream = null;
File file = null;
try {
//读取CatalinaBase/conf/server.xml
file = configFile();
inputStream = new FileInputStream(file);
inputSource = new InputSource(file.toURI().toURL().toString());
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("catalina.configFail", file), e);
}
}
if (inputStream == null) {
try {
//读conf/server.xml
inputStream = getClass().getClassLoader()
.getResourceAsStream(getConfigFile());
inputSource = new InputSource
(getClass().getClassLoader()
.getResource(getConfigFile()).toString());
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("catalina.configFail",
getConfigFile()), e);
}
}
}

// This should be included in catalina.jar
// Alternative: don't bother with xml, just create it manually.
if( inputStream==null ) {
try {
inputStream = getClass().getClassLoader()
.getResourceAsStream("server-embed.xml");
inputSource = new InputSource
(getClass().getClassLoader()
.getResource("server-embed.xml").toString());
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("catalina.configFail",
"server-embed.xml"), e);
}
}
}


if (inputStream == null || inputSource == null) {
if (file == null) {
log.warn(sm.getString("catalina.configFail",
getConfigFile() + "] or [server-embed.xml]"));
} else {
log.warn(sm.getString("catalina.configFail",
file.getAbsolutePath()));
if (file.exists() && !file.canRead()) {
log.warn("Permissions incorrect, read permission is not allowed on the file.");
}
}
return;
}

try {
inputSource.setByteStream(inputStream);
digester.push(this);
digester.parse(inputSource);
} catch (SAXParseException spe) {
log.warn("Catalina.start using " + getConfigFile() + ": " +
spe.getMessage());
return;
} catch (Exception e) {
log.warn("Catalina.start using " + getConfigFile() + ": " , e);
return;
} finally {
try {
inputStream.close();
} catch (IOException e) {
// Ignore
}
}
//Server 设置Catalina为本实例,设置CatalinaHome、CatalinaBase 初始化
getServer().setCatalina(this);
getServer().setCatalinaHome(Bootstrap.getCatalinaHomeFile());
getServer().setCatalinaBase(Bootstrap.getCatalinaBaseFile());

// Stream redirection
initStreams();

// Start the new server
try {
getServer().init();
} catch (LifecycleException e) {
if (Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE")) {
throw new java.lang.Error(e);
} else {
log.error("Catalina.start", e);
}

}

long t2 = System.nanoTime();
if(log.isInfoEnabled()) {
log.info("Initialization processed in " + ((t2 - t1) / 1000000) + " ms");
}

}

其中Digester是很重要的东西,重点在

1
2
3
4
5
6
7
8
9
10
11
//创建Digester
Digester digester = createStartDigester();
//读取conf/server.xml,放入流
inputSource.setByteStream(inputStream);
//吧当前对象压入
digester.push(this);
//解析xml
digester.parse(inputSource);
getServer().setCatalina(this);
//调用 Server 接口对象的 init 方法。其实就是StandardServer的init(内部initInternal)
getServer().init();

这样经过对 xml 文件的解析将会产生 org.apache.catalina.core.StandardServer、org.apache.catalina.core.StandardService、org.apache.catalina.connector.Connector、org.apache.catalina.core.StandardEngine、org.apache.catalina.core.StandardHost、org.apache.catalina.core.StandardContext 等等

Catalina的start

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public void start() {

if (getServer() == null) {
load();
}

if (getServer() == null) {
log.fatal("Cannot start server. Server instance is not configured.");
return;
}

long t1 = System.nanoTime();

// 调用Server的start
try {
getServer().start();
} catch (LifecycleException e) {
log.fatal(sm.getString("catalina.serverStartFail"), e);
try {
//destroy
getServer().destroy();
} catch (LifecycleException e1) {
log.debug("destroy() failed for failed Server ", e1);
}
return;
}

long t2 = System.nanoTime();
if(log.isInfoEnabled()) {
log.info("Server startup in " + ((t2 - t1) / 1000000) + " ms");
}

// 注册 shutdown hook
if (useShutdownHook) {
if (shutdownHook == null) {
shutdownHook = new CatalinaShutdownHook();
}
Runtime.getRuntime().addShutdownHook(shutdownHook);

// If JULI is being used, disable JULI's shutdown hook since
// shutdown hooks run in parallel and log messages may be lost
// if JULI's hook completes before the CatalinaShutdownHook()
LogManager logManager = LogManager.getLogManager();
if (logManager instanceof ClassLoaderLogManager) {
((ClassLoaderLogManager) logManager).setUseShutdownHook(
false);
}
}

if (await) {
await();
stop();
}
}

我们来看下startup.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

#!/bin/sh
# Better OS/400 detection: see Bugzilla 31132
os400=false
case "`uname`" in
OS400*) os400=true;;
esac

# $0 表示Shell本身的文件名
PRG="$0"

while [ -h "$PRG" ] ; do
#取到软连接的真实文件或真实目录
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG=`dirname "$PRG"`/"$link"
fi
done

#获取脚本文件所在的绝对路径
PRGDIR=`dirname "$PRG"`
EXECUTABLE=catalina.sh

# Check that target executable exists
if $os400; then
# -x will Only work on the os400 if the files are:
# 1. owned by the user
# 2. owned by the PRIMARY group of the user
# this will not work if the user belongs in secondary groups
eval
else
if [ ! -x "$PRGDIR"/"$EXECUTABLE" ]; then
echo "Cannot find $PRGDIR/$EXECUTABLE"
echo "The file is absent or does not have execute permission"
echo "This file is needed to run this program"
exit 1
fi
fi

# $@ 所有参数列表
exec "$PRGDIR"/"$EXECUTABLE" start "$@"

所以其实就是执行 catalina.sh start $@

catalina.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
#!/bin/sh

# OS specific support. $var _must_ be set to either true or false.
cygwin=false
darwin=false
os400=false
case "`uname`" in
CYGWIN*) cygwin=true;;
Darwin*) darwin=true;;
OS400*) os400=true;;
esac

# resolve links - $0 may be a softlink
PRG="$0"

while [ -h "$PRG" ]; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG=`dirname "$PRG"`/"$link"
fi
done

# Get standard environment variables
PRGDIR=`dirname "$PRG"`

# -z CATALINA_HOME长度为0为真 ,取脚本所在目录上层作为CATALINA_HOME
[ -z "$CATALINA_HOME" ] && CATALINA_HOME=`cd "$PRGDIR/.." >/dev/null; pwd`

# CATALINA_BASE没设置 CATALINA_BASE就设为CATALINA_HOME
[ -z "$CATALINA_BASE" ] && CATALINA_BASE="$CATALINA_HOME"

# 校验 确保中间没有“:”
case $CATALINA_HOME in
*:*) echo "Using CATALINA_HOME: $CATALINA_HOME";
echo "Unable to start as CATALINA_HOME contains a colon (:) character";
exit 1;
esac
case $CATALINA_BASE in
*:*) echo "Using CATALINA_BASE: $CATALINA_BASE";
echo "Unable to start as CATALINA_BASE contains a colon (:) character";
exit 1;
esac

# Ensure that any user defined CLASSPATH variables are not used on startup
CLASSPATH=

#-r 存在文件 setenv.sh则执行
if [ -r "$CATALINA_BASE/bin/setenv.sh" ]; then
. "$CATALINA_BASE/bin/setenv.sh"
elif [ -r "$CATALINA_HOME/bin/setenv.sh" ]; then
. "$CATALINA_HOME/bin/setenv.sh"
fi

# Cygwin 下,存在变量则用cygpath
if $cygwin; then
[ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
[ -n "$JRE_HOME" ] && JRE_HOME=`cygpath --unix "$JRE_HOME"`
[ -n "$CATALINA_HOME" ] && CATALINA_HOME=`cygpath --unix "$CATALINA_HOME"`
[ -n "$CATALINA_BASE" ] && CATALINA_BASE=`cygpath --unix "$CATALINA_BASE"`
[ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
fi

# For OS400
if $os400; then
# Set job priority to standard for interactive (interactive - 6) by using
# the interactive priority - 6, the helper threads that respond to requests
# will be running at the same priority as interactive jobs.
COMMAND='chgjob job('$JOBNAME') runpty(6)'
system $COMMAND

# Enable multi threading
export QIBM_MULTI_THREADED=Y
fi

# setclasspath.sh 见下面
if $os400; then
# -r will Only work on the os400 if the files are:
# 1. owned by the user
# 2. owned by the PRIMARY group of the user
# this will not work if the user belongs in secondary groups
. "$CATALINA_HOME"/bin/setclasspath.sh
else
if [ -r "$CATALINA_HOME"/bin/setclasspath.sh ]; then
. "$CATALINA_HOME"/bin/setclasspath.sh
else
echo "Cannot find $CATALINA_HOME/bin/setclasspath.sh"
echo "This file is needed to run this program"
exit 1
fi
fi

# 添加bootstrap.jar(作用看后续,主要是org.apache.catalina.startup.Bootstrap)到 CLASSPATH
if [ ! -z "$CLASSPATH" ] ; then
CLASSPATH="$CLASSPATH":
fi
CLASSPATH="$CLASSPATH""$CATALINA_HOME"/bin/bootstrap.jar

#设置CATALINA_OUT(日志)
if [ -z "$CATALINA_OUT" ] ; then
CATALINA_OUT="$CATALINA_BASE"/logs/catalina.out
fi

#设置CATALINA_TMPDIR(temp文件)
if [ -z "$CATALINA_TMPDIR" ] ; then
# Define the java.io.tmpdir to use for Catalina
CATALINA_TMPDIR="$CATALINA_BASE"/temp
fi

# 添加 tomcat-juli.jar 到 classpath
if [ -r "$CATALINA_BASE/bin/tomcat-juli.jar" ] ; then
CLASSPATH=$CLASSPATH:$CATALINA_BASE/bin/tomcat-juli.jar
else
CLASSPATH=$CLASSPATH:$CATALINA_HOME/bin/tomcat-juli.jar
fi

# Bugzilla 37848: When no TTY is available, don't output to console
have_tty=0
if [ "`tty`" != "not a tty" ]; then
have_tty=1
fi

# For Cygwin, switch paths to Windows format before running java
if $cygwin; then
JAVA_HOME=`cygpath --absolute --windows "$JAVA_HOME"`
JRE_HOME=`cygpath --absolute --windows "$JRE_HOME"`
CATALINA_HOME=`cygpath --absolute --windows "$CATALINA_HOME"`
CATALINA_BASE=`cygpath --absolute --windows "$CATALINA_BASE"`
CATALINA_TMPDIR=`cygpath --absolute --windows "$CATALINA_TMPDIR"`
CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
JAVA_ENDORSED_DIRS=`cygpath --path --windows "$JAVA_ENDORSED_DIRS"`
fi

# Set juli LogManager config file if it is present and an override has not been issued
if [ -z "$LOGGING_CONFIG" ]; then
if [ -r "$CATALINA_BASE"/conf/logging.properties ]; then
LOGGING_CONFIG="-Djava.util.logging.config.file=$CATALINA_BASE/conf/logging.properties"
else
# Bugzilla 45585
LOGGING_CONFIG="-Dnop"
fi
fi

if [ -z "$LOGGING_MANAGER" ]; then
LOGGING_MANAGER="-Djava.util.logging.manager=org.apache.juli.ClassLoaderLogManager"
fi

# Uncomment the following line to make the umask available when using the
# org.apache.catalina.security.SecurityListener
#JAVA_OPTS="$JAVA_OPTS -Dorg.apache.catalina.security.SecurityListener.UMASK=`umask`"

# ----- Execute The Requested Command -----------------------------------------

# Bugzilla 37848: only output this if we have a TTY
if [ $have_tty -eq 1 ]; then
echo "Using CATALINA_BASE: $CATALINA_BASE"
echo "Using CATALINA_HOME: $CATALINA_HOME"
echo "Using CATALINA_TMPDIR: $CATALINA_TMPDIR"
if [ "$1" = "debug" ] ; then
echo "Using JAVA_HOME: $JAVA_HOME"
else
echo "Using JRE_HOME: $JRE_HOME"
fi
echo "Using CLASSPATH: $CLASSPATH"
if [ ! -z "$CATALINA_PID" ]; then
echo "Using CATALINA_PID: $CATALINA_PID"
fi
fi

if [ "$1" = "jpda" ] ; then
if [ -z "$JPDA_TRANSPORT" ]; then
JPDA_TRANSPORT="dt_socket"
fi
if [ -z "$JPDA_ADDRESS" ]; then
JPDA_ADDRESS="localhost:8000"
fi
if [ -z "$JPDA_SUSPEND" ]; then
JPDA_SUSPEND="n"
fi
if [ -z "$JPDA_OPTS" ]; then
JPDA_OPTS="-agentlib:jdwp=transport=$JPDA_TRANSPORT,address=$JPDA_ADDRESS,server=y,suspend=$JPDA_SUSPEND"
fi
CATALINA_OPTS="$CATALINA_OPTS $JPDA_OPTS"
#shift 向左移动一个参数
shift
fi

if [ "$1" = "debug" ] ; then
if $os400; then
echo "Debug command not available on OS400"
exit 1
else
shift
if [ "$1" = "-security" ] ; then
if [ $have_tty -eq 1 ]; then
echo "Using Security Manager"
fi
shift
exec "$_RUNJDB" "$LOGGING_CONFIG" $LOGGING_MANAGER $JAVA_OPTS $CATALINA_OPTS \
-Djava.endorsed.dirs="$JAVA_ENDORSED_DIRS" -classpath "$CLASSPATH" \
-sourcepath "$CATALINA_HOME"/../../java \
-Djava.security.manager \
-Djava.security.policy=="$CATALINA_BASE"/conf/catalina.policy \
-Dcatalina.base="$CATALINA_BASE" \
-Dcatalina.home="$CATALINA_HOME" \
-Djava.io.tmpdir="$CATALINA_TMPDIR" \
org.apache.catalina.startup.Bootstrap "$@" start
else
exec "$_RUNJDB" "$LOGGING_CONFIG" $LOGGING_MANAGER $JAVA_OPTS $CATALINA_OPTS \
-Djava.endorsed.dirs="$JAVA_ENDORSED_DIRS" -classpath "$CLASSPATH" \
-sourcepath "$CATALINA_HOME"/../../java \
-Dcatalina.base="$CATALINA_BASE" \
-Dcatalina.home="$CATALINA_HOME" \
-Djava.io.tmpdir="$CATALINA_TMPDIR" \
org.apache.catalina.startup.Bootstrap "$@" start
fi
fi

elif [ "$1" = "run" ]; then

shift
if [ "$1" = "-security" ] ; then
if [ $have_tty -eq 1 ]; then
echo "Using Security Manager"
fi
shift
eval exec "\"$_RUNJAVA\"" "\"$LOGGING_CONFIG\"" $LOGGING_MANAGER $JAVA_OPTS $CATALINA_OPTS \
-Djava.endorsed.dirs="\"$JAVA_ENDORSED_DIRS\"" -classpath "\"$CLASSPATH\"" \
-Djava.security.manager \
-Djava.security.policy=="\"$CATALINA_BASE/conf/catalina.policy\"" \
-Dcatalina.base="\"$CATALINA_BASE\"" \
-Dcatalina.home="\"$CATALINA_HOME\"" \
-Djava.io.tmpdir="\"$CATALINA_TMPDIR\"" \
org.apache.catalina.startup.Bootstrap "$@" start
else
eval exec "\"$_RUNJAVA\"" "\"$LOGGING_CONFIG\"" $LOGGING_MANAGER $JAVA_OPTS $CATALINA_OPTS \
-Djava.endorsed.dirs="\"$JAVA_ENDORSED_DIRS\"" -classpath "\"$CLASSPATH\"" \
-Dcatalina.base="\"$CATALINA_BASE\"" \
-Dcatalina.home="\"$CATALINA_HOME\"" \
-Djava.io.tmpdir="\"$CATALINA_TMPDIR\"" \
org.apache.catalina.startup.Bootstrap "$@" start
fi

#start
elif [ "$1" = "start" ] ; then

#启动前如果pid存在,kill掉
if [ ! -z "$CATALINA_PID" ]; then
if [ -f "$CATALINA_PID" ]; then
if [ -s "$CATALINA_PID" ]; then
echo "Existing PID file found during start."
if [ -r "$CATALINA_PID" ]; then
PID=`cat "$CATALINA_PID"`
ps -p $PID >/dev/null 2>&1
if [ $? -eq 0 ] ; then
echo "Tomcat appears to still be running with PID $PID. Start aborted."
exit 1
else
echo "Removing/clearing stale PID file."
rm -f "$CATALINA_PID" >/dev/null 2>&1
if [ $? != 0 ]; then
if [ -w "$CATALINA_PID" ]; then
cat /dev/null > "$CATALINA_PID"
else
echo "Unable to remove or clear stale PID file. Start aborted."
exit 1
fi
fi
fi
else
echo "Unable to read PID file. Start aborted."
exit 1
fi
else
rm -f "$CATALINA_PID" >/dev/null 2>&1
if [ $? != 0 ]; then
if [ ! -w "$CATALINA_PID" ]; then
echo "Unable to remove or write to empty PID file. Start aborted."
exit 1
fi
fi
fi
fi
fi

shift
touch "$CATALINA_OUT"
if [ "$1" = "-security" ] ; then
if [ $have_tty -eq 1 ]; then
echo "Using Security Manager"
fi
shift
eval "\"$_RUNJAVA\"" "\"$LOGGING_CONFIG\"" $LOGGING_MANAGER $JAVA_OPTS $CATALINA_OPTS \
-Djava.endorsed.dirs="\"$JAVA_ENDORSED_DIRS\"" -classpath "\"$CLASSPATH\"" \
-Djava.security.manager \
-Djava.security.policy=="\"$CATALINA_BASE/conf/catalina.policy\"" \
-Dcatalina.base="\"$CATALINA_BASE\"" \
-Dcatalina.home="\"$CATALINA_HOME\"" \
-Djava.io.tmpdir="\"$CATALINA_TMPDIR\"" \
org.apache.catalina.startup.Bootstrap "$@" start \
>> "$CATALINA_OUT" 2>&1 "&"

else
# 启动最关键的地方
eval "\"$_RUNJAVA\"" "\"$LOGGING_CONFIG\"" $LOGGING_MANAGER $JAVA_OPTS $CATALINA_OPTS \
-Djava.endorsed.dirs="\"$JAVA_ENDORSED_DIRS\"" -classpath "\"$CLASSPATH\"" \
-Dcatalina.base="\"$CATALINA_BASE\"" \
-Dcatalina.home="\"$CATALINA_HOME\"" \
-Djava.io.tmpdir="\"$CATALINA_TMPDIR\"" \
org.apache.catalina.startup.Bootstrap "$@" start \
>> "$CATALINA_OUT" 2>&1 "&"

fi

if [ ! -z "$CATALINA_PID" ]; then
echo $! > "$CATALINA_PID"
fi

echo "Tomcat started."

elif [ "$1" = "stop" ] ; then

shift

SLEEP=5
if [ ! -z "$1" ]; then
echo $1 | grep "[^0-9]" >/dev/null 2>&1
if [ $? -gt 0 ]; then
SLEEP=$1
shift
fi
fi

FORCE=0
if [ "$1" = "-force" ]; then
shift
FORCE=1
fi

if [ ! -z "$CATALINA_PID" ]; then
if [ -f "$CATALINA_PID" ]; then
if [ -s "$CATALINA_PID" ]; then
kill -0 `cat "$CATALINA_PID"` >/dev/null 2>&1
if [ $? -gt 0 ]; then
echo "PID file found but no matching process was found. Stop aborted."
exit 1
fi
else
echo "PID file is empty and has been ignored."
fi
else
echo "\$CATALINA_PID was set but the specified file does not exist. Is Tomcat running? Stop aborted."
exit 1
fi
fi

eval "\"$_RUNJAVA\"" $LOGGING_MANAGER $JAVA_OPTS \
-Djava.endorsed.dirs="\"$JAVA_ENDORSED_DIRS\"" -classpath "\"$CLASSPATH\"" \
-Dcatalina.base="\"$CATALINA_BASE\"" \
-Dcatalina.home="\"$CATALINA_HOME\"" \
-Djava.io.tmpdir="\"$CATALINA_TMPDIR\"" \
org.apache.catalina.startup.Bootstrap "$@" stop

# stop failed. Shutdown port disabled? Try a normal kill.
if [ $? != 0 ]; then
if [ ! -z "$CATALINA_PID" ]; then
echo "The stop command failed. Attempting to signal the process to stop through OS signal."
kill -15 `cat "$CATALINA_PID"` >/dev/null 2>&1
fi
fi

if [ ! -z "$CATALINA_PID" ]; then
if [ -f "$CATALINA_PID" ]; then
while [ $SLEEP -ge 0 ]; do
kill -0 `cat "$CATALINA_PID"` >/dev/null 2>&1
if [ $? -gt 0 ]; then
rm -f "$CATALINA_PID" >/dev/null 2>&1
if [ $? != 0 ]; then
if [ -w "$CATALINA_PID" ]; then
cat /dev/null > "$CATALINA_PID"
# If Tomcat has stopped don't try and force a stop with an empty PID file
FORCE=0
else
echo "The PID file could not be removed or cleared."
fi
fi
echo "Tomcat stopped."
break
fi
if [ $SLEEP -gt 0 ]; then
sleep 1
fi
if [ $SLEEP -eq 0 ]; then
if [ $FORCE -eq 0 ]; then
echo "Tomcat did not stop in time. PID file was not removed. To aid diagnostics a thread dump has been written to standard out."
kill -3 `cat "$CATALINA_PID"`
fi
fi
SLEEP=`expr $SLEEP - 1 `
done
fi
fi

KILL_SLEEP_INTERVAL=5
if [ $FORCE -eq 1 ]; then
if [ -z "$CATALINA_PID" ]; then
echo "Kill failed: \$CATALINA_PID not set"
else
if [ -f "$CATALINA_PID" ]; then
PID=`cat "$CATALINA_PID"`
echo "Killing Tomcat with the PID: $PID"
kill -9 $PID
while [ $KILL_SLEEP_INTERVAL -ge 0 ]; do
kill -0 `cat "$CATALINA_PID"` >/dev/null 2>&1
if [ $? -gt 0 ]; then
rm -f "$CATALINA_PID" >/dev/null 2>&1
if [ $? != 0 ]; then
if [ -w "$CATALINA_PID" ]; then
cat /dev/null > "$CATALINA_PID"
else
echo "The PID file could not be removed."
fi
fi
# Set this to zero else a warning will be issued about the process still running
KILL_SLEEP_INTERVAL=0
echo "The Tomcat process has been killed."
break
fi
if [ $KILL_SLEEP_INTERVAL -gt 0 ]; then
sleep 1
fi
KILL_SLEEP_INTERVAL=`expr $KILL_SLEEP_INTERVAL - 1 `
done
if [ $KILL_SLEEP_INTERVAL -gt 0 ]; then
echo "Tomcat has not been killed completely yet. The process might be waiting on some system call or might be UNINTERRUPTIBLE."
fi
fi
fi
fi

elif [ "$1" = "configtest" ] ; then

eval "\"$_RUNJAVA\"" $LOGGING_MANAGER $JAVA_OPTS \
-Djava.endorsed.dirs="\"$JAVA_ENDORSED_DIRS\"" -classpath "\"$CLASSPATH\"" \
-Dcatalina.base="\"$CATALINA_BASE\"" \
-Dcatalina.home="\"$CATALINA_HOME\"" \
-Djava.io.tmpdir="\"$CATALINA_TMPDIR\"" \
org.apache.catalina.startup.Bootstrap configtest
result=$?
if [ $result -ne 0 ]; then
echo "Configuration error detected!"
fi
exit $result

elif [ "$1" = "version" ] ; then

"$_RUNJAVA" \
-classpath "$CATALINA_HOME/lib/catalina.jar" \
org.apache.catalina.util.ServerInfo

else

echo "Usage: catalina.sh ( commands ... )"
echo "commands:"
if $os400; then
echo " debug Start Catalina in a debugger (not available on OS400)"
echo " debug -security Debug Catalina with a security manager (not available on OS400)"
else
echo " debug Start Catalina in a debugger"
echo " debug -security Debug Catalina with a security manager"
fi
echo " jpda start Start Catalina under JPDA debugger"
echo " run Start Catalina in the current window"
echo " run -security Start in the current window with security manager"
echo " start Start Catalina in a separate window"
echo " start -security Start in a separate window with security manager"
echo " stop Stop Catalina, waiting up to 5 seconds for the process to end"
echo " stop n Stop Catalina, waiting up to n seconds for the process to end"
echo " stop -force Stop Catalina, wait up to 5 seconds and then use kill -KILL if still running"
echo " stop n -force Stop Catalina, wait up to n seconds and then use kill -KILL if still running"
echo " configtest Run a basic syntax check on server.xml - check exit code for result"
echo " version What version of tomcat are you running?"
echo "Note: Waiting for the process to end and use of the -force option require that \$CATALINA_PID is defined"
exit 1

fi

setclasspath.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#!/bin/sh

# Make sure prerequisite environment variables are set
if [ -z "$JAVA_HOME" -a -z "$JRE_HOME" ]; then
if $darwin; then
# Bugzilla 54390
if [ -x '/usr/libexec/java_home' ] ; then
export JAVA_HOME=`/usr/libexec/java_home`
# Bugzilla 37284 (reviewed).
elif [ -d "/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home" ]; then
export JAVA_HOME="/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home"
fi
else
JAVA_PATH=`which java 2>/dev/null`
if [ "x$JAVA_PATH" != "x" ]; then
JAVA_PATH=`dirname $JAVA_PATH 2>/dev/null`
JRE_HOME=`dirname $JAVA_PATH 2>/dev/null`
fi
if [ "x$JRE_HOME" = "x" ]; then
# XXX: Should we try other locations?
if [ -x /usr/bin/java ]; then
JRE_HOME=/usr
fi
fi
fi
if [ -z "$JAVA_HOME" -a -z "$JRE_HOME" ]; then
echo "Neither the JAVA_HOME nor the JRE_HOME environment variable is defined"
echo "At least one of these environment variable is needed to run this program"
exit 1
fi
fi
if [ -z "$JAVA_HOME" -a "$1" = "debug" ]; then
echo "JAVA_HOME should point to a JDK in order to run in debug mode."
exit 1
fi
if [ -z "$JRE_HOME" ]; then
JRE_HOME="$JAVA_HOME"
fi

# If we're running under jdb, we need a full jdk.
if [ "$1" = "debug" ] ; then
if [ "$os400" = "true" ]; then
if [ ! -x "$JAVA_HOME"/bin/java -o ! -x "$JAVA_HOME"/bin/javac ]; then
echo "The JAVA_HOME environment variable is not defined correctly"
echo "This environment variable is needed to run this program"
echo "NB: JAVA_HOME should point to a JDK not a JRE"
exit 1
fi
else
if [ ! -x "$JAVA_HOME"/bin/java -o ! -x "$JAVA_HOME"/bin/jdb -o ! -x "$JAVA_HOME"/bin/javac ]; then
echo "The JAVA_HOME environment variable is not defined correctly"
echo "This environment variable is needed to run this program"
echo "NB: JAVA_HOME should point to a JDK not a JRE"
exit 1
fi
fi
fi

# Don't override the endorsed dir if the user has set it previously
if [ -z "$JAVA_ENDORSED_DIRS" ]; then
# Set the default -Djava.endorsed.dirs argument
JAVA_ENDORSED_DIRS="$CATALINA_HOME"/endorsed
fi

# 设置_RUNJAVA
_RUNJAVA="$JRE_HOME"/bin/java
if [ "$os400" != "true" ]; then
_RUNJDB="$JAVA_HOME"/bin/jdb
fi

start最关键的地方

1
2
3
4
5
6
7
eval "\"$_RUNJAVA\"" "\"$LOGGING_CONFIG\"" $LOGGING_MANAGER $JAVA_OPTS $CATALINA_OPTS \
-Djava.endorsed.dirs="\"$JAVA_ENDORSED_DIRS\"" -classpath "\"$CLASSPATH\"" \
-Dcatalina.base="\"$CATALINA_BASE\"" \
-Dcatalina.home="\"$CATALINA_HOME\"" \
-Djava.io.tmpdir="\"$CATALINA_TMPDIR\"" \
org.apache.catalina.startup.Bootstrap "$@" start \
>> "$CATALINA_OUT" 2>&1 "&"

stop最关键的地方

1
2
3
4
5
6
eval "\"$_RUNJAVA\"" $LOGGING_MANAGER $JAVA_OPTS \
-Djava.endorsed.dirs="\"$JAVA_ENDORSED_DIRS\"" -classpath "\"$CLASSPATH\"" \
-Dcatalina.base="\"$CATALINA_BASE\"" \
-Dcatalina.home="\"$CATALINA_HOME\"" \
-Djava.io.tmpdir="\"$CATALINA_TMPDIR\"" \
org.apache.catalina.startup.Bootstrap "$@" stop