直播弹幕系统(三) 您所在的位置:网站首页 抖音直播间观看人数是累计的还是实时的 直播弹幕系统(三)

直播弹幕系统(三)

2024-06-04 18:37| 来源: 网络整理| 查看: 265

直播弹幕系统(三)- 直播在线人数统计 前言一. 在线人数统计功能实现1.1 Redis整合1.2 在线人数更新1.3 演示

前言

上一篇文章整合RabbitMQ进行消息广播和异步处理 写完了消息的广播、削峰、异步处理业务逻辑等操作。完成了实时共享功能。

不过写到后面发现在线人数统计这块的功能还没实现,因此在这篇补上。

备注:

目前的WebSocket写法实现只是一种方案,显然并不是最优解,也可能有更好的写法。后期准备尝试用Netty来平移替换。 一. 在线人数统计功能实现

基本思路:

既然我们使用一种分布式架构,并且使用本地缓存去存储WebSocket的链接信息。那么对于分布式下的数值统计,最好的就是使用第三方库或者中间件去存储数据。这样分布式下的每个服务都可以通过第三方库去进行数据交互和共享。那么很简单。建立WebSocket的时候,同一个直播间下的在线人数+1。断开的时候则-1。对于Redis存储的数据结构,可以使用Hash。1000个直播间可以共用一个hashKey,但是每个键值对却又不同。和1000个String类型的数据存储相比,内存上要节省的多。Redis - String内存开销问题以及基本/扩展数据类型的使用 1.1 Redis整合

1.首先毫无疑问我们需要添加一个pom依赖:

org.springframework.boot spring-boot-starter-data-redis 2.6.7

2.在上一章节中的bulletcommon包下创建共通函数。第一个,SpringBean的工具类SpringBeanUtil:

