0%

SQL解析流程

sequencediagram01

SQL解析引擎

SQLParsingEngine

sql分析引擎

会先使用词法分析引擎进行词法分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@RequiredArgsConstructor
public final class SQLParsingEngine {
//数据库类型,如mysql
private final DatabaseType dbType;

private final String sql;

private final ShardingRule shardingRule;

/**
* Parse SQL.
*
* @return parsed SQL statement
*/
public SQLStatement parse() {
//根据sql和数据库类型创建词法分析引擎
LexerEngine lexerEngine = LexerEngineFactory.newInstance(dbType, sql);
//读入第一个标记
lexerEngine.nextToken();
//使用sql解析工厂创建sql解析器并解析
return SQLParserFactory.newInstance(dbType, lexerEngine.getCurrentToken().getType(), shardingRule, lexerEngine).parse();
}
}

SQLParserFactory

sql解析器工厂,负责根据sql第一个标记类型创建出相应的解析器

  • 解析器工厂(负责根据数据库类型选择具体解析器)分类:

    SELECT<—>SelectParserFactory、INSERT<—>InsertParserFactory、UPDATE<—>UpdateParserFactory、DELETE<—>DeleteParserFactory、CREATE<—>CreateParserFactory、ALTER<—>AlterParserFactory、DROP<—>DropParserFactory、TRUNCATE<—>TruncateParserFactory

  • 解析器分类:

    SELECT<—>AbstractSelectParser、INSERT<—>AbstractInsertParser、UPDATE<—>AbstractUpdateParser、DELETE<—>AbstractDeleteParser、CREATE<—>AbstractCreateParser、ALTER<—>AbstractAlterParser、DROP<—>AbstractDropParser、TRUNCATE<—>AbstractTruncateParser

SQL解析器

SQLParser

解析器

diagram

1
2
3
4
5
6
7
8
9
public interface SQLParser {

/**
* Parse SQL.
*
* @return SQL statement
*/
SQLStatement parse();
}

插入

以插入为例

2121212

插入语句解析器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public abstract class AbstractInsertParser implements SQLParser {

@Getter(AccessLevel.PROTECTED)
private final ShardingRule shardingRule;

@Getter(AccessLevel.PROTECTED)
private final LexerEngine lexerEngine;

private final AbstractInsertClauseParserFacade insertClauseParserFacade;

public AbstractInsertParser(final ShardingRule shardingRule, final LexerEngine lexerEngine, final AbstractInsertClauseParserFacade insertClauseParserFacade) {
this.shardingRule = shardingRule;
this.lexerEngine = lexerEngine;
this.insertClauseParserFacade = insertClauseParserFacade;
}

@Override
public final DMLStatement parse() {
//读取下一个标记,比如INSERT INTO t_order (user_id, status) VALUES (10, 'INIT')
//就读取到INTO
lexerEngine.nextToken();
//创建InsertStatement
InsertStatement result = new InsertStatement();
//读取INTO后面的表名
insertClauseParserFacade.getInsertIntoClauseParser().parse(result);
//读取插入的列
insertClauseParserFacade.getInsertColumnsClauseParser().parse(result);
//不支持INSERT SELECT
if (lexerEngine.equalAny(DefaultKeyword.SELECT, Symbol.LEFT_PAREN)) {
throw new UnsupportedOperationException("Cannot INSERT SELECT");
}
//读取VALUES 后面
insertClauseParserFacade.getInsertValuesClauseParser().parse(result);
//读取SET 后面
insertClauseParserFacade.getInsertSetClauseParser().parse(result);
//处理自增键转化为GeneratedKeyToken
appendGenerateKey(result);
return result;
}

private void appendGenerateKey(final InsertStatement insertStatement) {
String tableName = insertStatement.getTables().getSingleTableName();
//获取自增列列名
Optional<String> generateKeyColumn = shardingRule.getGenerateKeyColumn(tableName);
if (!generateKeyColumn.isPresent() || null != insertStatement.getGeneratedKey()) {
return;
}
//拿到刚才解析的所有列名
ItemsToken columnsToken = new ItemsToken(insertStatement.getColumnsListLastPosition());
columnsToken.getItems().add(generateKeyColumn.get());
insertStatement.getSqlTokens().add(columnsToken);
//处理自增id
insertStatement.getSqlTokens().add(new GeneratedKeyToken(insertStatement.getValuesListLastPosition()));
}
}

diagram

AbstractInsertClauseParserFacade

门面模式

1
2
3
4
5
6
7
8
9
10
11
12
@RequiredArgsConstructor
@Getter
public abstract class AbstractInsertClauseParserFacade {

private final InsertIntoClauseParser insertIntoClauseParser;

private final InsertColumnsClauseParser insertColumnsClauseParser;

private final InsertValuesClauseParser insertValuesClauseParser;

private final InsertSetClauseParser insertSetClauseParser;
}

diagram

InsertIntoClauseParser

INTO 部分解析

1
2
3
4
5
6
7
8
9
10
public void parse(final InsertStatement insertStatement) {
lexerEngine.unsupportedIfEqual(getUnsupportedKeywordsBeforeInto());
//一直读取直到结束或者 "INTO"
lexerEngine.skipUntil(DefaultKeyword.INTO);
//读取"INTO"下一个标记
lexerEngine.nextToken();
//解析表
tableReferencesClauseParser.parse(insertStatement, true);
skipBetweenTableAndValues(insertStatement);
}

表解析器MySQLTableReferencesClauseParser

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public final void parse(final SQLStatement sqlStatement, final boolean isSingleTableOnly) {
do {
parseTableReference(sqlStatement, isSingleTableOnly);
//','分割
} while (lexerEngine.skipIfEqual(Symbol.COMMA));
}

@Override
protected void parseTableReference(final SQLStatement sqlStatement, final boolean isSingleTableOnly) {
parseTableFactor(sqlStatement, isSingleTableOnly);
//解析PARTITION,Mysql不支持。
parsePartition();
//解析使用索引
parseIndexHint(sqlStatement);
}

