ShardingSphere原理分析和实战总结

it2022-05-09  28

架构原理 

官方文档:https://shardingsphere.apache.org/document/legacy/4.x/document/cn/overview/

核心概念

逻辑表

水平拆分的数据库(表)的相同逻辑和数据结构表的总称。例:订单数据根据主键尾数拆分为10张表,分别是t_order_0到t_order_9,他们的逻辑表名为t_order。

真实表

在分片的数据库中真实存在的物理表。即上个示例中的t_order_0到t_order_9。

数据节点

数据分片的最小单元。由数据源名称和数据表组成,例:ds_0.t_order_0。

绑定表

指分片规则一致的主表和子表。例如:t_order表和t_order_item表,均按照order_id分片,则此两张表互为绑定表关系。绑定表之间的多表关联查询不会出现笛卡尔积关联,关联查询效率将大大提升。举例说明,如果SQL为:

SELECT i.* FROM t_order o JOIN t_order_item i ON o.order_id=i.order_id WHERE o.order_id in (10, 11);

在不配置绑定表关系时,假设分片键order_id将数值10路由至第0片,将数值11路由至第1片,那么路由后的SQL应该为4条,它们呈现为笛卡尔积:

SELECT i.* FROM t_order_0 o JOIN t_order_item_0 i ON o.order_id=i.order_id WHERE o.order_id in (10, 11); SELECT i.* FROM t_order_0 o JOIN t_order_item_1 i ON o.order_id=i.order_id WHERE o.order_id in (10, 11); SELECT i.* FROM t_order_1 o JOIN t_order_item_0 i ON o.order_id=i.order_id WHERE o.order_id in (10, 11); SELECT i.* FROM t_order_1 o JOIN t_order_item_1 i ON o.order_id=i.order_id WHERE o.order_id in (10, 11);

在配置绑定表关系后,路由的SQL应该为2条:

SELECT i.* FROM t_order_0 o JOIN t_order_item_0 i ON o.order_id=i.order_id WHERE o.order_id in (10, 11); SELECT i.* FROM t_order_1 o JOIN t_order_item_1 i ON o.order_id=i.order_id WHERE o.order_id in (10, 11);

其中t_order在FROM的最左侧,ShardingSphere将会以它作为整个绑定表的主表。 所有路由计算将会只使用主表的策略,那么t_order_item表的分片计算将会使用t_order的条件。故绑定表之间的分区键要完全相同。

分片策略

ShardingSphere认为对于分片策略存有两种维度:

数据源分片策略(DatabaseShardingStrategy):数据被分配的目标数据源表分片策略(TableShardingStrategy):数据被分配的目标表

两种分片策略API完全相同,但是表分片策略是依赖于数据源分片策略的(即:先分库然后才有分表)

 

分片算法

Sharding-JDBC提供了5种分片策略。由于分片算法和业务实现紧密相关,因此Sharding-JDBC并未提供内置分片算法,而是通过分片策略将各种场景提炼出来,提供更高层级的抽象,并提供接口让应用开发者自行实现分片算法。

精确分片算法

对应PreciseShardingAlgorithm,用于处理使用单一键作为分片键的=与IN进行分片的场景。需要配合StandardShardingStrategy使用。

范围分片算法

对应RangeShardingAlgorithm,用于处理使用单一键作为分片键的BETWEEN AND、>、<、>=、<=进行分片的场景。需要配合StandardShardingStrategy使用。

复合分片算法

对应ComplexKeysShardingAlgorithm,用于处理使用多键作为分片键进行分片的场景,包含多个分片键的逻辑较复杂,需要应用开发者自行处理其中的复杂度。需要配合ComplexShardingStrategy使用。

Hint分片算法

对应HintShardingAlgorithm,用于处理使用Hint行分片的场景。需要配合HintShardingStrategy使用。

分页原理

select * from ORDER_XX where orderId = ? limit 0 ,10

如果查询条件没有orderId, 那么最终执行的sql就是3条(假设每页10条):

select * from ORDER_00 where create_tm >= ? and create_tm <= ? limit 0 ,10 ; select * from ORDER_01 where create_tm >= ? and create_tm <= ? limit 0 ,10 ; select * from ORDER_02 where create_tm >= ? and create_tm <= ? limit 0 ,10 ;

