一. 介绍

  • 接口模块也是去暴露给外界信息

image-chtg.png

image-ueme.pngimage-skiq.png

  • bank-api:支付回调接口。

二. 项目基础架构搭建

1. 微服务架构模式

2. 框架选型

①Dubbo

②SpringCloud

③SpringCloudAlibaba

3. 基础环境安装

4. docker介绍

5. 项目搭建

三. 高并发场景中的用户服务中台设计分析

1. 用户中台bg

2. 用户中台场景分析

3. 远程调用: HTTP还是RPC?

4. RPC产品选型比对

5. Dubbo

①dubbo引入

②dubbo深入

四. 用户中台实现

1. 用户数据存储分析

2. 分库分表

3. 搭建数据库

  • 假定1亿数据量,可以分100张表,每张100w数据量,性能足够高。

DELIMITER $$ 
 
CREATE 
 PROCEDURE douyu_live_user.create_t_user_100() 
 BEGIN 
 
 DECLARE i INT; 
 DECLARE table_name VARCHAR(30); 
 DECLARE table_pre VARCHAR(30); 
 DECLARE sql_text VARCHAR(3000); 
 DECLARE table_body VARCHAR(2000); 
 SET i=0; 
 SET table_name=''; 
 
 SET sql_text=''; 
 SET table_body = '(
 user_id bigint NOT NULL DEFAULT -1 COMMENT \'用户 id\',
 nick_name varchar(35) DEFAULT NULL COMMENT \'昵称\',
 avatar varchar(255) DEFAULT NULL COMMENT \'头像\',
 true_name varchar(20) DEFAULT NULL COMMENT \'真实姓名\',
 sex tinyint(1) DEFAULT NULL COMMENT \'性别 0 男,1 女\',
 born_date datetime DEFAULT NULL COMMENT \'出生时间\',
 work_city int(9) DEFAULT NULL COMMENT \'工作地\',
 born_city int(9) DEFAULT NULL COMMENT \'出生地\',
 create_time datetime DEFAULT CURRENT_TIMESTAMP,
 update_time datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP,
 PRIMARY KEY (user_id)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 
COLLATE=utf8mb4_bin;';
 WHILE i<100 DO 
 IF i<10 THEN
 SET table_name = CONCAT('t_user_0',i);
 ELSE
 SET table_name = CONCAT('t_user_',i);
 END IF;
 
 SET sql_text=CONCAT('CREATE TABLE ',table_name, 
table_body); 
 SELECT sql_text; 
 SET @sql_text=sql_text; 
 PREPARE stmt FROM @sql_text; 
 EXECUTE stmt; 
 DEALLOCATE PREPARE stmt; 
 SET i=i+1; 
 END WHILE; 
 
 END$$ 
 
DELIMITER ;

4. ShardingSphere介绍

①介绍

  • ShardingJdc对原生jdbc封装,同时兼容ORM框架,myatis可以继续使用,感觉不到底层jdbc替换为了Shardingjdbc

image-legm.png

②ShardingJdbc路由

③ShardingJdbc归并

image-ukby.png

④实现

5. 基于docker搭建mysql主从架构

①主从架构

  • 4-8PDF

DELIMITER $$ 
 
CREATE 
 PROCEDURE douyu_live_user.create_t_user_100() 
 BEGIN 
 
 DECLARE i INT; 
 DECLARE table_name VARCHAR(30); 
 DECLARE table_pre VARCHAR(30); 
 DECLARE sql_text VARCHAR(3000); 
 DECLARE table_body VARCHAR(2000); 
 SET i=0; 
 SET table_name=''; 
 
 SET sql_text=''; 
 SET table_body = '(
 user_id bigint NOT NULL DEFAULT -1 COMMENT \'用户 id\',
 nick_name varchar(35) DEFAULT NULL COMMENT \'昵称\',
 avatar varchar(255) DEFAULT NULL COMMENT \'头像\',
 true_name varchar(20) DEFAULT NULL COMMENT \'真实姓名\',
 sex tinyint(1) DEFAULT NULL COMMENT \'性别 0 男,1 女\',
 born_date datetime DEFAULT NULL COMMENT \'出生时间\',
 work_city int(9) DEFAULT NULL COMMENT \'工作地\',
 born_city int(9) DEFAULT NULL COMMENT \'出生地\',
 create_time datetime DEFAULT CURRENT_TIMESTAMP,
 update_time datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP,
 PRIMARY KEY (user_id)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 
COLLATE=utf8mb4_bin;';
 WHILE i<100 DO 
 IF i<10 THEN
 SET table_name = CONCAT('t_user_0',i);
 ELSE
 SET table_name = CONCAT('t_user_',i);
 END IF;
 
 SET sql_text=CONCAT('CREATE TABLE ',table_name, 
table_body); 
 SELECT sql_text; 
 SET @sql_text=sql_text; 
 PREPARE stmt FROM @sql_text; 
 EXECUTE stmt; 
 DEALLOCATE PREPARE stmt; 
 SET i=i+1; 
 END WHILE; 
 
 END$$ 
 
DELIMITER ;
docker run --name=mysql-master-1 \
--privileged=true \
-p 3306:3306 \
-v /usr/local/mysql/master1/data/:/var/lib/mysql \
-v /usr/local/mysql/master1/conf/my.cnf:/etc/mysql/my.cnf \
-v /usr/local/mysql/master1/mysql-files/:/var/lib/mysql-files/ \
-e MYSQL_ROOT_PASSWORD=123456 \
-d mysql:8.0.28 --lower_case_table_names=1


docker run --name=mysql-slave-1 \
--privileged=true \
-p 3307:3306 \
-v /usr/local/mysql/slave1/data/:/var/lib/mysql \
-v /usr/local/mysql/slave1/conf/my.cnf:/etc/mysql/my.cnf \
-v /usr/local/mysql/slave1/mysql-files/:/var/lib/mysql-files/ \
-e MYSQL_ROOT_PASSWORD=123456 \
-d mysql:8.0.28 --lower_case_table_names=1

# 主数据库创建用户 slave 并授权
# 创建用户,设置主从同步的账户名
create user 'douyu-slave'@'%' identified with mysql_native_password by '123456';

# 授权
grant replication slave on *.* to 'douyu-slave'@'%';

# 刷新权限
flush privileges;

# 查询 server_id 值
show variables like 'server_id';

# 也可临时(重启后失效)指定 server_id 的值(主从数据库的 server_id 不能
相同)
set global server_id = 1;

# 查询 Master 状态,并记录 File 和 Position 的值,这两个值用于和下边的从数
据库中的 change 那条 sql 中
的 master_log_file,master_log_pos 参数对齐使用
show master status;
show binlog events;

# 重置下 master 的 binlog 位点
reset master;



# 进入从数据库
# 注意:执行完此步骤后退出主数据库,防止再次操作导致 File 和 Position 的值
发生变化# 验证 slave 用户是否可用
# 查询 server_id 值
show variables like 'server_id';
# 也可临时(重启后失效)指定 server_id 的值(主从数据库的 server_id 不能
相同)
set global server_id = 2;
# 若之前设置过同步,请先重置
stop slave;
reset slave;

# 若出现错误,则停止同步,重置后再次启动
stop slave;
reset slave;
start slave;

# 设置主数据库
change master to master_host='172.17.0.2',master_port=3306,master_user='douyu-slave',master_password='123456',master_log_file='mysql-bin.000001',master_log_pos=157;

# 开始同步
start slave;

# 查询 Slave 状态
show slave status;


reset master;

# 若出现错误,则停止同步,重置后再次启动
stop slave;
reset slave;

# 设置主数据库
change master to master_host='172.17.0.2',master_port=3306,master_user='douyu-slave',master_password='123456',master_log_file='mysql-bin.000001',master_log_pos=157;

# 开始同步
start slave;

# 查询 Slave 状态
show slave status;

②读写分离

  • 4-8PDF

6. 分布式缓存redis引入

①引入

  • 4-10PDF

image-mzqi.png

  • KeyBuilder 的设计:设计一个统一的 Key 管理类,希望每个 Redis 的 key 都能有统一的前缀进行管理和匹配。

  • keyBuilder类需要写到META-INF/spring,实现提前提前加载到应用上下文(桶之前,把引用写到那个提前加载的文件)。但是若有多个KeyBuilder类,会将用不到的类加载。使用RedisKeyLoadMatch实现,只加载匹配用到的类的Key生成器,详见代码注解。

//批量查询用户+redis优化

 public Map<Long, UserDTO> batchQueryUserInfo(List<Long> userIdList) {

        if(CollectionUtils.isEmpty(userIdList)) {
            return Maps.newHashMap();
        }

        userIdList=userIdList.stream().
                filter(id-> id>10000).//假定id>1w
                collect(Collectors.toList());

        if(CollectionUtils.isEmpty(userIdList)){
            return Maps.newHashMap();
        }

        //redis优化
        //循环单个查询判断是否在redis,性能差
        //这里使用redisTemplate.opsForValue().multiGet();
        List<String> keyList=new ArrayList<>();
        userIdList.forEach(userId->{//转化为redis的Key
            keyList.add(userProviderCacheKeyBuilder.buildUserInfoKey(userId));
        });//multiGet查询不到key,会赛一个Null到集合,所以这里我们判空一下
        List<UserDTO> userDTOList=redisTemplate.opsForValue().multiGet(keyList)
                .stream().filter(x->x!=null).collect(Collectors.toList());

        //判断是否所有要查询的用户都在缓存
        if(!CollectionUtils.isEmpty(userDTOList)&&userDTOList.size()==userIdList.size()){
            return userDTOList.stream().
                    collect(Collectors.toMap(UserDTO::getUserId, userDTO->userDTO));
        }

        List<Long> userIdInCacheList=userDTOList.stream().//取出实际在redis的user的Id
                map(UserDTO::getUserId).collect(Collectors.toList());
        List<Long> userIdNotInCacheList=userIdList.stream()//不在缓存的user id集合
                .filter(x->!userIdInCacheList.contains(x))
                .collect(Collectors.toList());


//        直接走Mysql, 性能不太好
//        底层使用union all实现,若有100张分表,性能差
//        userMapper.selectBatchIds(userIdList);

        //我们本地开启多线程,不同表id分成不同组,交给
        //多线程去查询,最终归并返回即可
        Map<Long, List<Long>> userIdMap=
                userIdNotInCacheList.stream().//片键分组
                        collect(Collectors.groupingBy(userId->userId%100));
        List<UserDTO> dbQueryResult=new CopyOnWriteArrayList<>();//使用线程安全容器
        userIdMap.values().parallelStream()
                        .forEach(queryUserIdList->{
                            dbQueryResult.addAll(ConvertBeanUtils.convertList(
                                    userMapper.selectBatchIds(queryUserIdList), UserDTO.class
                            ));
                        });

        //当前业务下:可能有多个用户重复进入,所以可以塞入缓存优化
        if(!CollectionUtils.isEmpty(dbQueryResult)){
            Map<String, UserDTO> saveCacheMap = dbQueryResult.stream()
                    .collect(Collectors.toMap(userDTO -> userProviderCacheKeyBuilder
                            .buildUserInfoKey(userDTO.getUserId()), x -> x));
            redisTemplate.opsForValue().multiSet(saveCacheMap);
        }
        
        userDTOList.addAll(dbQueryResult);
        return userDTOList.stream().
                collect(Collectors.toMap(UserDTO::getUserId, userDTO->userDTO));
    }

