1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
| @Getter public final class MasterSlaveStatement extends AbstractStatementAdapter { private final MasterSlaveConnection connection; private final MasterSlaveRouter masterSlaveRouter; private final int resultSetType; private final int resultSetConcurrency; private final int resultSetHoldability; private final Collection<Statement> routedStatements = new LinkedList<>(); public MasterSlaveStatement(final MasterSlaveConnection connection) { this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT); } public MasterSlaveStatement(final MasterSlaveConnection connection, final int resultSetType, final int resultSetConcurrency) { this(connection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT); } public MasterSlaveStatement(final MasterSlaveConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) { super(Statement.class); this.connection = connection; masterSlaveRouter = new MasterSlaveRouter(connection.getMasterSlaveDataSource().getMasterSlaveRule()); this.resultSetType = resultSetType; this.resultSetConcurrency = resultSetConcurrency; this.resultSetHoldability = resultSetHoldability; } @Override public ResultSet executeQuery(final String sql) throws SQLException { SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge(); Collection<String> dataSourceNames = masterSlaveRouter.route(sqlStatement.getType()); Preconditions.checkState(1 == dataSourceNames.size(), "Cannot support executeQuery for DML or DDL"); Statement statement = connection.getConnection(dataSourceNames.iterator().next()).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); routedStatements.add(statement); return statement.executeQuery(sql); } @Override public int executeUpdate(final String sql) throws SQLException { int result = 0; SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge(); for (String each : masterSlaveRouter.route(sqlStatement.getType())) { Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); routedStatements.add(statement); result += statement.executeUpdate(sql); } return result; } @Override public int executeUpdate(final String sql, final int autoGeneratedKeys) throws SQLException { int result = 0; SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge(); for (String each : masterSlaveRouter.route(sqlStatement.getType())) { Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); routedStatements.add(statement); result += statement.executeUpdate(sql, autoGeneratedKeys); } return result; } @Override public int executeUpdate(final String sql, final int[] columnIndexes) throws SQLException { int result = 0; SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge(); for (String each : masterSlaveRouter.route(sqlStatement.getType())) { Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); routedStatements.add(statement); result += statement.executeUpdate(sql, columnIndexes); } return result; } @Override public int executeUpdate(final String sql, final String[] columnNames) throws SQLException { int result = 0; SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge(); for (String each : masterSlaveRouter.route(sqlStatement.getType())) { Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); routedStatements.add(statement); result += statement.executeUpdate(sql, columnNames); } return result; } @Override public boolean execute(final String sql) throws SQLException { boolean result = false; SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge(); for (String each : masterSlaveRouter.route(sqlStatement.getType())) { Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); routedStatements.add(statement); result = statement.execute(sql); } return result; } @Override public boolean execute(final String sql, final int autoGeneratedKeys) throws SQLException { boolean result = false; SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge(); for (String each : masterSlaveRouter.route(sqlStatement.getType())) { Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); routedStatements.add(statement); result = statement.execute(sql, autoGeneratedKeys); } return result; } @Override public boolean execute(final String sql, final int[] columnIndexes) throws SQLException { boolean result = false; SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge(); for (String each : masterSlaveRouter.route(sqlStatement.getType())) { Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); routedStatements.add(statement); result = statement.execute(sql, columnIndexes); } return result; } @Override public boolean execute(final String sql, final String[] columnNames) throws SQLException { boolean result = false; SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge(); for (String each : masterSlaveRouter.route(sqlStatement.getType())) { Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); routedStatements.add(statement); result = statement.execute(sql, columnNames); } return result; } @Override public ResultSet getGeneratedKeys() throws SQLException { Preconditions.checkState(1 == routedStatements.size()); return routedStatements.iterator().next().getGeneratedKeys(); } @Override public ResultSet getResultSet() throws SQLException { Preconditions.checkState(1 == routedStatements.size()); return routedStatements.iterator().next().getResultSet(); } }
|