一. 介绍
接口模块也是去暴露给外界信息
bank-api:支付回调接口。
二. 项目基础架构搭建
1. 微服务架构模式
2. 框架选型
①Dubbo
②SpringCloud
③SpringCloudAlibaba
3. 基础环境安装
4. docker介绍
5. 项目搭建
三. 高并发场景中的用户服务中台设计分析
1. 用户中台bg
2. 用户中台场景分析
3. 远程调用: HTTP还是RPC?
4. RPC产品选型比对
5. Dubbo
①dubbo引入
②dubbo深入
四. 用户中台实现
1. 用户数据存储分析
2. 分库分表
3. 搭建数据库
假定1亿数据量,可以分100张表,每张100w数据量,性能足够高。
DELIMITER $$
CREATE
PROCEDURE douyu_live_user.create_t_user_100()
BEGIN
DECLARE i INT;
DECLARE table_name VARCHAR(30);
DECLARE table_pre VARCHAR(30);
DECLARE sql_text VARCHAR(3000);
DECLARE table_body VARCHAR(2000);
SET i=0;
SET table_name='';
SET sql_text='';
SET table_body = '(
user_id bigint NOT NULL DEFAULT -1 COMMENT \'用户 id\',
nick_name varchar(35) DEFAULT NULL COMMENT \'昵称\',
avatar varchar(255) DEFAULT NULL COMMENT \'头像\',
true_name varchar(20) DEFAULT NULL COMMENT \'真实姓名\',
sex tinyint(1) DEFAULT NULL COMMENT \'性别 0 男,1 女\',
born_date datetime DEFAULT NULL COMMENT \'出生时间\',
work_city int(9) DEFAULT NULL COMMENT \'工作地\',
born_city int(9) DEFAULT NULL COMMENT \'出生地\',
create_time datetime DEFAULT CURRENT_TIMESTAMP,
update_time datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP,
PRIMARY KEY (user_id)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_bin;';
WHILE i<100 DO
IF i<10 THEN
SET table_name = CONCAT('t_user_0',i);
ELSE
SET table_name = CONCAT('t_user_',i);
END IF;
SET sql_text=CONCAT('CREATE TABLE ',table_name,
table_body);
SELECT sql_text;
SET @sql_text=sql_text;
PREPARE stmt FROM @sql_text;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET i=i+1;
END WHILE;
END$$
DELIMITER ;
4. ShardingSphere介绍
①介绍
ShardingJdc对原生jdbc封装,同时兼容ORM框架,myatis可以继续使用,感觉不到底层jdbc替换为了Shardingjdbc
②ShardingJdbc路由
③ShardingJdbc归并
④实现
5. 基于docker搭建mysql主从架构
①主从架构
4-8PDF
DELIMITER $$
CREATE
PROCEDURE douyu_live_user.create_t_user_100()
BEGIN
DECLARE i INT;
DECLARE table_name VARCHAR(30);
DECLARE table_pre VARCHAR(30);
DECLARE sql_text VARCHAR(3000);
DECLARE table_body VARCHAR(2000);
SET i=0;
SET table_name='';
SET sql_text='';
SET table_body = '(
user_id bigint NOT NULL DEFAULT -1 COMMENT \'用户 id\',
nick_name varchar(35) DEFAULT NULL COMMENT \'昵称\',
avatar varchar(255) DEFAULT NULL COMMENT \'头像\',
true_name varchar(20) DEFAULT NULL COMMENT \'真实姓名\',
sex tinyint(1) DEFAULT NULL COMMENT \'性别 0 男,1 女\',
born_date datetime DEFAULT NULL COMMENT \'出生时间\',
work_city int(9) DEFAULT NULL COMMENT \'工作地\',
born_city int(9) DEFAULT NULL COMMENT \'出生地\',
create_time datetime DEFAULT CURRENT_TIMESTAMP,
update_time datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP,
PRIMARY KEY (user_id)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_bin;';
WHILE i<100 DO
IF i<10 THEN
SET table_name = CONCAT('t_user_0',i);
ELSE
SET table_name = CONCAT('t_user_',i);
END IF;
SET sql_text=CONCAT('CREATE TABLE ',table_name,
table_body);
SELECT sql_text;
SET @sql_text=sql_text;
PREPARE stmt FROM @sql_text;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET i=i+1;
END WHILE;
END$$
DELIMITER ;
docker run --name=mysql-master-1 \
--privileged=true \
-p 3306:3306 \
-v /usr/local/mysql/master1/data/:/var/lib/mysql \
-v /usr/local/mysql/master1/conf/my.cnf:/etc/mysql/my.cnf \
-v /usr/local/mysql/master1/mysql-files/:/var/lib/mysql-files/ \
-e MYSQL_ROOT_PASSWORD=123456 \
-d mysql:8.0.28 --lower_case_table_names=1
docker run --name=mysql-slave-1 \
--privileged=true \
-p 3307:3306 \
-v /usr/local/mysql/slave1/data/:/var/lib/mysql \
-v /usr/local/mysql/slave1/conf/my.cnf:/etc/mysql/my.cnf \
-v /usr/local/mysql/slave1/mysql-files/:/var/lib/mysql-files/ \
-e MYSQL_ROOT_PASSWORD=123456 \
-d mysql:8.0.28 --lower_case_table_names=1
# 主数据库创建用户 slave 并授权
# 创建用户,设置主从同步的账户名
create user 'douyu-slave'@'%' identified with mysql_native_password by '123456';
# 授权
grant replication slave on *.* to 'douyu-slave'@'%';
# 刷新权限
flush privileges;
# 查询 server_id 值
show variables like 'server_id';
# 也可临时(重启后失效)指定 server_id 的值(主从数据库的 server_id 不能
相同)
set global server_id = 1;
# 查询 Master 状态,并记录 File 和 Position 的值,这两个值用于和下边的从数
据库中的 change 那条 sql 中
的 master_log_file,master_log_pos 参数对齐使用
show master status;
show binlog events;
# 重置下 master 的 binlog 位点
reset master;
# 进入从数据库
# 注意:执行完此步骤后退出主数据库,防止再次操作导致 File 和 Position 的值
发生变化# 验证 slave 用户是否可用
# 查询 server_id 值
show variables like 'server_id';
# 也可临时(重启后失效)指定 server_id 的值(主从数据库的 server_id 不能
相同)
set global server_id = 2;
# 若之前设置过同步,请先重置
stop slave;
reset slave;
# 若出现错误,则停止同步,重置后再次启动
stop slave;
reset slave;
start slave;
# 设置主数据库
change master to master_host='172.17.0.2',master_port=3306,master_user='douyu-slave',master_password='123456',master_log_file='mysql-bin.000001',master_log_pos=157;
# 开始同步
start slave;
# 查询 Slave 状态
show slave status;
reset master;
# 若出现错误,则停止同步,重置后再次启动
stop slave;
reset slave;
# 设置主数据库
change master to master_host='172.17.0.2',master_port=3306,master_user='douyu-slave',master_password='123456',master_log_file='mysql-bin.000001',master_log_pos=157;
# 开始同步
start slave;
# 查询 Slave 状态
show slave status;
②读写分离
4-8PDF
6. 分布式缓存redis引入
①引入
4-10PDF
KeyBuilder 的设计:设计一个统一的 Key 管理类,希望每个 Redis 的 key 都能有统一的前缀进行管理和匹配。
keyBuilder类需要写到META-INF/spring,实现提前提前加载到应用上下文(桶之前,把引用写到那个提前加载的文件)。但是若有多个KeyBuilder类,会将用不到的类加载。使用RedisKeyLoadMatch实现,只加载匹配用到的类的Key生成器,详见代码注解。
//批量查询用户+redis优化
public Map<Long, UserDTO> batchQueryUserInfo(List<Long> userIdList) {
if(CollectionUtils.isEmpty(userIdList)) {
return Maps.newHashMap();
}
userIdList=userIdList.stream().
filter(id-> id>10000).//假定id>1w
collect(Collectors.toList());
if(CollectionUtils.isEmpty(userIdList)){
return Maps.newHashMap();
}
//redis优化
//循环单个查询判断是否在redis,性能差
//这里使用redisTemplate.opsForValue().multiGet();
List<String> keyList=new ArrayList<>();
userIdList.forEach(userId->{//转化为redis的Key
keyList.add(userProviderCacheKeyBuilder.buildUserInfoKey(userId));
});//multiGet查询不到key,会赛一个Null到集合,所以这里我们判空一下
List<UserDTO> userDTOList=redisTemplate.opsForValue().multiGet(keyList)
.stream().filter(x->x!=null).collect(Collectors.toList());
//判断是否所有要查询的用户都在缓存
if(!CollectionUtils.isEmpty(userDTOList)&&userDTOList.size()==userIdList.size()){
return userDTOList.stream().
collect(Collectors.toMap(UserDTO::getUserId, userDTO->userDTO));
}
List<Long> userIdInCacheList=userDTOList.stream().//取出实际在redis的user的Id
map(UserDTO::getUserId).collect(Collectors.toList());
List<Long> userIdNotInCacheList=userIdList.stream()//不在缓存的user id集合
.filter(x->!userIdInCacheList.contains(x))
.collect(Collectors.toList());
// 直接走Mysql, 性能不太好
// 底层使用union all实现,若有100张分表,性能差
// userMapper.selectBatchIds(userIdList);
//我们本地开启多线程,不同表id分成不同组,交给
//多线程去查询,最终归并返回即可
Map<Long, List<Long>> userIdMap=
userIdNotInCacheList.stream().//片键分组
collect(Collectors.groupingBy(userId->userId%100));
List<UserDTO> dbQueryResult=new CopyOnWriteArrayList<>();//使用线程安全容器
userIdMap.values().parallelStream()
.forEach(queryUserIdList->{
dbQueryResult.addAll(ConvertBeanUtils.convertList(
userMapper.selectBatchIds(queryUserIdList), UserDTO.class
));
});
//当前业务下:可能有多个用户重复进入,所以可以塞入缓存优化
if(!CollectionUtils.isEmpty(dbQueryResult)){
Map<String, UserDTO> saveCacheMap = dbQueryResult.stream()
.collect(Collectors.toMap(userDTO -> userProviderCacheKeyBuilder
.buildUserInfoKey(userDTO.getUserId()), x -> x));
redisTemplate.opsForValue().multiSet(saveCacheMap);
}
userDTOList.addAll(dbQueryResult);
return userDTOList.stream().
collect(Collectors.toMap(UserDTO::getUserId, userDTO->userDTO));
}
②缓存的过期时间
4-13
过期时间过短:很快过期导致数据库压力大;
过长:缓存占用内存过大
//单个设置过期时间
redisTemplate.opsForValue().set(key,userDTO, 30, TimeUnit.MINUTES);
//多用户设置过期时间
//批量 key 设置过期时间,建议批量的 key 的过期时间不要设置相同,随机性多一些。
/**
* 创建随机的过期时间 用于 redis 设置 key 过期
*/
private int createRandomExpireTime() {
int time= ThreadLocalRandom.current().nextInt(100);
return time + 30 * 60;
}
//管道批量传输命令,减少网络IO开销
redisTemplate.opsForValue().multiSet(saveCacheMap);
//管道批量传输命令,减少网络IO开销
redisTemplate.executePipelined(new SessionCallback<Object>() {
@Override
public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
for(String redisKey: saveCacheMap.keySet()) {
operations.expire((K) redisKey, createRandomExpireTime(), TimeUnit.SECONDS);
}
return null;
}
});
7. 缓存和数据库一致性问题
引入消息队列4-15,解决缓存和数据库一致性问题。
//生产
userMapper.updateById(ConvertBeanUtils.convert(userDTO, UserPO.class));
//写时删除缓存,并使用消息队列延迟双删,保证数据一致性
String key= userProviderCacheKeyBuilder.buildUserInfoKey(userDTO.getUserId());
redisTemplate.delete(key);//第一次删除
try {
Message message=new Message();
message.setBody(JSON.toJSONString(userDTO).getBytes());
message.setTopic("user-update-cache");//设置主题,消费者那的
message.setDelayTimeLevel(1);//延迟级别,1代表1s发送
mqProducer.send(message);
} catch (Exception e) {
throw new RuntimeException(e);
}
//消费
String msgStr = new String(msgs.get(0).getBody());
UserDTO userDTO = JSON.parseObject(msgStr, UserDTO.class);
if (userDTO == null || userDTO.getUserId() == null) {
LOGGER.error("用户 id 为空,参数异常,内容: {}", msgStr);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//延迟消息的回调,处理相关的缓存二次删除
redisTemplate.delete(userProviderCacheKeyBuilder.buildUserInfoKey(
userDTO.getUserId()));
LOGGER.error("延迟删除处理,userDTO is {}", userDTO);
8. 分布式id的生成
step过低导致同步慢获取不出来id, 过高导致浪费
vesion是为了乐观锁,防止多个线程更新字段,由于Id发号器服务器不会太多,所以不用redis分布式锁这种高性能锁。
实现4-17PDF
ConcurrentHashMap
是 Java 提供的一个线程安全的哈希表实现,主要用于在多线程环境中高效地存储和访问键值对。初始化->使用>75->刷新,异步更新保证效率
有序id生成
/**
* 在内存中记录的当前有序id的值
AtomicLong 保证多线程下,自增时线程安全
*/
private AtomicLong currentNum;
//键值对:分布式策略id主键:对应的生成对象
内存:private static Map<Integer, LocalSeqIdBO> localSeqIdBOMap=new ConcurrentHashMap<>();
因为当前类是被容器托管,所以在类初始化时对map也初始化,所以可利用InitializingBean的回调接口去初始化map
InitializingBean 是 Spring 框架中的一个接口,用于在 Bean 初始化完成后执行某些操作。它的主要用途是在 Spring 容器完全初始化 Bean 之后,执行自定义的初始化逻辑。
@Service
public class IdGenerateServiceImpl implements IdGenerateService, InitializingBean {
private static final Logger LOGGER=LoggerFactory.getLogger(IdGenerateServiceImpl.class);
&&&&&&&&&&&&
@Service
public class IdGenerateServiceImpl implements IdGenerateService, InitializingBean {
@Resource
private IdGenerateMapper idGenerateMapper;
private static final Logger LOGGER=LoggerFactory.getLogger(IdGenerateServiceImpl.class);
//键值对:分布式策略id主键:对应的生成对象
private static Map<Integer, LocalSeqIdBO> localSeqIdBOMap=new ConcurrentHashMap<>();
//更新id段预值
private static final float UPDATE_RATE=0.75f;
//异步线程池
private static ThreadPoolExecutor threadPoolExecutor
=new ThreadPoolExecutor(8, 16, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000),
new ThreadFactory() {//重写以便,重写名字方便debug
@Override
public Thread newThread(Runnable r) {
Thread thread=new Thread(r);
thread.setName("id-generate-thread-"+ThreadLocalRandom.current().nextInt(1000));
return thread;
}
});
// Semaphore 是 Java 并发包中的一个类,位于 java.util.concurrent 包中,
// 用于控制对共享资源的访问。它通过维护一组许可证来实现对资源的限制,允许一定数量的线程同时访问资源。
//不同id生成策略的id对应不同限流器
private static Map<Integer, Semaphore> semaphoreMap=new ConcurrentHashMap<>();
/**
* 刷新本地有序id段
* //异步线程池
* private static ThreadPoolExecutor threadPoolExecutor
* @param localSeqIdBO
*/
private void refreshLocalSeqIdBO(LocalSeqIdBO localSeqIdBO){
long setp=localSeqIdBO.getNextThreshold()-localSeqIdBO.getCurrentStart();
long used=localSeqIdBO.getCurrentNum().get()-localSeqIdBO.getCurrentStart();
if(used>setp*UPDATE_RATE){//若已经使用的id数大于总数的0.75
//若同步执行,涉及网络IO,性能较慢,可以异步执行
//若更新较慢,可能有大量请求进入这里的异步更新这块
//所以需要进行限制,所以一旦有一个任务执行,其他就不要再提交异步任务了
//private static Map<Integer, Semaphore> semaphoreMap
Semaphore semaphore=semaphoreMap.get(localSeqIdBO.getId());
if(semaphore==null) {
LOGGER.error("semaphore is null, id is {}", localSeqIdBO.getId());
return;
}
boolean acquireStatus=semaphore.tryAcquire();//尝试进入临界区
if(acquireStatus){//成功,则进入异步任务
LOGGER.info("开始尝试进行本地id段的同步操作");
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
IdGeneratePO idGeneratePO=idGenerateMapper.selectById(localSeqIdBO.getId());
tryUpdateMySQLRecord(idGeneratePO);
semaphoreMap.get(localSeqIdBO.getId()).release();//任务执行完后释放
LOGGER.info("本地id段的同步完成");
}
});
}
}
}
/**
* 得到有序id
* 此处的id是t_id_generate_config中的主键id, 表示不同的id生成策略
* @param id
* @return
*/
@Override
public Long getSeqId(Integer id) {
if (id == null) {
LOGGER.error("[getSeqId] id is error, id is {}", id);
return null;
}
LocalSeqIdBO localSeqIdBO=localSeqIdBOMap.get(id);//当请求到达时,取得生成的id对象
if(localSeqIdBO==null){
LOGGER.error("[getSeqId] localSeqIdBO is null, id is {}", id);
}
/**
* 当当前id段用完
* 尾值用不到
* 10000 10100
* 10050 10150
*
* 10050 10150
* 10100 10200
*
* update sql
* 网络io
* update sql重试,select 几次网络io
* 更新内存map
*
* 网络阻塞会导致上游调用方崩溃
*/
/**
* 设置更新预值,id快用完时就去更新id段
* private static final float UPDATE_RATE=0.75f;
*/
this.refreshLocalSeqIdBO(localSeqIdBO);//异步刷新本地id段
//本地当前id字段用完就不要生成id, 防止占用别的id段
if(localSeqIdBO.getCurrentNum().get()>= localSeqIdBO.getNextThreshold()) {
/**
* 这里直接失败,而不是采用同步刷新
* Dubbo采用业务线程池去处理远程调用
* 若采用同步刷新,会有大量请求堆积在这里
* 1. 占用dubbo业务线程池,影响别的业务获取分布式id(可能是别的生成id策略)
* 1id要堵塞,2id不需要堵塞(直接内存取),却因1id而堵塞
*/
LOGGER.error("[getSeqId] localSeqIdBO is over limit, id is {}", id);
return null;
}
long returnId=localSeqIdBO.getCurrentNum().getAndIncrement();
return returnId;
}
/**
* 更新mysql里面的分布式id的配置信息,占用相应id段
* 若同步执行,涉及网络IO,性能较慢,可以异步执行
* @param idGeneratePO
*/
private void tryUpdateMySQLRecord(IdGeneratePO idGeneratePO) {
long currentStart=idGeneratePO.getCurrentStart();//当前开始位置
long nextThreshold=idGeneratePO.getNextThreshold();//结束位置
long currentNum=currentStart;//本地当前值
int updateResult=idGenerateMapper.updateNewIdCountAndVersion(idGeneratePO.getId(), idGeneratePO.getVersion());
if(updateResult>0) {//更新成功,不需要查
LocalSeqIdBO localSeqIdBO=new LocalSeqIdBO();
AtomicLong atomicLong=new AtomicLong(currentNum);
localSeqIdBO.setId(idGeneratePO.getId());
localSeqIdBO.setCurrentNum(atomicLong);//最开始当前值=起始值
localSeqIdBO.setCurrentStart(currentStart);//设置起始值
localSeqIdBO.setNextThreshold(nextThreshold);//设置终止值
localSeqIdBOMap.put(localSeqIdBO.getId(), localSeqIdBO);
return;
}
//重试机制
for(int i=0;i<3;++i) {//若更新失败,因为可能有别的服务器也在更新
//再去读最新的IdGeneratePO
idGeneratePO=idGenerateMapper.selectById(idGeneratePO.getId());
updateResult=idGenerateMapper.updateNewIdCountAndVersion(idGeneratePO.getId(), idGeneratePO.getVersion());
if(updateResult>0) {//若更新成功
LocalSeqIdBO localSeqIdBO=new LocalSeqIdBO();
AtomicLong atomicLong=new AtomicLong(idGeneratePO.getCurrentStart());
localSeqIdBO.setId(idGeneratePO.getId());
localSeqIdBO.setCurrentNum(atomicLong);//最开始当前值=起始值
localSeqIdBO.setCurrentStart(idGeneratePO.getCurrentStart());//设置起始值
localSeqIdBO.setNextThreshold(idGeneratePO.getNextThreshold());//设置终止值
localSeqIdBOMap.put(localSeqIdBO.getId(), localSeqIdBO);//更新
return;
}
}
//若3次还没更新成功,抛出异常
throw new RuntimeException("表id段占用失败,竞争激烈, id is "+idGeneratePO.getId());
}
//bean初始化时会回调到这里
@Override
public void afterPropertiesSet() throws Exception {
List<IdGeneratePO> idGeneratePOList=idGenerateMapper.selectAll();
for(IdGeneratePO idGeneratePO:idGeneratePOList){
tryUpdateMySQLRecord(idGeneratePO);
//初始化限流器,每次只能有一个异步线程去更新数据库
semaphoreMap.put(idGeneratePO.getId(), new Semaphore(1));
}
}
}
//无序id生成
//无序id map
private static Map<Integer, LocalUnSeqIdBO> localUnSeqIdBOMap
//BO对象
public class LocalUnSeqIdBO {
private int id;
/**
* 提前将无序的id存放在这条队列中
*/
private ConcurrentLinkedQueue<Long> idQueue;
/**
* 当前id段的开始值
*/
private Long currentStart;
/**
* 当前id段的结束值
*/
private Long nextThreshold;
&&&&&&&&&&&&&&&&&&
@Service
public class IdGenerateServiceImpl implements IdGenerateService, InitializingBean {
@Resource
private IdGenerateMapper idGenerateMapper;
private static final Logger LOGGER=LoggerFactory.getLogger(IdGenerateServiceImpl.class);
//键值对:分布式策略id主键:对应的生成对象,有序id Map
private static Map<Integer, LocalSeqIdBO> localSeqIdBOMap=new ConcurrentHashMap<>();
//无序id map
private static Map<Integer, LocalUnSeqIdBO> localUnSeqIdBOMap=new ConcurrentHashMap<>();
//更新id段预值
private static final float UPDATE_RATE=0.75f;
private static final int SEQ_ID=1;//表示有序Id
//异步线程池
private static ThreadPoolExecutor threadPoolExecutor
=new ThreadPoolExecutor(8, 16, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000),
new ThreadFactory() {//重写以便,重写名字方便debug
@Override
public Thread newThread(Runnable r) {
Thread thread=new Thread(r);
thread.setName("id-generate-thread-"+ThreadLocalRandom.current().nextInt(1000));
return thread;
}
});
// Semaphore 是 Java 并发包中的一个类,位于 java.util.concurrent 包中,
// 用于控制对共享资源的访问。它通过维护一组许可证来实现对资源的限制,允许一定数量的线程同时访问资源。
//不同id生成策略的id对应不同限流器
private static Map<Integer, Semaphore> semaphoreMap=new ConcurrentHashMap<>();
/**
* 得到有序id
* 此处的id是t_id_generate_config中的主键id, 表示不同的id生成策略
* @param id
* @return
*/
@Override
public Long getSeqId(Integer id) {
if (id == null) {
LOGGER.error("[getSeqId] id is error, id is {}", id);
return null;
}
LocalSeqIdBO localSeqIdBO=localSeqIdBOMap.get(id);//当请求到达时,取得生成的id对象
if(localSeqIdBO==null){
LOGGER.error("[getSeqId] localSeqIdBO is null, id is {}", id);
return null;
}
/**
* 当当前id段用完
* 尾值用不到
* 10000 10100
* 10050 10150
*
* 10050 10150
* 10100 10200
*
* update sql
* 网络io
* update sql重试,select 几次网络io
* 更新内存map
*
* 网络阻塞会导致上游调用方崩溃
*/
/**
* 设置更新预值,id快用完时就去更新id段
* private static final float UPDATE_RATE=0.75f;
*/
this.refreshLocalSeqIdBO(localSeqIdBO);//异步刷新本地id段
// //本地当前id字段用完就不要生成id, 防止占用别的id段
// if(localSeqIdBO.getCurrentNum().get()>= localSeqIdBO.getNextThreshold()) {
// /**
// * 这里直接失败,而不是采用同步刷新
// * Dubbo采用业务线程池去处理远程调用
// * 若采用同步刷新,会有大量请求堆积在这里
// * 1. 占用dubbo业务线程池,影响别的业务获取分布式id(可能是别的生成id策略)
// * 1id要堵塞,2id不需要堵塞(直接内存取),却因1id而堵塞
// */
// LOGGER.error("[getSeqId] localSeqIdBO is over limit, id is {}", id);
// return null;
// }
//
// long returnId=localSeqIdBO.getCurrentNum().getAndIncrement();
long returnId=localSeqIdBO.getCurrentNum().getAndIncrement();
if(returnId>= localSeqIdBO.getNextThreshold()) {//即使一直加,也不会被返回,保证线程安全
LOGGER.error("[getSeqId] localSeqIdBO is over limit, id is {}", id);
return null;
}
return returnId;
}
/**
* 刷新本地有序id段
* //异步线程池
* private static ThreadPoolExecutor threadPoolExecutor
* @param localSeqIdBO
*/
private void refreshLocalSeqIdBO(LocalSeqIdBO localSeqIdBO){
long setp=localSeqIdBO.getNextThreshold()-localSeqIdBO.getCurrentStart();
long used=localSeqIdBO.getCurrentNum().get()-localSeqIdBO.getCurrentStart();
if(used>setp*UPDATE_RATE){//若已经使用的id数大于总数的0.75
//若同步执行,涉及网络IO,性能较慢,可以异步执行
//若更新较慢,可能有大量请求进入这里的异步更新这块
//所以需要进行限制,所以一旦有一个任务执行,其他就不要再提交异步任务了
//private static Map<Integer, Semaphore> semaphoreMap
Semaphore semaphore=semaphoreMap.get(localSeqIdBO.getId());
if(semaphore==null) {
LOGGER.error("semaphore is null, id is {}", localSeqIdBO.getId());
return;
}
boolean acquireStatus=semaphore.tryAcquire();//尝试进入临界区
if(acquireStatus){//成功,则进入异步任务
LOGGER.info("开始尝试进行本地有序id段的同步操作");
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
try {
IdGeneratePO idGeneratePO=idGenerateMapper.selectById(localSeqIdBO.getId());
tryUpdateMySQLRecord(idGeneratePO);
}catch (Exception e) {
LOGGER.error("[refreshLocalSeqIdBO] error is ", e);
}finally {
semaphoreMap.get(localSeqIdBO.getId()).release();//任务执行完后释放
LOGGER.info("本地有序id段的同步完成, id is {}", localSeqIdBO.getId());
}
}
});
}
}
}
/**
* 刷新本地无序id段
* @param localUnSeqIdBO
*/
private void refreshLocalUnSeqIdBO(LocalUnSeqIdBO localUnSeqIdBO) {
long begin=localUnSeqIdBO.getCurrentStart();
long end=localUnSeqIdBO.getNextThreshold();
long remainSize=localUnSeqIdBO.getIdQueue().size();//剩余id数
if((end-begin)*0.25>remainSize) {//已用75
Semaphore semaphore=semaphoreMap.get(localUnSeqIdBO.getId());
if(semaphore==null) {
LOGGER.error("semaphore is null, id is {}", localUnSeqIdBO.getId());
return;
}
boolean acquireStatus=semaphore.tryAcquire();//尝试进入临界区
if(acquireStatus){//成功,则进入异步任务
LOGGER.info("开始尝试进行本地无序id段的同步操作");
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
try {
IdGeneratePO idGeneratePO = idGenerateMapper.selectById(localUnSeqIdBO.getId());
tryUpdateMySQLRecord(idGeneratePO);
}catch (Exception e) {
LOGGER.error("[refreshLocalUnSeqIdBO] error is ", e);
} finally {//确保不会因为tryUpdateMySQLRecord的异常,而无法释放
semaphoreMap.get(localUnSeqIdBO.getId()).release();//任务执行完后释放
LOGGER.info("本地无序id段同步完成,id is {}", localUnSeqIdBO.getId());
}
}
});
}
}
}
@Override
public Long getUnSeqId(Integer id) {
if (id == null) {
LOGGER.error("[getUnSeqId] id is error, id is {}", id);
return null;
}
LocalUnSeqIdBO localUnSeqIdBO=localUnSeqIdBOMap.get(id);//当请求到达时,取得生成的id对象
if(localUnSeqIdBO==null){
LOGGER.error("[getUnSeqId] localUnSeqIdBO is null, id is {}", id);
return null;
}
this.refreshLocalUnSeqIdBO(localUnSeqIdBO);
Long returnId=localUnSeqIdBO.getIdQueue().poll();//队列就是线程安全的队列
if(returnId==null) {
LOGGER.error("[getUnSeqId] returnId is null, id is {}", id);
return null;
}
return returnId;
}
/**
* 封装本地BO对象
* 将本地id对象放入map,并进行初始化
* @param idGeneratePO
*/
private void localIdBOHandler(IdGeneratePO idGeneratePO) {
long currentStart=idGeneratePO.getCurrentStart();//当前开始位置
long nextThreshold=idGeneratePO.getNextThreshold();//结束位置
long currentNum=currentStart;//本地当前值
if(idGeneratePO.getIsSeq()==SEQ_ID) {//若PO是有序id
LocalSeqIdBO localSeqIdBO=new LocalSeqIdBO();
AtomicLong atomicLong=new AtomicLong(currentNum);
localSeqIdBO.setId(idGeneratePO.getId());
localSeqIdBO.setCurrentNum(atomicLong);//最开始当前值=起始值
localSeqIdBO.setCurrentStart(currentStart);//设置起始值
localSeqIdBO.setNextThreshold(nextThreshold);//设置终止值
localSeqIdBOMap.put(localSeqIdBO.getId(), localSeqIdBO);
} else {//是无序id
LocalUnSeqIdBO localUnSeqIdBO=new LocalUnSeqIdBO();
localUnSeqIdBO.setCurrentStart(currentStart);
localUnSeqIdBO.setNextThreshold(nextThreshold);
localUnSeqIdBO.setId(idGeneratePO.getId());
//提前生成start到end的数字,并打乱放入队列
Long begin=localUnSeqIdBO.getCurrentStart();
Long end=localUnSeqIdBO.getNextThreshold();
List<Long> idList=new ArrayList<>();
for(long i=begin;i<end;++i)
idList.add(i);
Collections.shuffle(idList);//重洗idList
ConcurrentLinkedQueue<Long> idQueue=new ConcurrentLinkedQueue<>();
idQueue.addAll(idList);
localUnSeqIdBO.setIdQueue(idQueue);
localUnSeqIdBOMap.put(localUnSeqIdBO.getId(), localUnSeqIdBO);
}
}
/**
* 更新mysql里面的分布式id的配置信息,占用相应id段
* 若同步执行,涉及网络IO,性能较慢,可以异步执行
* @param idGeneratePO
*/
private void tryUpdateMySQLRecord(IdGeneratePO idGeneratePO) {
int updateResult=idGenerateMapper.updateNewIdCountAndVersion(idGeneratePO.getId(), idGeneratePO.getVersion());
if(updateResult>0) {//更新成功,不需要查
localIdBOHandler(idGeneratePO);
return;
}
//重试机制
for(int i=0;i<3;++i) {//若更新失败,因为可能有别的服务器也在更新
//再去读最新的IdGeneratePO
idGeneratePO=idGenerateMapper.selectById(idGeneratePO.getId());
updateResult=idGenerateMapper.updateNewIdCountAndVersion(idGeneratePO.getId(), idGeneratePO.getVersion());
if(updateResult>0) {//若更新成功
localIdBOHandler(idGeneratePO);
return;
}
}
//若3次还没更新成功,抛出异常
throw new RuntimeException("表id段占用失败,竞争激烈, id is "+idGeneratePO.getId());
}
//bean初始化时会回调到这里
@Override
public void afterPropertiesSet() throws Exception {
List<IdGeneratePO> idGeneratePOList=idGenerateMapper.selectAll();
for(IdGeneratePO idGeneratePO:idGeneratePOList){
tryUpdateMySQLRecord(idGeneratePO);
//初始化限流器,每次只能有一个异步线程去更新数据库
semaphoreMap.put(idGeneratePO.getId(), new Semaphore(1));
}
}
}
后序优化:如何写一些接口,动态更新id生成策略的上限等值,并同步到内存?
五. 用户标签实战
1. 介绍
2. 常见实现思路
3. 实现
PDF5-4~5-6
4. 优化
①优化sql次数
优化了sql次数
只能允许第一次设置成功
${fieldName} & #{tag}=0表示还没有被设置
@Update("update t_user_tag set ${fieldName}=${fieldName} | #{tag} where user_id=#{userId} and ${fieldName} & #{tag}=0")
int setTag(Long userId, String fieldName, long tag);
/**
* 使用先取反在与的思路来取消标签,只能允许第一次删除成功
* @param userId
* @param fieldName
* @param tag
* @return
*/
${fieldName} & #{tag}=#{tag}表示用户确实包含这个,才允许撤销
@Update("update t_user_tag set ${fieldName}=${fieldName} &~ #{tag} where user_id=#{userId} and ${fieldName} & #{tag}=#{tag}")
int cancelTag(Long userId, String fieldName, long tag);
②用户标签记录不存在的问题,以及多线程插入问题
设置标签优化
@Override
public boolean setTag(Long userId, UserTagsEnum userTagsEnum) {
//尝试update true就直接返回成功
//设置了标签,没有记录(两种失败场景)
//select is null查询记录为空,insert则插入,再更新update
boolean updateStatus=userTagMapper.setTag(userId, userTagsEnum.getFieldName(), userTagsEnum.getTag())>0;
if(updateStatus) return true;//更新成功直接返回
/**
* A B线程同时都执行到这里
* 由于userId唯一,多次执行
* 会导致失败
*
* 此外多个线程都打入mysql, 效率低
* 所以多线程下,若用户记录不存在
* 我们只允许一个线程去执行插入操作
*
* 标签服务可能启动多个,导致了分布式下多进程问题
*/
//失败则验证记录是否存在
UserTagPO userTagPO=userTagMapper.selectById(userId);
if(userTagPO!=null){//表明设置重复了
return false;
}
//两条指令,不具备原子性
// redisTemplate.opsForValue().setIfAbsent();//若key已经存在就设置失败
// redisTemplate.expire();//过期时间
String setNxKey = cacheKeyBuilder.buildTagLockKey(userId);
//替换为一条指令
String setNxResult=redisTemplate.execute(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
RedisSerializer keySerializer = redisTemplate.getKeySerializer();
RedisSerializer valueSerializer = redisTemplate.getValueSerializer();
return (String) connection.execute("set", keySerializer.serialize(setNxKey),
valueSerializer.serialize("-1"),
"NX".getBytes(StandardCharsets.UTF_8),//键不存在时设置
"EX".getBytes(StandardCharsets.UTF_8),//过期时间
"3".getBytes(StandardCharsets.UTF_8));
}
});
//
if (!"OK".equals(setNxResult)) {//若已经有别的线程在插入了,直接返回失败即可
return false;
}
//无记录则插入
userTagPO=new UserTagPO();
userTagPO.setUserId(userId);
userTagMapper.insert(userTagPO);
updateStatus=userTagMapper.//再次更新
setTag(userId, userTagsEnum.getFieldName(), userTagsEnum.getTag())>0;
redisTemplate.delete(setNxKey);//设置成功则删除缓存里的key
return updateStatus;
}
缓存清理同样使用延迟双删,并新增了删除缓存类,缓存枚举码,消息队列主题常量类
六. 用户中台应用实现
1. 介绍
2. SpringBoot 应用的 Docker 容器 化部署
PDF6-2
服务日志规范化
PDF6-4
3. 引入nacos配置中心
PDF6-6
4. 基于SPI机制修改ShardingJDBC底层,实现nacos配置数据源
pdf6-7
org.idea.douyu.live.framework.datasource.starter.config.NacosDriverURLProvider
application 的url nacos:
5. 引入网关
pdf6-10
新建模块douyu-live-gateway
6. 引入compose容器部署
7. 用户中台压力测试
pdf6-11
8. UI界面设计讲解
Pixso免费设计页面
9. 登录注册流程--短信验证码实现
pdf6-15
生成验证码,4位,6位(取它),有效期(30s,60s),同一个手机号不能重发,redis去存储验证码
发生短信需要先发送http请求到第三方,第三方再去发送短信到用户,然后将结果返回,比较耗时,用异步去发。
10. 登录注册流程--手机号登录注册
pdf6-16
缓存击穿,空值缓存:当缓存没有查到数据,但是数据库也没有查到数据,此时我们new一个新的空值对象放入缓存,以便下次再查询时,直接在缓存层返回结果,不用去数据库查询。
对手机号这些信息进行加密和解密转换(凡是涉及到DB的操作)。
注意无序id时,common数据库,类型改成0。
11. 前后端联调
pdf6-19
统一返回对象WebResponseVO
后端设置cookie属性
Cookie cookie = new Cookie("dytk", userLoginDTO.getToken());
//设置顶级域名,接着我们的二级域名中进行 web 页面的访问,后续请求就会
//携带上它了
cookie.setDomain("127.0.0.1");
cookie.setPath("/");
cookie.setMaxAge(30 * 24 * 3600);
response.setHeader("Access-Control-Allow-Credentials",
"true");
response.addCookie(cookie);
//app.qiyu.live.com/html/xx.html 前台
//app.qiyu.live.com/live/api/userLogin/login 后端
//设置顶级域名qiyu.live.com,接着我们的二级域名app.qiyu.live.com
// 中进行 web 页面的访问,后续请求就会
//携带上它了
12. 接口鉴权模块的开发
账户服务的职责:负责对外界请求的 token 进行校验,需要频繁访问 redis(原来userProvider接口并发就大,再加上token校验并发也大)
所以使用单独微服务去承载token访问的压力
PDF6-21
所有token操作都由此模块来负责
13. 网关服务引入鉴权+白名单功能
pdf6-22
// gateway --(header)--> springboot-web(interceptor-->get header)
//将userId封装到请求头,下游不需要再根据cookie去取userId
ServerHttpRequest.Builder builder = request.mutate();
builder.header(GatewayHeaderEnum.USER_LOGIN_ID.getName(), String.valueOf(userId));
14. 下游服务提取gateway塞入的userId
定义org.douyu.live.web.starter.context.QiyuUserInfoInterceptor拦截器去拦截
/实现父子线程之间的线程本地变量传递
七. IM系统实现
1. 介绍
2. IM系统发展过程
3. 直播下的IM架构设计
4. WebSocket和Tcp长连接
①WebSocket长连接
②TCP长连接
③对比
为什么开发Web浏览器页面很难使用原生Tcp,而用WebSocket。而客户端App应用可以用原生Tcp
5. 网络的三种IO模型介绍
①介绍
②BIO实现
③NIO实现
④基于Selector的高性能NIO实现
select():当没有请求CPU会让出?Linux基于EpollSelectorImpl去实现
⑤AIO实现
6. IM核心实现
im-core-server编解码器实现
im-core-server核心处理handler的实现,继承SimplyHandler来处理不同的code用不同的类
ImHandlerFactory消息工厂,做消息的处理和分发ImHandlerFactoryImpl
ChannelHandlerContextCache去将ctx和用户userId对应起来,以便异步推送。ChannelHandlerContextCache.put(userId, ctx);//记录userId对应的ctx
//作为二次开发的属性,ctx登录时记录userId这一属性
ctx.attr(ImContextAttr.USER_ID).set(userId);
//todo
//作为二次开发的属性
ImContextUtils.setUserId(ctx, userId);
ImServerCoreHandler
//它表示通道(Channel)处于不活动状态。这通常意味着连接已关闭或失去,
// 可能是由于客户端主动断开连接、网络故障或服务器关闭等原因。
//执行该事件
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}
7. IM心跳功能实现
心跳用于检测,当前用户是否在线状态
心跳包record记录(暂时性的数据),redis存储心跳记录
zSet集合存储心跳记录,基于userId取模从而得到key(防止大key), key(userId)-score(心跳时间)
douyu-live-im-core-server:hertbeat:999:zset
8. IM业务消息通信
jsonObject.put("userId",userId);//发送方
jsonObject.put("objectId", 1001101L);//接收方是谁
// 记录每个用户连接的im服务器地址,然后根据im服务器的连接地址去做具体机器的调用
// 基于mq广播思路去做,可能会有消息风暴发生,100台im机器,99%的mq消息都是无效的,
// 加入一个叫路由层的设计,router中转的设计,router就是一个dubbo的rpc层
// A--》B im-core-server -> msg-provider(持久化) -> im-core-server -> 通知到b
上面图的在线路由模式
im-core-server-interfaces暴露接口给router层
router用rpc给别的调用,其实高并发下用mq更合适。SPI机制,DubboSPI要想生效,必须在META-INF.dubbo.internal中配置
/**
* 基于Cluster去做spi扩展,实现根据rpc上下文来选择具体请求的机器
*
*/
public class ImRouterCluster implements Cluster {
@Override//dubbo在做集群筛选时会有jion方法的回调
public <T> Invoker<T> join(Directory<T> directory, boolean buildFilterChain) throws RpcException {
return new ImRouterClusterInvoker<>(directory);
}
}
public class ImRouterClusterInvoker<T> extends AbstractClusterInvoker<T>//dubbo在做集群节点筛选时,按照特定的规则去筛选
//获取到指定的rpc服务提供者的所有地址信息
List<Invoker<T>> invokers = list(invocation);//服务提供者会被封装成一个个invokers对象
@Override
protected Result doInvoke(Invocation invocation, List list, LoadBalance loadbalance) throws RpcException {
checkWhetherDestroyed();
String ip = (String) RpcContext.getContext().get("ip");
if (StringUtils.isEmpty(ip)) {
throw new RuntimeException("ip can not be null!");
}
//获取到指定的rpc服务提供者的所有地址信息
List<Invoker<T>> invokers = list(invocation);//服务提供者会被封装成一个个invokers对象
Invoker<T> matchInvoker = invokers.stream().filter(invoker -> {
//拿到我们服务提供者的暴露地址(ip:端口的格式)
String serverIp = invoker.getUrl().getHost() + ":" + invoker.getUrl().getPort();
return serverIp.equals(ip);
}).findFirst().orElse(null);
if (matchInvoker == null) {
throw new RuntimeException("ip is invalid");
}
return matchInvoker.invoke(invocation);
}
boostrap.yml
dubbo: #指定消费时使用imRouter扩展类
consumer:
cluster: imRouter
im-core-server接受客户端长链接,后台数据推送也都依赖im-core-server和客户端的链接通道(里面只做简单的数据接受和推送:高性能高可用)
im-provider去对提供一些im服务:判断用户是否在线,对im扩展的业务接口放在im-provider里
im-router:路由层,转发消息请求,业务请求发到我这边,我做转发到对应B长连接的IM-core服务器
9. IM router模块后续完善,用户路由信息绑定
//去将ctx和用户userId对应起来
public class ChannelHandlerContextCache {
/**
* 当前的im服务启动的时候,对外暴露的ip和端口
* nacos上注册的ip和端口
*/
private static String SERVER_IP_ADDRESS = "";
//当前userId对应ctx
private static Map<Long, ChannelHandlerContext> channelHandlerContextMap = new HashMap<>();
///////////
NettyImServerStarter启动im core 服务器时,我们在内存记录下当前服务器的ip和port
ChannelHandlerContextCache.setServerIpAddress(SelfStartIp+":"+SelfStartPort);
LoginMsgHandler用户登录成功时,记录下userId连接当前服务器的channel
ChannelHandlerContextCache.put(userId, ctx);//记录userId对应的ctx,以及channel对应的userId appId
//todo
//作为二次开发的属性
ImContextUtils.setUserId(ctx, userId);
ImContextUtils.setAppId(ctx, appId);
并且在redis中记录userId对应的im-core服务器的地址和端口
//在redis中记录用户链接的im core服务器地址
//这里不用frame的redis key生成模块,是因为router模块也要查找userId对应的imcore服务器,而router的应用名称和imcore名称不同
//所以采用自定义的key
package org.douyu.live.im.core.server.interfaces.constants;
public class ImCoreServerConstants {
public static String IM_BIND_IP_KEY = "douyu:live:im:bind:ip:";
}
stringRedisTemplate.opsForValue().
set(ImCoreServerConstants.IM_BIND_IP_KEY + appId + ":" + userId,
ChannelHandlerContextCache.getServerIpAddress(),//过期时间是60s
ImConstants.DEFAULT_HEART_BEAT_GAP*2, TimeUnit.SECONDS);
//心跳包收到后,记得延长时间,防止过期
//延长用户之前保存的ip绑定记录时间HeartBeatImMsgHandler
stringRedisTemplate.expire(ImCoreServerConstants.IM_BIND_IP_KEY + appId + ":" + userId,
ImConstants.DEFAULT_HEART_BEAT_GAP * 2, TimeUnit.SECONDS);
//登出后以及非正常登出记得清楚缓存LogoutMsgHandler
//清除userId绑定im core ip的缓存
stringRedisTemplate.delete(ImCoreServerConstants.IM_BIND_IP_KEY + appId + ":" + userId);
ImRouterServiceImpl路由转发
public boolean sendMsg(ImMsgBody imMsgBody) {
//假设我们有100个userid -> 10台im服务器上 100个ip做分类 -> 最终ip的数量一定是<=10
String bindAddress = stringRedisTemplate.
opsForValue().
get(ImCoreServerConstants.IM_BIND_IP_KEY + imMsgBody.getAppId() + ":" + imMsgBody.getUserId());
if (StringUtils.isEmpty(bindAddress)) {
return false;
}
bindAddress = bindAddress.substring(0,bindAddress.indexOf(":"));
RpcContext.getContext().set("ip", bindAddress);//得到dubbo上下文,并且注入ip地址, 在clusterSPI中筛选imcore服务器
//将消息路由到接受userId对应的im core服务器,再由imcore服务器去将消息发给B接受方
routerHandlerRpc.sendMsg(imMsgBody);
return true;
}
Long userId = imMsgBody.getUserId();
ChannelHandlerContext ctx = ChannelHandlerContextCache.get(userId);
if (ctx != null) {//极端情况:router层是用户还在线,当到达im core层时用户可能下线,在这里判断
10. 用户在线检测功能实现(im-provider)
//当前userId对应ctx
private static Map<Long, ChannelHandlerContext> channelHandlerContextMap = new HashMap<>();
最准确的方式是去im-core服务查userId对应ctx,但是provider需要先知道userId对应的服务器(通过router层),链路比较长。
redisTemplate.hasKey(ImCoreServerConstants.IM_BIND_IP_KEY + appId + ":" + userId);
断线时也会自动删除
所以可以根据userId绑定的IP去判断,不需要太准确(极端情况:两次心跳包因为网络原因没发送过来,所以会导致缓存被删除)
public class ImOnlineServiceImpl implements ImOnlineService {
@Resource
private RedisTemplate<String,Object> redisTemplate;
@Override
public boolean isOnline(long userId, int appId) {
//不通过rpc掉im-core,减轻其压力
return redisTemplate.hasKey(ImCoreServerConstants.IM_BIND_IP_KEY + appId + ":" + userId);
}
11. IM-消息ACK确认机制
客户端收到业务消息,回写ack消息给im-core服务器表示收到消息
收到ack确认包后,进行删除map kv操作【AckImMsgHandler】
什么时候存储map:在im-core服务将消息推回给客户端时【package org.douyu.live.im.core.server.service.impl; RouterHandlerServiceImpl】 以及发送延迟消息sendDelayMsg
延迟消息会在【package org.douyu.live.im.core.server.consumer; ImAckConsumer】进行消费,判断是否收到ack, 是否需要重发消息`
@Override
public void onReceive(ImMsgBody imMsgBody) {
//需要进行消息通知的userid
if(sendMsgToClient(imMsgBody)) {
//当im服务器推送了消息给到客户端,然后我们需要记录下ack
msgAckCheckService.recordMsgAck(imMsgBody, 1);
msgAckCheckService.sendDelayMsg(imMsgBody);
}
}
public String buildImAckMapKey(Long userId,Integer appId) {//因为map主要存储没有收到消息的,量不大,100个map即可
return super.getPrefix() + IM_ACK_MAP + super.getSplitItem() + appId + super.getSplitItem() + userId % 100;//对100取模
redisTemplate.opsForHash().put(key, imMsgBody.getMsgId(), times);//值是消息重发的次数
sendDelayMsg
//等级1 -> 1s,等级2 -> 5s
message.setDelayTimeLevel(2);
12. rocketMq原理
①介绍
RocketMQ 的双主双从架构
②架构
③消息发送流程
/**
* 生产者发送消息给Broker, Broker内存在CommitLog,顺序记录所有发来的消息,同时将该消息的地址写入
* ConsumerQueue队列,若指定了ConsumerQueue,则会写入指定的ConsumerQueue队列中去maxOffset位置
*
*
* 每个topic消息话题对应多个ConsumerQueue队列,ConsumerQueue中不存储实际消息
* 而是存储消息的实际位置的地址(索引)
*
* 每个ConsumerQueue只能分给一个消费者消费, 多出来的消费者处于空闲状态不能消费ConsumerQueue
*
* ConsumerQueue队列里面是如何记录消费的情况呢?
* 写入指定的ConsumerQueue队列中去maxOffset位置,每次消费时会拉取一批消息
* 即实线消息,consumerOffset最初可能和minOffset在同一位置,拉取三个消息后
* 移动刀最新未消费的消息(第4个位置)。当拉取到本地消费的消息消费完了,会返回一个
* 类似ack一样的确认机制给到broker, minOffset主要记录最新一条收到ack确认的消息的位置的后面一个位置
*
* 若消费者挂了,没有返回ack,重启后从[minOffset, consumerOffset)之间的消息会被重复消费
*
* 消费位点就是minOffset、consumerOffset、maxOffset这些
*/
④消息持久化机制
⑤高性能写入
13. 小结
八. IM系统接入直播平台
1. 介绍
2. 直播开关播实现
给用户打上可以开播的标签,表示可以开直播
douyu_live_living数据库:t_living_room(在线表)和t_living_room_record(离线表)两个数据库表。
直播业务模块。
开播即王开播表插入数据,关播即从开播表删除,再往关播表插入数据。
只有当token里的用户id和当前执行关播的主播id一样时才允许关播
@Transactional(rollbackFor = Exception.class) public boolean closeLiving 关播需要删除开播,新增关播,两条sql,用事务注解保证一致性。
3. 直播列表查询
list
MybatisPageConfig要进行分页配置,mybatisplus3.5x后。
前端滑动去实现:living_room_list列表:listLivingRoom、initLoad方法
4. 直播间加载优化
一般直播平台主播不会太多(推拉流等第三方费用,还要判断直播间整体收益情况),而且直播间变化频率(开关播)不会太高,典型的读多写少场景,可以将直播间列表用redis作缓存。redis的List集合。
每隔一段时间RefreshLivingRoomListJob类中的refreshDBToRedis函数将直播间列表刷到缓存中去,schedulePool 1s 执行一次事务,run由于分布式部署节点(多个),可能有多个进程去执行,所以要使用分布式锁去加锁,保证只有一个进程去执行
redisTemplate.opsForValue().setIfAbsent(cacheKey, 1, 1, TimeUnit.SECONDS);
是 Spring Data Redis 中的一种操作,用于在 Redis 中设置一个键值对,只有在该键不存在时才会设置成功。利用redis实现分布式锁。
list函数
//若直播间是按开播顺序存储,则只需做range操作
List<Object> resultList = redisTemplate.opsForList().range(cacheKey, (page - 1) * pageSize, (page * pageSize));
****
private void refreshDBToRedis(Integer type) {
String cacheKey = cacheKeyBuilder.buildLivingRoomList(type);
List<LivingRoomRespDTO> resultList = livingRoomService.listAllLivingRoomFromDB(type);
if(CollectionUtils.isEmpty(resultList)) {
redisTemplate.delete(cacheKey);
return;
}
String tempListName = cacheKey + "_temp";
//按照查询出来的顺序,一个个地塞入到list集合中
for (LivingRoomRespDTO livingRoomRespDTO : resultList) {
redisTemplate.opsForList().rightPush(tempListName, livingRoomRespDTO);
}
//高并发下对redis的操作优化
//正在访问的list集合,直接操作原有集合先del -> 再leftPush,会有阻塞
//直接修改重命名这个list,不要直接对原先的list进行修改,减少阻塞影响 cow
redisTemplate.rename(tempListName, cacheKey);
redisTemplate.delete(tempListName);
}
queryByRoomId也去缓存优化,因为其查询量比较大
做空值缓存,防止缓存击穿。有人开播则移除空值缓存。关播则移除当前直播的缓存
todo: 点进已经关闭的直播间,前端无法响应的问题!
5. 前端接入IM服务器
浏览器通常不支持原生tcp,通常使用webSocket去实现。
webSocket建立链接有一个握手环节:ws的握手连接处理器WsSharkHandler
ImController:将Im服务器地址和token和userId返回给前端。ImCoreServer自身会上报地址和端口到nacos, 所以可以从nacos拉取:DiscoveryClient去提供一个api去拉取(buildImServerAddress)
6. 直播间在线用户记录维护
一个用户发一条消息,直播间所有用户都能看到。
当用户进入直播间时将userId和roomId关联起来。进入直播间会触发ws imcore的登录。不是tcp的登录。且发送mq sendLoginMQ去处理关联【LivingRoomOfflineConsumer】
当用户离开直播间时,userId移除集合列表。离开直播间会触发ws imcore的登出
支持根据roomId查询出批量的userId。【LivingRoomServiceImpl】queryUserIdByRoomId支持根据roomId查询出批量的userId(set)存储,3000个人,元素非常多,O(n)影响redis性能采用scan思路分批查询(0,100)(101, 200);redisTemplate.opsForSet().scan(cacheKey, ScanOptions.scanOptions().match("*").count(100).build())
但是操作2 3其实不用暴露接口,可以基于IM服务上下线,监听用户上下线。用户登录发送MQ消息:loginSuccessHandler方法里调用sendLoginMQ方法。用户下线发送MQ消息:logoutHandler->sendLogoutMQ
7. 用户上下线访问记录
在用户在直播间发送消息时需要推送到roomId关联的所有userId(queryUserIdByRoomId),属于biz消息,会在msgProvider中得到处理。远程调用router层进行群发batchSendMsg。
router层batchSendMsg可以直接调用router层sendMsg要发送的用户数次,//假设我们有100个immsgbody,调用100次im-core-server 2ms,200ms。循环rpc远程调用性能消耗大不建议。
优化在router层://假设我们有100个userid -> 10台im服务器上 100个ip做分类 -> 最终ip的数量一定是<=10(rpc次数)。batchSendMsg方法redis批量查询,批量rpc优化调用!!!
Imcore层的batchSendMsg批量发消息可以循环调用单个,因为本层已经在内存中了,无需rpc远程调用。
九. 礼物系统实现
1. 介绍
2. 礼物相关表结构
3. 礼物服务核心功能设计
4. svga礼物特效
前端living_room.js:initSvga设置一个面板展示效果、playGiftSvga
礼物流水表,可能记录会超过mysql单表上限100w~200w->根据时间做归档(1年)。t_gift_record_2022
十. 公共组件优化
1. 介绍
2. 断言组件实现(基于全局异常捕获器)
webStarter层:org.douyu.live.web.starter.error.GlobalExceptionHandler:@ControllerAdvice对Controller层有拦截作用
ErrorAssert断言产生异常->QiyuBaseError接口规范->BizBaseErrorEnum->注入错误信息QiyuErrorException->被捕获GlobalExceptionHandler
只要GlobalExceptionHandler被扫的到,其中定义的捕获器就会生效
每个模块定义自己的error包继承QiyuBaseError,当前模块会产生的错误
@ControllerAdvice
可以捕获控制器产生的异常,也可以捕获在服务层等其他组件中未处理的异常,只要这些异常最终通过控制器的请求处理流程传播到控制器。这样可以实现统一的异常处理机制,提高代码的可维护性。GlobalExceptionHandler加入SPI文件里才能生效或scan扫描到该包-->加载到容器上下文当中。
3. 限流组件实现(对controller层限制请求频率)
自定义注解+滑动窗口算法去实现。
org.douyu.live.web.starter.config.RequestLimit注解
org.douyu.live.web.starter.context.RequestLimitInterceptor拦截器(对于重复请求,要有专门的拦截器去处理)细节详见该类,并在org.douyu.live.web.starter.config.WebConfig定义bean
十一. 钱包系统和支付中台
1. 介绍
2. 礼物打赏流程
3. 钱包体系搭建
douyu_live_bank模块,平台内部主要使用虚拟币代替实际货币。
账户增加不一定是通过充值增加,也可能是礼物分成(total_charge不变)。
账户余额就是鱼币
4. 送礼流程完善
org.douyu.live.bank.provider.service.impl.QiyuCurrencyAccountServiceImpl:consume送礼。所有优化详见类
//todo 流水记录? t_qiyu_currency_trade表记录流水, num记录增加或扣减值(正负数),type记录流水类型
//大并发请求场景,1000个直播间,500人,50w人在线,20%的人送礼,10w人在线触发送礼行为,
//DB扛不住
//1.MySQL换成写入性能相对较高的数据库
//2.我们能不能从业务上去进行优化,用户送礼都在直播间,大家都连接上了im服务器,router层扩容(50台),im-core-server层(100台),RocketMQ削峰,
// 消费端也可以水平扩容
//3.我们客户端发起送礼行为的时候,同步校验(校验账户余额是否足够,余额放入到redis中),
//4.拦截下大部分的请求,如果余额不足,(接口还得做防止重复点击,客户端也要防重复)
//5.同步送礼接口,只完成简单的余额校验,发送mq,在mq的异步操作里面,完成二次余额校验,余额扣减,礼物发送
//6.如果余额不足,是不是可以利用im,反向通知发送方
// todo 性能问题
增加和扣减,先在redis中更新,再异步更新数据库,而不是直接同步更新数据库,性能更高
/**
* 专门给送礼业务调用的扣减库存逻辑
*
* @param accountTradeReqDTO
*/
AccountTradeRespDTO consumeForSendGift(AccountTradeReqDTO accountTradeReqDTO);
getBalance方法
只查询余额,redis缓存优化
@Override
public void decr(long userId, int num) {
//扣减余额
String cacheKey = cacheKeyBuilder.buildUserBalance(userId);
if (redisTemplate.hasKey(cacheKey)) {
//基于redis的扣减操作
redisTemplate.opsForValue().decrement(cacheKey, num);
redisTemplate.expire(cacheKey, 5, TimeUnit.MINUTES);
}
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
//分布式架构下,cap理论,可用性和性能,(强一致性),柔弱的一致性处理
//在异步线程池中完成数据库层的扣减和流水记录插入操作,带有事务
consumeDecrDBHandler(userId, num);
}
});
}
5. 使用mq提升送礼接口性能
生产者:org.douyu.live.api.service.impl.GiftServiceImpl MQ削峰
@Override
public boolean send(GiftReqVO giftReqVO) {
int giftId = giftReqVO.getGiftId();
//map集合,判断本地是否有对象,如果有就返回,如果没有就rpc调用,同时注入到本地map中
GiftConfigDTO giftConfigDTO = giftConfigDTOCache.get(giftId, id -> giftConfigRpc.getByGiftId(giftId));
ErrorAssert.isNotNull(giftConfigDTO, ApiErrorEnum.GIFT_CONFIG_ERROR);
ErrorAssert.isTure(!giftReqVO.getReceiverId().equals(giftReqVO.getSenderUserId()), ApiErrorEnum.NOT_SEND_TO_YOURSELF);
SendGiftMq sendGiftMq = new SendGiftMq();
sendGiftMq.setUserId(QiyuRequestContext.getUserId());
sendGiftMq.setGiftId(giftId);
sendGiftMq.setRoomId(giftReqVO.getRoomId());
sendGiftMq.setReceiverId(giftReqVO.getReceiverId());
sendGiftMq.setUrl(giftConfigDTO.getSvgaUrl());
sendGiftMq.setType(giftReqVO.getType());
sendGiftMq.setPrice(giftConfigDTO.getPrice());
//避免重复消费***********************************加uuid字段
sendGiftMq.setUuid(UUID.randomUUID().toString());
Message message = new Message();
message.setTopic(GiftProviderTopicNames.SEND_GIFT);
message.setBody(JSON.toJSONBytes(sendGiftMq));
try {
SendResult sendResult = mqProducer.send(message);
LOGGER.info("【api.service.impl.GiftServiceImpl】[send] send result is {}", sendResult);
} catch (Exception e) {
LOGGER.info("【api.service.impl.GiftServiceImpl】[send] send result is error:", e);
}
return true;
// int giftId = giftReqVO.getGiftId();
// GiftConfigDTO giftConfigDTO = giftConfigRpc.getByGiftId(giftId);
// ErrorAssert.isNotNull(giftConfigDTO, ApiErrorEnum.GIFT_CONFIG_ERROR);
// ErrorAssert.isTure(!giftReqVO.getReceiverId().equals(giftReqVO.getSenderUserId()), ApiErrorEnum.NOT_SEND_TO_YOURSELF);
// AccountTradeReqDTO reqDTO=new AccountTradeReqDTO();
// reqDTO.setUserId(QiyuRequestContext.getUserId());
// reqDTO.setNum(giftConfigDTO.getPrice());
// AccountTradeRespDTO tradeRespDTO= qiyuCurrencyAccountRpc.consumeForSendGift(reqDTO);//吞吐量较小,同步调用也可以
// ErrorAssert.isTure(tradeRespDTO!=null&&tradeRespDTO.isSuccess(), ApiErrorEnum.SEND_GIFT_ERROR);
}
消费者:gift服务和Bank服务【org.douyu.live.gift.provider.consumer.SendGiftConsumer】
SendGiftMq sendGiftMq = JSON.parseObject(new String(msg.getBody()), SendGiftMq.class);
String mqConsumerKey = cacheKeyBuilder.buildGiftConsumeKey(sendGiftMq.getUuid());
boolean lockStatus = redisTemplate.opsForValue().setIfAbsent(mqConsumerKey, -1, 5, TimeUnit.MINUTES);
if (!lockStatus) {
//代表曾经消费过**************
continue;
}
AccountTradeReqDTO tradeReqDTO = new AccountTradeReqDTO();
tradeReqDTO.setUserId(sendGiftMq.getUserId());
tradeReqDTO.setNum(sendGiftMq.getPrice());//送礼账户变更
AccountTradeRespDTO tradeRespDTO = qiyuCurrencyAccountRpc.consumeForSendGift(tradeReqDTO);
ImMsgBody imMsgBody = new ImMsgBody();
imMsgBody.setAppId(AppIdEnum.QIYU_LIVE_BIZ.getCode());
if(tradeRespDTO.isSuccess()) {//账户余额扣减成功,触发礼物特效功能
imMsgBody.setBizCode(ImMsgBizCodeEnum.LIVING_ROOM_SEND_GIFT_SUCCESS.getCode());
imMsgBody.setUserId(sendGiftMq.getReceiverId());//消息接收方为接受礼物的人
} else {//利用im将发送失败的消息告知用户
imMsgBody.setBizCode(ImMsgBizCodeEnum.LIVING_ROOM_SEND_GIFT_FAIL.getCode());
imMsgBody.setUserId(sendGiftMq.getUserId());//消息接收方为送礼物的人
}
routerRpc.sendMsg(imMsgBody);//发送送礼结果消息给客户端
一次拉取10条消息,若消费完第九条时异常,消费成功的消息不会返回给broker。重启服务时有可能重复消费。
发送端应该加唯一标识uuid,消费端根据uuid生成缓存key存入redis, 判断该消息是否消费过
加了缓存标记消费,但是下面实际扣费没有调用,即送礼没有成功怎么办(后面再发消息去消费也会因为已经标记而不能重新送礼)?没有影响,因为钱没扣,只需提示前台系统故障即可。
前端websocketOnMessage去处理收到的im送礼成功的消息
6. 前后端对接svga特效实现
连续点击礼物,前端会显示送礼成功,即使余额不足
直播间关闭,前端不会提示直播间已经关闭
IM异步通知送礼失败
todo:org.douyu.live.gift.provider.consumer.SendGiftConsumer 送礼全直播间可见
7. bank服务构建
送礼物(用户的账户需要有一定余额)。后台一个接口模拟支付(用户选择对应产品就算支付成功)
通过一个接口,返回可以购买的产品列表,映射我们的每个虚拟商品。这个表记录真实rmb售价和支付的二维码url链接
8. 模拟支付的实现
// 1.申请调用第三方支付接口(签名-》支付宝/微信)(生成一条支付中状态的订单)
// 2.生成一个(特定的支付页)二维码 (输入账户密码,支付)(第三方平台完成账户余额变更)模拟支付
// 3.发送回调请求 -》业务方
// 要求(可以接收不同平台v、z的回调数据)
// 可以根据业务标识去回调不同的业务服务(直播充值、别的充值服务)(自定义参数组成中,塞入一个业务code(消息队列的话题),根据业务code去回调不同的业务服务(生产消息))
@PostMapping("/payProduct")
public WebResponseVO payProduct(PayProductReqVO payProductReqVO) {
return WebResponseVO.success(bankService.payProduct(payProductReqVO));
}
支付流程:点击按钮,会有一个待支付状态,模拟支付就是模拟的调用第三方的服务,第三方完成后回调bank-api, 然后根据业务码生成消息给别的服务消费(回调别的服务)
bankprovider以及下游服务完成后再通知第三方支付完成。
//从接口友好性来说,只需传一个产品Id就能查,则只传产品id
//复用Redis list,但要传type值
//不用type, 但多存一个redis对象✔
//更新时List
@Override
public PayProductDTO getByProductId(Integer productId) {给支付时产品校验用,安全性的要求
/**
* 支付来源 (直播间,个人中心,聊天页面,第三方宣传页面,广告弹窗引导)
* @see org.douyu.live.bank.constants.PaySourceEnum 转换率的计算
*/
private Integer paySource;
@Override
public PayProductRespVO payProduct(PayProductReqVO payProductReqVO) {
//参数校验
ErrorAssert.isTure(payProductReqVO != null && payProductReqVO.getProductId() != null && payProductReqVO.getPaySource() != null, BizBaseErrorEnum.PARAM_ERROR);
ErrorAssert.isNotNull(PaySourceEnum.find(payProductReqVO.getPaySource()), BizBaseErrorEnum.PARAM_ERROR);
PayProductDTO payProductDTO = payProductRpc.getByProductId(payProductReqVO.getProductId());
ErrorAssert.isNotNull(payProductDTO, BizBaseErrorEnum.PARAM_ERROR);
//插入一条订单,待支付状态
PayOrderDTO payOrderDTO = new PayOrderDTO();
payOrderDTO.setProductId(payProductReqVO.getProductId());
payOrderDTO.setUserId(QiyuRequestContext.getUserId());
payOrderDTO.setSource(payProductReqVO.getPaySource());
payOrderDTO.setPayChannel(payProductReqVO.getPayChannel());
String orderId = payOrderRpc.insertOne(payOrderDTO);//返回orderId更具备业务属性
//跳转第三方支付页面输入完点击后才是支付中状态
//更新订单为支付中状态
payOrderRpc.updateOrderStatus(orderId, OrderStatusEnum.PAYING.getCode());
PayProductRespVO payProductRespVO = new PayProductRespVO();
payProductRespVO.setOrderId(orderId);
// //todo 远程http请求 resttemplate-》支付回调接口
// JSONObject jsonObject = new JSONObject();
// jsonObject.put("orderId", orderId);
// jsonObject.put("userId", QiyuRequestContext.getUserId());
// jsonObject.put("bizCode", 10001);
// HashMap<String,String> paramMap = new HashMap<>();
// paramMap.put("param",jsonObject.toJSONString());
// ResponseEntity<String> resultEntity = restTemplate.postForEntity("http://localhost:8201/live/bank/payNotify/wxNotify?param={param}", null, String.class,paramMap);
// System.out.println(resultEntity.getBody());
//当用户支付完后,会进入支付状态轮询(后台返回支付id给前端,前端每个几秒去轮询)(其他下单页面)
//但在直播间中,会直接通过IM消息通知前端是支付成功还是失败
return payProductRespVO;
}
org.douyu.live.bank.provider.service.impl.PayOrderServiceImpl
@Override
public boolean payNotify(PayOrderDTO payOrderDTO) {
// PayOrderPO payOrderPO = this.queryByOrderId(payOrderDTO.getOrderId());
// if (payOrderPO == null) {
// LOGGER.error("error payOrderPO, payOrderDTO is {}", payOrderDTO);
// return false;
// }
// PayTopicPO payTopicPO = payTopicService.getByCode(payOrderDTO.getBizCode());
// if (payTopicPO == null || StringUtils.isEmpty(payTopicPO.getTopic())) {
// LOGGER.error("error payTopicPO, payOrderDTO is {}", payOrderDTO);
// return false;
// }
// this.payNotifyHandler(payOrderPO);
// //假设 支付成功后,要发送消息通知 -》 msg-provider
// //假设 支付成功后,要修改用户的vip经验值
// //发mq
// //中台服务,支付的对接方 10几种服务,pay-notify-topic。若所有服务都监听这个主题消息,会导致消息复用高。比如A发起支付
//B C D服务都监听这个消息,收到消息后还得判断一下这个消息需不需要处理。
//支付加个业务码bizCode, 不同业务码对应不同消息主题。这样收到消息的服务一定需要处理该消息
//定义一张pay_topic表,bizCode映射主题
// Message message = new Message();
// message.setTopic(payTopicPO.getTopic());
// message.setBody(JSON.toJSONBytes(payOrderPO));
// SendResult sendResult = null;
// try {
// sendResult = mqProducer.send(message);
// LOGGER.info("[payNotify] sendResult is {} ", sendResult);
// } catch (Exception e) {
// LOGGER.error("[payNotify] sendResult is {}, error is ", sendResult, e);
// }
// return true;
可在message消费信息,通知直播间余额信息做变动
十二. 直播PK
1. 介绍
2. PK流程介绍
每次一个人送礼,会影响到所有人的进度条变化。
3. 群发默认+pk效果实现
org.douyu.live.gift.provider.consumer.SendGiftConsumer
sendImMsgSingleton、pkImMsgSend、batchSendImMsg
pk进度条的实现
//pk类型的送礼 要通知什么给直播间的用户
//url 礼物特效全直播间可见
//todo 进度条全直播间可见
// 1000,进度条长度一共是1000,每个礼物对于进度条的影响就是一个数值(500(A):500(B),550:450)
// 直播pk进度是不是以roomId为维度,存入redis string,(redis中)送礼(A)incr,送礼给(B)就是decr。A顶点为1000,B顶点为0。增加和减少以礼物的数值为基准
还需要记录下参与pk的用户有谁:pkUserId是主播,pkObject可能是多个人。前端如何表示连上线的这个动作?后台提供一个上线pk接口,前端有观众想要上线pk时调用这个接口(LivingRoom /onlinePk),且将其userId记录到缓存中。同时LivingRoom还要提供一个查询当前pk的用户的人是谁的rpc接口,给礼物消息消费这调用查询。以及查出当前直播间信息(得到主播userId)
boolean tryOnline = redisTemplate//将直播间pk的人记入缓存
.opsForValue()
.setIfAbsent(cacheKey, livingRoomReqDTO.getPkObjId(), 30, TimeUnit.HOURS);
if (tryOnline) {//多个人同时点击可能有问题,所以这里只有无缓存才能设置objPk
respDTO.setOnlineStatus(false);响应字段默认是false, 只有连上才是true。多人连线,连不上的会产生异常然后捕获,返回前端显示连接失败
-
由于后台是多线程发送消息给前台,再加上网络原因, pkNum的值无法保证顺序 A->556 B->554 C->557实际顺序和到达前台的顺序可能不一致。
(顺序问题)第一种解决方案:设置缓存LIVING_PK_SEND_SEQ,每次pkNum变化前,先另Seq增加得到送序号。前台将顺序缓存到本地,当新的pkNum到达时,判断其Seq是否比本地的要大,若大则是最新值,否则是旧值。
第二种解决方案:将上面两条redis incr使用lua脚本封装起来,一条命令执行完成,保证两条自增指令是原子性
-
用户下线怎么监听?不可能因为网络不好断线一直占着位置不下来。
这两个上下线,缓存的是roomId对应的userId的集合。roomId映射的直播间对象是另一个缓存。
boolean trySetToRedis = redisTemplate
.opsForValue()//加锁,防止冗余数据,3s内只有一个能从数据库插入缓存
.setIfAbsent(cacheKeyBuilder.buildGiftListLockCacheKey(),1,3,TimeUnit.SECONDS);
if(trySetToRedis) {
redisTemplate.opsForList().leftPushAll(cacheKey, resultList.toArray());
//大部分情况下,一个直播间的有效时间大概就是60min以上
redisTemplate.expire(cacheKey, 60, TimeUnit.MINUTES);
}
在送礼成功和买币成功后都调用listPayProduct()函数,以更新余额
pk送礼物时要指定送给谁,主播还是连线pk的人
4. 连线功能的前后联调
onLinePk
pk进度条Lua脚本优化 SendConsumer中
礼物:数据量不多,读多写少,可以缓存到本地内存。caffine组件api.GiftServiceImpl,会导致数据不一致性,比如新增或下架礼物
十三, 红包雨功能+抢红包
1. 介绍
2. 红包雨实现
以主播Id作为配置,特权行为,吸引人气,直播间id是变化的不方便记录。配置了主播才有特权开启红包雨。
红包是一种特殊的礼物,也放在送礼模块。
3. 红包雨底层实现逻辑
package org.douyu.live.common.interfaces.utils;
/**
* 将List拆分为小的List,用于Redis中list的存入,避免 Redis的输入输出缓冲区 和 网络 发生堵塞
*/
public class ListUtils {
/**
* 二倍均值法:
* 创建红包雨的每个红包金额数据
*/
private List<Integer> createRedPacketPriceList(Integer totalPrice, Integer totalCount) {
List<Integer> redPacketPriceList = new ArrayList<>();
for (int i = 0; i < totalCount; i++) {
if (i + 1 == totalCount) {
// 如果是最后一个红包
redPacketPriceList.add(totalPrice);
break;
}
int maxLimit = (totalPrice / (totalCount - i)) * 2;// 最大限额为平均值的两倍
int currentPrice = ThreadLocalRandom.current().nextInt(1, maxLimit);
totalPrice -= currentPrice;
redPacketPriceList.add(currentPrice);
}
return redPacketPriceList;
}
// 将红包数据拆分为子集合进行插入到Redis,避免 Redis输入输出缓冲区 被填满,影响其他命令
List<List<Integer>> splitPriceList = ListUtils.splistList(priceList, 100);
什么时候将缓存中记录的领取金额、数量同步到数据库?主播关播时(关博基于MQ和IM心跳服务无需额外处理、这三个数据没有那么重要)
红包配置code如何给前端?主播在进入直播间时,通过anchorConfig接口去返回
主播开始红包雨,要将[prepareRedPacket]生成的数据广播到所有用户[startRedPacket]
领取红包[getRedPacket],同一个红包用户可能点击多次
对上述三个接口进行防重限制
按钮 第一次点击prepareRedPacket->生成好红包数据变成startRedPacket
prepareRedPacket, 防止startRedPacket重复开始红包雨
redisTemplate.opsForValue().set(cacheKeyBuilder.buildRedPacketPrepareSuccess(redPacketConfigPO.getConfigCode()), 1, 1L, TimeUnit.DAYS);
public Boolean startRedPacket(RedPacketConfigReqDTO reqDTO) {
String code = reqDTO.getRedPacketConfigCode();
// 红包没有准备好,则返回false, 防止重复点击开始红包雨按钮
if (Boolean.FALSE.equals(redisTemplate.hasKey(cacheKeyBuilder.buildRedPacketPrepareSuccess(code)))) {
return false;
}
receiveRedPacket:从redis统计->多一层mq。因为会涉及到rpc调用账户变更,比较耗时
十四. 直播带货
1. 介绍
2. 业务梳理
正常是新开一个微服务,但由于本地内存占用问题,直接放礼物模块下了。
进行大图和缩略图的区分。
3. 下单逻辑
// 购物车接口的开发:
// 用户进入直播间,查看到商品列表
// 用户查看商品详情
// 用户把感兴趣的商品,加入待支付的购物车中(购物车的概念)->
//购物车的基本存储结构(按照直播间为维度去设计购物车),直播间的购物车是独立的,不会存在数据跨直播间存在的情况
// 购物车的添加,移除
// 购物车的内容展示
// 购物车的清空
购物车和直播间一样的生命周期直播结束,购物车直接清空了,不是一个需要永久持久化的东西。所以可以用缓存去存购物车。
redis数据结构:
//1个用户 多个商品
//读取素哟有商品的数据
//每个商品都有数量(目前限定为1,目前业务场景中没有体现)
//string(对象,对象里面关联商品的数据信息)
//set/list 读取方便,移除/加减麻烦
//map(k,v):key是skuId,value是商品的数量,hIncr可以直接操作map里面的值
redisTemplate.opsForHash().put(cacheKey, String.valueOf(shopCarReqDTO.getSkuId()), 1);
4. 库存扣减
// 购物车以及塞满了,下边的逻辑是怎样的?
// 预下单,(手机产品100台,库存的预锁定操作)(生成一条支付订单,预先扣减库存)
// 如果下单成功(库存就正常扣减了)(修改订单状态,支付超时,库存要回滚)
// 如果到达一定时间限制没有下单(100台手机,100台库存锁定,不支付,支付倒计时,库存回滚,订单状态会变成支付超时状态)
生成待支付订单时会扣减库存,否则支付时在扣减库存,万一库存不足,对用户体验不友好。
没有分布式调度器,只能本地定时任务去做【org.douyu.live.gift.provider.config.RefreshStockNumConfig】
5. 订单存储
//订单超时概念,多少分钟后订单关闭
//1. 定时任务,sql查询时间<某个限定时间的, 要指定索引,数据量很大时,扫描表耗时
//2. redis过期回调,key过期后,有一个回调通知到订阅方:回调并不高可靠,可能会丢失
//3. mq:延迟消息,时间轮去做.将扣减库存的消息,利用mq发送出去,在延迟回调处校验是否支付
6. 前后联调
商品信息被缓存通过进页面时mounted时调用方法queryShopInfo
商品库存信息被缓存通过开启直播时发送mq消息(后续秒杀时是对缓存中的库存扣减),然后this.sendStartingLivingRoomMq(livingRoomPO);
礼物提供者去消费上面消息(StartingLivingRoomConsumer)
todo:扣除版本3;14-17