注意在有多个分表的情况下, 每个表都取前10条数据出来(一共30条), 然后再排序取前10条, 这样的逻辑是不对的. sharding-jdbc给了个例子, 如果下图:

图中的例子中,想要取得两个表中共同的按照分数排序的第2条和第3条数据,应该是95和90。 由于执行的SQL只能从每个表中获取第2条和第3条数据,即从t_score_0表中获取的是90和80;从t_score_0表中获取的是85和75。 因此进行结果归并时,只能从获取的90,80,85和75之中进行归并,那么结果归并无论怎么实现,都不可能获得正确的结果.

那怎么办呢?

sharding-jdbc的做法就改写我们的sql, 先查出来所有的数据, 再做归并排序

例如查询第2页时

原sql是: select * from ORDER_00 where create_tm >= ? and create_tm <= ? limit 10 ,10 ; select * from ORDER_01 where create_tm >= ? and create_tm <= ? limit 10 ,10 ; select * from ORDER_02 where create_tm >= ? and create_tm <= ? limit 10 ,10 ; 会被改写成: select * from ORDER_00 where create_tm >= ? and create_tm <= ? limit 0 ,20 ; select * from ORDER_01 where create_tm >= ? and create_tm <= ? limit 0 ,20 ; select * from ORDER_02 where create_tm >= ? and create_tm <= ? limit 0 ,20 ;

查询第3页时

原sql是: select * from ORDER_00 where create_tm >= ? and create_tm <= ? limit 20 ,10 ; select * from ORDER_01 where create_tm >= ? and create_tm <= ? limit 20 ,10 ; select * from ORDER_02 where create_tm >= ? and create_tm <= ? limit 20 ,10 ; 会被改写成: select * from ORDER_00 where create_tm >= ? and create_tm <= ? limit 0 ,30 ; select * from ORDER_01 where create_tm >= ? and create_tm <= ? limit 0 ,30 ; select * from ORDER_02 where create_tm >= ? and create_tm <= ? limit 0 ,30 ;

当然, 大家肯定会觉得这样处理性能会很差, 其实事实上也的确是, 不过sharing-jdbc是在这个基础上做了优化的,就是上面提到的"归并",大概的逻辑就是先查出所有页的数据, 然后通过流式处理跳过前面的页,只取最终需要的页,最终达到分页的目的

配置实战 

Sharding-JDBC重写了mybatis里配置sql的路由规则,只要按照常规的方法在xml文件里书写sql就可以自动路由的对应的表,但是查询时最好要指定分片键,否则会出现扫描所有表的情况,在数据量很庞大的情况下不可取。

引入依赖

<!--mybatis--> <dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>sharding-jdbc-spring-boot-starter</artifactId> <version>${sharding.jdbc.version}</version> </dependency> <dependency> <groupId>tk.mybatis</groupId> <artifactId>mapper-spring-boot-starter</artifactId> <version>${tk.mybatis.version}</version> </dependency>

数据源配置 (按照月份进行分表)