②缓存的过期时间

  • 4-13

  • 过期时间过短:很快过期导致数据库压力大;

  • 过长:缓存占用内存过大

//单个设置过期时间
redisTemplate.opsForValue().set(key,userDTO, 30, TimeUnit.MINUTES);

//多用户设置过期时间
//批量 key 设置过期时间,建议批量的 key 的过期时间不要设置相同,随机性多一些。
    /**
     * 创建随机的过期时间 用于 redis 设置 key 过期
     */
    private int createRandomExpireTime() {
        int time= ThreadLocalRandom.current().nextInt(100);
        return time + 30 * 60;
    }


//管道批量传输命令,减少网络IO开销
redisTemplate.opsForValue().multiSet(saveCacheMap);

            //管道批量传输命令,减少网络IO开销
            redisTemplate.executePipelined(new SessionCallback<Object>() {
                @Override
                public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
                    for(String redisKey: saveCacheMap.keySet()) {
                        operations.expire((K) redisKey, createRandomExpireTime(), TimeUnit.SECONDS);
                    }

                    return null;
                }
            });

7. 缓存和数据库一致性问题

  • 引入消息队列4-15,解决缓存和数据库一致性问题。

        
//生产
userMapper.updateById(ConvertBeanUtils.convert(userDTO, UserPO.class));

        //写时删除缓存,并使用消息队列延迟双删,保证数据一致性
        String key= userProviderCacheKeyBuilder.buildUserInfoKey(userDTO.getUserId());
        redisTemplate.delete(key);//第一次删除
        try {
            Message message=new Message();
            message.setBody(JSON.toJSONString(userDTO).getBytes());
            message.setTopic("user-update-cache");//设置主题,消费者那的
            message.setDelayTimeLevel(1);//延迟级别,1代表1s发送
            mqProducer.send(message);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
//消费
String msgStr = new String(msgs.get(0).getBody());
                    UserDTO userDTO = JSON.parseObject(msgStr, UserDTO.class);
                    if (userDTO == null || userDTO.getUserId() == null) {
                        LOGGER.error("用户 id 为空,参数异常,内容: {}", msgStr);
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                    //延迟消息的回调,处理相关的缓存二次删除

                    redisTemplate.delete(userProviderCacheKeyBuilder.buildUserInfoKey(
                            userDTO.getUserId()));
                    LOGGER.error("延迟删除处理,userDTO is {}", userDTO);

8. 分布式id的生成

  • step过低导致同步慢获取不出来id, 过高导致浪费

  • vesion是为了乐观锁,防止多个线程更新字段,由于Id发号器服务器不会太多,所以不用redis分布式锁这种高性能锁。

  • 实现4-17PDF

  • ConcurrentHashMap 是 Java 提供的一个线程安全的哈希表实现,主要用于在多线程环境中高效地存储和访问键值对。

  • 初始化->使用>75->刷新,异步更新保证效率

有序id生成
/**
     * 在内存中记录的当前有序id的值
        AtomicLong 保证多线程下,自增时线程安全
     */
    private AtomicLong currentNum;



//键值对:分布式策略id主键:对应的生成对象
    内存:private static Map<Integer, LocalSeqIdBO> localSeqIdBOMap=new ConcurrentHashMap<>();


因为当前类是被容器托管,所以在类初始化时对map也初始化,所以可利用InitializingBean的回调接口去初始化map
InitializingBean 是 Spring 框架中的一个接口,用于在 Bean 初始化完成后执行某些操作。它的主要用途是在 Spring 容器完全初始化 Bean 之后,执行自定义的初始化逻辑。
@Service
public class IdGenerateServiceImpl implements IdGenerateService, InitializingBean {

    private static final Logger LOGGER=LoggerFactory.getLogger(IdGenerateServiceImpl.class);



&&&&&&&&&&&&

@Service
public class IdGenerateServiceImpl implements IdGenerateService, InitializingBean {

    @Resource
    private IdGenerateMapper idGenerateMapper;

    private static final Logger LOGGER=LoggerFactory.getLogger(IdGenerateServiceImpl.class);

    //键值对:分布式策略id主键:对应的生成对象
    private static Map<Integer, LocalSeqIdBO> localSeqIdBOMap=new ConcurrentHashMap<>();

    //更新id段预值
    private static final float UPDATE_RATE=0.75f;

    //异步线程池
    private static ThreadPoolExecutor threadPoolExecutor
            =new ThreadPoolExecutor(8, 16, 3, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1000),
            new ThreadFactory() {//重写以便,重写名字方便debug
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread=new Thread(r);
                    thread.setName("id-generate-thread-"+ThreadLocalRandom.current().nextInt(1000));
                    return thread;
                }
            });
//    Semaphore 是 Java 并发包中的一个类,位于 java.util.concurrent 包中,
//    用于控制对共享资源的访问。它通过维护一组许可证来实现对资源的限制,允许一定数量的线程同时访问资源。
    //不同id生成策略的id对应不同限流器
    private static Map<Integer, Semaphore> semaphoreMap=new ConcurrentHashMap<>();

    /**
     * 刷新本地有序id段
     * //异步线程池
     * private static ThreadPoolExecutor threadPoolExecutor
     * @param localSeqIdBO
     */
    private void refreshLocalSeqIdBO(LocalSeqIdBO localSeqIdBO){
        long setp=localSeqIdBO.getNextThreshold()-localSeqIdBO.getCurrentStart();
        long used=localSeqIdBO.getCurrentNum().get()-localSeqIdBO.getCurrentStart();
        if(used>setp*UPDATE_RATE){//若已经使用的id数大于总数的0.75
            //若同步执行,涉及网络IO,性能较慢,可以异步执行
            //若更新较慢,可能有大量请求进入这里的异步更新这块
            //所以需要进行限制,所以一旦有一个任务执行,其他就不要再提交异步任务了
            //private static Map<Integer, Semaphore> semaphoreMap

            Semaphore semaphore=semaphoreMap.get(localSeqIdBO.getId());
            if(semaphore==null) {
                LOGGER.error("semaphore is null, id  is {}", localSeqIdBO.getId());
                return;
            }
            boolean acquireStatus=semaphore.tryAcquire();//尝试进入临界区
            if(acquireStatus){//成功,则进入异步任务
                LOGGER.info("开始尝试进行本地id段的同步操作");

                threadPoolExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        IdGeneratePO idGeneratePO=idGenerateMapper.selectById(localSeqIdBO.getId());
                        tryUpdateMySQLRecord(idGeneratePO);
                        semaphoreMap.get(localSeqIdBO.getId()).release();//任务执行完后释放
                        LOGGER.info("本地id段的同步完成");
                    }
                });
            }
        }
    }

    /**
     * 得到有序id
     * 此处的id是t_id_generate_config中的主键id, 表示不同的id生成策略
     * @param id
     * @return
     */
    @Override
    public Long getSeqId(Integer id) {
        if (id == null) {
            LOGGER.error("[getSeqId] id is error, id is {}", id);
            return null;
        }

        LocalSeqIdBO localSeqIdBO=localSeqIdBOMap.get(id);//当请求到达时,取得生成的id对象
        if(localSeqIdBO==null){
            LOGGER.error("[getSeqId] localSeqIdBO is null, id is {}", id);
        }

        /**
         * 当当前id段用完
         * 尾值用不到
         * 10000 10100
         * 10050 10150
         *
         * 10050 10150
         * 10100 10200
         *
         * update sql
         * 网络io
         * update sql重试,select 几次网络io
         * 更新内存map
         *
         * 网络阻塞会导致上游调用方崩溃
         */

        /**
         * 设置更新预值,id快用完时就去更新id段
         * private static final float UPDATE_RATE=0.75f;
         */

        this.refreshLocalSeqIdBO(localSeqIdBO);//异步刷新本地id段

        //本地当前id字段用完就不要生成id, 防止占用别的id段
        if(localSeqIdBO.getCurrentNum().get()>= localSeqIdBO.getNextThreshold()) {
            /**
             * 这里直接失败,而不是采用同步刷新
             * Dubbo采用业务线程池去处理远程调用
             * 若采用同步刷新,会有大量请求堆积在这里
             *      1. 占用dubbo业务线程池,影响别的业务获取分布式id(可能是别的生成id策略)
             *          1id要堵塞,2id不需要堵塞(直接内存取),却因1id而堵塞
             */
            LOGGER.error("[getSeqId] localSeqIdBO is over limit, id is {}", id);
            return null;
        }

        long returnId=localSeqIdBO.getCurrentNum().getAndIncrement();
        return returnId;
    }

    /**
     * 更新mysql里面的分布式id的配置信息,占用相应id段
     * 若同步执行,涉及网络IO,性能较慢,可以异步执行
     * @param idGeneratePO
     */
    private void tryUpdateMySQLRecord(IdGeneratePO idGeneratePO) {
        long currentStart=idGeneratePO.getCurrentStart();//当前开始位置
        long nextThreshold=idGeneratePO.getNextThreshold();//结束位置
        long currentNum=currentStart;//本地当前值
        int updateResult=idGenerateMapper.updateNewIdCountAndVersion(idGeneratePO.getId(), idGeneratePO.getVersion());

        if(updateResult>0) {//更新成功,不需要查
            LocalSeqIdBO localSeqIdBO=new LocalSeqIdBO();
            AtomicLong atomicLong=new AtomicLong(currentNum);
            localSeqIdBO.setId(idGeneratePO.getId());
            localSeqIdBO.setCurrentNum(atomicLong);//最开始当前值=起始值
            localSeqIdBO.setCurrentStart(currentStart);//设置起始值
            localSeqIdBO.setNextThreshold(nextThreshold);//设置终止值
            localSeqIdBOMap.put(localSeqIdBO.getId(), localSeqIdBO);
            return;
        }

        //重试机制
        for(int i=0;i<3;++i) {//若更新失败,因为可能有别的服务器也在更新
            //再去读最新的IdGeneratePO
            idGeneratePO=idGenerateMapper.selectById(idGeneratePO.getId());
            updateResult=idGenerateMapper.updateNewIdCountAndVersion(idGeneratePO.getId(), idGeneratePO.getVersion());
            if(updateResult>0) {//若更新成功

                LocalSeqIdBO localSeqIdBO=new LocalSeqIdBO();
                AtomicLong atomicLong=new AtomicLong(idGeneratePO.getCurrentStart());
                localSeqIdBO.setId(idGeneratePO.getId());
                localSeqIdBO.setCurrentNum(atomicLong);//最开始当前值=起始值
                localSeqIdBO.setCurrentStart(idGeneratePO.getCurrentStart());//设置起始值
                localSeqIdBO.setNextThreshold(idGeneratePO.getNextThreshold());//设置终止值
                localSeqIdBOMap.put(localSeqIdBO.getId(), localSeqIdBO);//更新
                return;
            }
        }

        //若3次还没更新成功,抛出异常
        throw new RuntimeException("表id段占用失败,竞争激烈, id is "+idGeneratePO.getId());
    }

    //bean初始化时会回调到这里
    @Override
    public void afterPropertiesSet() throws Exception {
        List<IdGeneratePO> idGeneratePOList=idGenerateMapper.selectAll();
        for(IdGeneratePO idGeneratePO:idGeneratePOList){
            tryUpdateMySQLRecord(idGeneratePO);
            //初始化限流器,每次只能有一个异步线程去更新数据库
            semaphoreMap.put(idGeneratePO.getId(), new Semaphore(1));
        }
    }
}
//无序id生成
//无序id map
    private static Map<Integer, LocalUnSeqIdBO> localUnSeqIdBOMap