import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; @Component @Slf4j public class SpringBeanUtil implements ApplicationContextAware { private static ApplicationContext applicationContext; // 获取applicationContext public static ApplicationContext getApplicationContext() { return applicationContext; } // 通过name获取 Bean. public static Object getBean(String name) { return getApplicationContext().getBean(name); } // 通过class获取Bean. @SuppressWarnings("unchecked") public static T getBean(Class clazz) { try { char[] cs = clazz.getSimpleName().toCharArray(); cs[0] += 32;// 首字母大写到小写 return (T) getApplicationContext().getBean(String.valueOf(cs)); } catch (Exception e) { e.printStackTrace(); return null; } } // 通过name,以及Clazz返回指定的Bean public static T getBean(String name, Class clazz) { return getApplicationContext().getBean(name, clazz); } public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { if (SpringBeanUtil.applicationContext == null) { SpringBeanUtil.applicationContext = applicationContext; } log.info("\r\n----------加载applicationContext成功-----------------"); }

3.创建一个RedisUtil:StringRedisTemplate下有个操作:

opsForHash().increment(hashKey,key,value):可以给指定的hashKey下的key加上指定的value数值大小。那正好可以用给直播间在线人数的 ±1。 import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; import org.springframework.data.redis.core.StringRedisTemplate; /** * @author Zong0915 * @date 2022/12/15 下午6:52 */ public class RedisUtil { public static StringRedisTemplate getStringRedisTemplate() { return SpringBeanUtil.getBean(StringRedisTemplate.class); } public static void increment(String hashKey, String key) { if (StringUtils.isBlank(hashKey) || StringUtils.isBlank(key)) { return; } StringRedisTemplate stringRedisTemplate = RedisUtil.getStringRedisTemplate(); stringRedisTemplate.opsForHash().increment(hashKey, key, 1); } public static void decrement(String hashKey, String key) { if (StringUtils.isBlank(hashKey) || StringUtils.isBlank(key)) { return; } StringRedisTemplate stringRedisTemplate = RedisUtil.getStringRedisTemplate(); stringRedisTemplate.opsForHash().increment(hashKey, key, -1); } public static Long get(String hashKey, String key) { if (StringUtils.isBlank(hashKey) || StringUtils.isBlank(key)) { return 0L; } StringRedisTemplate stringRedisTemplate = RedisUtil.getStringRedisTemplate(); String res = (String) stringRedisTemplate.opsForHash().get(hashKey, key); return NumberUtils.toLong(res); } }

4.我们给消息体添加两个参数:count和operateType

@Data public class OriginMessage { private String sessionId; private String userId; private String roomId; private String message; /** 直播间人数 */ private Long count; /** 1:链接初始化。2:发送弹幕 */ private Integer operateType; }

5.SocketConstants再添加一个常量,用于HashKey:

public static final String LIVE_COUNT_HASH_KEY = "LiveCount"; 1.2 在线人数更新

在线人数的更新涉及到两个地方:

WebSocket建立连接的时候。WebSocket断开连接的时候

修改BulletScreenServer类:重点在于@OnOpen和@OnClose修饰的函数。

package kz.service; import kz.cache.SocketCache; import kz.common.SocketConstants; import kz.entity.OriginMessage; import kz.producer.OriginMessageSender; import kz.util.RedisUtil; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.websocket.OnClose; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.util.concurrent.atomic.AtomicLong; /** * @author Zong0915 * @date 2022/12/9 下午3:45 */ @Component @ServerEndpoint("/websocket/live/{roomId}/{userId}") @Slf4j @Getter public class BulletScreenServer { /** * 多例模式下的赋值方式 */ private static OriginMessageSender originMessageSender; /** * 多例模式下的赋值方式 */ @Autowired private void setOriginMessageSender(OriginMessageSender originMessageSender) { BulletScreenServer.originMessageSender = originMessageSender; } private static final AtomicLong count = new AtomicLong(0); private Session session; private String sessionId; private String userId; private String roomId; /** * 打开连接 * * @param session * @OnOpen 连接成功后会自动调用该方法 */ @OnOpen public void openConnection(Session session, @PathParam("roomId") String roomId, @PathParam("userId") String userId) { // 如果是游客观看视频,虽然有弹幕,但是没有用户信息,所以需要用try count.incrementAndGet(); log.info("*************WebSocket连接次数: {} *************", count.longValue()); this.userId = userId; this.roomId = roomId; // 保存session相关信息到本地 this.sessionId = session.getId(); this.session = session; // 在线人数+1 RedisUtil.increment(SocketConstants.LIVE_COUNT_HASH_KEY, roomId); SocketCache.put(sessionId, this); // 通知客户端,此时服务端和客户端之间成功建立连接,并且把在线人数传过去 originMessageSender.send(buildMessage("", 1)); } /** * 客户端刷新页面,或者关闭页面,服务端断开连接等等操作,都需要关闭连接 */ @OnClose public void closeConnection() { SocketCache.remove(sessionId); // 链接断开,人数-1,并通知给客户端 RedisUtil.decrement(SocketConstants.LIVE_COUNT_HASH_KEY, roomId); originMessageSender.send(buildMessage("", 1)); } /** * 客户端发送消息给服务端 * * @param message */ @OnMessage public void onMessage(String message) { if (StringUtils.isBlank(message)) { return; } // 将消息丢给MQ,业务上的处理什么也不管,交给弹幕业务来处理,并且达到削峰的目的 originMessageSender.send(buildMessage(message, 2)); } private OriginMessage buildMessage(String message, Integer operateType) { OriginMessage originMessage = new OriginMessage(); originMessage.setMessage(message); originMessage.setRoomId(roomId); originMessage.setSessionId(sessionId); originMessage.setUserId(userId); originMessage.setOperateType(operateType); originMessage.setCount(RedisUtil.get(SocketConstants.LIVE_COUNT_HASH_KEY, roomId)); return originMessage; } }

前端修改:完整的项目框架可以看这篇文章SpringBoot - WebSocket的使用和聊天室练习 。 这里就是修改index.tsx文件(区别不大)

import React, { useEffect, useState } from 'react'; import { Button, Row, Col, Input } from 'antd'; import { getValueByParam } from '../utils/pageHelper'; const ws = new WebSocket(`ws://localhost:80/websocket/live/${getValueByParam('roomId')}/${getValueByParam('userId')}`); const UserPage = () => { const [ message, setMessage ] = useState(''); const [ bulletList, setBulletList ] = useState([]); const [ onlineCount, setOnlineCount ] = useState(0); useEffect(() => { ws.onopen = () => { ws.onmessage = (msg: any) => { const entity: any = JSON.parse(msg.data); if (entity?.operateType === 2) { const arr :any = [ `用户[${entity.userId}]: ${entity.message}` ]; setBulletList((pre: any[]) => [].concat(...pre, ...arr)); } setOnlineCount(entity?.count ?? 0); }; }; ws.onclose = () => { console.log('断开连接'); }; }, []); const sendMsg = () => { ws?.send(message); }; return width: 2000, marginTop: 200 }}> event => setMessage(event.target.value)} /> { marginLeft: 100 }}> {'在线人数: ' + onlineCount} marginLeft: 10 }}> border: '1px solid', width: 500, height: 500 }}> {bulletList.map((item: string, index: number) => { return item} ; })} ; }; export default UserPage; 1.3 演示

动图如下: 在这里插入图片描述



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有