sharding-jdbc源码解析-读写分离(二)

调用流程

sequencediagram1

执行sql的时候,sql根据词法分析出sql的类型从而判断走主库还是从库

MasterSlaveDataSource

MasterSlaveDataSourceFactory.createDataSource(createDataSourceMap(), masterSlaveRuleConfig)我们可以找到读写分离实现最重要的在于MasterSlaveDataSource这个数据源的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Getter
public class MasterSlaveDataSource extends AbstractDataSourceAdapter {
//数据源映射
private Map<String, DataSource> dataSourceMap;

private MasterSlaveRule masterSlaveRule;

public MasterSlaveDataSource(final Map<String, DataSource> dataSourceMap, final MasterSlaveRuleConfiguration masterSlaveRuleConfig, final Map<String, Object> configMap) throws SQLException {
super(getAllDataSources(dataSourceMap, masterSlaveRuleConfig.getMasterDataSourceName(), masterSlaveRuleConfig.getSlaveDataSourceNames()));
this.dataSourceMap = dataSourceMap;
this.masterSlaveRule = new MasterSlaveRule(masterSlaveRuleConfig);
if (!configMap.isEmpty()) {
ConfigMapContext.getInstance().getMasterSlaveConfig().putAll(configMap);
}
}

private static Collection<DataSource> getAllDataSources(final Map<String, DataSource> dataSourceMap, final String masterDataSourceName, final Collection<String> slaveDataSourceNames) {
Collection<DataSource> result = new LinkedList<>();
result.add(dataSourceMap.get(masterDataSourceName));
for (String each : slaveDataSourceNames) {
result.add(dataSourceMap.get(each));
}
return result;
}

/**
* Get map of all actual data source name and all actual data sources.
*
* @return map of all actual data source name and all actual data sources
*/
public Map<String, DataSource> getAllDataSources() {
Map<String, DataSource> result = new HashMap<>(masterSlaveRule.getSlaveDataSourceNames().size() + 1, 1);
result.put(masterSlaveRule.getMasterDataSourceName(), dataSourceMap.get(masterSlaveRule.getMasterDataSourceName()));
for (String each : masterSlaveRule.getSlaveDataSourceNames()) {
result.put(each, dataSourceMap.get(each));
}
return result;
}

/**
* Renew master-slave data source.
*
* @param dataSourceMap data source map
* @param masterSlaveRuleConfig new master-slave rule configuration
*/
public void renew(final Map<String, DataSource> dataSourceMap, final MasterSlaveRuleConfiguration masterSlaveRuleConfig) {
this.dataSourceMap = dataSourceMap;
this.masterSlaveRule = new MasterSlaveRule(masterSlaveRuleConfig);
}

@Override
public MasterSlaveConnection getConnection() {
return new MasterSlaveConnection(this);
}
}

MasterSlaveConnection

基本上没啥好看的直接都是跟后面两个相关