//BO对象
public class LocalUnSeqIdBO {

    private int id;
    /**
     * 提前将无序的id存放在这条队列中
     */
    private ConcurrentLinkedQueue<Long> idQueue;
    /**
     * 当前id段的开始值
     */
    private Long currentStart;
    /**
     * 当前id段的结束值
     */
    private Long nextThreshold;


&&&&&&&&&&&&&&&&&&

@Service
public class IdGenerateServiceImpl implements IdGenerateService, InitializingBean {

    @Resource
    private IdGenerateMapper idGenerateMapper;

    private static final Logger LOGGER=LoggerFactory.getLogger(IdGenerateServiceImpl.class);

    //键值对:分布式策略id主键:对应的生成对象,有序id Map
    private static Map<Integer, LocalSeqIdBO> localSeqIdBOMap=new ConcurrentHashMap<>();
    //无序id map
    private static Map<Integer, LocalUnSeqIdBO> localUnSeqIdBOMap=new ConcurrentHashMap<>();

    //更新id段预值
    private static final float UPDATE_RATE=0.75f;

    private static final int SEQ_ID=1;//表示有序Id

    //异步线程池
    private static ThreadPoolExecutor threadPoolExecutor
            =new ThreadPoolExecutor(8, 16, 3, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1000),
            new ThreadFactory() {//重写以便,重写名字方便debug
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread=new Thread(r);
                    thread.setName("id-generate-thread-"+ThreadLocalRandom.current().nextInt(1000));
                    return thread;
                }
            });
//    Semaphore 是 Java 并发包中的一个类,位于 java.util.concurrent 包中,
//    用于控制对共享资源的访问。它通过维护一组许可证来实现对资源的限制,允许一定数量的线程同时访问资源。
    //不同id生成策略的id对应不同限流器
    private static Map<Integer, Semaphore> semaphoreMap=new ConcurrentHashMap<>();

    /**
     * 得到有序id
     * 此处的id是t_id_generate_config中的主键id, 表示不同的id生成策略
     * @param id
     * @return
     */
    @Override
    public Long getSeqId(Integer id) {
        if (id == null) {
            LOGGER.error("[getSeqId] id is error, id is {}", id);
            return null;
        }

        LocalSeqIdBO localSeqIdBO=localSeqIdBOMap.get(id);//当请求到达时,取得生成的id对象
        if(localSeqIdBO==null){
            LOGGER.error("[getSeqId] localSeqIdBO is null, id is {}", id);
            return null;
        }

        /**
         * 当当前id段用完
         * 尾值用不到
         * 10000 10100
         * 10050 10150
         *
         * 10050 10150
         * 10100 10200
         *
         * update sql
         * 网络io
         * update sql重试,select 几次网络io
         * 更新内存map
         *
         * 网络阻塞会导致上游调用方崩溃
         */

        /**
         * 设置更新预值,id快用完时就去更新id段
         * private static final float UPDATE_RATE=0.75f;
         */

        this.refreshLocalSeqIdBO(localSeqIdBO);//异步刷新本地id段

//        //本地当前id字段用完就不要生成id, 防止占用别的id段
//        if(localSeqIdBO.getCurrentNum().get()>= localSeqIdBO.getNextThreshold()) {
//            /**
//             * 这里直接失败,而不是采用同步刷新
//             * Dubbo采用业务线程池去处理远程调用
//             * 若采用同步刷新,会有大量请求堆积在这里
//             *      1. 占用dubbo业务线程池,影响别的业务获取分布式id(可能是别的生成id策略)
//             *          1id要堵塞,2id不需要堵塞(直接内存取),却因1id而堵塞
//             */
//            LOGGER.error("[getSeqId] localSeqIdBO is over limit, id is {}", id);
//            return null;
//        }
//
//        long returnId=localSeqIdBO.getCurrentNum().getAndIncrement();

        long returnId=localSeqIdBO.getCurrentNum().getAndIncrement();
        if(returnId>= localSeqIdBO.getNextThreshold()) {//即使一直加,也不会被返回,保证线程安全
            LOGGER.error("[getSeqId] localSeqIdBO is over limit, id is {}", id);
            return null;
        }
        return returnId;
    }

    /**
     * 刷新本地有序id段
     * //异步线程池
     * private static ThreadPoolExecutor threadPoolExecutor
     * @param localSeqIdBO
     */
    private void refreshLocalSeqIdBO(LocalSeqIdBO localSeqIdBO){
        long setp=localSeqIdBO.getNextThreshold()-localSeqIdBO.getCurrentStart();
        long used=localSeqIdBO.getCurrentNum().get()-localSeqIdBO.getCurrentStart();
        if(used>setp*UPDATE_RATE){//若已经使用的id数大于总数的0.75
            //若同步执行,涉及网络IO,性能较慢,可以异步执行
            //若更新较慢,可能有大量请求进入这里的异步更新这块
            //所以需要进行限制,所以一旦有一个任务执行,其他就不要再提交异步任务了
            //private static Map<Integer, Semaphore> semaphoreMap

            Semaphore semaphore=semaphoreMap.get(localSeqIdBO.getId());
            if(semaphore==null) {
                LOGGER.error("semaphore is null, id  is {}", localSeqIdBO.getId());
                return;
            }
            boolean acquireStatus=semaphore.tryAcquire();//尝试进入临界区
            if(acquireStatus){//成功,则进入异步任务
                LOGGER.info("开始尝试进行本地有序id段的同步操作");

                threadPoolExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            IdGeneratePO idGeneratePO=idGenerateMapper.selectById(localSeqIdBO.getId());
                            tryUpdateMySQLRecord(idGeneratePO);
                        }catch (Exception e) {
                            LOGGER.error("[refreshLocalSeqIdBO] error is ", e);
                        }finally {
                            semaphoreMap.get(localSeqIdBO.getId()).release();//任务执行完后释放
                            LOGGER.info("本地有序id段的同步完成, id is {}", localSeqIdBO.getId());
                        }
                    }
                });
            }
        }
    }

    /**
     * 刷新本地无序id段
     * @param localUnSeqIdBO
     */
    private void refreshLocalUnSeqIdBO(LocalUnSeqIdBO localUnSeqIdBO) {
        long begin=localUnSeqIdBO.getCurrentStart();
        long end=localUnSeqIdBO.getNextThreshold();
        long remainSize=localUnSeqIdBO.getIdQueue().size();//剩余id数
        if((end-begin)*0.25>remainSize) {//已用75

            Semaphore semaphore=semaphoreMap.get(localUnSeqIdBO.getId());
            if(semaphore==null) {
                LOGGER.error("semaphore is null, id  is {}", localUnSeqIdBO.getId());
                return;
            }
            boolean acquireStatus=semaphore.tryAcquire();//尝试进入临界区
            if(acquireStatus){//成功,则进入异步任务
                LOGGER.info("开始尝试进行本地无序id段的同步操作");
                threadPoolExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            IdGeneratePO idGeneratePO = idGenerateMapper.selectById(localUnSeqIdBO.getId());
                            tryUpdateMySQLRecord(idGeneratePO);
                        }catch (Exception e) {
                            LOGGER.error("[refreshLocalUnSeqIdBO] error is ", e);
                        } finally {//确保不会因为tryUpdateMySQLRecord的异常,而无法释放
                            semaphoreMap.get(localUnSeqIdBO.getId()).release();//任务执行完后释放
                            LOGGER.info("本地无序id段同步完成,id is {}", localUnSeqIdBO.getId());
                        }
                    }
                });
            }
        }
    }

    @Override
    public Long getUnSeqId(Integer id) {
        if (id == null) {
            LOGGER.error("[getUnSeqId] id is error, id is {}", id);
            return null;
        }

        LocalUnSeqIdBO localUnSeqIdBO=localUnSeqIdBOMap.get(id);//当请求到达时,取得生成的id对象
        if(localUnSeqIdBO==null){
            LOGGER.error("[getUnSeqId] localUnSeqIdBO is null, id is {}", id);
            return null;
        }


        this.refreshLocalUnSeqIdBO(localUnSeqIdBO);
        Long returnId=localUnSeqIdBO.getIdQueue().poll();//队列就是线程安全的队列
        if(returnId==null) {
            LOGGER.error("[getUnSeqId] returnId is null, id is {}", id);
            return null;
        }
        return returnId;
    }

    /**
     * 封装本地BO对象
     * 将本地id对象放入map,并进行初始化
     * @param idGeneratePO
     */
    private void localIdBOHandler(IdGeneratePO idGeneratePO) {
        long currentStart=idGeneratePO.getCurrentStart();//当前开始位置
        long nextThreshold=idGeneratePO.getNextThreshold();//结束位置
        long currentNum=currentStart;//本地当前值

        if(idGeneratePO.getIsSeq()==SEQ_ID) {//若PO是有序id
            LocalSeqIdBO localSeqIdBO=new LocalSeqIdBO();
            AtomicLong atomicLong=new AtomicLong(currentNum);
            localSeqIdBO.setId(idGeneratePO.getId());
            localSeqIdBO.setCurrentNum(atomicLong);//最开始当前值=起始值
            localSeqIdBO.setCurrentStart(currentStart);//设置起始值
            localSeqIdBO.setNextThreshold(nextThreshold);//设置终止值
            localSeqIdBOMap.put(localSeqIdBO.getId(), localSeqIdBO);
        } else {//是无序id
            LocalUnSeqIdBO localUnSeqIdBO=new LocalUnSeqIdBO();
            localUnSeqIdBO.setCurrentStart(currentStart);
            localUnSeqIdBO.setNextThreshold(nextThreshold);
            localUnSeqIdBO.setId(idGeneratePO.getId());

            //提前生成start到end的数字,并打乱放入队列
            Long begin=localUnSeqIdBO.getCurrentStart();
            Long end=localUnSeqIdBO.getNextThreshold();
            List<Long> idList=new ArrayList<>();
            for(long i=begin;i<end;++i)
                idList.add(i);
            Collections.shuffle(idList);//重洗idList
            ConcurrentLinkedQueue<Long> idQueue=new ConcurrentLinkedQueue<>();
            idQueue.addAll(idList);
            localUnSeqIdBO.setIdQueue(idQueue);
            localUnSeqIdBOMap.put(localUnSeqIdBO.getId(), localUnSeqIdBO);
        }
    }

    /**
     * 更新mysql里面的分布式id的配置信息,占用相应id段
     * 若同步执行,涉及网络IO,性能较慢,可以异步执行
     * @param idGeneratePO
     */
    private void tryUpdateMySQLRecord(IdGeneratePO idGeneratePO) {

        int updateResult=idGenerateMapper.updateNewIdCountAndVersion(idGeneratePO.getId(), idGeneratePO.getVersion());
        if(updateResult>0) {//更新成功,不需要查
            localIdBOHandler(idGeneratePO);
            return;
        }

        //重试机制
        for(int i=0;i<3;++i) {//若更新失败,因为可能有别的服务器也在更新
            //再去读最新的IdGeneratePO
            idGeneratePO=idGenerateMapper.selectById(idGeneratePO.getId());
            updateResult=idGenerateMapper.updateNewIdCountAndVersion(idGeneratePO.getId(), idGeneratePO.getVersion());
            if(updateResult>0) {//若更新成功
                localIdBOHandler(idGeneratePO);
                return;
            }
        }

        //若3次还没更新成功,抛出异常
        throw new RuntimeException("表id段占用失败,竞争激烈, id is "+idGeneratePO.getId());
    }

    //bean初始化时会回调到这里
    @Override
    public void afterPropertiesSet() throws Exception {
        List<IdGeneratePO> idGeneratePOList=idGenerateMapper.selectAll();
        for(IdGeneratePO idGeneratePO:idGeneratePOList){
            tryUpdateMySQLRecord(idGeneratePO);
            //初始化限流器,每次只能有一个异步线程去更新数据库
            semaphoreMap.put(idGeneratePO.getId(), new Semaphore(1));
        }
    }
}
  • 后序优化:如何写一些接口,动态更新id生成策略的上限等值,并同步到内存?