spring: shardingsphere: datasource: names: two-project two-project: driver-class-name: com.mysql.jdbc.Driver filters: stat,wall,log4j poolPreparedStatements: true stat-view-servlet: enabled: true reset-enable: false url-pattern: /druid/* testOnBorrow: false testOnReturn: false testWhileIdle: true type: com.alibaba.druid.pool.DruidDataSource url: jdbc:mysql://127.0.0.1:3306/sharding-jdbc?useSSL=false&characterEncoding=utf-8&serverTimezone=GMT+8&allowPublicKeyRetrieval=true username: root password: 123456.. validationQuery: SELECT 1 FROM DUAL web-stat-filter: enabled: true exclusions: '*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*' props: sql: show: false sharding: tables: order: actual-data-nodes: two-project.t_order_2020_0$->{8..9},two-project.t_order_2020_$->{10..12},two-project.t_order_$->{2021..2099}_0$->{1..9},two-project.t_order_$->{2021..2099}_$->{10..12} table-strategy: standard: sharding-column: id precise-algorithm-class-name: com.baoquan.sharding.TimeShardingTableAlgorithm range-algorithm-class-name: com.baoquan.sharding.TimeRangeShardingAlgorithm

mapper文件 

<insert id="createOrder" parameterType="com.baoquan.dataobject.po.Order"> insert into `order` (id,user_id, order_id,create_time) values (#{id},#{userId}, #{orderId}, #{createTime}) </insert>

 分片算法

package com.baoquan.sharding; import com.baoquan.constants.CommonContants; import com.baoquan.util.ParaseShardingKeyTool; import org.apache.shardingsphere.api.sharding.standard.PreciseShardingAlgorithm; import org.apache.shardingsphere.api.sharding.standard.PreciseShardingValue; import java.util.Collection; public class TimeShardingTableAlgorithm implements PreciseShardingAlgorithm<Long> { @Override public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) { String logicTable = new StringBuffer().append(shardingValue.getLogicTableName()) .append(CommonContants.SPLIT).append(ParaseShardingKeyTool.getYearAndMonth((shardingValue.getValue().toString()))).toString(); //获取实际表名 String targetTable = null; for (String availableTargetName : availableTargetNames) { if (availableTargetName.contains(logicTable)) { targetTable = availableTargetName; } } return targetTable; } } package com.baoquan.sharding; import com.google.common.collect.Range; import com.baoquan.constants.CommonContants; import com.baoquan.util.ParaseShardingKeyTool; import org.apache.shardingsphere.api.sharding.standard.RangeShardingAlgorithm; import org.apache.shardingsphere.api.sharding.standard.RangeShardingValue; import java.time.LocalDate; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; import java.time.temporal.ChronoField; import java.util.Collection; import java.util.LinkedHashSet; public class TimeRangeShardingAlgorithm implements RangeShardingAlgorithm<String> { private DateTimeFormatter dateformat = DateTimeFormatter.ofPattern(CommonContants.yyyy_MM); private DateTimeFormatter DATE_FORMAT = new DateTimeFormatterBuilder() .appendPattern(CommonContants.yyyy_MM) .parseDefaulting(ChronoField.DAY_OF_MONTH, 1) .toFormatter(); @Override public Collection<String> doSharding(Collection<String> availableTargetNames, RangeShardingValue<String> shardingValue) { Collection<String> result = new LinkedHashSet<String>(); Range<String> shardingKey = shardingValue.getValueRange(); String startShardingKey = shardingKey.lowerEndpoint(); String endShardingKey = shardingKey.upperEndpoint(); String startTimeString = ParaseShardingKeyTool.getYearAndMonth(startShardingKey); String endTimeString = ParaseShardingKeyTool.getYearAndMonth(endShardingKey); LocalDate startLocalDate = LocalDate.parse(startTimeString, DATE_FORMAT); LocalDate endLocalDate = LocalDate.parse(endTimeString, DATE_FORMAT); while (startLocalDate.compareTo(endLocalDate) != 0) { StringBuffer tableName = new StringBuffer(); tableName.append(shardingValue.getLogicTableName()) .append(CommonContants.SPLIT).append(dateformat.format(startLocalDate)); result.add(tableName.toString()); startLocalDate = startLocalDate.plusMonths(1); } result.add(shardingValue.getLogicTableName() + CommonContants.SPLIT + dateformat.format(startLocalDate)); //获取实际表名 Collection<String> result2 = new LinkedHashSet<String>(); for (String availableTargetName : availableTargetNames) { for (String logicTable : result) { if (availableTargetName.contains(logicTable)) { result2.add(availableTargetName); } } } return result2; } }

调用代码 

public void createOrder(Order order) { order.setId(Long.valueOf(keyGenerator.generateKey().toString())); order.setCreateTime(new Date()); orderMapper.createOrder(order); }

定时任务(每月执行一次)

CREATE DEFINER=`root`@`localhost` PROCEDURE `proc_create_order`() BEGIN declare str_date varchar(16); SET str_date = date_format(date_add(NOW(), interval 1 MONTH),"%Y%m"); SET @sqlcmd1 = CONCAT('CREATE TABLE t_order_',str_date ," ( `id` bigint(32) NOT NULL, `order_id` int(11) NOT NULL, `user_id` int(11) NOT NULL, `create_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;"); PREPARE p1 FROM @sqlcmd1; EXECUTE p1; DEALLOCATE PREPARE p1; END

效果图 

 

插入数据时基于雪花算法生成唯一主键

import com.google.common.base.Preconditions; import lombok.Getter; import lombok.Setter; import lombok.SneakyThrows; import org.apache.shardingsphere.core.strategy.keygen.TimeService; import org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; import org.springframework.util.unit.DataUnit; import java.time.*; import java.time.format.DateTimeFormatter; import java.time.temporal.TemporalAdjusters; import java.util.Calendar; import java.util.Date; import java.util.Properties; import static java.time.temporal.TemporalAdjusters.firstDayOfMonth; @Component public final class KeyGenerator implements ShardingKeyGenerator { public KeyGenerator() { } public static final long EPOCH; private static final long SEQUENCE_BITS = 12L; private static final long WORKER_ID_BITS = 10L; private static final long SEQUENCE_MASK = (1 << SEQUENCE_BITS) - 1; private static final long WORKER_ID_LEFT_SHIFT_BITS = SEQUENCE_BITS; private static final long TIMESTAMP_LEFT_SHIFT_BITS = WORKER_ID_LEFT_SHIFT_BITS + WORKER_ID_BITS; private static final long WORKER_ID_MAX_VALUE = 1L << WORKER_ID_BITS; private static final long WORKER_ID = 0; private static final int MAX_TOLERATE_TIME_DIFFERENCE_MILLISECONDS = 1; @Setter private static TimeService timeService = new TimeService(); @Getter @Setter private Properties properties = new Properties(); private byte sequenceOffset; private long sequence; private long lastMilliseconds; @Value("${keyGenerator.snowflake.worker.id}") private String workerId; @Value("${max.tolerate.time.difference.milliseconds}}") private String maxTolerateTime; static { Calendar calendar = Calendar.getInstance(); calendar.set(2016, Calendar.NOVEMBER, 1); calendar.set(Calendar.HOUR_OF_DAY, 0); calendar.set(Calendar.MINUTE, 0); calendar.set(Calendar.SECOND, 0); calendar.set(Calendar.MILLISECOND, 0); EPOCH = calendar.getTimeInMillis(); } @Override public String getType() { return "THISSNOWFLAKEY"; } @Override public synchronized Comparable<?> generateKey() { long currentMilliseconds = timeService.getCurrentMillis(); if (waitTolerateTimeDifferenceIfNeed(currentMilliseconds)) { currentMilliseconds = timeService.getCurrentMillis(); } if (lastMilliseconds == currentMilliseconds) { if (0L == (sequence = (sequence + 1) & SEQUENCE_MASK)) { currentMilliseconds = waitUntilNextTime(currentMilliseconds); } } else { vibrateSequenceOffset(); sequence = sequenceOffset; } lastMilliseconds = currentMilliseconds; return ((currentMilliseconds - EPOCH) << TIMESTAMP_LEFT_SHIFT_BITS) | (getWorkerId() << WORKER_ID_LEFT_SHIFT_BITS) | sequence; } public synchronized long generatorKey() { return this.generateKeyByMillis(timeService.getCurrentMillis()).longValue(); } public synchronized Number generateKeyByMillis(long currentMilliseconds) { if (lastMilliseconds == currentMilliseconds) { if (0L == (sequence = (sequence + 1) & SEQUENCE_MASK)) { currentMilliseconds = waitUntilNextTime(currentMilliseconds); } } else { vibrateSequenceOffset(); sequence = sequenceOffset; } lastMilliseconds = currentMilliseconds; return ((currentMilliseconds - EPOCH) << TIMESTAMP_LEFT_SHIFT_BITS) | (this.getWorkerId() << WORKER_ID_LEFT_SHIFT_BITS) | sequence; } @SneakyThrows private boolean waitTolerateTimeDifferenceIfNeed(final long currentMilliseconds) { if (lastMilliseconds <= currentMilliseconds) { return false; } long timeDifferenceMilliseconds = lastMilliseconds - currentMilliseconds; Preconditions.checkState(timeDifferenceMilliseconds < getMaxTolerateTimeDifferenceMilliseconds(), "Clock is moving backwards, last time is %d milliseconds, current time is %d milliseconds", lastMilliseconds, currentMilliseconds); Thread.sleep(timeDifferenceMilliseconds); return true; } private long getWorkerId() { // long result = Long.valueOf(properties.getProperty("worker.id", String.valueOf(GenericTool.getWorkId()))); long result = Long.valueOf(workerId); Preconditions.checkArgument(result >= 0L && result < WORKER_ID_MAX_VALUE); return result; } private int getMaxTolerateTimeDifferenceMilliseconds() { // return Integer.valueOf(properties.getProperty("max.tolerate.time.difference.milliseconds", // String.valueOf(MAX_TOLERATE_TIME_DIFFERENCE_MILLISECONDS))); return Integer.valueOf(maxTolerateTime); } private long waitUntilNextTime(final long lastTime) { long result = timeService.getCurrentMillis(); while (result <= lastTime) { result = timeService.getCurrentMillis(); } return result; } private void vibrateSequenceOffset() { sequenceOffset = (byte) (~sequenceOffset & 1); } }

如何查询

1.询时能指定分片键的要尽量指定分片键,sharding-jdbc会自动帮你定位到对应的分表。

2.单分表查询,要指定分片键的,自己手动指定具体的分表先过滤出符合条件的数据,再union all其他分表符合条件的数据。union all的分表最好不要超过2张,即查询时间跨度在两个月内。

<select id="selectListByProductSelective" parameterType="hashmap" resultMap="ResultMapWithBLOBs"> select <include refid="Base_Column_List"/> , <include refid="Blob_Column_List"/> from ${startTable} <include refid="selectWhere"/> <if test="endTable != null "> union all select <include refid="Base_Column_List"/> , <include refid="Blob_Column_List"/> from ${endTable} <include refid="selectWhere"/> </if> <if test="pageStart != null and pageEnd!=null "> limit #{pageStart},#{pageEnd} </if> </select>

3.多分表关联查询时一定要单独指定每张分表的分片键的值(避免扫描多余的表)。

spring.shardingsphere.datasource.names=test spring.shardingsphere.datasource.test.driver-class-name=com.mysql.jdbc.Driver spring.shardingsphere.datasource.test.filters=stat,wall,log4j spring.shardingsphere.datasource.test.stat-view-servlet.enabled=true spring.shardingsphere.datasource.test.stat-view-servlet.reset-enable=false spring.shardingsphere.datasource.test.stat-view-servlet.url-pattern=/druid/* spring.shardingsphere.datasource.test.testOnBorrow=false spring.shardingsphere.datasource.test.testOnReturn=false spring.shardingsphere.datasource.testn.testWhileIdle=true spring.shardingsphere.datasource.test.maxActive=20 spring.shardingsphere.datasource.test.maxWait=20000 spring.shardingsphere.datasource.testn.initial-size=1 spring.shardingsphere.datasource.test.min-idle=2 spring.shardingsphere.datasource.test.timeBetweenEvictionRunsMillis=60000 spring.shardingsphere.datasource.test.minEvictableIdleTimeMillis=300000 spring.shardingsphere.datasource.test.removeAbandoned=true spring.shardingsphere.datasource.test.removeAbandonedTimeout=180 spring.shardingsphere.datasource.test.maxPoolPreparedStatementPerConnectionSize=50 spring.shardingsphere.datasource.test.poolPreparedStatements=true spring.shardingsphere.datasource.test.type=com.alibaba.druid.pool.DruidDataSource spring.shardingsphere.datasource.test.url=jdbc:mysql://xxx:3306/database?useSSL=false&characterEncoding=utf-8&serverTimezone=GMT+8&allowPublicKeyRetrieval=true spring.shardingsphere.datasource.test.username=xxx spring.shardingsphere.datasource.test.password=xxx spring.shardingsphere.datasource.test.validationQuery=SELECT 1 FROM DUAL spring.shardingsphere.datasource.test.web-stat-filter.enabled=true spring.shardingsphere.datasource.test.web-stat-filter.exclusions=*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/* spring.shardingsphere.props.sql.show=false spring.shardingsphere.sharding.tables.defendant.actual-data-nodes = test.defendant_$->{1..5} spring.shardingsphere.sharding.tables.defendant.table-strategy.inline.sharding-column = case_id spring.shardingsphere.sharding.tables.defendant.table-strategy.inline.algorithm-expression=defendant_$->{case_id%5 + 1} spring.shardingsphere.sharding.tables.defendant_business.actual-data-nodes = test.defendant_business_$->{1..5} spring.shardingsphere.sharding.tables.defendant_business.table-strategy.inline.sharding-column = case_id spring.shardingsphere.sharding.tables.defendant_business.table-strategy.inline.algorithm-expression=defendant_business_$->{case_id%5 + 1}

case_id是defendant和defendant_business的分片键查询语句如下:

SELECT a.*, b.* FROM  defendant  a LEFT JOIN  defendant_business b ON a.case_id = b.case_id AND a.case_id = 1 AND b.case_id = 1

 


最新回复(0)