protected final void parseTableFactor(final SQLStatement sqlStatement, final boolean isSingleTableOnly) {
//"INTO"下一个标记开始的下标
//如:INSERT INTO t_order (user_id, status) VALUES (10, 'INIT')
//是12
final int beginPosition = lexerEngine.getCurrentToken().getEndPosition() - lexerEngine.getCurrentToken().getLiterals().length();
//"INTO"下一个字面量,就是逻辑表名
//如:INSERT INTO t_order (user_id, status) VALUES (10, 'INIT')
//是t_order
String literals = lexerEngine.getCurrentToken().getLiterals();
//下一个标记,如(、AS等,
lexerEngine.nextToken();
//不能支持`schema.table`
if (lexerEngine.equalAny(Symbol.DOT)) {
throw new UnsupportedOperationException("Cannot support SQL for `schema.table`");
}
//移除 '`'和 '"'
String tableName = SQLUtil.getExactlyValue(literals);
//解析 AS ,拿到别名,并跳到别名下一个标记
Optional<String> alias = aliasClauseParser.parse();

if (isSingleTableOnly || shardingRule.tryFindTableRule(tableName).isPresent() || shardingRule.findBindingTableRule(tableName).isPresent()
|| shardingRule.getDataSourceMap().containsKey(shardingRule.getDefaultDataSourceName())) {
//添加sqlToken (12,t_order)
sqlStatement.getSqlTokens().add(new TableToken(beginPosition, literals));
//添加表名和别名
sqlStatement.getTables().add(new Table(tableName, alias));
}
//解析join
parseJoinTable(sqlStatement);
if (isSingleTableOnly && !sqlStatement.getTables().isSingleTable()) {
throw new UnsupportedOperationException("Cannot support Multiple-Table.");
}

private void parseIndexHint(final SQLStatement sqlStatement) {
//USE、IGNORE、FORCE
if (getLexerEngine().skipIfEqual(DefaultKeyword.USE, MySQLKeyword.IGNORE, MySQLKeyword.FORCE)) {
//INDEX、KEY、FOR、JOIN、ORDER、GROUP、BY
getLexerEngine().skipAll(DefaultKeyword.INDEX, DefaultKeyword.KEY, DefaultKeyword.FOR, DefaultKeyword.JOIN, DefaultKeyword.ORDER, DefaultKeyword.GROUP, DefaultKeyword.BY);
getLexerEngine().skipParentheses(sqlStatement);
}
}

别名解析器AliasClauseParser

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public Optional<String> parse() {
//解析到AS了,就在往下读一个标记
if (lexerEngine.skipIfEqual(DefaultKeyword.AS)) {
//读到符号返回不存在
if (lexerEngine.equalAny(Symbol.values())) {
return Optional.absent();
}
//接下来的字面量去 '`'和 '"'
String result = SQLUtil.getExactlyValue(lexerEngine.getCurrentToken().getLiterals());
//往下读
lexerEngine.nextToken();
//返回别名
return Optional.of(result);
}
//直接别名的
if (lexerEngine.equalAny(
Literals.IDENTIFIER, Literals.CHARS, DefaultKeyword.USER, DefaultKeyword.END, DefaultKeyword.CASE, DefaultKeyword.KEY, DefaultKeyword.INTERVAL, DefaultKeyword.CONSTRAINT)) {
String result = SQLUtil.getExactlyValue(lexerEngine.getCurrentToken().getLiterals());
lexerEngine.nextToken();
//返回别名
return Optional.of(result);
}
return Optional.absent();
}

InsertColumnsClauseParser

列 部分解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void parse(final InsertStatement insertStatement) {
Collection<Column> result = new LinkedList<>();
// "("开头
if (lexerEngine.equalAny(Symbol.LEFT_PAREN)) {
//刚才解析出来的表名
String tableName = insertStatement.getTables().getSingleTableName();
//获取该表的分片规则列的列名
Optional<String> generateKeyColumn = shardingRule.getGenerateKeyColumn(tableName);
int count = 0;
//读取INTO 的所有列名
do {
lexerEngine.nextToken();
String columnName = SQLUtil.getExactlyValue(lexerEngine.getCurrentToken().getLiterals());
result.add(new Column(columnName, tableName));
lexerEngine.nextToken();
if (generateKeyColumn.isPresent() && generateKeyColumn.get().equalsIgnoreCase(columnName)) {
//记下需要自增的列的位置
insertStatement.setGenerateKeyColumnIndex(count);
}
count++;
} while (!lexerEngine.equalAny(Symbol.RIGHT_PAREN) && !lexerEngine.equalAny(Assist.END));
//记录最后一列结束位置
insertStatement.setColumnsListLastPosition(lexerEngine.getCurrentToken().getEndPosition() - lexerEngine.getCurrentToken().getLiterals().length());
//跳过")"
lexerEngine.nextToken();
}
//设置列名
insertStatement.getColumns().addAll(result);
}

InsertValuesClauseParser

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public void parse(final InsertStatement insertStatement) {
Collection<Keyword> valueKeywords = new LinkedList<>();
//VALUES
valueKeywords.add(DefaultKeyword.VALUES);
//mysql是VALUE
valueKeywords.addAll(Arrays.asList(getSynonymousKeywordsForValues()));
//读到VALUES或VALUE,接着读下一个
if (lexerEngine.skipIfEqual(valueKeywords.toArray(new Keyword[valueKeywords.size()]))) {
//记录VALUES或VALUE后面开始的位置
insertStatement.setAfterValuesPosition(lexerEngine.getCurrentToken().getEndPosition() - lexerEngine.getCurrentToken().getLiterals().length());
//VALUES或VALUE的值和表名组成Condition
parseValues(insertStatement);
//如果是","表示批量插入的写法
if (lexerEngine.equalAny(Symbol.COMMA)) {
parseMultipleValues(insertStatement);
}
}
}


private void parseValues(final InsertStatement insertStatement) {
//跳过"("
lexerEngine.accept(Symbol.LEFT_PAREN);
List<SQLExpression> sqlExpressions = new LinkedList<>();
do {
//表达式,就是每一个值,逗号隔开
sqlExpressions.add(expressionClauseParser.parse(insertStatement));
} while (lexerEngine.skipIfEqual(Symbol.COMMA));
//记录结束位置
insertStatement.setValuesListLastPosition(lexerEngine.getCurrentToken().getEndPosition() - lexerEngine.getCurrentToken().getLiterals().length());
int count = 0;
//列名和值组装成条件Condition
for (Column each : insertStatement.getColumns()) {
SQLExpression sqlExpression = sqlExpressions.get(count);
insertStatement.getConditions().add(new Condition(each, sqlExpression), shardingRule);
if (insertStatement.getGenerateKeyColumnIndex() == count) {
insertStatement.setGeneratedKey(createGeneratedKey(each, sqlExpression));
}
count++;
}
//跳过")"
lexerEngine.accept(Symbol.RIGHT_PAREN);
}

查询

select语法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
SELECT
[ALL | DISTINCT | DISTINCTROW ]
[HIGH_PRIORITY]
[STRAIGHT_JOIN]
[SQL_SMALL_RESULT][SQL_BIG_RESULT] [SQL_BUFFER_RESULT]
[SQL_CACHE | SQL_NO_CACHE][SQL_CALC_FOUND_ROWS]
select_expr [, select_expr ...]
[FROM table_references
[PARTITION partition_list]
[WHERE where_condition]
[GROUP BY {col_name | expr | position}
[ASC | DESC], ... [WITH ROLLUP]]
[HAVING where_condition]
[ORDER BY {col_name | expr | position}
[ASC | DESC], ...]
[LIMIT {[offset,] row_count | row_count OFFSET offset}]
[PROCEDURE procedure_name(argument_list)]
[INTO OUTFILE 'file_name'
[CHARACTER SET charset_name]
export_options
| INTO DUMPFILE 'file_name'
| INTO var_name [, var_name]]
[FOR UPDATE | LOCK IN SHARE MODE]]

select_expr 指的是你想获取的列,至少要有一个

table_references 指的是表或者表中的行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
tbl_name [[AS] alias] [index_hint_list]

index_hint_list:
index_hint [index_hint] ...

index_hint:
USE {INDEX|KEY}
[FOR {JOIN|ORDER BY|GROUP BY}] ([index_list])
| IGNORE {INDEX|KEY}
[FOR {JOIN|ORDER BY|GROUP BY}] (index_list)
| FORCE {INDEX|KEY}
[FOR {JOIN|ORDER BY|GROUP BY}] (index_list)

index_list:
index_name [, index_name] ...

select … partition 分区 。https://dev.mysql.com/doc/refman/5.7/en/partitioning-selection.html

Expression Syntax

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
expr:
expr OR expr
| expr || expr
| expr XOR expr
| expr AND expr
| expr && expr
| NOT expr
| ! expr
| boolean_primary IS [NOT] {TRUE | FALSE | UNKNOWN}
| boolean_primary

boolean_primary:
boolean_primary IS [NOT] NULL
| boolean_primary <=> predicate
| boolean_primary comparison_operator predicate
| boolean_primary comparison_operator {ALL | ANY} (subquery)
| predicate

comparison_operator: = | >= | > | <= | < | <> | !=

predicate:
bit_expr [NOT] IN (subquery)
| bit_expr [NOT] IN (expr [, expr] ...)
| bit_expr [NOT] BETWEEN bit_expr AND predicate
| bit_expr SOUNDS LIKE bit_expr
| bit_expr [NOT] LIKE simple_expr [ESCAPE simple_expr]
| bit_expr [NOT] REGEXP bit_expr
| bit_expr

bit_expr:
bit_expr | bit_expr
| bit_expr & bit_expr
| bit_expr << bit_expr
| bit_expr >> bit_expr
| bit_expr + bit_expr
| bit_expr - bit_expr
| bit_expr * bit_expr
| bit_expr / bit_expr
| bit_expr DIV bit_expr
| bit_expr MOD bit_expr
| bit_expr % bit_expr
| bit_expr ^ bit_expr
| bit_expr + interval_expr
| bit_expr - interval_expr
| simple_expr

simple_expr:
literal
| identifier
| function_call
| simple_expr COLLATE collation_name
| param_marker
| variable
| simple_expr || simple_expr
| + simple_expr
| - simple_expr
| ~ simple_expr
| ! simple_expr
| BINARY simple_expr
| (expr [, expr] ...)
| ROW (expr, expr [, expr] ...)
| (subquery)
| EXISTS (subquery)
| {identifier expr}
| match_expr
| case_expr
| interval_expr

Operator Precedence

运算符的优先级

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
INTERVAL
BINARY, COLLATE
!
- (unary minus), ~ (unary bit inversion)
^
*, /, DIV, %, MOD
-, +
<<, >>
&
|
= (comparison), <=>, >=, >, <=, <, <>, !=, IS, LIKE, REGEXP, IN
BETWEEN, CASE, WHEN, THEN, ELSE
NOT
AND, &&
XOR
OR, ||
= (assignment), :=

查询语句解析器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@RequiredArgsConstructor
@Getter(AccessLevel.PROTECTED)
public abstract class AbstractSelectParser implements SQLParser {

private static final String DERIVED_COUNT_ALIAS = "AVG_DERIVED_COUNT_%s";

private static final String DERIVED_SUM_ALIAS = "AVG_DERIVED_SUM_%s";

private static final String ORDER_BY_DERIVED_ALIAS = "ORDER_BY_DERIVED_%s";

private static final String GROUP_BY_DERIVED_ALIAS = "GROUP_BY_DERIVED_%s";

private final ShardingRule shardingRule;

private final LexerEngine lexerEngine;

private final AbstractSelectClauseParserFacade selectClauseParserFacade;

private final List<SelectItem> items = new LinkedList<>();

@Override
public final SelectStatement parse() {
//解析成SelectStatement
SelectStatement result = parseInternal();
//是否包含子查询,包含的话合并子查询语句
if (result.containsSubQuery()) {
result = result.mergeSubQueryStatement();
}
// TODO move to rewrite
appendDerivedColumns(result);
appendDerivedOrderBy(result);
return result;
}

private SelectStatement parseInternal() {
SelectStatement result = new SelectStatement();
//跳过第一个SELECT
lexerEngine.nextToken();
parseInternal(result);
return result;
}


}

MySQLSelectParser

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
protected void parseInternal(final SelectStatement selectStatement) {
//解析distinct
parseDistinct();
//解析Option
parseSelectOption();
//解析列
parseSelectList(selectStatement, getItems());
//解析from
parseFrom(selectStatement);
//解析WHERE
parseWhere(getShardingRule(), selectStatement, getItems());
//解析Group By
parseGroupBy(selectStatement);
//解析Having
parseHaving();
//解析Order By
parseOrderBy(selectStatement);
//解析Limit
parseLimit(selectStatement);
//解析
parseSelectRest();
}

AbstractSelectClauseParserFacade

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@RequiredArgsConstructor
@Getter
public abstract class AbstractSelectClauseParserFacade {

private final DistinctClauseParser distinctClauseParser;

private final SelectListClauseParser selectListClauseParser;

private final TableReferencesClauseParser tableReferencesClauseParser;

private final WhereClauseParser whereClauseParser;

private final GroupByClauseParser groupByClauseParser;

private final HavingClauseParser havingClauseParser;

private final OrderByClauseParser orderByClauseParser;

private final SelectRestClauseParser selectRestClauseParser;
}

解析Distinct

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@RequiredArgsConstructor
public class DistinctClauseParser implements SQLClauseParser {

private final LexerEngine lexerEngine;

/**
* Parse distinct.
*/
public final void parse() {
lexerEngine.skipAll(DefaultKeyword.ALL);
Collection<Keyword> distinctKeywords = new LinkedList<>();
distinctKeywords.add(DefaultKeyword.DISTINCT);
distinctKeywords.addAll(Arrays.asList(getSynonymousKeywordsForDistinct()));
lexerEngine.unsupportedIfEqual(distinctKeywords.toArray(new Keyword[distinctKeywords.size()]));
}

protected Keyword[] getSynonymousKeywordsForDistinct() {
return new Keyword[0];
}
}

解析Option

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RequiredArgsConstructor
public final class MySQLSelectOptionClauseParser implements SQLClauseParser {

private final LexerEngine lexerEngine;

/**
* 解析Option.
*/
public void parse() {
//HIGH_PRIORITY、STRAIGHT_JOIN、SQL_SMALL_RESULT、SQL_BIG_RESULT、SQL_BUFFER_RESULT、SQL_CACHE、SQL_NO_CACHE、SQL_CALC_FOUND_ROWS
lexerEngine.skipAll(MySQLKeyword.HIGH_PRIORITY, MySQLKeyword.STRAIGHT_JOIN,
MySQLKeyword.SQL_SMALL_RESULT, MySQLKeyword.SQL_BIG_RESULT, MySQLKeyword.SQL_BUFFER_RESULT, MySQLKeyword.SQL_CACHE, MySQLKeyword.SQL_NO_CACHE, MySQLKeyword.SQL_CALC_FOUND_ROWS);
}
}

解析列

SelectListClauseParser

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void parse(final SelectStatement selectStatement, final List<SelectItem> items) {
do {
selectStatement.getItems().add(parseSelectItem(selectStatement));
//','分割
} while (lexerEngine.skipIfEqual(Symbol.COMMA));
selectStatement.setSelectListLastPosition(lexerEngine.getCurrentToken().getEndPosition() - lexerEngine.getCurrentToken().getLiterals().length());
items.addAll(selectStatement.getItems());
}

//解析列
private SelectItem parseSelectItem(final SelectStatement selectStatement) {
//跳过CONNECT_BY_ROOT
lexerEngine.skipIfEqual(getSkippedKeywordsBeforeSelectItem());
SelectItem result;
//Mysql是false,没有行号。sqlserver是ROW_NUMBER
if (isRowNumberSelectItem()) {
result = parseRowNumberSelectItem(selectStatement);
// 解析'*'
} else if (isStarSelectItem()) {
selectStatement.setContainStar(true);
//StarSelectItem
result = parseStarSelectItem();
//解析MAX, MIN, SUM, AVG, COUNT
} else if (isAggregationSelectItem()) {
//转化AggregationSelectItem
result = parseAggregationSelectItem(selectStatement);
parseRestSelectItem(selectStatement);
//其他情况,也就是直接列名的
} else {
result = new CommonSelectItem(SQLUtil.getExactlyValue(parseCommonSelectItem(selectStatement) + parseRestSelectItem(selectStatement)), aliasClauseParser.parse());
}
return result;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private String parseCommonSelectItem(final SelectStatement selectStatement) {
String literals = lexerEngine.getCurrentToken().getLiterals();
int position = lexerEngine.getCurrentToken().getEndPosition() - literals.length();
StringBuilder result = new StringBuilder();
result.append(literals);
lexerEngine.nextToken();
//'('开头
if (lexerEngine.equalAny(Symbol.LEFT_PAREN)) {
//跳过括号内所有令牌,并返回括号内内容
result.append(lexerEngine.skipParentheses(selectStatement));
//字面量带'.'
} else if (lexerEngine.equalAny(Symbol.DOT)) {
//解析表名
String tableName = SQLUtil.getExactlyValue(literals);
//看绑定的逻辑表名有没有
if (shardingRule.tryFindTableRule(tableName).isPresent() || shardingRule.findBindingTableRule(tableName).isPresent()) {
//添加到TableToken
selectStatement.getSqlTokens().add(new TableToken(position, literals));
}

result.append(lexerEngine.getCurrentToken().getLiterals());
lexerEngine.nextToken();
//列名
result.append(lexerEngine.getCurrentToken().getLiterals());
lexerEngine.nextToken();
}
return result.toString();
}

private String parseRestSelectItem(final SelectStatement selectStatement) {
StringBuilder result = new StringBuilder();
while (lexerEngine.equalAny(Symbol.getOperators())) {
result.append(lexerEngine.getCurrentToken().getLiterals());
lexerEngine.nextToken();
result.append(parseCommonSelectItem(selectStatement));
}
return result.toString();
}

解析FROM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected final void parseFrom(final SelectStatement selectStatement) {
//不支持SELECT * INTO
lexerEngine.unsupportedIfEqual(DefaultKeyword.INTO);
if (lexerEngine.skipIfEqual(DefaultKeyword.FROM)) {
parseTable(selectStatement);
}
}


private void parseTable(final SelectStatement selectStatement) {
//'(',解析子查询
if (lexerEngine.skipIfEqual(Symbol.LEFT_PAREN)) {
//子查询,在调用一次parseInternal
selectStatement.setSubQueryStatement(parseInternal());
//跳过WHERE
if (lexerEngine.equalAny(DefaultKeyword.WHERE, Assist.END)) {
return;
}
}
//解析表名。TableReferencesClauseParser已经分析过了
selectClauseParserFacade.getTableReferencesClauseParser().parse(selectStatement, false);
}

解析Where

1
2
3
4
5
6
7
8
9
10
11
12
protected final void parseWhere(final ShardingRule shardingRule, final SelectStatement selectStatement, final List<SelectItem> items) {
selectClauseParserFacade.getWhereClauseParser().parse(shardingRule, selectStatement, items);
}

public void parse(final ShardingRule shardingRule, final SQLStatement sqlStatement, final List<SelectItem> items) {
//AliasClauseParser 别名解析器,分析过了
aliasClauseParser.parse();
//Where
if (lexerEngine.skipIfEqual(DefaultKeyword.WHERE)) {
parseConditions(shardingRule, sqlStatement, items);
}
}
1
2
3
4
5
6
7
private void parseConditions(final ShardingRule shardingRule, final SQLStatement sqlStatement, final List<SelectItem> items) {
do {
parseComparisonCondition(shardingRule, sqlStatement, items);
} while (lexerEngine.skipIfEqual(DefaultKeyword.AND));
//不支持OR,3.X支持了。。。我这里源码是2.x的所以没有
lexerEngine.unsupportedIfEqual(DefaultKeyword.OR);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
private void parseComparisonCondition(final ShardingRule shardingRule, final SQLStatement sqlStatement, final List<SelectItem> items) {
//跳过'('
lexerEngine.skipIfEqual(Symbol.LEFT_PAREN);
//表达式解析器
SQLExpression left = expressionClauseParser.parse(sqlStatement);
//'='
if (lexerEngine.skipIfEqual(Symbol.EQ)) {
parseEqualCondition(shardingRule, sqlStatement, left);
lexerEngine.skipIfEqual(Symbol.RIGHT_PAREN);
return;
}
//IN
if (lexerEngine.skipIfEqual(DefaultKeyword.IN)) {
parseInCondition(shardingRule, sqlStatement, left);
lexerEngine.skipIfEqual(Symbol.RIGHT_PAREN);
return;
}
//BETWEEN
if (lexerEngine.skipIfEqual(DefaultKeyword.BETWEEN)) {
parseBetweenCondition(shardingRule, sqlStatement, left);
lexerEngine.skipIfEqual(Symbol.RIGHT_PAREN);
return;
}
//处理rowNumber
if (sqlStatement instanceof SelectStatement && isRowNumberCondition(items, left)) {
if (lexerEngine.skipIfEqual(Symbol.LT, Symbol.LT_EQ)) {
parseRowCountCondition((SelectStatement) sqlStatement);
return;
}
if (lexerEngine.skipIfEqual(Symbol.GT, Symbol.GT_EQ)) {
parseOffsetCondition((SelectStatement) sqlStatement);
return;
}
}
//处理其他自定义条件,mysql就是REGEXP
List<Keyword> otherConditionOperators = new LinkedList<>(Arrays.asList(getCustomizedOtherConditionOperators()));
//'<'、'<='、'>'、'>='、'<>'、'!='、'!>'、'!<'、'LIKE'、'IS'
otherConditionOperators.addAll(
Arrays.asList(Symbol.LT, Symbol.LT_EQ, Symbol.GT, Symbol.GT_EQ, Symbol.LT_GT, Symbol.BANG_EQ, Symbol.BANG_GT, Symbol.BANG_LT, DefaultKeyword.LIKE, DefaultKeyword.IS));
//处理条件
if (lexerEngine.skipIfEqual(otherConditionOperators.toArray(new Keyword[otherConditionOperators.size()]))) {
parseOtherCondition(sqlStatement);
}
//处理NOT
if (lexerEngine.skipIfEqual(DefaultKeyword.NOT)) {
lexerEngine.nextToken();
lexerEngine.skipIfEqual(Symbol.LEFT_PAREN);
parseOtherCondition(sqlStatement);
lexerEngine.skipIfEqual(Symbol.RIGHT_PAREN);
}
lexerEngine.skipIfEqual(Symbol.RIGHT_PAREN);
}

解析Group By

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final void parse(final SelectStatement selectStatement) {
if (!lexerEngine.skipIfEqual(DefaultKeyword.GROUP)) {
return;
}
lexerEngine.accept(DefaultKeyword.BY);
while (true) {
addGroupByItem(expressionClauseParser.parse(selectStatement), selectStatement);
//不是',',也就是不是多个就跳出
if (!lexerEngine.equalAny(Symbol.COMMA)) {
break;
}
//处理下一个
lexerEngine.nextToken();
}
//WITH、ROLLUP
lexerEngine.skipAll(getSkippedKeywordAfterGroupBy());
selectStatement.setGroupByLastPosition(lexerEngine.getCurrentToken().getEndPosition() - lexerEngine.getCurrentToken().getLiterals().length());
}

关于ROLLUP https://dev.mysql.com/doc/refman/5.7/en/group-by-modifiers.html

解析Group By后面的东西

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void addGroupByItem(final SQLExpression sqlExpression, final SelectStatement selectStatement) {
//如果是Oracle不支持ROLLUP、CUBE、GROUPING
lexerEngine.unsupportedIfEqual(getUnsupportedKeywordBeforeGroupByItem());
OrderType orderByType = OrderType.ASC;
//ASC,默认ASC
if (lexerEngine.equalAny(DefaultKeyword.ASC)) {
lexerEngine.nextToken();
//DESC
} else if (lexerEngine.skipIfEqual(DefaultKeyword.DESC)) {
orderByType = OrderType.DESC;
}
OrderItem orderItem;
if (sqlExpression instanceof SQLPropertyExpression) {
SQLPropertyExpression sqlPropertyExpression = (SQLPropertyExpression) sqlExpression;
orderItem = new OrderItem(SQLUtil.getExactlyValue(sqlPropertyExpression.getOwner().getName()), SQLUtil.getExactlyValue(sqlPropertyExpression.getName()), orderByType, OrderType.ASC,
selectStatement.getAlias(SQLUtil.getExactlyValue(sqlPropertyExpression.getOwner() + "." + SQLUtil.getExactlyValue(sqlPropertyExpression.getName()))));
} else if (sqlExpression instanceof SQLIdentifierExpression) {
SQLIdentifierExpression sqlIdentifierExpression = (SQLIdentifierExpression) sqlExpression;
orderItem = new OrderItem(
SQLUtil.getExactlyValue(sqlIdentifierExpression.getName()), orderByType, OrderType.ASC, selectStatement.getAlias(SQLUtil.getExactlyValue(sqlIdentifierExpression.getName())));
} else if (sqlExpression instanceof SQLIgnoreExpression) {
SQLIgnoreExpression sqlIgnoreExpression = (SQLIgnoreExpression) sqlExpression;
orderItem = new OrderItem(sqlIgnoreExpression.getExpression(), orderByType, OrderType.ASC, selectStatement.getAlias(sqlIgnoreExpression.getExpression()));
} else {
return;
}
selectStatement.getGroupByItems().add(orderItem);
}

合并子查询

1
2
3
4
5
6
public SelectStatement mergeSubQueryStatement() {
SelectStatement result = processLimitForSubQuery();
processItems(result);
processOrderByItems(result);
return result;
}

分库分表配置

TableRuleConfiguration##build—>TableRule

TableRuleConfiguration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//逻辑表,表名
//例:订单数据根据主键尾数拆分为10张表,分别是t_order_0到t_order_9,他们的逻辑表名为t_order。
private String logicTable;
//
//如:ds_jdbc.t_order_${0..9}
//
private String actualDataNodes;
//数据库分库策略
private ShardingStrategyConfiguration databaseShardingStrategyConfig;
//分表策略
private ShardingStrategyConfiguration tableShardingStrategyConfig;
//自增列
private String keyGeneratorColumnName;
//id生成器类名
private String keyGeneratorClass;
  • actualDataNodes
1
2
//使用groovy表达式解析出真实数据库节点
List<String> actualDataNodes = new InlineExpressionParser(this.actualDataNodes).evaluate();

InlineExpressionParser

内联表达式解析器

inlineExpression是它唯一的参数和构造参数。如ds_jdbc.t_order_${[0, 9]}这样的表达式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public List<String> evaluate() {
if (null == inlineExpression) {
return Collections.emptyList();
}
return flatten(evaluate(split()));
}
// ds_jdbc.t_order_${[0,9]} ,ds_jdbc.t_order_${1..8}
//result: ["ds_jdbc.t_order_${[0,9]}", "ds_jdbc.t_order_${1..8}"]
private List<String> split() {
List<String> result = new ArrayList<>();
StringBuilder segment = new StringBuilder();
int bracketsDepth = 0;
for (int i = 0; i < inlineExpression.length(); i++) {
char each = inlineExpression.charAt(i);
switch (each) {
// ','
case SPLITTER:
if (bracketsDepth > 0) {
segment.append(each);
} else {
result.add(segment.toString().trim());
segment.setLength(0);
}
break;
case '$':
if ('{' == inlineExpression.charAt(i + 1)) {
bracketsDepth++;
}
segment.append(each);
break;
case '}':
if (bracketsDepth > 0) {
bracketsDepth--;
}
segment.append(each);
break;
default:
segment.append(each);
break;
}
}
if (segment.length() > 0) {
result.add(segment.toString().trim());
}
return result;
}

//Groovy表达式
//inlineExpressions: "ds_jdbc.t_order_${[0,9]} ,ds_jdbc.t_order_${1..8}"
//result : [ds_jdbc.t_order_[0, 9], ds_jdbc.t_order_[1, 2, 3, 4, 5, 6, 7, 8]] -> List<GString>
private List<Object> evaluate(final List<String> inlineExpressions) {
List<Object> result = new ArrayList<>(inlineExpressions.size());
GroovyShell shell = new GroovyShell();
for (String each : inlineExpressions) {
StringBuilder expression = new StringBuilder(each);
if (!each.startsWith("\"")) {
expression.insert(0, "\"");
}
if (!each.endsWith("\"")) {
expression.append("\"");
}
result.add(shell.evaluate(expression.toString()));
}
return result;
}

//segments: "[ds_jdbc.t_order_[0, 9], ds_jdbc.t_order_[1, 2, 3, 4, 5, 6, 7, 8]]" -> List<GString>
//result : "ds_jdbc.t_order_0","ds_jdbc.t_order_1","ds_jdbc.t_order_2","ds_jdbc.t_order_3","ds_jdbc.t_order_4","ds_jdbc.t_order_5","ds_jdbc.t_order_6","ds_jdbc.t_order_7","ds_jdbc.t_order_8","ds_jdbc.t_order_9"
private List<String> flatten(final List<Object> segments) {
List<String> result = new ArrayList<>();
for (Object each : segments) {
if (each instanceof GString) {
result.addAll(assemblyCartesianSegments((GString) each));
} else {
result.add(each.toString());
}
}
return result;
}

TableRule

1
2
3
4
5
6
7
8
9
10
11
12
private final String logicTable;
//静态分库分表数据单元
//"ds_jdbc.t_order_0","ds_jdbc.t_order_1","ds_jdbc.t_order_2","ds_jdbc.t_order_3","ds_jdbc.t_order_4","ds_jdbc.t_order_5","ds_jdbc.t_order_6","ds_jdbc.t_order_7","ds_jdbc.t_order_8","ds_jdbc.t_order_9"
private final List<DataNode> actualDataNodes;

private final ShardingStrategy databaseShardingStrategy;

private final ShardingStrategy tableShardingStrategy;

private final String generateKeyColumn;

private final KeyGenerator keyGenerator;

ShardingStrategyConfiguration

  • StandardShardingStrategyConfiguration 标准分片策略

    1
    2
    3
    4
    5
    6
    //分片列名
    private final String shardingColumn;
    //用于处理=和IN的分片
    private final String preciseAlgorithmClassName;
    //处理BETWEEN AND分片
    private final String rangeAlgorithmClassName;
  • ComplexShardingStrategyConfiguration

1
2
3
private final String shardingColumns;
//
private final String algorithmClassName;
  • InlineShardingStrategyConfiguration
1
2
3
4
private final String shardingColumn;
//分片表达式,
//如:t_user_${u_id % 8} 表示t_user表按照u_id按8取模分成8个表,表名称为t_user_0到t_user_7。
private final String algorithmExpression;
  • HintShardingStrategyConfiguration
1
private final String algorithmClassName;
  • NoneShardingStrategyConfiguration

ShardingStrategy

  • StandardShardingStrategy
1
2
3
4
5
private final String shardingColumn;
//用于处理=和IN的算法
private final PreciseShardingAlgorithm preciseShardingAlgorithm;
//这个是可选的,处理BETWEEN AND算法
private final Optional<RangeShardingAlgorithm> rangeShardingAlgorithm;
  • ComplexShardingStrategy 复合分片策略
1
2
3
4
@Getter
private final Collection<String> shardingColumns;
//复合分片算法
private final ComplexKeysShardingAlgorithm shardingAlgorithm;
  • InlineShardingStrategy 行表达式分片策略
1
2
3
private final String shardingColumn;
//表达式转化成groovy闭包
private final Closure<?> closure;
  • HintShardingStrategy 通过Hint而非SQL解析的方式分片的策略
1
2
3
4
@Getter
private final Collection<String> shardingColumns;

private final HintShardingAlgorithm shardingAlgorithm;
  • NoneShardingStrategy 不分片的策略
1
private final Collection<String> shardingColumns = Collections.emptyList();

ShardingRuleConfiguration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

private String defaultDataSourceName;

private Collection<TableRuleConfiguration> tableRuleConfigs = new LinkedList<>();

private Collection<String> bindingTableGroups = new LinkedList<>();

private ShardingStrategyConfiguration defaultDatabaseShardingStrategyConfig;

private ShardingStrategyConfiguration defaultTableShardingStrategyConfig;

private String defaultKeyGeneratorClass;

private Collection<MasterSlaveRuleConfiguration> masterSlaveRuleConfigs =
new LinkedList<>();

ShardingRule

1
2
3
4
5
6
7
8
9
10
11
12
13
private final Map<String, DataSource> dataSourceMap;

private final String defaultDataSourceName;
//表规则配置对象集合
private final Collection<TableRule> tableRules;

private final Collection<BindingTableRule> bindingTableRules = new LinkedList<>();
//分库策略
private final ShardingStrategy defaultDatabaseShardingStrategy;
//分表策略
private final ShardingStrategy defaultTableShardingStrategy;

private final KeyGenerator defaultKeyGenerator;

重要的接口

1
2
3
4
5
6
7
8
//分片值规则
public interface ShardingValue {
//获取逻辑表名
String getLogicTableName();

//获取列名
String getColumnName();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
//表映射单元
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public final class TableUnit {
//真实数据源名
private final String dataSourceName;
//逻辑表名
private final String logicTableName;
//真实表名
private final String actualTableName;
}
1
2
3
4
5
6
7
8
9
10
11
//SQL执行单元
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public final class SQLExecutionUnit {
//真实数据源
private final String dataSource;
//真实sql
private final String sql;
}
1
2
3
4
5
6
7
8
9
10
11
//SQL路由结果
@RequiredArgsConstructor
@Getter
public final class SQLRouteResult {

private final SQLStatement sqlStatement;

private final Set<SQLExecutionUnit> executionUnits = new LinkedHashSet<>();

private final List<Number> generatedKeys = new LinkedList<>();
}

执行流程

121212

路由

SQLRouter

SQL 路由器接口

1
2
3
4
5
6
7
public interface SQLRouter {
//解析sql
SQLStatement parse(String logicSQL, int parametersSize);

//路由sql
SQLRouteResult route(String logicSQL, List<Object> parameters, SQLStatement sqlStatement);
}

有两种实现:

  • DatabaseHintSQLRouter:通过提示且仅路由至数据库的SQL路由器
1
2
3
4
5
@Override
public SQLStatement parse(final String logicSQL, final int parametersSize) {
//通过词法分析分析出sql的类型
return new SQLJudgeEngine(logicSQL).judge();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
// TODO insert SQL need parse gen key
public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
//SQL路由结果
SQLRouteResult result = new SQLRouteResult(sqlStatement);
//路由
RoutingResult routingResult = new DatabaseHintRoutingEngine(shardingRule.getDataSourceMap(), (HintShardingStrategy) shardingRule.getDefaultDatabaseShardingStrategy()).route();
for (TableUnit each : routingResult.getTableUnits().getTableUnits()) {
result.getExecutionUnits().add(new SQLExecutionUnit(each.getDataSourceName(), logicSQL));
}
//打印sql
if (showSQL) {
SQLLogger.logSQL(logicSQL, sqlStatement, result.getExecutionUnits(), parameters);
}
return result;
}

DatabaseHintRoutingEngine 数据库提示路由引擎

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private final Map<String, DataSource> dataSourceMap;
//hint路由策略
private final HintShardingStrategy databaseShardingStrategy;

@Override
public RoutingResult route() {
//获取分片键值
Optional<ShardingValue> shardingValue = HintManagerHolder.getDatabaseShardingValue(new ShardingKey(HintManagerHolder.DB_TABLE_NAME, HintManagerHolder.DB_COLUMN_NAME));
Preconditions.checkState(shardingValue.isPresent());
log.debug("Before database sharding only db:{} sharding values: {}", dataSourceMap.keySet(), shardingValue.get());
//路由
Collection<String> routingDataSources;
routingDataSources = databaseShardingStrategy.doSharding(dataSourceMap.keySet(), Collections.singletonList(shardingValue.get()));
Preconditions.checkState(!routingDataSources.isEmpty(), "no database route info");
log.debug("After database sharding only result: {}", routingDataSources);
RoutingResult result = new RoutingResult();
//TableUnit
for (String each : routingDataSources) {
result.getTableUnits().getTableUnits().add(new TableUnit(each, "", ""));
}
return result;
}

HintShardingStrategy hint路由策略

1
2
3
4
5
6
7
8
@Override
public Collection<String> doSharding(final Collection<String> availableTargetNames, final Collection<ShardingValue> shardingValues) {
//通过HintShardingAlgorithm接口路由
Collection<String> shardingResult = shardingAlgorithm.doSharding(availableTargetNames, shardingValues.iterator().next());
Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
result.addAll(shardingResult);
return result;
}
  • ParsingSQLRouter:需要解析的SQL路由器

    解析

1
2
3
4
5
6
7
8
9
@Override
public SQLStatement parse(final String logicSQL, final int parametersSize) {
SQLParsingEngine parsingEngine = new SQLParsingEngine(databaseType, logicSQL, shardingRule);
SQLStatement result = parsingEngine.parse();
if (result instanceof InsertStatement) {
((InsertStatement) result).appendGenerateKeyToken(shardingRule, parametersSize);
}
return result;
}

SQLParsingEngine 是sql解析引擎,下一章讲到。经过sql解析引擎解析后得到SQLStatement,如果是InsertStatement会改写sql处理 GenerateKeyToken。关于sql改写后面会讲到。

InsertStatement

被sql解析引擎解析后,最重要的是返回了SQLStatement,拿insert举例就是InsertStatement

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//sql类型,比如DML
private final SQLType type;

private final Tables tables = new Tables();
//条件<列名,值>
private final Conditions conditions = new Conditions();
//所有token,比如TableToken(t_order)、ItemToken(order_id)、GeneratedKeyToken(57)
private final List<SQLToken> sqlTokens = new LinkedList<>();

private int parametersIndex;
//所有列
private final Collection<Column> columns = new LinkedList<>();
//批量条件<列名,值>
private final List<Conditions> multipleConditions = new LinkedList<>();
//最后一列结束位置
private int columnsListLastPosition;
//自增列是第几个,-1表示没有
private int generateKeyColumnIndex = -1;
//VALUES或VALUE后面值开始的位置
private int afterValuesPosition;
//VALUES或VALUE后面值结束位置
private int valuesListLastPosition;

private GeneratedKey generatedKey;

路由

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Override
public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
//SQL路由结果
SQLRouteResult result = new SQLRouteResult(sqlStatement);
if (sqlStatement instanceof InsertStatement && null != ((InsertStatement) sqlStatement).getGeneratedKey()) {
//处理插入sql的主键
processGeneratedKey(parameters, (InsertStatement) sqlStatement, result);
}
//路由
RoutingResult routingResult = route(parameters, sqlStatement);
//SQL重写引擎
SQLRewriteEngine rewriteEngine = new SQLRewriteEngine(shardingRule, logicSQL, databaseType, sqlStatement);
boolean isSingleRouting = routingResult.isSingleRouting();
if (sqlStatement instanceof SelectStatement && null != ((SelectStatement) sqlStatement).getLimit()) {
// 处理分页
processLimit(parameters, (SelectStatement) sqlStatement, isSingleRouting);
}
//重写
SQLBuilder sqlBuilder = rewriteEngine.rewrite(!isSingleRouting);
// 笛卡尔积结果生成 ExecutionUnit
if (routingResult instanceof CartesianRoutingResult) {
for (CartesianDataSource cartesianDataSource : ((CartesianRoutingResult) routingResult).getRoutingDataSources()) {

for (CartesianTableReference cartesianTableReference : cartesianDataSource.getRoutingTableReferences()) {
result.getExecutionUnits().add(new SQLExecutionUnit(cartesianDataSource.getDataSource(), rewriteEngine.generateSQL(cartesianTableReference, sqlBuilder)));
}
}
} else {
//将每个逻辑表名转化成真实表名
for (TableUnit each : routingResult.getTableUnits().getTableUnits()) {
result.getExecutionUnits().add(new SQLExecutionUnit(each.getDataSourceName(), rewriteEngine.generateSQL(each, sqlBuilder)));
}
}
// 打印 SQL
if (showSQL) {
SQLLogger.logSQL(logicSQL, sqlStatement, result.getExecutionUnits(), parameters);
}
return result;
}


private RoutingResult route(final List<Object> parameters, final SQLStatement sqlStatement) {
//获取所有sql语句中的逻辑表名
Collection<String> tableNames = sqlStatement.getTables().getTableNames();
RoutingEngine routingEngine;
if (tableNames.isEmpty()) {
routingEngine = new DatabaseAllRoutingEngine(shardingRule.getDataSourceMap());
//1.只有一个逻辑表名2.是否表名全为BindingTable3.所有逻辑表名都在默认数据库
} else if (1 == tableNames.size() || shardingRule.isAllBindingTables(tableNames) || shardingRule.isAllInDefaultDataSource(tableNames)) {
//使用第一个表名做路由。
//如:SELECT * FROM t_order o join t_order_item i ON o.order_id = i.order_id 则使用t_order
routingEngine = new SimpleRoutingEngine(shardingRule, parameters, tableNames.iterator().next(), sqlStatement);
} else {
// TODO config for cartesian set
//可配置是否执行笛卡尔积
routingEngine = new ComplexRoutingEngine(shardingRule, parameters, tableNames, sqlStatement);
}
//路由
return routingEngine.route();
}

路由引擎

RoutingEngine

1
2
3
4
5
6
7
8
9
public interface RoutingEngine {

/**
* Route.
*
* @return routing result
*/
RoutingResult route();
}

SimpleRoutingEngine

1
2
3
4
5
6
7
private final ShardingRule shardingRule;
//参数
private final List<Object> parameters;
//逻辑表名
private final String logicTableName;

private final SQLStatement sqlStatement;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
public RoutingResult route() {
//获取规则
TableRule tableRule = shardingRule.getTableRule(logicTableName);
//Hint则使用分片管理器获取,否则使用数据库分片算法。获取逻辑表、列名、条件值
List<ShardingValue> databaseShardingValues = getDatabaseShardingValues(tableRule);
//表分片算法。获取逻辑表、列名、条件值
List<ShardingValue> tableShardingValues = getTableShardingValues(tableRule);
//根据逻辑表、列名、条件值路由出真实数据库节点
Collection<String> routedDataSources = routeDataSources(tableRule, databaseShardingValues);
Collection<DataNode> routedDataNodes = new LinkedList<>();
for (String each : routedDataSources) {
routedDataNodes.addAll(routeTables(tableRule, each, tableShardingValues));
}
//生成RoutingResult。包含TableUnit集合。TableUnit包括真实数据源、逻辑表名、真实表名
return generateRoutingResult(routedDataNodes);
}

private Collection<String> routeDataSources(final TableRule tableRule, final List<ShardingValue> databaseShardingValues) {
//获取所有真实数据库名
Collection<String> availableTargetDatabases = tableRule.getActualDatasourceNames();
if (databaseShardingValues.isEmpty()) {
return availableTargetDatabases;
}
//ShardingStrategy#doSharding
Collection<String> result = shardingRule.getDatabaseShardingStrategy(tableRule).doSharding(availableTargetDatabases, databaseShardingValues);
Preconditions.checkState(!result.isEmpty(), "no database route info");
return result;
}

分库规则

假如user_id模2分库,demo_ds_0、demo_ds_1 shardingRuleConfig.setDefaultDatabaseShardingStrategyConfig(new InlineShardingStrategyConfiguration("user_id", "demo_ds_${user_id % 2}"));

如SELECT o.* FROM t_order o WHERE o.user_id= 10

则返回ListShardingValue ,t_order(逻辑表)、user_id(列名)、10(值)

1
2
3
4
5
6
private List<ShardingValue> getDatabaseShardingValues(final TableRule tableRule) {
//获取分库规则
ShardingStrategy strategy = shardingRule.getDatabaseShardingStrategy(tableRule);
//分库的列名
return HintManagerHolder.isUseShardingHint() ? getDatabaseShardingValuesFromHint(strategy.getShardingColumns()) : getShardingValues(strategy.getShardingColumns());
}
1
2
3
4
5
6
7
8
9
10
11
private List<ShardingValue> getShardingValues(final Collection<String> shardingColumns) {
List<ShardingValue> result = new ArrayList<>(shardingColumns.size());
//查看sql的条件里面是否能找到分库规则使用的列名
for (String each : shardingColumns) {
Optional<Condition> condition = sqlStatement.getConditions().find(new Column(each, logicTableName));
if (condition.isPresent()) {
result.add(condition.get().getShardingValue(parameters));
}
}
return result;
}

Condition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public ShardingValue getShardingValue(final List<Object> parameters) {
List<Comparable<?>> conditionValues = getValues(parameters);
switch (operator) {
//'='或者"IN"返回 ListShardingValue
case EQUAL:
case IN:
return new ListShardingValue<>(column.getTableName(), column.getName(), conditionValues);
//"BETWEEN"返回RangeShardingValue
case BETWEEN:
return new RangeShardingValue<>(column.getTableName(), column.getName(), Range.range(conditionValues.get(0), BoundType.CLOSED, conditionValues.get(1), BoundType.CLOSED));
default:
throw new UnsupportedOperationException(operator.getExpression());
}
}

分表规则

假如按order_id模2分表 shardingRuleConfig.setDefaultTableShardingStrategyConfig(new StandardShardingStrategyConfiguration("order_id", ModuloShardingTableAlgorithm.class.getName()));

1
2
3
4
private List<ShardingValue> getTableShardingValues(final TableRule tableRule) {
ShardingStrategy strategy = shardingRule.getTableShardingStrategy(tableRule);
return HintManagerHolder.isUseShardingHint() ? getTableShardingValuesFromHint(strategy.getShardingColumns()) : getShardingValues(strategy.getShardingColumns());
}

分片策略实现

1
2
3
4
5
public interface ShardingStrategy {
//availableTargetNames真实数据库名集合
//SQL 的逻辑表、列名、条件(分片值)集合
Collection<String> doSharding(Collection<String> availableTargetNames, Collection<ShardingValue> shardingValues);
}

StandardShardingStrategy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public Collection<String> doSharding(final Collection<String> availableTargetNames, final Collection<ShardingValue> shardingValues) {
//获取第一个分片值
ShardingValue shardingValue = shardingValues.iterator().next();
Collection<String> shardingResult = shardingValue instanceof ListShardingValue
? doSharding(availableTargetNames, (ListShardingValue) shardingValue) : doSharding(availableTargetNames, (RangeShardingValue) shardingValue);
Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
result.addAll(shardingResult);
return result;
}


private Collection<String> doSharding(final Collection<String> availableTargetNames, final ListShardingValue<?> shardingValue) {
Collection<String> result = new LinkedList<>();
//ListShardingValue-》List<PreciseShardingValue>
//每个值都划出来,使用preciseShardingAlgorithm策略,获取真实数据库名
for (PreciseShardingValue<?> each : transferToPreciseShardingValues(shardingValue)) {
result.add(preciseShardingAlgorithm.doSharding(availableTargetNames, each));
}
return result;
}


private Collection<String> doSharding(final Collection<String> availableTargetNames, final RangeShardingValue<?> shardingValue) {
if (!rangeShardingAlgorithm.isPresent()) {
throw new UnsupportedOperationException("Cannot find range sharding strategy in sharding rule.");
}
//rangeShardingAlgorithm策略获取真实数据库名
return rangeShardingAlgorithm.get().doSharding(availableTargetNames, shardingValue);
}
1
2
3
4
5
6
7
8
9
10
11
12
public final class ModuloShardingAlgorithm implements PreciseShardingAlgorithm<Integer> {

@Override
public String doSharding(final Collection<String> availableTargetNames, final PreciseShardingValue<Integer> shardingValue) {
for (String each : availableTargetNames) {
if (each.endsWith(shardingValue.getValue() % 2 + "")) {
return each;
}
}
throw new IllegalArgumentException();
}
}

SQL重写

SQLRewriteEngine

1
2
3
4
5
6
7
8
9
private final ShardingRule shardingRule;
//原始sql
private final String originalSQL;
//数据库类型
private final DatabaseType databaseType;
//所有的SQL词根
private final List<SQLToken> sqlTokens = new LinkedList<>();

private final SQLStatement sqlStatement;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public SQLBuilder rewrite(final boolean isRewriteLimit) {
SQLBuilder result = new SQLBuilder();
if (sqlTokens.isEmpty()) {
result.appendLiterals(originalSQL);
return result;
}
int count = 0;
//sqlTokens按beginPos次序排序
sortByBeginPosition();
for (SQLToken each : sqlTokens) {
if (0 == count) {
//第一个词素加入
result.appendLiterals(originalSQL.substring(0, each.getBeginPosition()));
}
TableToken(t_order)、ItemToken(order_id)、GeneratedKeyToken(57)
//表名
if (each instanceof TableToken) {
appendTableToken(result, (TableToken) each, count, sqlTokens);
//ItemToken 如列名
} else if (each instanceof ItemsToken) {
appendItemsToken(result, (ItemsToken) each, count, sqlTokens);
//limit 后面的rowCount、offset
} else if (each instanceof RowCountToken) {
appendLimitRowCount(result, (RowCountToken) each, count, sqlTokens, isRewriteLimit);
//limit 后面的offset
} else if (each instanceof OffsetToken) {
appendLimitOffsetToken(result, (OffsetToken) each, count, sqlTokens, isRewriteLimit);
//转换order
} else if (each instanceof OrderByToken) {
appendOrderByToken(result, count, sqlTokens);
}
count++;
}
return result;
}
1
2
3
public String generateSQL(final TableUnit tableUnit, final SQLBuilder sqlBuilder) {
return sqlBuilder.toSQL(getTableTokens(tableUnit));
}

根据TableUnit转换逻辑表名和真实表名

1
2
3
4
5
6
7
8
9
private Map<String, String> getTableTokens(final TableUnit tableUnit) {
Map<String, String> tableTokens = new HashMap<>();
tableTokens.put(tableUnit.getLogicTableName(), tableUnit.getActualTableName());
Optional<BindingTableRule> bindingTableRule = shardingRule.findBindingTableRule(tableUnit.getLogicTableName());
if (bindingTableRule.isPresent()) {
tableTokens.putAll(getBindingTableTokens(tableUnit, bindingTableRule.get()));
}
return tableTokens;
}

处理自增键

ParsingSQLRouter

1
2
3
4
5
6
7
8
9
10
11
12
private void processGeneratedKey(final List<Object> parameters, final InsertStatement insertStatement, final SQLRouteResult sqlRouteResult) {
GeneratedKey generatedKey = insertStatement.getGeneratedKey();
if (parameters.isEmpty()) {
sqlRouteResult.getGeneratedKeys().add(generatedKey.getValue());
} else if (parameters.size() == generatedKey.getIndex()) {
Number key = shardingRule.generateKey(insertStatement.getTables().getSingleTableName());
parameters.add(key);
setGeneratedKeys(sqlRouteResult, key);
} else if (-1 != generatedKey.getIndex()) {
setGeneratedKeys(sqlRouteResult, (Number) parameters.get(generatedKey.getIndex()));
}
}

ShardingRule

1
2
3
4
5
6
7
8
9
10
public Number generateKey(final String logicTableName) {
Optional<TableRule> tableRule = tryFindTableRule(logicTableName);
if (!tableRule.isPresent()) {
throw new ShardingJdbcException("Cannot find strategy for generate keys.");
}
if (null != tableRule.get().getKeyGenerator()) {
return tableRule.get().getKeyGenerator().generateKey();
}
return defaultKeyGenerator.generateKey();
}
1
2
3
4
5
6
7
8
9
public interface KeyGenerator {

/**
* Generate key.
*
* @return generated key
*/
Number generateKey();
}

基本概念

先普及几个词汇

Lexer: 词法分析器。Lexical analyzer,简称Lexer

Literals :字面量

Symbol: 词法符号

Dictionary: 字典

tokenize: 标记化

lexeme: 词素。词素是组成编程语言的最小的有意义的单元实体。生成的词素最后会组成一个token列表,每一个token都包含一个lexeme

Token: 标记。一个字符串,是构成源代码的最小单位。从输入字符流中生成标记的过程叫作标记化(tokenization),在这个过程中,词法分析器还会对标记进行分类。

LexerEngine

词法分析引擎

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
@RequiredArgsConstructor
public final class LexerEngine {

private final Lexer lexer;

/**
* Get input string.
*
* @return inputted string
*/
public String getInput() {
return lexer.getInput();
}

/**
* Analyse next token.
*/
public void nextToken() {
lexer.nextToken();
}

/**
* Get current token.
*
* @return current token
*/
public Token getCurrentToken() {
return lexer.getCurrentToken();
}

/**
* 跳过括号内所有令牌,并返回括号内内容
*
* @param sqlStatement SQL statement
* @return skipped string
*/
public String skipParentheses(final SQLStatement sqlStatement) {
StringBuilder result = new StringBuilder("");
int count = 0;
//'('开头
if (Symbol.LEFT_PAREN == lexer.getCurrentToken().getType()) {
//括号后的下标
final int beginPosition = lexer.getCurrentToken().getEndPosition();
//'('
result.append(Symbol.LEFT_PAREN.getLiterals());
//接着读取
lexer.nextToken();
while (true) {
//如果是'?'
if (equalAny(Symbol.QUESTION)) {
//参数下标加1
sqlStatement.increaseParametersIndex();
}
//结束或者')'跳出
if (Assist.END == lexer.getCurrentToken().getType() || (Symbol.RIGHT_PAREN == lexer.getCurrentToken().getType() && 0 == count)) {
break;
}
//还是'('记下层数
if (Symbol.LEFT_PAREN == lexer.getCurrentToken().getType()) {
count++;
//')'
} else if (Symbol.RIGHT_PAREN == lexer.getCurrentToken().getType()) {
count--;
}
//接着读取
lexer.nextToken();
}
//括号里的都加进去
result.append(lexer.getInput().substring(beginPosition, lexer.getCurrentToken().getEndPosition()));
//跳过')'
lexer.nextToken();
}
return result.toString();
}

/**
* Assert current token type should equals input token and go to next token type.
*
* @param tokenType token type
*/
public void accept(final TokenType tokenType) {
if (lexer.getCurrentToken().getType() != tokenType) {
throw new SQLParsingException(lexer, tokenType);
}
lexer.nextToken();
}

/**
* Adjust current token equals one of input tokens or not.
*
* @param tokenTypes to be adjusted token types
* @return current token equals one of input tokens or not
*/
public boolean equalAny(final TokenType... tokenTypes) {
for (TokenType each : tokenTypes) {
if (each == lexer.getCurrentToken().getType()) {
return true;
}
}
return false;
}

/**
* Skip current token if equals one of input tokens.
*
* @param tokenTypes to be adjusted token types
* @return skipped current token or not
*/
public boolean skipIfEqual(final TokenType... tokenTypes) {
if (equalAny(tokenTypes)) {
lexer.nextToken();
return true;
}
return false;
}

/**
* Skip all input tokens.
*
* @param tokenTypes to be skipped token types
*/
public void skipAll(final TokenType... tokenTypes) {
Set<TokenType> tokenTypeSet = Sets.newHashSet(tokenTypes);
while (tokenTypeSet.contains(lexer.getCurrentToken().getType())) {
lexer.nextToken();
}
}

/**
* Skip until one of input tokens.
*
* @param tokenTypes to be skipped untiled token types
*/
public void skipUntil(final TokenType... tokenTypes) {
Set<TokenType> tokenTypeSet = Sets.newHashSet(tokenTypes);
tokenTypeSet.add(Assist.END);
while (!tokenTypeSet.contains(lexer.getCurrentToken().getType())) {
lexer.nextToken();
}
}

/**
* Throw unsupported exception if current token equals one of input tokens.
*
* @param tokenTypes to be adjusted token types
*/
public void unsupportedIfEqual(final TokenType... tokenTypes) {
if (equalAny(tokenTypes)) {
throw new SQLParsingUnsupportedException(lexer.getCurrentToken().getType());
}
}

/**
* Throw unsupported exception if current token not equals one of input tokens.
*
* @param tokenTypes to be adjusted token types
*/
public void unsupportedIfNotSkip(final TokenType... tokenTypes) {
if (!skipIfEqual(tokenTypes)) {
throw new SQLParsingUnsupportedException(lexer.getCurrentToken().getType());
}
}

/**
* Get database type.
*
* @return database type
*/
public DatabaseType getDatabaseType() {
if (lexer instanceof MySQLLexer) {
return DatabaseType.MySQL;
}
if (lexer instanceof OracleLexer) {
return DatabaseType.Oracle;
}
if (lexer instanceof SQLServerLexer) {
return DatabaseType.SQLServer;
}
if (lexer instanceof PostgreSQLLexer) {
return DatabaseType.PostgreSQL;
}
throw new UnsupportedOperationException(String.format("Cannot support lexer class: %s", lexer.getClass().getCanonicalName()));
}
}

Lexer

image

1
2
3
4
5
6
7
8
9
10
11
12
13
@RequiredArgsConstructor
public class Lexer {
//sql
@Getter
private final String input;
//
private final Dictionary dictionary;
//偏移量,从0开始
private int offset;

@Getter
private Token currentToken;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* Analyse next token.
*/
public final void nextToken() {
//跳过空白字符、注释
skipIgnoredToken();
//
//判断变量,MySQLLexer是'@'开头
if (isVariableBegin()) {
//扫描变量
currentToken = new Tokenizer(input, dictionary, offset).scanVariable();
//mysql没有Nchar。sqlserver有。不详细展开,就是扫描nchar的
} else if (isNCharBegin()) {
currentToken = new Tokenizer(input, dictionary, ++offset).scanChars();
//字母、'`'、'_'、'$' 开头的
} else if (isIdentifierBegin()) {
currentToken = new Tokenizer(input, dictionary, offset).scanIdentifier();
//'0x'开头,十六进制
} else if (isHexDecimalBegin()) {
//扫描十六进制
currentToken = new Tokenizer(input, dictionary, offset).scanHexDecimal();
//数字开头、非标识符+'.'+数字、"-."、'-'+数字
} else if (isNumberBegin()) {
//扫描数字
currentToken = new Tokenizer(input, dictionary, offset).scanNumber();
//符号'(' 、')' 、'[' 、 ']'、'{'、'}'、 '+' 、 '-' 、 '*' 、 '/' 、'%' 、 '^' 、'='、 '>' 、'<' 、 '~' 、'!'、 '?'、 '&' 、'|' 、'.'、':' 、 '#'、 ',' 、 ';'
} else if (isSymbolBegin()) {
//扫描符号
currentToken = new Tokenizer(input, dictionary, offset).scanSymbol();
// '\'' 或'\"'开头的字符
} else if (isCharsBegin()) {
//扫描单引号'或双引号"括起来的东西
currentToken = new Tokenizer(input, dictionary, offset).scanChars();
//读完了
} else if (isEnd()) {
currentToken = new Token(Assist.END, "", offset);
} else {
throw new SQLParsingException(this, Assist.ERROR);
}
offset = currentToken.getEndPosition();
}

标识符开头:字母、’`’、’_’、’$’ 开头的

skipIgnoredToken

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void skipIgnoredToken() {
//跳过空白符
offset = new Tokenizer(input, dictionary, offset).skipWhitespace();
//MySQLLexer是 '/' == getCurrentChar(0) && '*' == getCurrentChar(1) && '!' == getCurrentChar(2)
//注释开头
while (isHintBegin()) {
//算到注释结束
offset = new Tokenizer(input, dictionary, offset).skipHint();
//跳过空白符
offset = new Tokenizer(input, dictionary, offset).skipWhitespace();
}
//跳过单行注释、多行注释
// '//'或'--'或'/*'开头
while (isCommentBegin()) {
offset = new Tokenizer(input, dictionary, offset).skipComment();
//跳过空白符
offset = new Tokenizer(input, dictionary, offset).skipWhitespace();
}
}

skipHint

处理开头*/结束的东西

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public int skipHint() {
// HINT_BEGIN_SYMBOL_LENGTH = 3
return untilCommentAndHintTerminateSign(HINT_BEGIN_SYMBOL_LENGTH);
}

private int untilCommentAndHintTerminateSign(final int beginSymbolLength) {
int length = beginSymbolLength;
//非 */
while (!isMultipleLineCommentEnd(charAt(offset + length), charAt(offset + length + 1))) {
//如果是EOI = '0x1A'
if (CharType.isEndOfInput(charAt(offset + length))) {
throw new UnterminatedCharException("*/");
}
length++;
}
//offset加中间字符加*/的长度,COMMENT_AND_HINT_END_SYMBOL_LENGTH = 2
return offset + length + COMMENT_AND_HINT_END_SYMBOL_LENGTH;
}

private boolean isMultipleLineCommentEnd(final char ch, final char next) {
return '*' == ch && '/' == next;
}

skipComment

处理注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public int skipComment() {
//当前字符
char current = charAt(offset);
//下一个
char next = charAt(offset + 1);
// 单行注释 '//' 或者'--'
if (isSingleLineCommentBegin(current, next)) {
//跳过这行
return skipSingleLineComment(COMMENT_BEGIN_SYMBOL_LENGTH);
//一个'#'
} else if ('#' == current) {
//跳过这行
return skipSingleLineComment(MYSQL_SPECIAL_COMMENT_BEGIN_SYMBOL_LENGTH);
//跳过多行注释
} else if (isMultipleLineCommentBegin(current, next)) {
return skipMultiLineComment();
}
return offset;
}


private boolean isSingleLineCommentBegin(final char ch, final char next) {
return '/' == ch && '/' == next || '-' == ch && '-' == next;
}


private int skipSingleLineComment(final int commentSymbolLength) {
int length = commentSymbolLength;
//非 EOI = '0x1A'或'\n'换行
while (!CharType.isEndOfInput(charAt(offset + length)) && '\n' != charAt(offset + length)) {
length++;
}
return offset + length + 1;
}

scanVariable

处理变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public Token scanVariable() {
int length = 1;
// 第二个也是'@' 就是局部变量@@
if ('@' == charAt(offset + 1)) {
length++;
}
//变量的符号
while (isVariableChar(charAt(offset + length))) {
length++;
}
//变量
return new Token(Literals.VARIABLE, input.substring(offset, offset + length), offset + length);
}

private boolean isVariableChar(final char ch) {
//变量标识符或者'.'
return isIdentifierChar(ch) || '.' == ch;
}

private boolean isIdentifierChar(final char ch) {
//字母、数字、'_'、'$'、'#'
return CharType.isAlphabet(ch) || CharType.isDigital(ch) || '_' == ch || '$' == ch || '#' == ch;
}

scanIdentifier

处理标识符

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public Token scanIdentifier() {
// 第一个字符是'`'
if ('`' == charAt(offset)) {
//比如 "`a`" 就长度3
int length = getLengthUntilTerminatedChar('`');
return new Token(Literals.IDENTIFIER, input.substring(offset, offset + length), offset + length);
}
int length = 0;
//字母、数字、'_'、'$'、'#'
while (isIdentifierChar(charAt(offset + length))) {
length++;
}
//取出标识符
String literals = input.substring(offset, offset + length);
//Order 或者 Group
if (isAmbiguousIdentifier(literals)) {
return new Token(processAmbiguousIdentifier(offset + length, literals), literals, offset + length);
}
// 识别关键字,返回类型,不是关键字则使用IDENTIFIER类型
return new Token(dictionary.findTokenType(literals, Literals.IDENTIFIER), literals, offset + length);
}


private int getLengthUntilTerminatedChar(final char terminatedChar) {
int length = 1;
// 接下来不是 '`'或者连续两个'`'
while (terminatedChar != charAt(offset + length) || hasEscapeChar(terminatedChar, offset + length)) {
//一直匹配不到结束,写的sql有问题
if (offset + length >= input.length()) {
throw new UnterminatedCharException(terminatedChar);
}
//连续两个'`'的,往后再多走一位。走两位
if (hasEscapeChar(terminatedChar, offset + length)) {
length++;
}
length++;
}
return length + 1;
}


private boolean hasEscapeChar(final char charIdentifier, final int offset) {
return charIdentifier == charAt(offset) && charIdentifier == charAt(offset + 1);
}



private TokenType processAmbiguousIdentifier(final int offset, final String literals) {
int i = 0;
while (CharType.isWhitespace(charAt(offset + i))) {
i++;
}
if (DefaultKeyword.BY.name().equalsIgnoreCase(String.valueOf(new char[] {charAt(offset + i), charAt(offset + i + 1)}))) {
//查找关键字类型GROUP或者Order
return dictionary.findTokenType(literals);
}
return Literals.IDENTIFIER;
}

scanHexDecimal

处理十六进制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public Token scanHexDecimal() {
int length = HEX_BEGIN_SYMBOL_LENGTH;
//负数
if ('-' == charAt(offset + length)) {
length++;
}
while (isHex(charAt(offset + length))) {
length++;
}
return new Token(Literals.HEX, input.substring(offset, offset + length), offset + length);
}

private boolean isHex(final char ch) {
return ch >= 'A' && ch <= 'F' || ch >= 'a' && ch <= 'f' || CharType.isDigital(ch);
}

scanNumber

处理数字

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public Token scanNumber() {
int length = 0;
//负数
if ('-' == charAt(offset + length)) {
length++;
}
//数字长度
length += getDigitalLength(offset + length);
boolean isFloat = false;
//小数部分
if ('.' == charAt(offset + length)) {
isFloat = true;
length++;
length += getDigitalLength(offset + length);
}
//科学技术法 'e'或'E'
if (isScientificNotation(offset + length)) {
isFloat = true;
length++;
if ('+' == charAt(offset + length) || '-' == charAt(offset + length)) {
length++;
}
length += getDigitalLength(offset + length);
}
//'f'、'F'、'd'、'D'
if (isBinaryNumber(offset + length)) {
isFloat = true;
length++;
}
return new Token(isFloat ? Literals.FLOAT : Literals.INT, input.substring(offset, offset + length), offset + length);
}

scanSymbol

处理符号包起来的

其中Symbol后面讲述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public Token scanSymbol() {
int length = 0;
//跳过符号
while (CharType.isSymbol(charAt(offset + length))) {
length++;
}
String literals = input.substring(offset, offset + length);
Symbol symbol;
//通过字面量查找词法符号.
while (null == (symbol = Symbol.literalsOf(literals))) {
literals = input.substring(offset, offset + --length);
}
return new Token(symbol, literals, offset + length);
}

scanChars

处理单引号或双引号扩起来的

1
2
3
4
5
6
7
8
public Token scanChars() {
return scanChars(charAt(offset));
}
//如'avv'、"abb"
private Token scanChars(final char terminatedChar) {
int length = getLengthUntilTerminatedChar(terminatedChar);
return new Token(Literals.CHARS, input.substring(offset + 1, offset + length - 1), offset + length);
}

Token

Token有三个参数:

  • type(TokenType): INT, FLOAT, HEX, CHARS, IDENTIFIER, VARIABLE
  • literals(String): 字面量
  • endPosition(int): 字符结束的位置

Dictionary

字典

包含一个map,包含每个关键字,Keyword的实现类都是一些枚举常量

1
2
 //所有关键字
private final Map<String, Keyword> tokens = new HashMap<>(1024);

比如MySQLLexer 实现的时候就是使用MySQLKeyword.values()作为构造参数,构造后填充tokens

1
2
3
4
5
6
7
8
9
10
11
12
13
14

public Dictionary(final Keyword... dialectKeywords) {
fill(dialectKeywords);
}

private void fill(final Keyword... dialectKeywords) {
for (DefaultKeyword each : DefaultKeyword.values()) {
tokens.put(each.name(), each);
}
for (Keyword each : dialectKeywords) {
tokens.put(each.toString(), each);
}
}
}

image

下面这两个方法用来进行词法分析的时候根据字典识别出类型。

1
2
3
4
5
6
7
8
9
10
11
12
TokenType findTokenType(final String literals, final TokenType defaultTokenType) {
String key = null == literals ? null : literals.toUpperCase();
return tokens.containsKey(key) ? tokens.get(key) : defaultTokenType;
}

TokenType findTokenType(final String literals) {
String key = null == literals ? null : literals.toUpperCase();
if (tokens.containsKey(key)) {
return tokens.get(key);
}
throw new IllegalArgumentException();
}

Symbol

词法符号

包含一个map,包含每个符号,Symbol的都是一些枚举常量

1
private static Map<String, Symbol> symbols = new HashMap<>(128);

获取符号具体类型

1
2
3
public static Symbol literalsOf(final String literals) {
return symbols.get(literals);
}

调用流程

sequencediagram1

执行sql的时候,sql根据词法分析出sql的类型从而判断走主库还是从库

MasterSlaveDataSource

MasterSlaveDataSourceFactory.createDataSource(createDataSourceMap(), masterSlaveRuleConfig)我们可以找到读写分离实现最重要的在于MasterSlaveDataSource这个数据源的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Getter
public class MasterSlaveDataSource extends AbstractDataSourceAdapter {
//数据源映射
private Map<String, DataSource> dataSourceMap;

private MasterSlaveRule masterSlaveRule;

public MasterSlaveDataSource(final Map<String, DataSource> dataSourceMap, final MasterSlaveRuleConfiguration masterSlaveRuleConfig, final Map<String, Object> configMap) throws SQLException {
super(getAllDataSources(dataSourceMap, masterSlaveRuleConfig.getMasterDataSourceName(), masterSlaveRuleConfig.getSlaveDataSourceNames()));
this.dataSourceMap = dataSourceMap;
this.masterSlaveRule = new MasterSlaveRule(masterSlaveRuleConfig);
if (!configMap.isEmpty()) {
ConfigMapContext.getInstance().getMasterSlaveConfig().putAll(configMap);
}
}

private static Collection<DataSource> getAllDataSources(final Map<String, DataSource> dataSourceMap, final String masterDataSourceName, final Collection<String> slaveDataSourceNames) {
Collection<DataSource> result = new LinkedList<>();
result.add(dataSourceMap.get(masterDataSourceName));
for (String each : slaveDataSourceNames) {
result.add(dataSourceMap.get(each));
}
return result;
}

/**
* Get map of all actual data source name and all actual data sources.
*
* @return map of all actual data source name and all actual data sources
*/
public Map<String, DataSource> getAllDataSources() {
Map<String, DataSource> result = new HashMap<>(masterSlaveRule.getSlaveDataSourceNames().size() + 1, 1);
result.put(masterSlaveRule.getMasterDataSourceName(), dataSourceMap.get(masterSlaveRule.getMasterDataSourceName()));
for (String each : masterSlaveRule.getSlaveDataSourceNames()) {
result.put(each, dataSourceMap.get(each));
}
return result;
}

/**
* Renew master-slave data source.
*
* @param dataSourceMap data source map
* @param masterSlaveRuleConfig new master-slave rule configuration
*/
public void renew(final Map<String, DataSource> dataSourceMap, final MasterSlaveRuleConfiguration masterSlaveRuleConfig) {
this.dataSourceMap = dataSourceMap;
this.masterSlaveRule = new MasterSlaveRule(masterSlaveRuleConfig);
}

@Override
public MasterSlaveConnection getConnection() {
return new MasterSlaveConnection(this);
}
}

MasterSlaveConnection

基本上没啥好看的直接都是跟后面两个相关

MasterSlaveStatement

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
@Getter
public final class MasterSlaveStatement extends AbstractStatementAdapter {

private final MasterSlaveConnection connection;

private final MasterSlaveRouter masterSlaveRouter;

private final int resultSetType;

private final int resultSetConcurrency;

private final int resultSetHoldability;

private final Collection<Statement> routedStatements = new LinkedList<>();

public MasterSlaveStatement(final MasterSlaveConnection connection) {
this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}

public MasterSlaveStatement(final MasterSlaveConnection connection, final int resultSetType, final int resultSetConcurrency) {
this(connection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}

public MasterSlaveStatement(final MasterSlaveConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
super(Statement.class);
this.connection = connection;
masterSlaveRouter = new MasterSlaveRouter(connection.getMasterSlaveDataSource().getMasterSlaveRule());
this.resultSetType = resultSetType;
this.resultSetConcurrency = resultSetConcurrency;
this.resultSetHoldability = resultSetHoldability;
}

@Override
public ResultSet executeQuery(final String sql) throws SQLException {
SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge();
Collection<String> dataSourceNames = masterSlaveRouter.route(sqlStatement.getType());
Preconditions.checkState(1 == dataSourceNames.size(), "Cannot support executeQuery for DML or DDL");
Statement statement = connection.getConnection(dataSourceNames.iterator().next()).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
return statement.executeQuery(sql);
}

@Override
public int executeUpdate(final String sql) throws SQLException {
int result = 0;
SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge();
for (String each : masterSlaveRouter.route(sqlStatement.getType())) {
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result += statement.executeUpdate(sql);
}
return result;
}

@Override
public int executeUpdate(final String sql, final int autoGeneratedKeys) throws SQLException {
int result = 0;
SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge();
for (String each : masterSlaveRouter.route(sqlStatement.getType())) {
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result += statement.executeUpdate(sql, autoGeneratedKeys);
}
return result;
}

@Override
public int executeUpdate(final String sql, final int[] columnIndexes) throws SQLException {
int result = 0;
SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge();
for (String each : masterSlaveRouter.route(sqlStatement.getType())) {
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result += statement.executeUpdate(sql, columnIndexes);
}
return result;
}

@Override
public int executeUpdate(final String sql, final String[] columnNames) throws SQLException {
int result = 0;
SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge();
for (String each : masterSlaveRouter.route(sqlStatement.getType())) {
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result += statement.executeUpdate(sql, columnNames);
}
return result;
}

@Override
public boolean execute(final String sql) throws SQLException {
boolean result = false;
//根据sql语句判断出是哪种SQLStatement
SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge();
for (String each : masterSlaveRouter.route(sqlStatement.getType())) {
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result = statement.execute(sql);
}
return result;
}

@Override
public boolean execute(final String sql, final int autoGeneratedKeys) throws SQLException {
boolean result = false;
SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge();
for (String each : masterSlaveRouter.route(sqlStatement.getType())) {
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result = statement.execute(sql, autoGeneratedKeys);
}
return result;
}

@Override
public boolean execute(final String sql, final int[] columnIndexes) throws SQLException {
boolean result = false;
SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge();
for (String each : masterSlaveRouter.route(sqlStatement.getType())) {
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result = statement.execute(sql, columnIndexes);
}
return result;
}

@Override
public boolean execute(final String sql, final String[] columnNames) throws SQLException {
boolean result = false;
SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge();
for (String each : masterSlaveRouter.route(sqlStatement.getType())) {
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result = statement.execute(sql, columnNames);
}
return result;
}

@Override
public ResultSet getGeneratedKeys() throws SQLException {
Preconditions.checkState(1 == routedStatements.size());
return routedStatements.iterator().next().getGeneratedKeys();
}

@Override
public ResultSet getResultSet() throws SQLException {
Preconditions.checkState(1 == routedStatements.size());
return routedStatements.iterator().next().getResultSet();
}
}

MasterSlaveRouter(4月份新加的)

根据sql类型判断主从

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@RequiredArgsConstructor
public final class MasterSlaveRouter {

private final MasterSlaveRule masterSlaveRule;

/**
* Route Master slave.
*
* @param sqlType SQL type
* @return data source name
*/
// TODO for multiple masters may return more than one data source
public Collection<String> route(final SQLType sqlType) {
if (isMasterRoute(sqlType)) {
MasterVisitedManager.setMasterVisited();
return Collections.singletonList(masterSlaveRule.getMasterDataSourceName());
} else {
return Collections.singletonList(masterSlaveRule.getLoadBalanceAlgorithm().getDataSource(
masterSlaveRule.getName(), masterSlaveRule.getMasterDataSourceName(), new ArrayList<>(masterSlaveRule.getSlaveDataSourceNames())));
}
}

private boolean isMasterRoute(final SQLType sqlType) {
//1.非SELECT语句2.判断过放到ThreadLocal里3.通过Hint管理器设置强制masterRouteOnly
return SQLType.DQL != sqlType || MasterVisitedManager.isMasterVisited() || HintManagerHolder.isMasterRouteOnly();
}
}

SQLJudgeEngine

通过词法分析分析出sql的类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@RequiredArgsConstructor
public final class SQLJudgeEngine {

private final String sql;

/**
* judge SQL Type only.
*
* @return SQL statement
*/
public SQLStatement judge() {
LexerEngine lexerEngine = LexerEngineFactory.newInstance(DatabaseType.MySQL, sql);
//获取token
lexerEngine.nextToken();
while (true) {
//获取token的type
TokenType tokenType = lexerEngine.getCurrentToken().getType();
if (tokenType instanceof Keyword) {
//select
if (isDQL(tokenType)) {
//SelectStatement
return getDQLStatement();
}
//INSERT、UPDATE、DELETE
if (isDML(tokenType)) {
//InsertStatement 或者DMLStatement
return getDMLStatement(tokenType);
}
//CREATE、ALTER、DROP、TRUNCATE
if (isDDL(tokenType)) {
//DDLStatement
return getDDLStatement();
}
//SET、COMMIT、ROLLBACK、SAVEPOINT、BEGIN
if (isTCL(tokenType)) {
//TCLStatement
return getTCLStatement();
}
//USE、DESC、DESCRIBE、SHOW
if (isDAL(tokenType)) {
//UseStatement或DescribeStatement或ShowxxxStatement
return getDALStatement(tokenType, lexerEngine);
}
}
if (tokenType instanceof Assist && Assist.END == tokenType) {
throw new SQLParsingException("Unsupported SQL statement: [%s]", sql);
}
lexerEngine.nextToken();
}
}

private boolean isDAL(final TokenType tokenType) {
return DefaultKeyword.USE == tokenType || DefaultKeyword.DESC == tokenType || MySQLKeyword.DESCRIBE == tokenType || MySQLKeyword.SHOW == tokenType;
}


private SQLStatement getDALStatement(final TokenType tokenType, final LexerEngine lexerEngine) {
if (DefaultKeyword.USE == tokenType) {
return new UseStatement();
}
if (DefaultKeyword.DESC == tokenType || MySQLKeyword.DESCRIBE == tokenType) {
return new DescribeStatement();
}
return getShowStatement(lexerEngine);
}

private SQLStatement getShowStatement(final LexerEngine lexerEngine) {
lexerEngine.nextToken();
if (MySQLKeyword.DATABASES == lexerEngine.getCurrentToken().getType()) {
return new ShowDatabasesStatement();
}
if (MySQLKeyword.TABLES == lexerEngine.getCurrentToken().getType()) {
return new ShowTablesStatement();
}
if (MySQLKeyword.COLUMNS == lexerEngine.getCurrentToken().getType()) {
return new ShowColumnsStatement();
}
return new ShowOtherStatement();
}
}

LexerEngine

LexerEngineFactory会根据数据库来选择适合的分词器

mysql对应MySQLLexer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class LexerEngineFactory {

/**
* Create lexical analysis engine instance.
*
* @param dbType database type
* @param sql SQL
* @return lexical analysis engine instance
*/
public static LexerEngine newInstance(final DatabaseType dbType, final String sql) {
switch (dbType) {
case H2:
case MySQL:
return new LexerEngine(new MySQLLexer(sql));
case Oracle:
return new LexerEngine(new OracleLexer(sql));
case SQLServer:
return new LexerEngine(new SQLServerLexer(sql));
case PostgreSQL:
return new LexerEngine(new PostgreSQLLexer(sql));
default:
throw new UnsupportedOperationException(String.format("Cannot support database [%s].", dbType));
}
}
}

关于词法分析,下一章单独分析

JDBC接口

1
2
3
4
5
6
7
public interface DataSource  extends CommonDataSource, Wrapper {
//建立和数据源的连接
Connection getConnection() throws SQLException;

Connection getConnection(String username, String password)
throws SQLException;
}

image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
public interface Connection  extends Wrapper, AutoCloseable {
//创建一个Statement对象,可以用来发送sql语句
Statement createStatement() throws SQLException;

//创建一个PreparedStatement对象,发送参数化的sql语句。sql语句将会被预编译存储在对象
PreparedStatement prepareStatement(String sql)
throws SQLException;

//创建一个CallableStatement对象,发送调用存储过程语句
CallableStatement prepareCall(String sql) throws SQLException;

//将给定的SQL语句转换成系统的原生SQL语法
String nativeSQL(String sql) throws SQLException;

//设置自动提交参数
void setAutoCommit(boolean autoCommit) throws SQLException;

boolean getAutoCommit() throws SQLException;

//发送commit请求
void commit() throws SQLException;

//发送rollback请求
void rollback() throws SQLException;

//不等到连接自动释放,马上关闭连接
void close() throws SQLException;

boolean isClosed() throws SQLException;

//获取数据库元数据
DatabaseMetaData getMetaData() throws SQLException;

//设置只读
void setReadOnly(boolean readOnly) throws SQLException;

boolean isReadOnly() throws SQLException;

//设置catalog(数据库名)
void setCatalog(String catalog) throws SQLException;

String getCatalog() throws SQLException;

/**
* A constant indicating that transactions are not supported.
*/
int TRANSACTION_NONE = 0;

//读未提交级别
int TRANSACTION_READ_UNCOMMITTED = 1;

//读提交级别rc
int TRANSACTION_READ_COMMITTED = 2;

//可重复读rr
int TRANSACTION_REPEATABLE_READ = 4;

//串行化
int TRANSACTION_SERIALIZABLE = 8;

//设置连接的事务隔离级别
void setTransactionIsolation(int level) throws SQLException;

int getTransactionIsolation() throws SQLException;

//获取此 Connection 对象上的调用报告的第一个警告。如果有多个警告,则后续警告将被链接到第一个警告,可以通过对之前获得的警告调用 SQLWarning.getNextWarning 方法获取。
SQLWarning getWarnings() throws SQLException;

//清除为此 Connection 对象报告的所有警告。调用此方法后,在为此 Connection 对象报告新的警告前,getWarnings 方法将返回 null。
void clearWarnings() throws SQLException;


//--------------------------JDBC 2.0-----------------------------

//创建一个 Statement 对象,该对象将生成具有给定类型和并发性的 ResultSet 对象。此方法与上述 createStatement 方法相同,但它允许重写默认结果集类型和并发性。
//resultSetType - 结果集类型,它是 ResultSet.TYPE_FORWARD_ONLY、ResultSet.TYPE_SCROLL_INSENSITIVE 或 ResultSet.TYPE_SCROLL_SENSITIVE 之一
//resultSetConcurrency - 并发类型;它是 ResultSet.CONCUR_READ_ONLY 或 ResultSet.CONCUR_UPDATABLE 之一
Statement createStatement(int resultSetType, int resultSetConcurrency)
throws SQLException;

//创建一个 PreparedStatement 对象,该对象将生成具有给定类型和并发性的 ResultSet 对象。此方法与上述 prepareStatement 方法相同,但它允许重写默认结果集类型和并发性。
//resultSetType - 结果集类型,它是 ResultSet.TYPE_FORWARD_ONLY、ResultSet.TYPE_SCROLL_INSENSITIVE 或 ResultSet.TYPE_SCROLL_SENSITIVE 之一
//resultSetConcurrency - 并发类型,它是 ResultSet.CONCUR_READ_ONLY 或 ResultSet.CONCUR_UPDATABLE 之一
PreparedStatement prepareStatement(String sql, int resultSetType,
int resultSetConcurrency)
throws SQLException;

CallableStatement prepareCall(String sql, int resultSetType,
int resultSetConcurrency) throws SQLException;

//
java.util.Map<String,Class<?>> getTypeMap() throws SQLException;


void setTypeMap(java.util.Map<String,Class<?>> map) throws SQLException;

//--------------------------JDBC 3.0-----------------------------

// ResultSet.HOLD_CURSORS_OVER_COMMIT or ResultSet.CLOSE_CURSORS_AT_COMMIT
void setHoldability(int holdability) throws SQLException;

int getHoldability() throws SQLException;

//有时候一个事务可能是一组复杂的语句,因此可能想要回滚到事务中某个特殊的点。JDBC Savepoint帮我们在事务中创建检查点(checkpoint),这样就可以回滚到指定点。当事务提交或者整个事务回滚后,为事务产生的任何保存点都会自动释放并变为无效。把事务回滚到一个保存点,会使其他所有保存点自动释放并变为无效。
Savepoint setSavepoint() throws SQLException;

Savepoint setSavepoint(String name) throws SQLException;

//回滚到检查点
void rollback(Savepoint savepoint) throws SQLException;

//移除检查点
void releaseSavepoint(Savepoint savepoint) throws SQLException;

Statement createStatement(int resultSetType, int resultSetConcurrency,
int resultSetHoldability) throws SQLException;

PreparedStatement prepareStatement(String sql, int resultSetType,
int resultSetConcurrency, int resultSetHoldability)
throws SQLException;

CallableStatement prepareCall(String sql, int resultSetType,
int resultSetConcurrency,
int resultSetHoldability) throws SQLException;

//autoGeneratedKeys - 标志是否自增key应该返回Statement.RETURN_GENERATED_KEYS or Statement.NO_GENERATED_KEYS
PreparedStatement prepareStatement(String sql, int autoGeneratedKeys)
throws SQLException;

//columnIndexes - 列下标数组指示插入行哪些列应该返回
PreparedStatement prepareStatement(String sql, int columnIndexes[])
throws SQLException;

PreparedStatement prepareStatement(String sql, String columnNames[])
throws SQLException;

Clob createClob() throws SQLException;

Blob createBlob() throws SQLException;

NClob createNClob() throws SQLException;

SQLXML createSQLXML() throws SQLException;

//
boolean isValid(int timeout) throws SQLException;

//
void setClientInfo(String name, String value)
throws SQLClientInfoException;

void setClientInfo(Properties properties)
throws SQLClientInfoException;

String getClientInfo(String name)
throws SQLException;

Properties getClientInfo()
throws SQLException;

//
Array createArrayOf(String typeName, Object[] elements) throws
SQLException;

//
Struct createStruct(String typeName, Object[] attributes)
throws SQLException;

//--------------------------JDBC 4.1 -----------------------------

//schema
void setSchema(String schema) throws SQLException;

String getSchema() throws SQLException;

//
void abort(Executor executor) throws SQLException;

//
void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException;

int getNetworkTimeout() throws SQLException;
}

image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
public interface Statement extends Wrapper, AutoCloseable {

//执行sql语句返回一个ResultSet
ResultSet executeQuery(String sql) throws SQLException;

//执行一个增删改语句,或者没返回值的ddl语句。返回更新行数
int executeUpdate(String sql) throws SQLException;

//
void close() throws SQLException;

//返回列字符最大字节或二进制最大字节,0表示没有限制
int getMaxFieldSize() throws SQLException;

void setMaxFieldSize(int max) throws SQLException;

//返回最大行数
int getMaxRows() throws SQLException;

void setMaxRows(int max) throws SQLException;

//true以允许对转义处理。 否则为 false
void setEscapeProcessing(boolean enable) throws SQLException;

//返回语句的等待超时时间,0表示一直等待
int getQueryTimeout() throws SQLException;

void setQueryTimeout(int seconds) throws SQLException;

//
void cancel() throws SQLException;

SQLWarning getWarnings() throws SQLException;

void clearWarnings() throws SQLException;

//设置游标名
void setCursorName(String name) throws SQLException;

//----------------------- Multiple Results --------------------------

//返回true代表是一个ResultSet对象,false无返回或者返回的是更新行数
boolean execute(String sql) throws SQLException;

ResultSet getResultSet() throws SQLException;

int getUpdateCount() throws SQLException;

//无返回:((stmt.getMoreResults() == false) && (stmt.getUpdateCount() == -1))
boolean getMoreResults() throws SQLException;


//--------------------------JDBC 2.0-----------------------------


//ResultSet.FETCH_FORWARD、ResultSet.FETCH_REVERSE、ResultSet.FETCH_UNKNOWN
void setFetchDirection(int direction) throws SQLException;

int getFetchDirection() throws SQLException;

//游标拿的行数
void setFetchSize(int rows) throws SQLException;

int getFetchSize() throws SQLException;

//ResultSet.CONCUR_READ_ONLY、ResultSet.CONCUR_UPDATABLE
int getResultSetConcurrency() throws SQLException;

//ResultSet.TYPE_FORWARD_ONLY、ResultSet.TYPE_SCROLL_INSENSITIVE、ResultSet.TYPE_SCROLL_SENSITIVE
int getResultSetType() throws SQLException;

//添加sql语句,executeBatch将批量执行
void addBatch( String sql ) throws SQLException;

void clearBatch() throws SQLException;

//批量执行返回每一句成功行数
int[] executeBatch() throws SQLException;

Connection getConnection() throws SQLException;

//--------------------------JDBC 3.0-----------------------------

/**
* The constant indicating that the current <code>ResultSet</code> object
* should be closed when calling <code>getMoreResults</code>.
*
* @since 1.4
*/
int CLOSE_CURRENT_RESULT = 1;

/**
* The constant indicating that the current <code>ResultSet</code> object
* should not be closed when calling <code>getMoreResults</code>.
*
* @since 1.4
*/
int KEEP_CURRENT_RESULT = 2;

/**
* The constant indicating that all <code>ResultSet</code> objects that
* have previously been kept open should be closed when calling
* <code>getMoreResults</code>.
*
* @since 1.4
*/
int CLOSE_ALL_RESULTS = 3;

/**
* The constant indicating that a batch statement executed successfully
* but that no count of the number of rows it affected is available.
*
* @since 1.4
*/
int SUCCESS_NO_INFO = -2;

/**
* The constant indicating that an error occurred while executing a
* batch statement.
*
* @since 1.4
*/
int EXECUTE_FAILED = -3;

/**
* The constant indicating that generated keys should be made
* available for retrieval.
*
* @since 1.4
*/
int RETURN_GENERATED_KEYS = 1;

/**
* The constant indicating that generated keys should not be made
* available for retrieval.
*
* @since 1.4
*/
int NO_GENERATED_KEYS = 2;

//Statement.CLOSE_CURRENT_RESULT、Statement.KEEP_CURRENT_RESULT、Statement.CLOSE_ALL_RESULTS
boolean getMoreResults(int current) throws SQLException;

//获取自增key
ResultSet getGeneratedKeys() throws SQLException;

//update,Statement.RETURN_GENERATED_KEYS、Statement.NO_GENERATED_KEY
int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException;

//带参数
int executeUpdate(String sql, int columnIndexes[]) throws SQLException;

//
int executeUpdate(String sql, String columnNames[]) throws SQLException;

//Statement.RETURN_GENERATED_KEYS、Statement.NO_GENERATED_KEY
boolean execute(String sql, int autoGeneratedKeys) throws SQLException;

//带参数
boolean execute(String sql, int columnIndexes[]) throws SQLException;

//
boolean execute(String sql, String columnNames[]) throws SQLException;

//
int getResultSetHoldability() throws SQLException;

boolean isClosed() throws SQLException;

//
void setPoolable(boolean poolable)
throws SQLException;

boolean isPoolable()
throws SQLException;
}

image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public interface PreparedStatement extends Statement {
ResultSet executeQuery() throws SQLException;

int executeUpdate() throws SQLException;

//setxxx

void clearParameters() throws SQLException;

void setObject(int parameterIndex, Object x, int targetSqlType)
throws SQLException;

void setObject(int parameterIndex, Object x) throws SQLException;

boolean execute() throws SQLException;

//--------------------------JDBC 2.0-----------------------------
void addBatch() throws SQLException;

//setxxxx

ResultSetMetaData getMetaData() throws SQLException;

//------------------------- JDBC 3.0 -----------------------------------
void setURL(int parameterIndex, java.net.URL x) throws SQLException;

ParameterMetaData getParameterMetaData() throws SQLException;

//------------------------- JDBC 4.0 -----------------------------------

void setRowId(int parameterIndex, RowId x) throws SQLException;

//setxxx
//-----
}

image

image

1
2
3
4
5
6
7
8
//此接口描述访问哪些由代理代表的包装资源的标准机制,允许对资源代理的直接访问
public interface Wrapper {
//返回一个对象,该对象实现给定接口,以允许访问非标准方法或代理未公开的标准方法
<T> T unwrap(java.lang.Class<T> iface) throws java.sql.SQLException;

//如果调用此方法的对象实现接口参数,或者是实现接口参数的对象的直接或间接包装器,则返回 true
boolean isWrapperFor(java.lang.Class<?> iface) throws java.sql.SQLException;
}

unsupported包

sharding-jdbc-core/java.io.shardingjdbc.core.jdbc.unsupported下都是一些抽象类,对不支持的方法返回SQLFeatureNotSupportedException

image

adapter包

sharding-jdbc-core/java.io.shardingjdbc.core.jdbc.adapter里面都是一些适配类

WrapperAdapter

实现wrapper接口unwrap和isWrapperFor。另外提供给子类recordMethodInvocation、replayMethodsInvocation、throwSQLExceptionIfNecessary

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class WrapperAdapter implements Wrapper {

private final Collection<JdbcMethodInvocation> jdbcMethodInvocations = new ArrayList<>();

@SuppressWarnings("unchecked")
@Override
public final <T> T unwrap(final Class<T> iface) throws SQLException {
if (isWrapperFor(iface)) {
return (T) this;
}
throw new SQLException(String.format("[%s] cannot be unwrapped as [%s]", getClass().getName(), iface.getName()));
}

@Override
public final boolean isWrapperFor(final Class<?> iface) {
return iface.isInstance(this);
}

/**
* 记录方法调用.
*
* @param targetClass 目标类
* @param methodName 方法名称
* @param argumentTypes 参数类型
* @param arguments 参数
*/
public final void recordMethodInvocation(final Class<?> targetClass, final String methodName, final Class<?>[] argumentTypes, final Object[] arguments) {
try {
jdbcMethodInvocations.add(new JdbcMethodInvocation(targetClass.getMethod(methodName, argumentTypes), arguments));
} catch (final NoSuchMethodException ex) {
throw new ShardingJdbcException(ex);
}
}

/**
* 回放记录的方法调用.
*
* @param target 目标对象
*/
public final void replayMethodsInvocation(final Object target) {
for (JdbcMethodInvocation each : jdbcMethodInvocations) {
each.invoke(target);
}
}

protected void throwSQLExceptionIfNecessary(final Collection<SQLException> exceptions) throws SQLException {
if (exceptions.isEmpty()) {
return;
}
SQLException ex = new SQLException();
for (SQLException each : exceptions) {
ex.setNextException(each);
}
throw ex;
}
}

JdbcMethodInvocation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@RequiredArgsConstructor
public class JdbcMethodInvocation {

@Getter
private final Method method;

@Getter
private final Object[] arguments;

/**
* Invoke JDBC method.
*
* @param target target object
*/
public void invoke(final Object target) {
try {
method.invoke(target, arguments);
} catch (final IllegalAccessException | InvocationTargetException ex) {
throw new ShardingJdbcException("Invoke jdbc method exception", ex);
}
}
}

AbstractDataSourceAdapter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public abstract class AbstractDataSourceAdapter extends AbstractUnsupportedOperationDataSource {
@Getter
private final DatabaseType databaseType;

private PrintWriter logWriter = new PrintWriter(System.out);

public AbstractDataSourceAdapter(final Collection<DataSource> dataSources) throws SQLException {
databaseType = getDatabaseType(dataSources);
}

protected DatabaseType getDatabaseType(final Collection<DataSource> dataSources) throws SQLException {
DatabaseType result = null;
for (DataSource each : dataSources) {
DatabaseType databaseType = getDatabaseType(each);
Preconditions.checkState(null == result || result.equals(databaseType), String.format("Database type inconsistent with '%s' and '%s'", result, databaseType));
result = databaseType;
}
return result;
}

private DatabaseType getDatabaseType(final DataSource dataSource) throws SQLException {
if (dataSource instanceof AbstractDataSourceAdapter) {
return ((AbstractDataSourceAdapter) dataSource).databaseType;
}
try (Connection connection = dataSource.getConnection()) {
return DatabaseType.valueFrom(connection.getMetaData().getDatabaseProductName());
}
}

@Override
public final PrintWriter getLogWriter() {
return logWriter;
}

@Override
public final void setLogWriter(final PrintWriter out) {
this.logWriter = out;
}

@Override
public final Logger getParentLogger() {
return Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
}

@Override
public final Connection getConnection(final String username, final String password) throws SQLException {
return getConnection();
}
}

AbstractConnectionAdapter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOperationConnection {
//缓存数据源
private final Map<String, Connection> cachedConnections = new HashMap<>();

private boolean autoCommit = true;

private boolean readOnly = true;

private boolean closed;

private int transactionIsolation = TRANSACTION_READ_UNCOMMITTED;

/**
* Get database connection.
*
* @param dataSourceName data source name
* @return database connection
* @throws SQLException SQL exception
*/
public final Connection getConnection(final String dataSourceName) throws SQLException {
if (cachedConnections.containsKey(dataSourceName)) {
return cachedConnections.get(dataSourceName);
}
//从子类获取数据源
DataSource dataSource = getDataSourceMap().get(dataSourceName);
Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", dataSourceName);
//获取连接
Connection result = dataSource.getConnection();
cachedConnections.put(dataSourceName, result);
//回放记录的方法调用
replayMethodsInvocation(result);
return result;
}

protected abstract Map<String, DataSource> getDataSourceMap();

protected void removeCache(final Connection connection) {
cachedConnections.values().remove(connection);
}

@Override
public final boolean getAutoCommit() {
return autoCommit;
}

@Override
public final void setAutoCommit(final boolean autoCommit) throws SQLException {
this.autoCommit = autoCommit;
//记录方法调用
recordMethodInvocation(Connection.class, "setAutoCommit", new Class[] {boolean.class}, new Object[] {autoCommit});
for (Connection each : cachedConnections.values()) {
each.setAutoCommit(autoCommit);
}
}

@Override
public final void commit() throws SQLException {
Collection<SQLException> exceptions = new LinkedList<>();
for (Connection each : cachedConnections.values()) {
try {
each.commit();
} catch (final SQLException ex) {
exceptions.add(ex);
}
}
throwSQLExceptionIfNecessary(exceptions);
}

@Override
public final void rollback() throws SQLException {
Collection<SQLException> exceptions = new LinkedList<>();
for (Connection each : cachedConnections.values()) {
try {
each.rollback();
} catch (final SQLException ex) {
exceptions.add(ex);
}
}
throwSQLExceptionIfNecessary(exceptions);
}

@Override
public void close() throws SQLException {
closed = true;
HintManagerHolder.clear();
MasterVisitedManager.clear();
Collection<SQLException> exceptions = new LinkedList<>();
for (Connection each : cachedConnections.values()) {
try {
each.close();
} catch (final SQLException ex) {
exceptions.add(ex);
}
}
throwSQLExceptionIfNecessary(exceptions);
}

@Override
public final boolean isClosed() {
return closed;
}

@Override
public final boolean isReadOnly() {
return readOnly;
}

@Override
public final void setReadOnly(final boolean readOnly) throws SQLException {
this.readOnly = readOnly;
recordMethodInvocation(Connection.class, "setReadOnly", new Class[] {boolean.class}, new Object[] {readOnly});
for (Connection each : cachedConnections.values()) {
each.setReadOnly(readOnly);
}
}

@Override
public final int getTransactionIsolation() {
return transactionIsolation;
}

@Override
public final void setTransactionIsolation(final int level) throws SQLException {
transactionIsolation = level;
//记录
recordMethodInvocation(Connection.class, "setTransactionIsolation", new Class[] {int.class}, new Object[] {level});
for (Connection each : cachedConnections.values()) {
each.setTransactionIsolation(level);
}
}

// ------- Consist with MySQL driver implementation -------

@Override
public SQLWarning getWarnings() {
return null;
}

@Override
public void clearWarnings() {
}

@Override
public final int getHoldability() {
return ResultSet.CLOSE_CURSORS_AT_COMMIT;
}

@Override
public final void setHoldability(final int holdability) {
}
}

AbstractStatementAdapter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
@RequiredArgsConstructor
public abstract class AbstractStatementAdapter extends AbstractUnsupportedOperationStatement {

private final Class<? extends Statement> targetClass;

private boolean closed;

private boolean poolable;

private int fetchSize;

@Override
public final void close() throws SQLException {
closed = true;
Collection<SQLException> exceptions = new LinkedList<>();
for (Statement each : getRoutedStatements()) {
try {
each.close();
} catch (final SQLException ex) {
exceptions.add(ex);
}
}
getRoutedStatements().clear();
throwSQLExceptionIfNecessary(exceptions);
}

@Override
public final boolean isClosed() {
return closed;
}

@Override
public final boolean isPoolable() {
return poolable;
}

@Override
public final void setPoolable(final boolean poolable) throws SQLException {
this.poolable = poolable;
//记录调用
recordMethodInvocation(targetClass, "setPoolable", new Class[] {boolean.class}, new Object[] {poolable});
for (Statement each : getRoutedStatements()) {
each.setPoolable(poolable);
}
}

@Override
public final int getFetchSize() {
return fetchSize;
}

@Override
public final void setFetchSize(final int rows) throws SQLException {
this.fetchSize = rows;
recordMethodInvocation(targetClass, "setFetchSize", new Class[] {int.class}, new Object[] {rows});
for (Statement each : getRoutedStatements()) {
each.setFetchSize(rows);
}
}

@Override
public final void setEscapeProcessing(final boolean enable) throws SQLException {
recordMethodInvocation(targetClass, "setEscapeProcessing", new Class[] {boolean.class}, new Object[] {enable});
for (Statement each : getRoutedStatements()) {
each.setEscapeProcessing(enable);
}
}

@Override
public final void cancel() throws SQLException {
for (Statement each : getRoutedStatements()) {
each.cancel();
}
}

@Override
public final int getUpdateCount() throws SQLException {
long result = 0;
boolean hasResult = false;
for (Statement each : getRoutedStatements()) {
if (each.getUpdateCount() > -1) {
hasResult = true;
}
//累加起来
result += each.getUpdateCount();
}
if (result > Integer.MAX_VALUE) {
result = Integer.MAX_VALUE;
}
return hasResult ? Long.valueOf(result).intValue() : -1;
}

@Override
public SQLWarning getWarnings() {
return null;
}

@Override
public void clearWarnings() {
}

@Override
public final boolean getMoreResults() {
return false;
}

@Override
public final boolean getMoreResults(final int current) {
return false;
}

@Override
public final int getMaxFieldSize() throws SQLException {
return getRoutedStatements().isEmpty() ? 0 : getRoutedStatements().iterator().next().getMaxFieldSize();
}

@Override
public final void setMaxFieldSize(final int max) throws SQLException {
recordMethodInvocation(targetClass, "setMaxFieldSize", new Class[] {int.class}, new Object[] {max});
for (Statement each : getRoutedStatements()) {
each.setMaxFieldSize(max);
}
}

// TODO Confirm MaxRows for multiple databases is need special handle. eg: 10 statements maybe MaxRows / 10
@Override
public final int getMaxRows() throws SQLException {
return getRoutedStatements().isEmpty() ? -1 : getRoutedStatements().iterator().next().getMaxRows();
}

@Override
public final void setMaxRows(final int max) throws SQLException {
recordMethodInvocation(targetClass, "setMaxRows", new Class[] {int.class}, new Object[] {max});
for (Statement each : getRoutedStatements()) {
each.setMaxRows(max);
}
}

@Override
public final int getQueryTimeout() throws SQLException {
return getRoutedStatements().isEmpty() ? 0 : getRoutedStatements().iterator().next().getQueryTimeout();
}

@Override
public final void setQueryTimeout(final int seconds) throws SQLException {
recordMethodInvocation(targetClass, "setQueryTimeout", new Class[] {int.class}, new Object[] {seconds});
for (Statement each : getRoutedStatements()) {
each.setQueryTimeout(seconds);
}
}
//抽象,子类实现,路由语句对象集合
protected abstract Collection<? extends Statement> getRoutedStatements();
}

AbstractShardingPreparedStatementAdapter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public abstract class AbstractShardingPreparedStatementAdapter extends AbstractUnsupportedOperationPreparedStatement {
//记录的设置参数方法数组
private final List<SetParameterMethodInvocation> setParameterMethodInvocations = new LinkedList<>();

@Getter
private final List<Object> parameters = new ArrayList<>();

//记录占位符参数
private void setParameter(final int parameterIndex, final Object value) {
if (parameters.size() == parameterIndex - 1) {
parameters.add(value);
return;
}
for (int i = parameters.size(); i <= parameterIndex - 1; i++) {
parameters.add(null);
}
parameters.set(parameterIndex - 1, value);
}
//记录设置参数方法调用
private void recordSetParameter(final String methodName, final Class[] argumentTypes, final Object... arguments) {
try {
setParameterMethodInvocations.add(new SetParameterMethodInvocation(PreparedStatement.class.getMethod(methodName, argumentTypes), arguments, arguments[1]));
} catch (final NoSuchMethodException ex) {
throw new ShardingJdbcException(ex);
}
}
//回放记录的设置参数方法调用
protected void replaySetParameter(final PreparedStatement preparedStatement, final List<Object> parameters) {
setParameterMethodInvocations.clear();
addParameters(parameters);
for (SetParameterMethodInvocation each : setParameterMethodInvocations) {
each.invoke(preparedStatement);
}
}

private void addParameters(final List<Object> parameters) {
for (int i = 0; i < parameters.size(); i++) {
recordSetParameter("setObject", new Class[]{int.class, Object.class}, i + 1, parameters.get(i));
}
}

@Override
public final void clearParameters() {
parameters.clear();
setParameterMethodInvocations.clear();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public final class SetParameterMethodInvocation extends JdbcMethodInvocation {

@Getter
private final int index;

@Getter
private final Object value;

public SetParameterMethodInvocation(final Method method, final Object[] arguments, final Object value) {
super(method, arguments);
this.index = (int) arguments[0];
this.value = value;
}

/**
* Set argument.
*
* @param value argument value
*/
public void changeValueArgument(final Object value) {
getArguments()[1] = value;
}
}

AbstractResultSetAdapter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
@Slf4j
public abstract class AbstractResultSetAdapter extends AbstractUnsupportedOperationResultSet {
@Getter
private final List<ResultSet> resultSets;

@Getter
private final Statement statement;

private boolean closed;

public AbstractResultSetAdapter(final List<ResultSet> resultSets, final Statement statement) {
Preconditions.checkArgument(!resultSets.isEmpty());
this.resultSets = resultSets;
this.statement = statement;
}

@Override
public final ResultSetMetaData getMetaData() throws SQLException {
return resultSets.get(0).getMetaData();
}

@Override
public int findColumn(final String columnLabel) throws SQLException {
return resultSets.get(0).findColumn(columnLabel);
}

@Override
public final void close() throws SQLException {
closed = true;
Collection<SQLException> exceptions = new LinkedList<>();
for (ResultSet each : resultSets) {
try {
each.close();
} catch (final SQLException ex) {
exceptions.add(ex);
}
}
throwSQLExceptionIfNecessary(exceptions);
}

@Override
public final boolean isClosed() {
return closed;
}

@Override
public final void setFetchDirection(final int direction) throws SQLException {
Collection<SQLException> exceptions = new LinkedList<>();
for (ResultSet each : resultSets) {
try {
each.setFetchDirection(direction);
} catch (final SQLException ex) {
exceptions.add(ex);
}
}
throwSQLExceptionIfNecessary(exceptions);
}

@Override
public final int getFetchDirection() throws SQLException {
return resultSets.get(0).getFetchDirection();
}

@Override
public final void setFetchSize(final int rows) throws SQLException {
Collection<SQLException> exceptions = new LinkedList<>();
for (ResultSet each : resultSets) {
try {
each.setFetchSize(rows);
} catch (final SQLException ex) {
exceptions.add(ex);
}
}
throwSQLExceptionIfNecessary(exceptions);
}

@Override
public final int getFetchSize() throws SQLException {
return resultSets.get(0).getFetchSize();
}

@Override
public final int getType() throws SQLException {
return resultSets.get(0).getType();
}

@Override
public final int getConcurrency() throws SQLException {
return resultSets.get(0).getConcurrency();
}

@Override
public final SQLWarning getWarnings() throws SQLException {
return resultSets.get(0).getWarnings();
}

@Override
public final void clearWarnings() throws SQLException {
Collection<SQLException> exceptions = new LinkedList<>();
for (ResultSet each : resultSets) {
try {
each.clearWarnings();
} catch (final SQLException ex) {
exceptions.add(ex);
}
}
throwSQLExceptionIfNecessary(exceptions);
}
}

简介

Sharding-JDBC是一个开源的分布式数据库中间件解决方案。它在Java的JDBC层以对业务应用零侵入的方式额外提供数据分片,读写分离,柔性事务和分布式治理能力。并在其基础上提供封装了MySQL协议的服务端版本,用于完成对异构语言的支持。

(还没写完这个系列的时候就已经更名成sharding-sphere了)

官方文档:http://shardingjdbc.io/docs_cn/00-overview/

http://shardingsphere.io/document/cn/overview/

官方github地址:https://github.com/shardingjdbc/sharding-jdbc

https://github.com/sharding-sphere/sharding-sphere

功能列表

1. 数据分片

  • 支持分库 + 分表
  • 支持聚合,分组,排序,分页,关联查询等复杂查询语句
  • 支持常见的DML,DDL,TCL以及数据库管理语句
  • 支持=,BETWEEN,IN的分片操作符
  • 自定义的灵活分片策略,支持多分片键共用,支持inline表达式
  • 基于Hint的强制路由
  • 支持分布式主键

2. 读写分离

  • 支持一主多从的读写分离
  • 支持同一线程内的数据一致性
  • 支持分库分表与读写分离共同使用
  • 支持基于Hint的强制主库路由

3. 柔性事务

  • 最大努力送达型事务
  • TCC型事务(TBD)

4. 分布式治理

  • 支持配置中心,可动态修改配置
  • 支持客户端熔断和失效转移
  • 支持Open Tracing协议
Read more »

spring万恶的xml配置虽然恶心,但是也不乏良好的设计。如何自定义xml呢?

创建XML Schema文件

什么是XML Schema

  • XML Schema 描述了 XML文档的结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:tool="http://www.springframework.org/schema/tool"
xmlns="http://code.alibabatech.com/schema/dubbo"
targetNamespace="http://code.alibabatech.com/schema/dubbo">

<xsd:import namespace="http://www.w3.org/XML/1998/namespace"/>
<xsd:import namespace="http://www.springframework.org/schema/beans"/>
<xsd:import namespace="http://www.springframework.org/schema/tool"/>

<xsd:complexType name="annotationType">
<xsd:attribute name="id" type="xsd:ID">
<xsd:annotation>
<xsd:documentation><![CDATA[ The unique identifier for a bean. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="package" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[ The scan package. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>

<xsd:element name="annotation" type="annotationType">
<xsd:annotation>
<xsd:documentation><![CDATA[ The annotation config ]]></xsd:documentation>
</xsd:annotation>
</xsd:element>

</xsd:schema>

简易元素

语法

1
<xsd:element name="xxx" type="yyy"/>

这里的xsd:element 被称为简易元素。name是名称,type是类型。XML Schema 拥有很多内建的数据类型。

实例

这是一些 XML 元素:

1
2
3
<lastname>Refsnes</lastname>
<age>36</age>
<dateborn>1970-03-27</dateborn>

这是相应的简易元素定义:

1
2
3
<xsd:element name="lastname" type="xsd:string"/>
<xsd:element name="age" type="xsd:integer"/>
<xsd:element name="dateborn" type="xsd:date"/>

XSD 属性

简易元素无法拥有属性。假如某个元素拥有属性,它就会被当作某种复合类型。但是属性本身总是作为简易类型被声明的。

语法

1
<xsd:attribute name="xxx" type="yyy"/>

实例

这是一些 XML 元素:

1
2
3
<lastname>Refsnes</lastname>
<age>36</age>
<dateborn>1970-03-27</dateborn>

这是相应的简易元素定义:

1
2
3
<xsd:element name="lastname" type="xsd:string"/>
<xsd:element name="age" type="xsd:integer"/>
<xsd:element name="dateborn" type="xsd:date"/>

默认值

当没有其他的值被规定时,默认值就会自动分配给元素。

在下面的例子中,缺省值是 “red”:

1
<xsd:element name="color" type="xsd:string" default="red"/>

固定值

固定值同样会自动分配给元素,并且您无法规定另外一个值。

在下面的例子中,固定值是 “red”:

1
<xsd:element name="color" type="xsd:string" fixed="red"/>

可选的和必需的属性

在缺省的情况下,属性是可选的。如需规定属性为必选,请使用 “use” 属性:

1
<xsd:attribute name="lang" type="xsd:string" use="required"/>

XSD 限定 / Facets

限定(restriction)用于为 XML 元素或者属性定义可接受的值。对 XML 元素的限定被称为 facet。

对值的限定(minInclusive/maxInclusive)

下面的例子定义了带有一个限定且名为 “age” 的元素。age 的值不能低于 0 或者高于 120:

1
2
3
4
5
6
7
8
<xsd:element name="age">
<xsd:simpleType>
<xsd:restriction base="xsd:integer">
<xsd:minInclusive value="0"/>
<xsd:maxInclusive value="120"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:element>
对一组值的限定(enumeration)

如需把 XML 元素的内容限制为一组可接受的值,我们要使用枚举约束(enumeration constraint)。

下面的例子定义了带有一个限定的名为 “car” 的元素。可接受的值只有:Audi, Golf, BMW:

1
2
3
4
5
6
7
8
9
<xsd:element name="car">
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:enumeration value="Audi"/>
<xsd:enumeration value="Golf"/>
<xsd:enumeration value="BMW"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:element>

上面的例子也可以被写为:

1
2
3
4
5
6
7
8
<xsd:element name="car" type="carType"/>
<xsd:simpleType name="carType">
<xsd:restriction base="xsd:string">
<xsd:enumeration value="Audi"/>
<xsd:enumeration value="Golf"/>
<xsd:enumeration value="BMW"/>
</xsd:restriction>
</xsd:simpleType>

注意: 在这种情况下,类型 “carType” 可被其他元素使用,因为它不是 “car” 元素的组成部分。

对一系列值的限定(pattern)

如需把 XML 元素的内容限制定义为一系列可使用的数字或字母,我们要使用模式约束(pattern constraint)。

下面的例子定义了带有一个限定的名为 “letter” 的元素。可接受的值只有小写字母 a - z 其中的一个:

1
2
3
4
5
6
7
<xsd:element name="letter">
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:pattern value="[a-z]"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:element>

下一个例子定义了带有一个限定的名为 “initials” 的元素。可接受的值是大写字母 A - Z 其中的三个:

1
2
3
4
5
6
7
<xsd:element name="initials">
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:pattern value="[A-Z][A-Z][A-Z]"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:element>

下一个例子也定义了带有一个限定的名为 “initials” 的元素。可接受的值是大写或小写字母 a - z 其中的三个:

1
2
3
4
5
6
7
<xsd:element name="initials">
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:pattern value="[a-zA-Z][a-zA-Z][a-zA-Z]"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:element>

下一个例子定义了带有一个限定的名为 “choice 的元素。可接受的值是字母 x, y 或 z 中的一个:

1
2
3
4
5
6
7
<xsd:element name="choice">
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:pattern value="[xyz]"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:element>

下一个例子定义了带有一个限定的名为 “prodid” 的元素。可接受的值是五个阿拉伯数字的一个序列,且每个数字的范围是 0-9:

1
2
3
4
5
6
7
<xsd:element name="prodid">
<xsd:simpleType>
<xsd:restriction base="xsd:integer">
<xsd:pattern value="[0-9][0-9][0-9][0-9][0-9]"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:element>
对一系列值的其他限定

下面的例子定义了带有一个限定的名为 “letter” 的元素。可接受的值是 a - z 中零个或多个字母:

1
2
3
4
5
6
7
<xsd:element name="letter">
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:pattern value="([a-z])*"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:element>

下面的例子定义了带有一个限定的名为 “letter” 的元素。可接受的值是一对或多对字母,每对字母由一个小写字母后跟一个大写字母组成。举个例子,”sToP”将会通过这种模式的验证,但是 “Stop”、”STOP” 或者 “stop” 无法通过验证:

1
2
3
4
5
6
7
<xsd:element name="letter">
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:pattern value="([a-z][A-Z])+"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:element>

下面的例子定义了带有一个限定的名为 “gender” 的元素。可接受的值是 male 或者 female:

1
2
3
4
5
6
7
<xsd:element name="gender">
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:pattern value="male|female"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:element>

下面的例子定义了带有一个限定的名为 “password” 的元素。可接受的值是由 8 个字符组成的一行字符,这些字符必须是大写或小写字母 a - z 亦或数字 0 - 9:

1
2
3
4
5
6
7
<xsd:element name="password">
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:pattern value="[a-zA-Z0-9]{8}"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:element>
对空白字符的限定(whiteSpace)

如需规定对空白字符(whitespace characters)的处理方式,我们需要使用 whiteSpace 限定。

下面的例子定义了带有一个限定的名为 “address” 的元素。这个 whiteSpace 限定被设置为 “preserve”,这意味着 XML 处理器不会移除任何空白字符:

1
2
3
4
5
6
7
<xsd:element name="address">
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:whiteSpace value="preserve"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:element>

这个例子也定义了带有一个限定的名为 “address” 的元素。这个 whiteSpace 限定被设置为 “replace”,这意味着 XML 处理器将移除所有空白字符(换行、回车、空格以及制表符):

1
2
3
4
5
6
7
<xsd:element name="address">
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:whiteSpace value="replace"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:element>

这个例子也定义了带有一个限定的名为 “address” 的元素。这个 whiteSpace 限定被设置为 “collapse”,这意味着 XML 处理器将移除所有空白字符(换行、回车、空格以及制表符会被替换为空格,开头和结尾的空格会被移除,而多个连续的空格会被缩减为一个单一的空格):

1
2
3
4
5
6
7
<xsd:element name="address">
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:whiteSpace value="collapse"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:element>
对长度的限定(length/minLength/maxLength)

如需限制元素中值的长度,我们需要使用 length、maxLength 以及 minLength 限定。

本例定义了带有一个限定且名为 “password” 的元素。其值必须精确到 8 个字符:

1
2
3
4
5
6
7
<xsd:element name="password">
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:length value="8"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:element>

这个例子也定义了带有一个限定的名为 “password” 的元素。其值最小为 5 个字符,最大为 8 个字符:

1
2
3
4
5
6
7
8
<xsd:element name="password">
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:minLength value="5"/>
<xsd:maxLength value="8"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:element>
数据类型的限定
限定 描述
enumeration 定义可接受值的一个列表
fractionDigits 定义所允许的最大的小数位数。必须大于等于0。
length 定义所允许的字符或者列表项目的精确数目。必须大于或等于0。
maxExclusive 定义数值的上限。所允许的值必须小于此值。
maxInclusive 定义数值的上限。所允许的值必须小于或等于此值。
maxLength 定义所允许的字符或者列表项目的最大数目。必须大于或等于0。
minExclusive 定义数值的下限。所允许的值必需大于此值。
minInclusive 定义数值的下限。所允许的值必需大于或等于此值。
minLength 定义所允许的字符或者列表项目的最小数目。必须大于或等于0。
pattern 定义可接受的字符的精确序列。
totalDigits 定义所允许的阿拉伯数字的精确位数。必须大于0。
whiteSpace 定义空白字符(换行、回车、空格以及制表符)的处理方式。

复合元素(complexType)

复合元素指包含其他元素及/或属性的 XML 元素。

  • 有四种类型的复合元素:

    • 空元素

      1
      <product pid="1345"/>
      1
      2
      3
      4
      5
      <xsd:element name="product" type="prodtype"/>

      <xsd:complexType name="prodtype">
      <xsd:attribute name="pid" type="xsd:positiveInteger"/>
      </xsd:complexType>
    • 包含其他元素的元素

      1
      2
      3
      4
      <employee>
      <firstname>John</firstname>
      <lastname>Smith</lastname>
      </employee>
      1
      2
      3
      4
      5
      6
      7
      8
      <xsd:element name="employee" type="employeetype"/>

      <xsd:complexType name="employeetype">
      <xsd:sequence>
      <xsd:element name="firstname" type="xsd:string"/>
      <xsd:element name="lastname" type="xsd:string"/>
      </xsd:sequence>
      </xsd:complexType>
    • 仅包含文本的元素

      1
      <food type="dessert">Ice cream</food>
      1
      2
      3
      4
      5
      6
      7
      8
      9
      <xsd:element name="food" type="foodtype"/>

      <xsd:complexType name="foodtype">
      <xsd:simpleContent>
      <xsd:extension base="xs:string">
      <xsd:attribute name="dessert" type="xsd:string" />
      </xsd:extension>
      </xsd:simpleContent>
      </xsd:complexType>
    • 包含元素和文本的元素

      1
      2
      3
      <description>
      It happened on <date>03.03.99</date>
      </description>
      1
      2
      3
      4
      5
      6
      7
      <xsd:element name="description" type="descriptiontype"/>

      <xsd:complexType name="descriptiontype" mixed="true">
      <xsd:sequence>
      <xsd:element name="date" type="xs:date"/>
      </xsd:sequence>
      </xsd:complexType>

指示器

有七种指示器:

Order 指示器:

  • All 规定子元素可以按照任意顺序出现,且每个子元素必须只出现一次
  • Choice 指示器规定可出现某个子元素或者可出现另外一个子元素(非此即彼)
  • Sequence 规定子元素必须按照特定的顺序出现

Occurrence 指示器:

  • maxOccurs 指示器可规定某个元素可出现的最大次数
  • minOccurs 指示器可规定某个元素能够出现的最小次数

Group 指示器:

  • Group name 定义相关的数批元素
  • attributeGroup name

元素

元素使我们有能力通过未被 schema 规定的元素来拓展 XML 文档

1
2
3
4
5
6
7
8
9
<xsd:element name="person">
<xsd:complexType>
<xsd:sequence>
<xsd:element name="firstname" type="xsd:string"/>
<xsd:element name="lastname" type="xsd:string"/>
<xsd:any minOccurs="0"/>
</xsd:sequence>
</xsd:complexType>
</xsd:element>

假如有另一个xsd定义了chidren节点,我们就可以通过any把children加入person节点之下

1
2
3
4
5
6
7
8
9
10
11
12
<person>
<firstname>Hege</firstname>
<lastname>Refsnes</lastname>
<children>
<childname>Cecilie</childname>
</children>
</person>

<person>
<firstname>Stale</firstname>
<lastname>Refsnes</lastname>
</person>

元素

元素使我们有能力通过未被 schema 规定的属性来扩展 XML 文档

构造Bean

META-INF下增加spring.schemas

如:http://code.alibabatech.com/schema/dubbo/dubbo.xsd=META-INF/dubbo.xsd

继承NamespaceHandlerSupport实现处理器

image

继承NamespaceHandlerSupport后需要实现init方法

如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class DubboNamespaceHandler extends NamespaceHandlerSupport {
public void init() {
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
}

}

这样实现了:

1
2
3
4
5
6
7
8
9
10
* dubbo:application->ApplicationConfig
* dubbo:module->ModuleConfig
* dubbo:registry->RegistryConfig
* dubbo:monitor->MonitorConfig
* dubbo:provider->ProviderConfig
* dubbo:consumer->ConsumerConfig
* dubbo:protocol->ProtocolConfig
* dubbo:service->ServiceBean
* dubbo:reference->ReferenceBean
* dubbo:annotation ->使用继承了AbstractSingleBeanDefinitionParser的解析器AnnotationBeanDefinitionParser

实现BeanDefinitionParser接口实现多个解析器

1
2
3
4
5
public interface BeanDefinitionParser {

BeanDefinition parse(Element element, ParserContext parserContext);

}

BeanDefinitionParser只有一个接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class LightningXCacheBeanParser implements BeanDefinitionParser {

private final Class<?> beanClass;

public LightningXCacheBeanParser(Class<?> beanClass) {
this.beanClass = beanClass;
}

@Override
public BeanDefinition parse(Element element, ParserContext parserContext) {
RootBeanDefinition beanDefinition = new RootBeanDefinition();
beanDefinition.setBeanClass(beanClass);
beanDefinition.setLazyInit(false);
String application = element.getAttribute("application");
beanDefinition.getPropertyValues().addPropertyValue("application", application);

//注册到spring容器
parserContext.getRegistry().registerBeanDefinition(beanClass.getName(), beanDefinition);
return beanDefinition;
}
}

META-INF下增加spring.handlers

如:http://code.alibabatech.com/schema/dubbo=com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler

参考:

http://www.runoob.com/schema/schema-tutorial.html

平常的使用过程中我们经常需要使用日志打印,有些信息比如线程信息、时间戳、日志级别等都可以用一些默认规则很容易的使用。但是如果是一些自定义的信息可能就需要拓展一下了。

Logback

简单使用

1
2
3
4
5
6
7
8
9
10
11
12
13
package chapters.introduction;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HelloWorld1 {

public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger("chapters.introduction.HelloWorld1");
logger.debug("Hello world.");

}
}

20:49:07.962 [main] DEBUG chapters.introduction.HelloWorld1 - Hello world.

配置

1
2
3
4
5
6
7
8
9
10
11
12
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="debug">
<appender-ref ref="STDOUT" />
</root>
</configuration>
1
2
3
4
5
6
7
8
9
10
11
12
package chapters.configuration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Foo {
static final Logger logger = LoggerFactory.getLogger(Foo.class);

public void doIt() {
logger.debug("Did it again!");
}
}

16:06:09.031 [main] INFO chapters.configuration.MyApp1 - Entering application.
16:06:09.046 [main] DEBUG chapters.configuration.Foo - Did it again!
16:06:09.046 [main] INFO chapters.configuration.MyApp1 - Exiting application.

通过配置规则*%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n*来输出响应信息

logback主要由三块组成: Logger, Appender and Layout

Appender负责把日志事件的任务写入

1
2
3
4
5
public interface Appender<E> extends LifeCycle, ContextAware, FilterAttachable {
public String getName();
public void setName(String name);
void doAppend(E event);
}

encoder将事件转换为一个字节数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package ch.qos.logback.core.encoder;

public interface Encoder<E> extends ContextAware, LifeCycle {

/**
* This method is called when the owning appender starts or whenever output
* needs to be directed to a new OutputStream, for instance as a result of a
* rollover.
*/
void init(OutputStream os) throws IOException;

/**
* Encode and write an event to the appropriate {@link OutputStream}.
* Implementations are free to defer writing out of the encoded event and
* instead write in batches.
*/
void doEncode(E event) throws IOException;


/**
* This method is called prior to the closing of the underling
* {@link OutputStream}. Implementations MUST not close the underlying
* {@link OutputStream} which is the responsibility of the owning appender.
*/
void close() throws IOException;
}

Layout负责将传入的事件转换为一个字符串

1
2
3
4
5
6
7
8
9
public interface Layout<E> extends ContextAware, LifeCycle {

String doLayout(E event);
String getFileHeader();
String getPresentationHeader();
String getFileFooter();
String getPresentationFooter();
String getContentType();
}

完全自定义layout

LayoutBase

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="chapters.layouts.MySampleLayout2">
<prefix>MyPrefix</prefix>
<printThreadName>false</printThreadName>
</layout>
</encoder>
</appender>

<root level="DEBUG">
<appender-ref ref="STDOUT" />
</root>
</configuration>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package chapters.layouts;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.LayoutBase;

public class MySampleLayout2 extends LayoutBase<ILoggingEvent> {

String prefix = null;
boolean printThreadName = true;

public void setPrefix(String prefix) {
this.prefix = prefix;
}

public void setPrintThreadName(boolean printThreadName) {
this.printThreadName = printThreadName;
}

public String doLayout(ILoggingEvent event) {
StringBuffer sbuf = new StringBuffer(128);
if (prefix != null) {
sbuf.append(prefix + ": ");
}
sbuf.append(event.getTimeStamp() - event.getLoggerContextVO().getBirthTime());
sbuf.append(" ");
sbuf.append(event.getLevel());
if (printThreadName) {
sbuf.append(" [");
sbuf.append(event.getThreadName());
sbuf.append("] ");
} else {
sbuf.append(" ");
}
sbuf.append(event.getLoggerName());
sbuf.append(" - ");
sbuf.append(event.getFormattedMessage());
sbuf.append(LINE_SEP);
return sbuf.toString();
}
}

PatternLayout

Logback classic附带一个叫做PatternLayout灵活的布局。所有Layout,PatternLayout负责日志事件并返回一个字符串。这个字符串可以通过调整PatternLayout定制的转换模式。

你可以在转换模式里面插入任意文字,格式都是以百分号开头,括号括起来的。比如[%thread]

创建一个自定义转换说明符
  1. 继承ClassicConverter
1
2
3
4
5
6
7
8
9
10
public class MySampleConverter extends ClassicConverter {

long start = System.nanoTime();

@Override
public String convert(ILoggingEvent event) {
long nowInNanos = System.nanoTime();
return Long.toString(nowInNanos-start);
}
}
  1. 声明转换文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<configuration>

<conversionRule conversionWord="nanos"
converterClass="chapters.layouts.MySampleConverter" />

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%-6nanos [%thread] - %msg%n</pattern>
</encoder>
</appender>

<root level="DEBUG">
<appender-ref ref="STDOUT" />
</root>
</configuration>

4868695 [main] DEBUG - Everything’s going well
5758748 [main] ERROR - maybe not quite…

另外一种方式

  1. 继承PatternLayout,默认转换说明符号多设置一条
1
2
3
4
5
public class TraceIdPatternLogbackLayout extends PatternLayout {
static {
defaultConverterMap.put("traceId", LogbackPatternConverter.class.getName());
}
}
  1. 继承ClassicConverter
1
2
3
4
5
6
public class LogbackPatternConverter extends ClassicConverter {
@Override
public String convert(ILoggingEvent iLoggingEvent) {
return Strings.isNullOrEmpty(TraceUtil.getTraceId()) ? "N/A" : TraceUtil.getTraceId();
}
}
  1. 配置自定义layout配置文件
1
2
3
4
5
6
7
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="com.absurd.logback.TraceIdPatternLogbackLayout">
<pattern>[%d{MM-dd HH:mm:ss.SSS}] [%traceId] [%thread] %-5level %logger[%M] - %msg%n</pattern>
</layout>
</encoder>
</appender>

Log4j

简单使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import com.foo.Bar;

import org.apache.log4j.Logger;
import org.apache.log4j.BasicConfigurator;

public class MyApp {

// Define a static logger variable so that it references the
// Logger instance named "MyApp".
static Logger logger = Logger.getLogger(MyApp.class);

public static void main(String[] args) {

// 打印到控制台
BasicConfigurator.configure();

logger.info("Entering application.");
Bar bar = new Bar();
bar.doIt();
logger.info("Exiting application.");
}
}

0 [main] INFO MyApp - Entering application.
36 [main] DEBUG com.foo.Bar - Did it again!
51 [main] INFO MyApp - Exiting application.

手动设置配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import com.foo.Bar;

import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;

public class MyApp {

static Logger logger = Logger.getLogger(MyApp.class.getName());

public static void main(String[] args) {


// BasicConfigurator replaced with PropertyConfigurator.
PropertyConfigurator.configure(args[0]);

logger.info("Entering application.");
Bar bar = new Bar();
bar.doIt();
logger.info("Exiting application.");
}
}

屏蔽日志

1
2
3
4
5
6
7
8
9
log4j.rootLogger=DEBUG, A1
log4j.appender.A1=org.apache.log4j.ConsoleAppender
log4j.appender.A1.layout=org.apache.log4j.PatternLayout

# Print the date in ISO 8601 format
log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c - %m%n

# Print only messages of level WARN or above in the package com.foo.
log4j.logger.com.foo=WARN

2000-09-07 14:07:41,508 [main] INFO MyApp - Entering application.
2000-09-07 14:07:41,529 [main] INFO MyApp - Exiting application.

多个appender

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
log4j.rootLogger=debug, stdout, R

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

# Pattern to output the caller's file name and line number.
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n

log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=example.log

log4j.appender.R.MaxFileSize=100KB
# Keep one backup file
log4j.appender.R.MaxBackupIndex=1

log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%p %t %c - %m%n

INFO [main] (MyApp2.java:12) - Entering application.
DEBUG [main] (Bar.java:8) - Doing it again!
INFO [main] (MyApp2.java:15) - Exiting application.

自定义

log4j-1.x

1
2
3
4
5
6
public class TraceIdPatternConverter  extends PatternConverter {
@Override
protected String convert(LoggingEvent loggingEvent) {
return Strings.isNullOrEmpty(TraceUtil.getTraceId()) ? "N/A" : TraceUtil.getTraceId();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class TraceIdPatternParser extends PatternParser {

public TraceIdPatternParser(String pattern) {
super(pattern);
}

@Override
protected void finalizeConverter(char c) {
if ('T' == c) {
addConverter(new TraceIdPatternConverter());
} else {
super.finalizeConverter(c);
}
}
}
1
2
3
4
5
6
public class TraceIdPatternLayout extends PatternLayout {
@Override
protected PatternParser createPatternParser(String pattern) {
return new TraceIdPatternParser(pattern);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
log4j.rootLogger=debug, stdout, R

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=com.absurd.log4j.TraceIdPatternLayout

# Pattern to output the caller's file name and line number.
log4j.appender.stdout.layout.ConversionPattern=%5p [%T] [%t] (%F:%L) - %m%n

log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=trace.log

log4j.appender.R.MaxFileSize=100KB
# Keep one backup file
log4j.appender.R.MaxBackupIndex=1

log4j.appender.R.layout=com.absurd.log4j.TraceIdPatternLayout
log4j.appender.R.layout.ConversionPattern=%p [%T] %t %c - %m%n

INFO [1524824013684_bOcv][main] (TraceTest.java:44) - Entering application.
INFO [1524824013684_bOcv][main] (TraceTest.java:45) - Exiting application.

log4j-2.x

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Plugin(name = "TraceIdConverter", category = "Converter")
@ConverterKeys({"traceId"})
public class TraceIdConverter extends LogEventPatternConverter {

protected TraceIdConverter(String name, String style) {
super(name, style);
}

public static TraceIdConverter newInstance(String[] options) {
return new TraceIdConverter("traceId", "traceId");
}

@Override
public void format(LogEvent event, StringBuilder toAppendTo) {
Log4j2OutputAppender.append(toAppendTo);
}
}
1
2
3
4
5
6
public class Log4j2OutputAppender {

public static void append(StringBuilder toAppendTo) {
toAppendTo.append(Strings.isNullOrEmpty(TraceUtil.getTraceId()) ? "N/A" : TraceUtil.getTraceId());
}
}
1
2
3
4
5
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d [%traceId] %-5p %c{1}:%L - %m%n"/>
</Console>
</Appenders>

线程模型

如果事件处理的逻辑能迅速完成,并且不会发起新的 IO 请求,比如只是在内存中记个标识,则直接在 IO 线程上处理更快,因为减少了线程池调度。

但如果事件处理逻辑较慢,或者需要发起新的 IO 请求,比如需要查询数据库,则必须派发到线程池,否则 IO 线程阻塞,将导致不能接收其它请求。

如果用 IO 线程处理事件,又在事件处理过程中发起新的 IO 请求,比如在连接事件中发起登录请求,会报“可能引发死锁”异常,但不会真死锁。

image

因此,需要通过不同的派发策略和不同的线程池配置的组合来应对不同的场景:

1
<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />

Dispatcher

  • all 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等(如果线程池不可用了,就使用共享线程池)。
  • direct 所有消息都不派发到线程池,全部在 IO 线程上直接执行。
  • message 只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
  • execution 连接断开事件请求响应消息派发到线程池,其它心跳等消息,直接在 IO 线程上执行。
  • connection 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池(如果请求响应消息的线程池不可用了,就使用共享线程池)。

ThreadPool

  • fixed 固定大小线程池,启动时建立线程,不关闭,一直持有。(缺省)
  • cached 缓存线程池,空闲一分钟自动删除,需要时重建。
  • limited 可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。
  • eager 优先创建Worker线程池。在任务数量大于corePoolSize但是小于maximumPoolSize时,优先创建Worker来处理任务。当任务数量大于maximumPoolSize时,将任务放入阻塞队列中。阻塞队列充满时抛出RejectedExecutionException。(相比于cached:cached在任务数量超过maximumPoolSize时直接抛出异常而不是将任务放入阻塞队列)

上面这个eager是小伙伴时无两最近提交的,可以忽略。

Read more »

CAP

数据一致性(consistency)

如果系统对一个写操作返回成功,那么之后的读请求都必须读到这个新数据;如果返回失败,那么所有读操作都不能读到这个数据,对调用者而言数据具有强一致性(strong consistency) (又叫原子性 atomic、线性一致性 linearizable consistency)

服务可用性(availability)

所有读写请求在一定时间内得到响应,可终止、不会一直等待

分区容错性(partition-tolerance)

在网络分区的情况下,被分隔的节点仍能正常对外服务

根据定理,分布式系统只能满足三项中的两项而不可能满足全部三项[4]。理解CAP理论的最简单方式是想象两个节点分处分区两侧。允许至少一个节点更新状态会导致数据不一致,即丧失了C性质。如果为了保证数据一致性,将分区一侧的节点设置为不可用,那么又丧失了A性质。除非两个节点可以互相通信,才能既保证C又保证A,这又会导致丧失P性质。

Read more »