五. 用户标签实战

1. 介绍

2. 常见实现思路

3. 实现

  • PDF5-4~5-6

4. 优化

①优化sql次数

优化了sql次数

只能允许第一次设置成功
${fieldName} & #{tag}=0表示还没有被设置
@Update("update t_user_tag set ${fieldName}=${fieldName} | #{tag} where user_id=#{userId} and ${fieldName} & #{tag}=0")
    int setTag(Long userId, String fieldName, long tag);

    /**
     * 使用先取反在与的思路来取消标签,只能允许第一次删除成功
     * @param userId
     * @param fieldName
     * @param tag
     * @return
     */
${fieldName} & #{tag}=#{tag}表示用户确实包含这个,才允许撤销
    @Update("update t_user_tag set ${fieldName}=${fieldName} &~ #{tag} where user_id=#{userId} and ${fieldName} & #{tag}=#{tag}")
    int cancelTag(Long userId, String fieldName, long tag);

②用户标签记录不存在的问题,以及多线程插入问题

设置标签优化   
 @Override
    public boolean setTag(Long userId, UserTagsEnum userTagsEnum) {
        //尝试update true就直接返回成功
        //设置了标签,没有记录(两种失败场景)
        //select is null查询记录为空,insert则插入,再更新update
        boolean updateStatus=userTagMapper.setTag(userId, userTagsEnum.getFieldName(), userTagsEnum.getTag())>0;
        if(updateStatus) return true;//更新成功直接返回

        /**
         * A B线程同时都执行到这里
         * 由于userId唯一,多次执行
         * 会导致失败
         *
         * 此外多个线程都打入mysql, 效率低
         * 所以多线程下,若用户记录不存在
         * 我们只允许一个线程去执行插入操作
         *
         * 标签服务可能启动多个,导致了分布式下多进程问题
         */
        //失败则验证记录是否存在
        UserTagPO userTagPO=userTagMapper.selectById(userId);
        if(userTagPO!=null){//表明设置重复了
            return false;
        }

        //两条指令,不具备原子性
//        redisTemplate.opsForValue().setIfAbsent();//若key已经存在就设置失败
//        redisTemplate.expire();//过期时间
        String setNxKey = cacheKeyBuilder.buildTagLockKey(userId);
        //替换为一条指令
        String setNxResult=redisTemplate.execute(new RedisCallback<String>() {

            @Override
            public String doInRedis(RedisConnection connection) throws DataAccessException {
                RedisSerializer keySerializer = redisTemplate.getKeySerializer();
                RedisSerializer valueSerializer = redisTemplate.getValueSerializer();
                return (String) connection.execute("set", keySerializer.serialize(setNxKey),
                        valueSerializer.serialize("-1"),
                        "NX".getBytes(StandardCharsets.UTF_8),//键不存在时设置
                        "EX".getBytes(StandardCharsets.UTF_8),//过期时间
                        "3".getBytes(StandardCharsets.UTF_8));
            }
        });
        //
        if (!"OK".equals(setNxResult)) {//若已经有别的线程在插入了,直接返回失败即可
            return false;
        }

        //无记录则插入
        userTagPO=new UserTagPO();
        userTagPO.setUserId(userId);
        userTagMapper.insert(userTagPO);
        updateStatus=userTagMapper.//再次更新
                setTag(userId, userTagsEnum.getFieldName(), userTagsEnum.getTag())>0;

        redisTemplate.delete(setNxKey);//设置成功则删除缓存里的key
        return updateStatus;
    }
  • 缓存清理同样使用延迟双删,并新增了删除缓存类,缓存枚举码,消息队列主题常量类

六. 用户中台应用实现

1. 介绍

2. SpringBoot 应用的 Docker 容器 化部署

  • PDF6-2

  • 服务日志规范化

  • PDF6-4

3. 引入nacos配置中心

  • PDF6-6

4. 基于SPI机制修改ShardingJDBC底层,实现nacos配置数据源

  • pdf6-7

  • org.idea.douyu.live.framework.datasource.starter.config.NacosDriverURLProvider

  • application 的url nacos:

5. 引入网关

  • pdf6-10

  • 新建模块douyu-live-gateway

6. 引入compose容器部署

7. 用户中台压力测试

  • pdf6-11

8. UI界面设计讲解

  • Pixso免费设计页面

9. 登录注册流程--短信验证码实现

  • pdf6-15

  • 生成验证码,4位,6位(取它),有效期(30s,60s),同一个手机号不能重发,redis去存储验证码

  • 发生短信需要先发送http请求到第三方,第三方再去发送短信到用户,然后将结果返回,比较耗时,用异步去发。

10. 登录注册流程--手机号登录注册

  • pdf6-16

  • 缓存击穿,空值缓存:当缓存没有查到数据,但是数据库也没有查到数据,此时我们new一个新的空值对象放入缓存,以便下次再查询时,直接在缓存层返回结果,不用去数据库查询。

  • 对手机号这些信息进行加密和解密转换(凡是涉及到DB的操作)。

  • 注意无序id时,common数据库,类型改成0。

11. 前后端联调

  • pdf6-19

  • 统一返回对象WebResponseVO

  • 后端设置cookie属性

Cookie cookie = new Cookie("dytk", userLoginDTO.getToken());
        //设置顶级域名,接着我们的二级域名中进行 web 页面的访问,后续请求就会
        //携带上它了
        cookie.setDomain("127.0.0.1");
        cookie.setPath("/");
        cookie.setMaxAge(30 * 24 * 3600);
        response.setHeader("Access-Control-Allow-Credentials",
                "true");
        response.addCookie(cookie);

//app.qiyu.live.com/html/xx.html 前台
        //app.qiyu.live.com/live/api/userLogin/login 后端
        //设置顶级域名qiyu.live.com,接着我们的二级域名app.qiyu.live.com
        // 中进行 web 页面的访问,后续请求就会
        //携带上它了

12. 接口鉴权模块的开发

  • 账户服务的职责:负责对外界请求的 token 进行校验,需要频繁访问 redis(原来userProvider接口并发就大,再加上token校验并发也大)

  • 所以使用单独微服务去承载token访问的压力

  • PDF6-21

  • 所有token操作都由此模块来负责

13. 网关服务引入鉴权+白名单功能

  • pdf6-22

// gateway --(header)--> springboot-web(interceptor-->get header)
        //将userId封装到请求头,下游不需要再根据cookie去取userId
        ServerHttpRequest.Builder builder = request.mutate();
        builder.header(GatewayHeaderEnum.USER_LOGIN_ID.getName(), String.valueOf(userId));

14. 下游服务提取gateway塞入的userId

  • 定义org.douyu.live.web.starter.context.QiyuUserInfoInterceptor拦截器去拦截

  • /实现父子线程之间的线程本地变量传递

七. IM系统实现

1. 介绍

2. IM系统发展过程

3. 直播下的IM架构设计

4. WebSocket和Tcp长连接

①WebSocket长连接

②TCP长连接

③对比

  • 为什么开发Web浏览器页面很难使用原生Tcp,而用WebSocket。而客户端App应用可以用原生Tcp

5. 网络的三种IO模型介绍

①介绍

②BIO实现

③NIO实现