MasterSlaveStatement

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
@Getter
public final class MasterSlaveStatement extends AbstractStatementAdapter {

private final MasterSlaveConnection connection;

private final MasterSlaveRouter masterSlaveRouter;

private final int resultSetType;

private final int resultSetConcurrency;

private final int resultSetHoldability;

private final Collection<Statement> routedStatements = new LinkedList<>();

public MasterSlaveStatement(final MasterSlaveConnection connection) {
this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}

public MasterSlaveStatement(final MasterSlaveConnection connection, final int resultSetType, final int resultSetConcurrency) {
this(connection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}

public MasterSlaveStatement(final MasterSlaveConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
super(Statement.class);
this.connection = connection;
masterSlaveRouter = new MasterSlaveRouter(connection.getMasterSlaveDataSource().getMasterSlaveRule());
this.resultSetType = resultSetType;
this.resultSetConcurrency = resultSetConcurrency;
this.resultSetHoldability = resultSetHoldability;
}

@Override
public ResultSet executeQuery(final String sql) throws SQLException {
SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge();
Collection<String> dataSourceNames = masterSlaveRouter.route(sqlStatement.getType());
Preconditions.checkState(1 == dataSourceNames.size(), "Cannot support executeQuery for DML or DDL");
Statement statement = connection.getConnection(dataSourceNames.iterator().next()).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
return statement.executeQuery(sql);
}

@Override
public int executeUpdate(final String sql) throws SQLException {
int result = 0;
SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge();
for (String each : masterSlaveRouter.route(sqlStatement.getType())) {
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result += statement.executeUpdate(sql);
}
return result;
}

@Override
public int executeUpdate(final String sql, final int autoGeneratedKeys) throws SQLException {
int result = 0;
SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge();
for (String each : masterSlaveRouter.route(sqlStatement.getType())) {
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result += statement.executeUpdate(sql, autoGeneratedKeys);
}
return result;
}

@Override
public int executeUpdate(final String sql, final int[] columnIndexes) throws SQLException {
int result = 0;
SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge();
for (String each : masterSlaveRouter.route(sqlStatement.getType())) {
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result += statement.executeUpdate(sql, columnIndexes);
}
return result;
}

@Override
public int executeUpdate(final String sql, final String[] columnNames) throws SQLException {
int result = 0;
SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge();
for (String each : masterSlaveRouter.route(sqlStatement.getType())) {
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result += statement.executeUpdate(sql, columnNames);
}
return result;
}

@Override
public boolean execute(final String sql) throws SQLException {
boolean result = false;
//根据sql语句判断出是哪种SQLStatement
SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge();
for (String each : masterSlaveRouter.route(sqlStatement.getType())) {
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result = statement.execute(sql);
}
return result;
}

@Override
public boolean execute(final String sql, final int autoGeneratedKeys) throws SQLException {
boolean result = false;
SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge();
for (String each : masterSlaveRouter.route(sqlStatement.getType())) {
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result = statement.execute(sql, autoGeneratedKeys);
}
return result;
}

@Override
public boolean execute(final String sql, final int[] columnIndexes) throws SQLException {
boolean result = false;
SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge();
for (String each : masterSlaveRouter.route(sqlStatement.getType())) {
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result = statement.execute(sql, columnIndexes);
}
return result;
}

@Override
public boolean execute(final String sql, final String[] columnNames) throws SQLException {
boolean result = false;
SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge();
for (String each : masterSlaveRouter.route(sqlStatement.getType())) {
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result = statement.execute(sql, columnNames);
}
return result;
}

@Override
public ResultSet getGeneratedKeys() throws SQLException {
Preconditions.checkState(1 == routedStatements.size());
return routedStatements.iterator().next().getGeneratedKeys();
}

@Override
public ResultSet getResultSet() throws SQLException {
Preconditions.checkState(1 == routedStatements.size());
return routedStatements.iterator().next().getResultSet();
}
}

MasterSlaveRouter(4月份新加的)

根据sql类型判断主从

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@RequiredArgsConstructor
public final class MasterSlaveRouter {

private final MasterSlaveRule masterSlaveRule;

/**
* Route Master slave.
*
* @param sqlType SQL type
* @return data source name
*/
// TODO for multiple masters may return more than one data source
public Collection<String> route(final SQLType sqlType) {
if (isMasterRoute(sqlType)) {
MasterVisitedManager.setMasterVisited();
return Collections.singletonList(masterSlaveRule.getMasterDataSourceName());
} else {
return Collections.singletonList(masterSlaveRule.getLoadBalanceAlgorithm().getDataSource(
masterSlaveRule.getName(), masterSlaveRule.getMasterDataSourceName(), new ArrayList<>(masterSlaveRule.getSlaveDataSourceNames())));
}
}

private boolean isMasterRoute(final SQLType sqlType) {
//1.非SELECT语句2.判断过放到ThreadLocal里3.通过Hint管理器设置强制masterRouteOnly
return SQLType.DQL != sqlType || MasterVisitedManager.isMasterVisited() || HintManagerHolder.isMasterRouteOnly();
}
}

SQLJudgeEngine

通过词法分析分析出sql的类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@RequiredArgsConstructor
public final class SQLJudgeEngine {

private final String sql;

/**
* judge SQL Type only.
*
* @return SQL statement
*/
public SQLStatement judge() {
LexerEngine lexerEngine = LexerEngineFactory.newInstance(DatabaseType.MySQL, sql);
//获取token
lexerEngine.nextToken();
while (true) {
//获取token的type
TokenType tokenType = lexerEngine.getCurrentToken().getType();
if (tokenType instanceof Keyword) {
//select
if (isDQL(tokenType)) {
//SelectStatement
return getDQLStatement();
}
//INSERT、UPDATE、DELETE
if (isDML(tokenType)) {
//InsertStatement 或者DMLStatement
return getDMLStatement(tokenType);
}
//CREATE、ALTER、DROP、TRUNCATE
if (isDDL(tokenType)) {
//DDLStatement
return getDDLStatement();
}
//SET、COMMIT、ROLLBACK、SAVEPOINT、BEGIN
if (isTCL(tokenType)) {
//TCLStatement
return getTCLStatement();
}
//USE、DESC、DESCRIBE、SHOW
if (isDAL(tokenType)) {
//UseStatement或DescribeStatement或ShowxxxStatement
return getDALStatement(tokenType, lexerEngine);
}
}
if (tokenType instanceof Assist && Assist.END == tokenType) {
throw new SQLParsingException("Unsupported SQL statement: [%s]", sql);
}
lexerEngine.nextToken();
}
}

private boolean isDAL(final TokenType tokenType) {
return DefaultKeyword.USE == tokenType || DefaultKeyword.DESC == tokenType || MySQLKeyword.DESCRIBE == tokenType || MySQLKeyword.SHOW == tokenType;
}


private SQLStatement getDALStatement(final TokenType tokenType, final LexerEngine lexerEngine) {
if (DefaultKeyword.USE == tokenType) {
return new UseStatement();
}
if (DefaultKeyword.DESC == tokenType || MySQLKeyword.DESCRIBE == tokenType) {
return new DescribeStatement();
}
return getShowStatement(lexerEngine);
}

private SQLStatement getShowStatement(final LexerEngine lexerEngine) {
lexerEngine.nextToken();
if (MySQLKeyword.DATABASES == lexerEngine.getCurrentToken().getType()) {
return new ShowDatabasesStatement();
}
if (MySQLKeyword.TABLES == lexerEngine.getCurrentToken().getType()) {
return new ShowTablesStatement();
}
if (MySQLKeyword.COLUMNS == lexerEngine.getCurrentToken().getType()) {
return new ShowColumnsStatement();
}
return new ShowOtherStatement();
}
}

LexerEngine

LexerEngineFactory会根据数据库来选择适合的分词器

mysql对应MySQLLexer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class LexerEngineFactory {

/**
* Create lexical analysis engine instance.
*
* @param dbType database type
* @param sql SQL
* @return lexical analysis engine instance
*/
public static LexerEngine newInstance(final DatabaseType dbType, final String sql) {
switch (dbType) {
case H2:
case MySQL:
return new LexerEngine(new MySQLLexer(sql));
case Oracle:
return new LexerEngine(new OracleLexer(sql));
case SQLServer:
return new LexerEngine(new SQLServerLexer(sql));
case PostgreSQL:
return new LexerEngine(new PostgreSQLLexer(sql));
default:
throw new UnsupportedOperationException(String.format("Cannot support database [%s].", dbType));
}
}
}

关于词法分析,下一章单独分析