④基于Selector的高性能NIO实现

  • select():当没有请求CPU会让出?Linux基于EpollSelectorImpl去实现

⑤AIO实现

6. IM核心实现

  • im-core-server编解码器实现

  • im-core-server核心处理handler的实现,继承SimplyHandler来处理不同的code用不同的类

  • ImHandlerFactory消息工厂,做消息的处理和分发ImHandlerFactoryImpl

  • ChannelHandlerContextCache去将ctx和用户userId对应起来,以便异步推送。ChannelHandlerContextCache.put(userId, ctx);//记录userId对应的ctx

//作为二次开发的属性,ctx登录时记录userId这一属性
ctx.attr(ImContextAttr.USER_ID).set(userId);
//todo
//作为二次开发的属性
ImContextUtils.setUserId(ctx, userId);

ImServerCoreHandler
//它表示通道(Channel)处于不活动状态。这通常意味着连接已关闭或失去,
    // 可能是由于客户端主动断开连接、网络故障或服务器关闭等原因。
    //执行该事件
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

    }

7. IM心跳功能实现

  • 心跳用于检测,当前用户是否在线状态

  • 心跳包record记录(暂时性的数据),redis存储心跳记录

  • zSet集合存储心跳记录,基于userId取模从而得到key(防止大key), key(userId)-score(心跳时间)

  • douyu-live-im-core-server:hertbeat:999:zset

8. IM业务消息通信

jsonObject.put("userId",userId);//发送方
jsonObject.put("objectId", 1001101L);//接收方是谁
// 记录每个用户连接的im服务器地址,然后根据im服务器的连接地址去做具体机器的调用
// 基于mq广播思路去做,可能会有消息风暴发生,100台im机器,99%的mq消息都是无效的,
// 加入一个叫路由层的设计,router中转的设计,router就是一个dubbo的rpc层
// A--》B im-core-server -> msg-provider(持久化) -> im-core-server -> 通知到b
上面图的在线路由模式
im-core-server-interfaces暴露接口给router层
  • router用rpc给别的调用,其实高并发下用mq更合适。SPI机制,DubboSPI要想生效,必须在META-INF.dubbo.internal中配置

/**
 * 基于Cluster去做spi扩展,实现根据rpc上下文来选择具体请求的机器
 *
 */
public class ImRouterCluster implements Cluster {

    @Override//dubbo在做集群筛选时会有jion方法的回调
    public <T> Invoker<T> join(Directory<T> directory, boolean buildFilterChain) throws RpcException {
        return new ImRouterClusterInvoker<>(directory);
    }
}

public class ImRouterClusterInvoker<T> extends AbstractClusterInvoker<T>//dubbo在做集群节点筛选时,按照特定的规则去筛选

//获取到指定的rpc服务提供者的所有地址信息
List<Invoker<T>> invokers = list(invocation);//服务提供者会被封装成一个个invokers对象

@Override
    protected Result doInvoke(Invocation invocation, List list, LoadBalance loadbalance) throws RpcException {
        checkWhetherDestroyed();
        String ip = (String) RpcContext.getContext().get("ip");
        if (StringUtils.isEmpty(ip)) {
            throw new RuntimeException("ip can not be null!");
        }
        //获取到指定的rpc服务提供者的所有地址信息
        List<Invoker<T>> invokers = list(invocation);//服务提供者会被封装成一个个invokers对象
        Invoker<T> matchInvoker = invokers.stream().filter(invoker -> {
            //拿到我们服务提供者的暴露地址(ip:端口的格式)
            String serverIp = invoker.getUrl().getHost() + ":" + invoker.getUrl().getPort();
            return serverIp.equals(ip);
        }).findFirst().orElse(null);
        if (matchInvoker == null) {
            throw new RuntimeException("ip is invalid");
        }
        return matchInvoker.invoke(invocation);
    }


boostrap.yml
dubbo: #指定消费时使用imRouter扩展类
  consumer:
    cluster: imRouter
  • im-core-server接受客户端长链接,后台数据推送也都依赖im-core-server和客户端的链接通道(里面只做简单的数据接受和推送:高性能高可用)

  • im-provider去对提供一些im服务:判断用户是否在线,对im扩展的业务接口放在im-provider里

  • im-router:路由层,转发消息请求,业务请求发到我这边,我做转发到对应B长连接的IM-core服务器

9. IM router模块后续完善,用户路由信息绑定

//去将ctx和用户userId对应起来
public class ChannelHandlerContextCache {

    /**
     * 当前的im服务启动的时候,对外暴露的ip和端口
     * nacos上注册的ip和端口
     */
    private static String SERVER_IP_ADDRESS = "";

    //当前userId对应ctx
    private static Map<Long, ChannelHandlerContext> channelHandlerContextMap = new HashMap<>();
///////////
NettyImServerStarter启动im core 服务器时,我们在内存记录下当前服务器的ip和port
ChannelHandlerContextCache.setServerIpAddress(SelfStartIp+":"+SelfStartPort);

LoginMsgHandler用户登录成功时,记录下userId连接当前服务器的channel
ChannelHandlerContextCache.put(userId, ctx);//记录userId对应的ctx,以及channel对应的userId appId
            //todo
            //作为二次开发的属性
ImContextUtils.setUserId(ctx, userId);
ImContextUtils.setAppId(ctx, appId);

并且在redis中记录userId对应的im-core服务器的地址和端口
//在redis中记录用户链接的im core服务器地址
//这里不用frame的redis key生成模块,是因为router模块也要查找userId对应的imcore服务器,而router的应用名称和imcore名称不同
//所以采用自定义的key
package org.douyu.live.im.core.server.interfaces.constants;

public class ImCoreServerConstants {

    public static String IM_BIND_IP_KEY = "douyu:live:im:bind:ip:";
}
stringRedisTemplate.opsForValue().
                    set(ImCoreServerConstants.IM_BIND_IP_KEY + appId + ":" + userId,
                            ChannelHandlerContextCache.getServerIpAddress(),//过期时间是60s
                            ImConstants.DEFAULT_HEART_BEAT_GAP*2, TimeUnit.SECONDS);
//心跳包收到后,记得延长时间,防止过期

//延长用户之前保存的ip绑定记录时间HeartBeatImMsgHandler
        stringRedisTemplate.expire(ImCoreServerConstants.IM_BIND_IP_KEY + appId + ":" + userId, 
                ImConstants.DEFAULT_HEART_BEAT_GAP * 2, TimeUnit.SECONDS);

//登出后以及非正常登出记得清楚缓存LogoutMsgHandler
        //清除userId绑定im core ip的缓存
        stringRedisTemplate.delete(ImCoreServerConstants.IM_BIND_IP_KEY + appId + ":" + userId);
ImRouterServiceImpl路由转发
public boolean sendMsg(ImMsgBody imMsgBody) {
        //假设我们有100个userid -> 10台im服务器上 100个ip做分类 -> 最终ip的数量一定是<=10
        String bindAddress = stringRedisTemplate.
                opsForValue().
                get(ImCoreServerConstants.IM_BIND_IP_KEY + imMsgBody.getAppId() + ":" + imMsgBody.getUserId());
        if (StringUtils.isEmpty(bindAddress)) {
            return false;
        }
        bindAddress = bindAddress.substring(0,bindAddress.indexOf(":"));
        RpcContext.getContext().set("ip", bindAddress);//得到dubbo上下文,并且注入ip地址, 在clusterSPI中筛选imcore服务器

    //将消息路由到接受userId对应的im core服务器,再由imcore服务器去将消息发给B接受方
        routerHandlerRpc.sendMsg(imMsgBody);
        return true;
    }
        Long userId = imMsgBody.getUserId();
        ChannelHandlerContext ctx = ChannelHandlerContextCache.get(userId);
        if (ctx != null) {//极端情况:router层是用户还在线,当到达im core层时用户可能下线,在这里判断

10. 用户在线检测功能实现(im-provider)

//当前userId对应ctx
private static Map<Long, ChannelHandlerContext> channelHandlerContextMap = new HashMap<>();
最准确的方式是去im-core服务查userId对应ctx,但是provider需要先知道userId对应的服务器(通过router层),链路比较长。
redisTemplate.hasKey(ImCoreServerConstants.IM_BIND_IP_KEY + appId + ":" + userId);
断线时也会自动删除
所以可以根据userId绑定的IP去判断,不需要太准确(极端情况:两次心跳包因为网络原因没发送过来,所以会导致缓存被删除)

public class ImOnlineServiceImpl implements ImOnlineService {

    @Resource
    private RedisTemplate<String,Object> redisTemplate;

    @Override
    public boolean isOnline(long userId, int appId) {
        //不通过rpc掉im-core,减轻其压力
        return redisTemplate.hasKey(ImCoreServerConstants.IM_BIND_IP_KEY + appId + ":" + userId);
    }

11. IM-消息ACK确认机制

image-btje.pngimage-hipc.pngimage-eoya.png

  • 客户端收到业务消息,回写ack消息给im-core服务器表示收到消息

  • 收到ack确认包后,进行删除map kv操作【AckImMsgHandler】

  • 什么时候存储map:在im-core服务将消息推回给客户端时【package org.douyu.live.im.core.server.service.impl; RouterHandlerServiceImpl】 以及发送延迟消息sendDelayMsg

  • 延迟消息会在【package org.douyu.live.im.core.server.consumer; ImAckConsumer】进行消费,判断是否收到ack, 是否需要重发消息`

@Override
public void onReceive(ImMsgBody imMsgBody) {
    //需要进行消息通知的userid
    if(sendMsgToClient(imMsgBody)) {
        //当im服务器推送了消息给到客户端,然后我们需要记录下ack
        msgAckCheckService.recordMsgAck(imMsgBody, 1);
        msgAckCheckService.sendDelayMsg(imMsgBody);
    }
}

    public String buildImAckMapKey(Long userId,Integer appId) {//因为map主要存储没有收到消息的,量不大,100个map即可
        return super.getPrefix() + IM_ACK_MAP + super.getSplitItem() + appId + super.getSplitItem() + userId % 100;//对100取模

redisTemplate.opsForHash().put(key, imMsgBody.getMsgId(), times);//值是消息重发的次数

sendDelayMsg
//等级1 -> 1s,等级2 -> 5s
message.setDelayTimeLevel(2);

12. rocketMq原理

①介绍

  • RocketMQ 的双主双从架构

②架构

③消息发送流程

image-jjon.png

/**
 * 生产者发送消息给Broker, Broker内存在CommitLog,顺序记录所有发来的消息,同时将该消息的地址写入
 * ConsumerQueue队列,若指定了ConsumerQueue,则会写入指定的ConsumerQueue队列中去maxOffset位置
 *
 *
 * 每个topic消息话题对应多个ConsumerQueue队列,ConsumerQueue中不存储实际消息
 * 而是存储消息的实际位置的地址(索引)
 *
 * 每个ConsumerQueue只能分给一个消费者消费, 多出来的消费者处于空闲状态不能消费ConsumerQueue
 *
 * ConsumerQueue队列里面是如何记录消费的情况呢?
 * 写入指定的ConsumerQueue队列中去maxOffset位置,每次消费时会拉取一批消息
 * 即实线消息,consumerOffset最初可能和minOffset在同一位置,拉取三个消息后
 * 移动刀最新未消费的消息(第4个位置)。当拉取到本地消费的消息消费完了,会返回一个
 * 类似ack一样的确认机制给到broker, minOffset主要记录最新一条收到ack确认的消息的位置的后面一个位置
 *
 * 若消费者挂了,没有返回ack,重启后从[minOffset, consumerOffset)之间的消息会被重复消费
 *
 * 消费位点就是minOffset、consumerOffset、maxOffset这些
 */

④消息持久化机制

⑤高性能写入

13. 小结

八. IM系统接入直播平台

1. 介绍

2. 直播开关播实现

  • 给用户打上可以开播的标签,表示可以开直播

  • douyu_live_living数据库:t_living_room(在线表)和t_living_room_record(离线表)两个数据库表。

  • 直播业务模块。

  • 开播即王开播表插入数据,关播即从开播表删除,再往关播表插入数据。

  • 只有当token里的用户id和当前执行关播的主播id一样时才允许关播

  • @Transactional(rollbackFor = Exception.class) public boolean closeLiving 关播需要删除开播,新增关播,两条sql,用事务注解保证一致性。

3. 直播列表查询

  • list

  • MybatisPageConfig要进行分页配置,mybatisplus3.5x后。

  • 前端滑动去实现:living_room_list列表:listLivingRoom、initLoad方法

4. 直播间加载优化

  • 一般直播平台主播不会太多(推拉流等第三方费用,还要判断直播间整体收益情况),而且直播间变化频率(开关播)不会太高,典型的读多写少场景,可以将直播间列表用redis作缓存。redis的List集合。

  • 每隔一段时间RefreshLivingRoomListJob类中的refreshDBToRedis函数将直播间列表刷到缓存中去,schedulePool 1s 执行一次事务,run由于分布式部署节点(多个),可能有多个进程去执行,所以要使用分布式锁去加锁,保证只有一个进程去执行

  • redisTemplate.opsForValue().setIfAbsent(cacheKey, 1, 1, TimeUnit.SECONDS); 是 Spring Data Redis 中的一种操作,用于在 Redis 中设置一个键值对,只有在该键不存在时才会设置成功。利用redis实现分布式锁。

list函数
//若直播间是按开播顺序存储,则只需做range操作
List<Object> resultList = redisTemplate.opsForList().range(cacheKey, (page - 1) * pageSize, (page * pageSize));
****
    private void refreshDBToRedis(Integer type) {
        String cacheKey = cacheKeyBuilder.buildLivingRoomList(type);
        List<LivingRoomRespDTO> resultList = livingRoomService.listAllLivingRoomFromDB(type);
        if(CollectionUtils.isEmpty(resultList)) {
            redisTemplate.delete(cacheKey);
            return;
        }
        String tempListName = cacheKey + "_temp";
        //按照查询出来的顺序,一个个地塞入到list集合中
        for (LivingRoomRespDTO livingRoomRespDTO : resultList) {
            redisTemplate.opsForList().rightPush(tempListName, livingRoomRespDTO);
        }
        //高并发下对redis的操作优化
        //正在访问的list集合,直接操作原有集合先del -> 再leftPush,会有阻塞
        //直接修改重命名这个list,不要直接对原先的list进行修改,减少阻塞影响 cow
        redisTemplate.rename(tempListName, cacheKey);
        redisTemplate.delete(tempListName);
    }

  • queryByRoomId也去缓存优化,因为其查询量比较大

  • 做空值缓存,防止缓存击穿。有人开播则移除空值缓存。关播则移除当前直播的缓存

  • todo: 点进已经关闭的直播间,前端无法响应的问题!

5. 前端接入IM服务器

  • 浏览器通常不支持原生tcp,通常使用webSocket去实现。

  • webSocket建立链接有一个握手环节:ws的握手连接处理器WsSharkHandler

  • ImController:将Im服务器地址和token和userId返回给前端。ImCoreServer自身会上报地址和端口到nacos, 所以可以从nacos拉取:DiscoveryClient去提供一个api去拉取(buildImServerAddress)

6. 直播间在线用户记录维护

  • 一个用户发一条消息,直播间所有用户都能看到。

  • 当用户进入直播间时将userId和roomId关联起来。进入直播间会触发ws imcore的登录。不是tcp的登录。且发送mq sendLoginMQ去处理关联【LivingRoomOfflineConsumer】

  • 当用户离开直播间时,userId移除集合列表。离开直播间会触发ws imcore的登出

支持根据roomId查询出批量的userId。【LivingRoomServiceImpl】queryUserIdByRoomId支持根据roomId查询出批量的userId(set)存储,3000个人,元素非常多,O(n)影响redis性能采用scan思路分批查询(0,100)(101, 200);redisTemplate.opsForSet().scan(cacheKey, ScanOptions.scanOptions().match("*").count(100).build())

  • 但是操作2 3其实不用暴露接口,可以基于IM服务上下线,监听用户上下线。用户登录发送MQ消息:loginSuccessHandler方法里调用sendLoginMQ方法。用户下线发送MQ消息:logoutHandler->sendLogoutMQ

7. 用户上下线访问记录

  • 在用户在直播间发送消息时需要推送到roomId关联的所有userId(queryUserIdByRoomId),属于biz消息,会在msgProvider中得到处理。远程调用router层进行群发batchSendMsg。

  • router层batchSendMsg可以直接调用router层sendMsg要发送的用户数次,//假设我们有100个immsgbody,调用100次im-core-server 2ms,200ms。循环rpc远程调用性能消耗大不建议。

  • 优化在router层://假设我们有100个userid -> 10台im服务器上 100个ip做分类 -> 最终ip的数量一定是<=10(rpc次数)。batchSendMsg方法redis批量查询,批量rpc优化调用!!!

  • Imcore层的batchSendMsg批量发消息可以循环调用单个,因为本层已经在内存中了,无需rpc远程调用。

九. 礼物系统实现

1. 介绍

2. 礼物相关表结构

3. 礼物服务核心功能设计

4. svga礼物特效

  • 前端living_room.js:initSvga设置一个面板展示效果、playGiftSvga

  • 礼物流水表,可能记录会超过mysql单表上限100w~200w->根据时间做归档(1年)。t_gift_record_2022

十. 公共组件优化

1. 介绍

2. 断言组件实现(基于全局异常捕获器)

  • webStarter层:org.douyu.live.web.starter.error.GlobalExceptionHandler:@ControllerAdvice对Controller层有拦截作用

  • ErrorAssert断言产生异常->QiyuBaseError接口规范->BizBaseErrorEnum->注入错误信息QiyuErrorException->被捕获GlobalExceptionHandler

  • 只要GlobalExceptionHandler被扫的到,其中定义的捕获器就会生效

  • 每个模块定义自己的error包继承QiyuBaseError,当前模块会产生的错误

  • @ControllerAdvice 可以捕获控制器产生的异常,也可以捕获在服务层等其他组件中未处理的异常,只要这些异常最终通过控制器的请求处理流程传播到控制器。这样可以实现统一的异常处理机制,提高代码的可维护性。GlobalExceptionHandler加入SPI文件里才能生效或scan扫描到该包-->加载到容器上下文当中。

3. 限流组件实现(对controller层限制请求频率)

  • 自定义注解+滑动窗口算法去实现。

  • org.douyu.live.web.starter.config.RequestLimit注解

  • org.douyu.live.web.starter.context.RequestLimitInterceptor拦截器(对于重复请求,要有专门的拦截器去处理)细节详见该类,并在org.douyu.live.web.starter.config.WebConfig定义bean

十一. 钱包系统和支付中台

1. 介绍

2. 礼物打赏流程

3. 钱包体系搭建

  • douyu_live_bank模块,平台内部主要使用虚拟币代替实际货币。

  • 账户增加不一定是通过充值增加,也可能是礼物分成(total_charge不变)。

  • 账户余额就是鱼币

4. 送礼流程完善

  • org.douyu.live.bank.provider.service.impl.QiyuCurrencyAccountServiceImpl:consume送礼。所有优化详见类

//todo 流水记录? t_qiyu_currency_trade表记录流水, num记录增加或扣减值(正负数),type记录流水类型
        //大并发请求场景,1000个直播间,500人,50w人在线,20%的人送礼,10w人在线触发送礼行为,
        //DB扛不住
        //1.MySQL换成写入性能相对较高的数据库
        //2.我们能不能从业务上去进行优化,用户送礼都在直播间,大家都连接上了im服务器,router层扩容(50台),im-core-server层(100台),RocketMQ削峰,
        // 消费端也可以水平扩容
        //3.我们客户端发起送礼行为的时候,同步校验(校验账户余额是否足够,余额放入到redis中),
        //4.拦截下大部分的请求,如果余额不足,(接口还得做防止重复点击,客户端也要防重复)
        //5.同步送礼接口,只完成简单的余额校验,发送mq,在mq的异步操作里面,完成二次余额校验,余额扣减,礼物发送
        //6.如果余额不足,是不是可以利用im,反向通知发送方
        // todo 性能问题
增加和扣减,先在redis中更新,再异步更新数据库,而不是直接同步更新数据库,性能更高
/**
     * 专门给送礼业务调用的扣减库存逻辑
     *
     * @param accountTradeReqDTO
     */
    AccountTradeRespDTO consumeForSendGift(AccountTradeReqDTO accountTradeReqDTO);

getBalance方法
只查询余额,redis缓存优化

 @Override
    public void decr(long userId, int num) {
        //扣减余额
        String cacheKey = cacheKeyBuilder.buildUserBalance(userId);
        if (redisTemplate.hasKey(cacheKey)) {
            //基于redis的扣减操作
            redisTemplate.opsForValue().decrement(cacheKey, num);
            redisTemplate.expire(cacheKey, 5, TimeUnit.MINUTES);
        }
        threadPoolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                //分布式架构下,cap理论,可用性和性能,(强一致性),柔弱的一致性处理
                //在异步线程池中完成数据库层的扣减和流水记录插入操作,带有事务
                consumeDecrDBHandler(userId, num);
            }
        });
    }

5. 使用mq提升送礼接口性能

  • 生产者:org.douyu.live.api.service.impl.GiftServiceImpl MQ削峰

  @Override
    public boolean send(GiftReqVO giftReqVO) {
        int giftId = giftReqVO.getGiftId();
        //map集合,判断本地是否有对象,如果有就返回,如果没有就rpc调用,同时注入到本地map中
        GiftConfigDTO giftConfigDTO = giftConfigDTOCache.get(giftId, id -> giftConfigRpc.getByGiftId(giftId));
        ErrorAssert.isNotNull(giftConfigDTO, ApiErrorEnum.GIFT_CONFIG_ERROR);
        ErrorAssert.isTure(!giftReqVO.getReceiverId().equals(giftReqVO.getSenderUserId()), ApiErrorEnum.NOT_SEND_TO_YOURSELF);
        SendGiftMq sendGiftMq = new SendGiftMq();
        sendGiftMq.setUserId(QiyuRequestContext.getUserId());
        sendGiftMq.setGiftId(giftId);
        sendGiftMq.setRoomId(giftReqVO.getRoomId());
        sendGiftMq.setReceiverId(giftReqVO.getReceiverId());
        sendGiftMq.setUrl(giftConfigDTO.getSvgaUrl());
        sendGiftMq.setType(giftReqVO.getType());
        sendGiftMq.setPrice(giftConfigDTO.getPrice());
        //避免重复消费***********************************加uuid字段
        sendGiftMq.setUuid(UUID.randomUUID().toString());
        Message message = new Message();
        message.setTopic(GiftProviderTopicNames.SEND_GIFT);
        message.setBody(JSON.toJSONBytes(sendGiftMq));
        try {
            SendResult sendResult = mqProducer.send(message);
            LOGGER.info("【api.service.impl.GiftServiceImpl】[send] send result is {}", sendResult);
        } catch (Exception e) {
            LOGGER.info("【api.service.impl.GiftServiceImpl】[send] send result is error:", e);
        }
        return true;

//        int giftId = giftReqVO.getGiftId();
//        GiftConfigDTO giftConfigDTO = giftConfigRpc.getByGiftId(giftId);
//        ErrorAssert.isNotNull(giftConfigDTO, ApiErrorEnum.GIFT_CONFIG_ERROR);
//        ErrorAssert.isTure(!giftReqVO.getReceiverId().equals(giftReqVO.getSenderUserId()), ApiErrorEnum.NOT_SEND_TO_YOURSELF);
//        AccountTradeReqDTO reqDTO=new AccountTradeReqDTO();
//        reqDTO.setUserId(QiyuRequestContext.getUserId());
//        reqDTO.setNum(giftConfigDTO.getPrice());
//        AccountTradeRespDTO tradeRespDTO= qiyuCurrencyAccountRpc.consumeForSendGift(reqDTO);//吞吐量较小,同步调用也可以
//        ErrorAssert.isTure(tradeRespDTO!=null&&tradeRespDTO.isSuccess(), ApiErrorEnum.SEND_GIFT_ERROR);
    }
  • 消费者:gift服务和Bank服务【org.douyu.live.gift.provider.consumer.SendGiftConsumer】

               SendGiftMq sendGiftMq = JSON.parseObject(new String(msg.getBody()), SendGiftMq.class);
                String mqConsumerKey = cacheKeyBuilder.buildGiftConsumeKey(sendGiftMq.getUuid());
                boolean lockStatus = redisTemplate.opsForValue().setIfAbsent(mqConsumerKey, -1, 5, TimeUnit.MINUTES);
                if (!lockStatus) {
                    //代表曾经消费过**************
                    continue;
                }
                AccountTradeReqDTO tradeReqDTO = new AccountTradeReqDTO();
                tradeReqDTO.setUserId(sendGiftMq.getUserId());
                tradeReqDTO.setNum(sendGiftMq.getPrice());//送礼账户变更
                AccountTradeRespDTO tradeRespDTO = qiyuCurrencyAccountRpc.consumeForSendGift(tradeReqDTO);
                ImMsgBody imMsgBody = new ImMsgBody();
                imMsgBody.setAppId(AppIdEnum.QIYU_LIVE_BIZ.getCode());

                if(tradeRespDTO.isSuccess()) {//账户余额扣减成功,触发礼物特效功能
                    imMsgBody.setBizCode(ImMsgBizCodeEnum.LIVING_ROOM_SEND_GIFT_SUCCESS.getCode());
                    imMsgBody.setUserId(sendGiftMq.getReceiverId());//消息接收方为接受礼物的人
                } else {//利用im将发送失败的消息告知用户
                    imMsgBody.setBizCode(ImMsgBizCodeEnum.LIVING_ROOM_SEND_GIFT_FAIL.getCode());
                    imMsgBody.setUserId(sendGiftMq.getUserId());//消息接收方为送礼物的人
                }
                routerRpc.sendMsg(imMsgBody);//发送送礼结果消息给客户端
  • 一次拉取10条消息,若消费完第九条时异常,消费成功的消息不会返回给broker。重启服务时有可能重复消费。

  • 发送端应该加唯一标识uuid,消费端根据uuid生成缓存key存入redis, 判断该消息是否消费过

  • 加了缓存标记消费,但是下面实际扣费没有调用,即送礼没有成功怎么办(后面再发消息去消费也会因为已经标记而不能重新送礼)?没有影响,因为钱没扣,只需提示前台系统故障即可。

  • 前端websocketOnMessage去处理收到的im送礼成功的消息

6. 前后端对接svga特效实现

  • 连续点击礼物,前端会显示送礼成功,即使余额不足

  • 直播间关闭,前端不会提示直播间已经关闭

  • IM异步通知送礼失败

  • todo:org.douyu.live.gift.provider.consumer.SendGiftConsumer 送礼全直播间可见

7. bank服务构建

  • 送礼物(用户的账户需要有一定余额)。后台一个接口模拟支付(用户选择对应产品就算支付成功)

  • 通过一个接口,返回可以购买的产品列表,映射我们的每个虚拟商品。这个表记录真实rmb售价和支付的二维码url链接

8. 模拟支付的实现

    // 1.申请调用第三方支付接口(签名-》支付宝/微信)(生成一条支付中状态的订单)
    // 2.生成一个(特定的支付页)二维码 (输入账户密码,支付)(第三方平台完成账户余额变更)模拟支付
    // 3.发送回调请求 -》业务方
    // 要求(可以接收不同平台v、z的回调数据)
    // 可以根据业务标识去回调不同的业务服务(直播充值、别的充值服务)(自定义参数组成中,塞入一个业务code(消息队列的话题),根据业务code去回调不同的业务服务(生产消息))

    @PostMapping("/payProduct")
    public WebResponseVO payProduct(PayProductReqVO payProductReqVO) {
        return WebResponseVO.success(bankService.payProduct(payProductReqVO));
    }

支付流程:点击按钮,会有一个待支付状态,模拟支付就是模拟的调用第三方的服务,第三方完成后回调bank-api, 然后根据业务码生成消息给别的服务消费(回调别的服务)

bankprovider以及下游服务完成后再通知第三方支付完成。

//从接口友好性来说,只需传一个产品Id就能查,则只传产品id
//复用Redis list,但要传type值
//不用type, 但多存一个redis对象✔
//更新时List
@Override
public PayProductDTO getByProductId(Integer productId) {给支付时产品校验用,安全性的要求


/**
     * 支付来源 (直播间,个人中心,聊天页面,第三方宣传页面,广告弹窗引导)
     * @see org.douyu.live.bank.constants.PaySourceEnum 转换率的计算
     */
    private Integer paySource;

 @Override
    public PayProductRespVO payProduct(PayProductReqVO payProductReqVO) {
        //参数校验
        ErrorAssert.isTure(payProductReqVO != null && payProductReqVO.getProductId() != null && payProductReqVO.getPaySource() != null, BizBaseErrorEnum.PARAM_ERROR);
        ErrorAssert.isNotNull(PaySourceEnum.find(payProductReqVO.getPaySource()), BizBaseErrorEnum.PARAM_ERROR);
        PayProductDTO payProductDTO = payProductRpc.getByProductId(payProductReqVO.getProductId());
        ErrorAssert.isNotNull(payProductDTO, BizBaseErrorEnum.PARAM_ERROR);

        //插入一条订单,待支付状态
        PayOrderDTO payOrderDTO = new PayOrderDTO();
        payOrderDTO.setProductId(payProductReqVO.getProductId());
        payOrderDTO.setUserId(QiyuRequestContext.getUserId());
        payOrderDTO.setSource(payProductReqVO.getPaySource());
        payOrderDTO.setPayChannel(payProductReqVO.getPayChannel());
        String orderId = payOrderRpc.insertOne(payOrderDTO);//返回orderId更具备业务属性

        //跳转第三方支付页面输入完点击后才是支付中状态

        //更新订单为支付中状态
        payOrderRpc.updateOrderStatus(orderId, OrderStatusEnum.PAYING.getCode());
        PayProductRespVO payProductRespVO = new PayProductRespVO();
        payProductRespVO.setOrderId(orderId);

//        //todo 远程http请求 resttemplate-》支付回调接口
//        JSONObject jsonObject = new JSONObject();
//        jsonObject.put("orderId", orderId);
//        jsonObject.put("userId", QiyuRequestContext.getUserId());
//        jsonObject.put("bizCode", 10001);
//        HashMap<String,String> paramMap = new HashMap<>();
//        paramMap.put("param",jsonObject.toJSONString());
//        ResponseEntity<String> resultEntity = restTemplate.postForEntity("http://localhost:8201/live/bank/payNotify/wxNotify?param={param}", null, String.class,paramMap);
//        System.out.println(resultEntity.getBody());

        //当用户支付完后,会进入支付状态轮询(后台返回支付id给前端,前端每个几秒去轮询)(其他下单页面)
        //但在直播间中,会直接通过IM消息通知前端是支付成功还是失败
        return payProductRespVO;
    }
org.douyu.live.bank.provider.service.impl.PayOrderServiceImpl    
@Override
    public boolean payNotify(PayOrderDTO payOrderDTO) {
//        PayOrderPO payOrderPO = this.queryByOrderId(payOrderDTO.getOrderId());
//        if (payOrderPO == null) {
//            LOGGER.error("error payOrderPO, payOrderDTO is {}", payOrderDTO);
//            return false;
//        }
//        PayTopicPO payTopicPO = payTopicService.getByCode(payOrderDTO.getBizCode());
//        if (payTopicPO == null || StringUtils.isEmpty(payTopicPO.getTopic())) {
//            LOGGER.error("error payTopicPO, payOrderDTO is {}", payOrderDTO);
//            return false;
//        }
//        this.payNotifyHandler(payOrderPO);
//        //假设 支付成功后,要发送消息通知 -》 msg-provider
//        //假设 支付成功后,要修改用户的vip经验值
//        //发mq
//        //中台服务,支付的对接方 10几种服务,pay-notify-topic。若所有服务都监听这个主题消息,会导致消息复用高。比如A发起支付
          //B C D服务都监听这个消息,收到消息后还得判断一下这个消息需不需要处理。
          //支付加个业务码bizCode, 不同业务码对应不同消息主题。这样收到消息的服务一定需要处理该消息
          //定义一张pay_topic表,bizCode映射主题
//        Message message = new Message();
//        message.setTopic(payTopicPO.getTopic());
//        message.setBody(JSON.toJSONBytes(payOrderPO));
//        SendResult sendResult = null;
//        try {
//            sendResult = mqProducer.send(message);
//            LOGGER.info("[payNotify] sendResult is {} ", sendResult);
//        } catch (Exception e) {
//            LOGGER.error("[payNotify] sendResult is {}, error is ", sendResult, e);
//        }
//        return true;
  • 可在message消费信息,通知直播间余额信息做变动

十二. 直播PK

1. 介绍

2. PK流程介绍

  • 每次一个人送礼,会影响到所有人的进度条变化。

3. 群发默认+pk效果实现

  • org.douyu.live.gift.provider.consumer.SendGiftConsumer

  • sendImMsgSingleton、pkImMsgSend、batchSendImMsg

  • pk进度条的实现

//pk类型的送礼 要通知什么给直播间的用户
//url 礼物特效全直播间可见
//todo 进度条全直播间可见
// 1000,进度条长度一共是1000,每个礼物对于进度条的影响就是一个数值(500(A):500(B),550:450)
// 直播pk进度是不是以roomId为维度,存入redis string,(redis中)送礼(A)incr,送礼给(B)就是decr。A顶点为1000,B顶点为0。增加和减少以礼物的数值为基准
  • 还需要记录下参与pk的用户有谁:pkUserId是主播,pkObject可能是多个人。前端如何表示连上线的这个动作?后台提供一个上线pk接口,前端有观众想要上线pk时调用这个接口(LivingRoom /onlinePk),且将其userId记录到缓存中。同时LivingRoom还要提供一个查询当前pk的用户的人是谁的rpc接口,给礼物消息消费这调用查询。以及查出当前直播间信息(得到主播userId)

boolean tryOnline = redisTemplate//将直播间pk的人记入缓存
        .opsForValue()
        .setIfAbsent(cacheKey, livingRoomReqDTO.getPkObjId(), 30, TimeUnit.HOURS);
if (tryOnline) {//多个人同时点击可能有问题,所以这里只有无缓存才能设置objPk

respDTO.setOnlineStatus(false);响应字段默认是false, 只有连上才是true。多人连线,连不上的会产生异常然后捕获,返回前端显示连接失败
  • -

  • 由于后台是多线程发送消息给前台,再加上网络原因, pkNum的值无法保证顺序 A->556 B->554 C->557实际顺序和到达前台的顺序可能不一致。

  • (顺序问题)第一种解决方案:设置缓存LIVING_PK_SEND_SEQ,每次pkNum变化前,先另Seq增加得到送序号。前台将顺序缓存到本地,当新的pkNum到达时,判断其Seq是否比本地的要大,若大则是最新值,否则是旧值。

  • 第二种解决方案:将上面两条redis incr使用lua脚本封装起来,一条命令执行完成,保证两条自增指令是原子性

  • -

  • 用户下线怎么监听?不可能因为网络不好断线一直占着位置不下来。

  • 这两个上下线,缓存的是roomId对应的userId的集合。roomId映射的直播间对象是另一个缓存。

boolean trySetToRedis = redisTemplate
                    .opsForValue()//加锁,防止冗余数据,3s内只有一个能从数据库插入缓存
                    .setIfAbsent(cacheKeyBuilder.buildGiftListLockCacheKey(),1,3,TimeUnit.SECONDS);
            if(trySetToRedis) {
                redisTemplate.opsForList().leftPushAll(cacheKey, resultList.toArray());
                //大部分情况下,一个直播间的有效时间大概就是60min以上
                redisTemplate.expire(cacheKey, 60, TimeUnit.MINUTES);
            }
  • 在送礼成功和买币成功后都调用listPayProduct()函数,以更新余额

  • pk送礼物时要指定送给谁,主播还是连线pk的人

4. 连线功能的前后联调

  • onLinePk

  • pk进度条Lua脚本优化 SendConsumer中

  • 礼物:数据量不多,读多写少,可以缓存到本地内存。caffine组件api.GiftServiceImpl,会导致数据不一致性,比如新增或下架礼物

十三, 红包雨功能+抢红包

1. 介绍

2. 红包雨实现

  • 以主播Id作为配置,特权行为,吸引人气,直播间id是变化的不方便记录。配置了主播才有特权开启红包雨。

  • 红包是一种特殊的礼物,也放在送礼模块。

3. 红包雨底层实现逻辑

package org.douyu.live.common.interfaces.utils;

/**
 * 将List拆分为小的List,用于Redis中list的存入,避免 Redis的输入输出缓冲区 和 网络 发生堵塞
 */
public class ListUtils {
/**
 * 二倍均值法:
 * 创建红包雨的每个红包金额数据
 */
private List<Integer> createRedPacketPriceList(Integer totalPrice, Integer totalCount) {
    List<Integer> redPacketPriceList = new ArrayList<>();
    for (int i = 0; i < totalCount; i++) {
        if (i + 1 == totalCount) {
            // 如果是最后一个红包
            redPacketPriceList.add(totalPrice);
            break;
        }
        int maxLimit = (totalPrice / (totalCount - i)) * 2;// 最大限额为平均值的两倍
        int currentPrice = ThreadLocalRandom.current().nextInt(1, maxLimit);
        totalPrice -= currentPrice;
        redPacketPriceList.add(currentPrice);
    }
    return redPacketPriceList;
}
// 将红包数据拆分为子集合进行插入到Redis,避免 Redis输入输出缓冲区 被填满,影响其他命令
List<List<Integer>> splitPriceList = ListUtils.splistList(priceList, 100);

  • 什么时候将缓存中记录的领取金额、数量同步到数据库?主播关播时(关博基于MQ和IM心跳服务无需额外处理、这三个数据没有那么重要)

  • 红包配置code如何给前端?主播在进入直播间时,通过anchorConfig接口去返回

  • 主播开始红包雨,要将[prepareRedPacket]生成的数据广播到所有用户[startRedPacket]

  • 领取红包[getRedPacket],同一个红包用户可能点击多次

  • 对上述三个接口进行防重限制

  • 按钮 第一次点击prepareRedPacket->生成好红包数据变成startRedPacket

prepareRedPacket, 防止startRedPacket重复开始红包雨
redisTemplate.opsForValue().set(cacheKeyBuilder.buildRedPacketPrepareSuccess(redPacketConfigPO.getConfigCode()), 1, 1L, TimeUnit.DAYS);


    public Boolean startRedPacket(RedPacketConfigReqDTO reqDTO) {
        String code = reqDTO.getRedPacketConfigCode();
        // 红包没有准备好,则返回false, 防止重复点击开始红包雨按钮
        if (Boolean.FALSE.equals(redisTemplate.hasKey(cacheKeyBuilder.buildRedPacketPrepareSuccess(code)))) {
            return false;
        }
  • receiveRedPacket:从redis统计->多一层mq。因为会涉及到rpc调用账户变更,比较耗时

十四. 直播带货

1. 介绍

2. 业务梳理

  • 正常是新开一个微服务,但由于本地内存占用问题,直接放礼物模块下了。

  • 进行大图和缩略图的区分。

3. 下单逻辑

// 购物车接口的开发:
// 用户进入直播间,查看到商品列表
// 用户查看商品详情
// 用户把感兴趣的商品,加入待支付的购物车中(购物车的概念)-> 
//购物车的基本存储结构(按照直播间为维度去设计购物车),直播间的购物车是独立的,不会存在数据跨直播间存在的情况
// 购物车的添加,移除
// 购物车的内容展示
// 购物车的清空
购物车和直播间一样的生命周期直播结束,购物车直接清空了,不是一个需要永久持久化的东西。所以可以用缓存去存购物车。
redis数据结构:
//1个用户 多个商品
//读取素哟有商品的数据
//每个商品都有数量(目前限定为1,目前业务场景中没有体现)
//string(对象,对象里面关联商品的数据信息)
//set/list 读取方便,移除/加减麻烦
//map(k,v):key是skuId,value是商品的数量,hIncr可以直接操作map里面的值
redisTemplate.opsForHash().put(cacheKey, String.valueOf(shopCarReqDTO.getSkuId()), 1);

4. 库存扣减

// 购物车以及塞满了,下边的逻辑是怎样的?
// 预下单,(手机产品100台,库存的预锁定操作)(生成一条支付订单,预先扣减库存)
// 如果下单成功(库存就正常扣减了)(修改订单状态,支付超时,库存要回滚)
// 如果到达一定时间限制没有下单(100台手机,100台库存锁定,不支付,支付倒计时,库存回滚,订单状态会变成支付超时状态)

  • 生成待支付订单时会扣减库存,否则支付时在扣减库存,万一库存不足,对用户体验不友好。

  • 没有分布式调度器,只能本地定时任务去做【org.douyu.live.gift.provider.config.RefreshStockNumConfig】

5. 订单存储

//订单超时概念,多少分钟后订单关闭
//1. 定时任务,sql查询时间<某个限定时间的, 要指定索引,数据量很大时,扫描表耗时
//2. redis过期回调,key过期后,有一个回调通知到订阅方:回调并不高可靠,可能会丢失
//3. mq:延迟消息,时间轮去做.将扣减库存的消息,利用mq发送出去,在延迟回调处校验是否支付

6. 前后联调

  • 商品信息被缓存通过进页面时mounted时调用方法queryShopInfo

  • 商品库存信息被缓存通过开启直播时发送mq消息(后续秒杀时是对缓存中的库存扣减),然后this.sendStartingLivingRoomMq(livingRoomPO);

  • 礼物提供者去消费上面消息(StartingLivingRoomConsumer)

  • todo:扣除版本3;14-17