黑马点评:image-20220621124100702

image-20220621144511008

image-20220621152828453

手机或者 app 端发起请求,请求我们的 nginx 服务器,nginx 基于七层模型走的事 HTTP 协议,可以实现基于 Lua 直接绕开 tomcat 访问 redis,也可以作为静态资源服务器,轻松扛下上万并发, 负载均衡到下游 tomcat 服务器,打散流量,我们都知道一台 4 核 8G 的 tomcat,在优化和处理简单业务的加持下,大不了就处理 1000 左右的并发, 经过 nginx 的负载均衡分流后,利用集群支撑起整个项目,同时 nginx 在部署了前端项目后,更是可以做到动静分离,进一步降低 tomcat 服务的压力,这些功能都得靠 nginx 起作用,所以 nginx 是整个项目中重要的一环。

在 tomcat 支撑起并发流量后,我们如果让 tomcat 直接去访问 Mysql,根据经验 Mysql 企业级服务器只要上点并发,一般是 16 或 32 核心 cpu,32 或 64G 内存,像企业级 mysql 加上固态硬盘能够支撑的并发,大概就是 4000 起~7000 左右,上万并发, 瞬间就会让 Mysql 服务器的 cpu,硬盘全部打满,容易崩溃,所以我们在高并发场景下,会选择使用 mysql 集群,同时为了进一步降低 Mysql 的压力,同时增加访问的性能,我们也会加入 Redis,同时使用 Redis 集群使得 Redis 对外提供更好的服务。

短信功能:

发送验证码功能:

具体逻辑上文已经分析,我们仅仅只需要按照提示的逻辑写出代码即可。

  • 发送验证码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

@Override
public Result sendCode(String phone, HttpSession session) {
// 1.校验手机号
if (RegexUtils.isPhoneInvalid(phone)) {
// 2.如果不符合,返回错误信息
return Result.fail("手机号格式错误!");
}
// 3.符合,生成验证码
String code = RandomUtil.randomNumbers(6);

// 4.保存验证码到 session
session.setAttribute("code",code);
// 5.发送验证码
log.debug("发送短信验证码成功,验证码:{}", code);
// 返回ok
return Result.ok();
}
  • 登录
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
// 1.校验手机号
String phone = loginForm.getPhone();
if (RegexUtils.isPhoneInvalid(phone)) {
// 2.如果不符合,返回错误信息
return Result.fail("手机号格式错误!");
}
// 3.校验验证码
Object cacheCode = session.getAttribute("code");
String code = loginForm.getCode();
if(cacheCode == null || !cacheCode.toString().equals(code)){
//3.不一致,报错
return Result.fail("验证码错误");
}
//一致,根据手机号查询用户
User user = query().eq("phone", phone).one();

//5.判断用户是否存在
if(user == null){
//不存在,则创建
user = createUserWithPhone(phone);
}
//7.保存用户信息到session中
session.setAttribute("user",user);

return Result.ok();
}

1.4、实现登录拦截功能

温馨小贴士:tomcat 的运行原理

image-20220925162915735

当用户发起请求时,会访问我们像 tomcat 注册的端口,任何程序想要运行,都需要有一个线程对当前端口号进行监听,tomcat 也不例外,当监听线程知道用户想要和 tomcat 连接连接时,那会由监听线程创建 socket 连接,socket 都是成对出现的,用户通过 socket 像互相传递数据,当 tomcat 端的 socket 接受到数据后,此时监听线程会从 tomcat 的线程池中取出一个线程执行用户请求,在我们的服务部署到 tomcat 后,线程会找到用户想要访问的工程,然后用这个线程转发到工程中的 controller,service,dao 中,并且访问对应的 DB,在用户执行完请求后,再统一返回,再找到 tomcat 端的 socket,再将数据写回到用户端的 socket,完成请求和响应

通过以上讲解,我们可以得知 每个用户其实对应都是去找 tomcat 线程池中的一个线程来完成工作的, 使用完成后再进行回收,既然每个请求都是独立的,所以在每个用户去访问我们的工程时,我们可以使用 threadlocal 来做到线程隔离,每个线程操作自己的一份数据

基于 Session 实现登录

image-20220621155657382

  1. 服务端 session 是用户第一次访问应用时,服务器就会创建的对象,代表用户的一次会话过程,可以用来存放数据。服务器为每一个 session 都分配一个唯一的 sessionid,以保证每个用户都有一个不同的 session 对象。
  2. 服务器在创建完 session 后,会把 sessionid 通过 cookie 返回给用户所在的浏览器,这样当用户第二次及以后向服务器发送请求的时候,就会通过 cookie 把 sessionid 传回给服务器,以便服务器能够根据 sessionid 找到与该用户对应的 session 对象。
  3. session 通常有失效时间的设定,比如 2 个小时。当失效时间到,服务器会销毁之前的 session,并创建新的 session 返回给用户。但是只要用户在失效时间内,有发送新的请求给服务器,通常服务器都会把他对应的 session 的失效时间根据当前的请求时间再延长 2 个小时。
  4. session 在一开始并不具备会话管理的作用。它只有在用户登录认证成功之后,并且往 sesssion 对象里面放入了用户登录成功的凭证,才能用来管理会话。管理会话的逻辑也很简单,只要拿到用户的 session 对象,看它里面有没有登录成功的凭证,就能判断这个用户是否已经登录。当用户主动退出的时候,会把它的 session 对象里的登录凭证清掉。所以在用户登录前或退出后或者 session 对象失效时,肯定都是拿不到需要的登录凭证的。

image-20220621170347348

登录校验:

用户请求进入拦截器中,从 session 中获取用户的信息,如果用户存在,将用户的信息存放到 TreadLocal 本地线程中,如果不存在,就进行拦截。

session 共享问题

核心思路分析:

每个 tomcat 中都有一份属于自己的 session, 假设用户第一次访问第一台 tomcat,并且把自己的信息存放到第一台服务器的 session 中,但是第二次这个用户访问到了第二台 tomcat,那么在第二台服务器上,肯定没有第一台服务器存放的 session,所以此时 整个登录拦截功能就会出现问题,我们能如何解决这个问题呢?

早期的方案是 session 拷贝,就是说虽然每个 tomcat 上都有不同的 session,但是每当任意一台服务器的 session 修改时,都会同步给其他的 Tomcat 服务器的 session,这样的话,就可以实现 session 的共享了

但是这种方案具有两个大的问题:

1、每台服务器中都有完整的一份 session 数据,服务器压力过大。

2、session 拷贝数据时,可能会出现延迟

所以咱们后来采用的方案都是基于 redis 来完成,我们把 session 换成 redis,redis 数据本身就是共享的,就可以避免 session 共享的问题了

基于 Redis 实现共享 session 登录

集群的 session 共享问题

使用浏览器进行登录的时候, 会创建 sessionId 写到用户的浏览器的 cookie 中,每次请求都会进行携带。

当多台 tomact 部署时,会出现数据共享不一致的问题。

session 的替代方案应该满足:

  • 数据共享
  • 内存存储
  • key、value 机构

当注册完成后,用户去登录会去校验用户提交的手机号和验证码,是否一致,如果一致,则根据手机号查询用户信息,不存在则新建,最后将用户数据保存到 redis,并且生成 token 作为 redis 的 key,当我们校验用户是否登录时,会去携带着 token 进行访问,从 redis 中取出 token 对应的 value,判断是否存在这个数据,如果没有则拦截,如果存在则将其保存到 threadLocal 中,并且放行。

image-20220621171045770

用户发送验证码,服务器以手机号为 key 存储验证,等下次提交验证码时,判断验证码是否正确,判断用户是否存在,如果存在,走登录操作,生成 token 存储信息到 redis 中,并返回前端,以后每次请求携带改 token ,如果不存在,走注册功能。

存储格式选择:

  • 使用 String 结构,以 JSON 字符串来进行保存,比较直观

image-20220621170117032

  • Hash 结构可以将对象中的每个独立的字段进行独立的存储,可以针对单个字段做 CRUD, 并且内存占用少

登录拦截器优化:

当用户进行频繁访问的时候时,因为拦截的用户是需要指定的 url,为了使用的友好性,需要在频繁使用时,更新 token. 那么问题来了,用户如果没有访问被拦截的 url, 但是访问了其他的 url, 如何进行更新 token 呢。

因此,再拦截的基础上再加上一个拦截器,只要访问了就进行 token 更新,第二个拦截其中只需要查询该线程中有没有该条数据。

image-20220621182031208

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
// 1.校验手机号
String phone = loginForm.getPhone();
if (RegexUtils.isPhoneInvalid(phone)) {
// 2.如果不符合,返回错误信息
return Result.fail("手机号格式错误!");
}
// 3.从redis获取验证码并校验
String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);
String code = loginForm.getCode();
if (cacheCode == null || !cacheCode.equals(code)) {
// 不一致,报错
return Result.fail("验证码错误");
}

// 4.一致,根据手机号查询用户 select * from tb_user where phone = ?
User user = query().eq("phone", phone).one();

// 5.判断用户是否存在
if (user == null) {
// 6.不存在,创建新用户并保存
user = createUserWithPhone(phone);
}

// 7.保存用户信息到 redis中
// 7.1.随机生成token,作为登录令牌
String token = UUID.randomUUID().toString(true);
// 7.2.将User对象转为HashMap存储
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),
CopyOptions.create()
.setIgnoreNullValue(true)
.setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString()));
// 7.3.存储
String tokenKey = LOGIN_USER_KEY + token;
stringRedisTemplate.opsForHash().putAll(tokenKey, userMap);
// 7.4.设置token有效期
stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES);

// 8.返回token
return Result.ok(token);
}

拦截器配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

@Configuration
public class MvcConfig implements WebMvcConfigurer {

@Resource
private StringRedisTemplate stringRedisTemplate;

@Override
public void addInterceptors(InterceptorRegistry registry) {
// 登录拦截器
registry.addInterceptor(new LoginInterceptor())
.excludePathPatterns(
"/shop/**",
"/voucher/**",
"/shop-type/**",
"/upload/**",
"/blog/hot",
"/user/code",
"/user/login"
).order(1);
// token刷新的拦截器越小优先级越高
registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0);
}
}

登录拦截:

1
2
3
4
5
6
7
8
9
10
11
12
13
14

public class LoginInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 1.判断是否需要拦截(ThreadLocal中是否有用户)
if (UserHolder.getUser() == null) {
// 没有,需要拦截,设置状态码
response.setStatus(401);
// 拦截
return false;
}
// 有用户,则放行
return true;
}

更新 tokren 拦截:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52


// 获取对象
public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 1.获取请求头中的token
String token = request.getHeader("authorization");
if (StrUtil.isBlank(token)) {
return true;
}
// 2.基于TOKEN获取redis中的用户
String key = LOGIN_USER_KEY + token;
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
// 3.判断用户是否存在
if (userMap.isEmpty()) {
return true;
}
// 5.将查询到的hash数据转为UserDTO
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
// 6.存在,保存用户信息到 ThreadLocal
UserHolder.saveUser(userDTO);
// 7.刷新token有效期
stringRedisTemplate.expire(key, LOGIN_USER_TTL, TimeUnit.MINUTES);
// 8.放行
return true;
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
// 移除用户
UserHolder.removeUser();
}
JAVA
public class UserHolder {
private static final ThreadLocal<UserDTO> tl = new ThreadLocal<>();

public static void saveUser(UserDTO user){
tl.set(user);
}

public static UserDTO getUser(){
return tl.get();
}

public static void removeUser(){
tl.remove();
}
}

缓存

什么是缓存:

缓存就是数据交换的缓冲区,是存储数据的临时地方,一般读写性能较高。

优点

  • 降低后端的负载
  • 提高读写的效率,降低响应的时间

缺点:

  • 数据一致性问题

  • 代码维护成本

  • 运维成本

  • 如何使用缓存

    实际开发中,会构筑多级缓存来使系统运行速度进一步提升,例如:本地缓存与 redis 中的缓存并发使用

    浏览器缓存:主要是存在于浏览器端的缓存

    应用层缓存:可以分为 tomcat 本地缓存,比如之前提到的 map,或者是使用 redis 作为缓存

    数据库缓存:在数据库中有一片空间是 buffer pool,增改查数据都会先加载到 mysql 的缓存中

    CPU 缓存:当代计算机最大的问题是 cpu 性能提升了,但内存读写速度没有跟上,所以为了适应当下的情况,增加了 cpu 的 L1,L2,L3 级的缓存

    image-20220925202803555

image-20220623215916658

image-20220623215926951

hutool的作用是使用JSONUtil类将数据库查到的List集合转化成JSON数组,存到redis中

1
2
3
4
5
6
7
8
//List转Json,maps是List类型的参数
String json = JSONUtil.toJsonStr(maps);
System.out.println("这是json字符串: "+json);

//Json转List
JSONArray objects =JSONUtil.parseArray(json);
List<Map> maps1 = JSONUtil.toList(objects, Map.class);
System.out.println("这是list集合: "+maps1);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Resource
private StringRedisTemplate stringRedisTemplate;
@GetMapping("list")
public Result queryTypeList() {

String typeJson = stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_TYPE_KEY);
if (StrUtil.isBlank(typeJson)){
List<ShopType> typeList = typeService
.query().orderByAsc("sort").list();
if (typeList==null){
return Result.fail("未有查询到数据");
}
stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_TYPE_KEY, JSONUtil.toJsonStr(typeList));
return Result.ok(typeList);
}
return Result.ok(JSONUtil.toList(JSONUtil.parseArray(typeJson),ShopType.class));
}

缓存更新策略:

3种常用的缓存读写策略

https://blog.csdn.net/m0_61802230/article/details/124109238?utm_medium=distribute.pc_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-0-124109238-blog-110834880.pc_relevant_antiscanv2&spm=1001.2101.3001.4242.1&utm_relevant_index=3

image-20220625142620668

ps:

  • 低一致性需求:使用内存淘汰机制。例如店铺类型的查询缓存
  • 高一致性需求:主动更新,并以超时剔除作为兜底方案。例如店铺详情查询的缓存

主动更新策略:

Cache Aside Pattern(旁路缓存模式)

由缓存的调用者,在更新数据库的同时更新缓存。

  • 写 :先更新 DB,然后直接删除 cache
  • 读:从 cache 中读取数据,读取到就直接返回;cache中读取不到的话,就从 DB 中读取数据返回,再把数据放到 cache 中

那么在写数据的过程中,可以先删除 cache ,后更新 DB 么? 答案是不可以

Read/Write Through Pattern(读写穿透)

缓存与数据库整合成一个服务,由服务来维护一致性,无序关心缓存一致性的问题

Read/Write Through Pattern 中服务端把 cache 视为主要数据存储,从中读取数据并将数据写入其中。cache 服务负责将此数据读取和写入 DB,从而减轻了应用程序的职责。一般在平时开发过程中很少使用。

写(Write Through):

  • 先查 cache,cache 中不存在,直接更新 DB。cache 中存在,则先更新 cache,然后 cache 服务自己更新 DB(同步更新 cache 和 DB)

读(Read Through):

  • 从 cache 中读取数据,读取到就直接返回 。读取不到的话,先从 DB 加载,写入到 cache 后返回响应。

在 Cache-Aside Pattern 下,发生读请求的时候,如果 cache 中不存在对应的数据,是由客户端自己负责把数据写入 cache,而 Read-Through Pattern 则是 cache 服务自己来写入缓存的,这对客户端是透明的。

Write Behind Pattern(异步缓存写入)

调用者只需要操作缓存,由其他的线程异步将数据库持久化到数据库中。

Read/Write Through 是同步更新 cache 和 DB,而 Write Behind Caching 则是只更新缓存,不直接更新 DB,而是改为异步批量的方式来更新 DB。

Write Behind Pattern 下 DB 的写性能非常高,非常适合一些数据经常变化又对数据一致性要求没那么高的场景,比如浏览量、点赞量。

image-20220625222346548

操作缓存和数据库三个问题:

1、删除缓存还是更新缓存

  • 更新缓存: 每次更新操作都更新缓存,无效的写操作较多。假设是100次更新操作,需要更新100次数据库,那么这就是写多读少的情况。
  • 删除缓存: 更新数据库的同时让缓存失败,查询的时候再更新缓存。所以每次更新完后,不进行缓存更新。

2、如何保证缓存与数据库的操作同时成功或者失败?

  • 单体系统,将缓存与数据库操作的同时成功或者失败

  • 分布式系统,利用TCC等分布式事务方案

3、先删除缓存还是先操作数据库

  • 先删除缓存,再操作数据库
  • 先操作数据库,再删除缓存

第一中操作:

:red_car::不推荐使用

两个并发的读写操作:

1、一个写操作过来,把缓存删除了

2、在写操作还没由有删除数据库的时候,一个读的请求又进来了,发现没有命中缓存,就去数据库把老的数据取了出来

3、读操作把老数据放在了缓存中

这样,数据库中的数据和缓存的数据就不一致了

image-20220625151110886

第二种:

先修改数据库,在进行删除缓存

假如数据库缓存中无数据。

1、一个读操作进来了,发现没有缓存,去数据库中读数据,这个时候因为某种原因卡了,没有及时把数据放入缓存中。

2、写操作进行来,修改数据库,删除了缓存

3、读操作恢复,把老数据写进了缓存。

image-20220625151702314

:carrot::(写:写完之后删除缓存,读:读完之后添加缓存)

这样就是造成了数据的不一致性,但是,这个概率出现的非常低,因为这需要在没有缓存的情况下,有读写的并发操作,在一般情况下,写数据库的操作要比读数据库操作慢得多,在这种情况下,还要保证读操作写缓存晚于写操作删除缓存才会出现这个问题,所以这个问题应该可以忽略不计。

解决方式

:cake: (第三种) 延迟双删:

可以看到修改数据库存在的问题,就是写读数据的保存操作,再写数据之后完成的。

:rabbit: :延迟双删就是先删除缓存,后修改数据库,最后延迟一定的时间,再次删除缓存

image-20220625153338642

ps:dancer:第一次删除缓存相当于检测下缓存服务是否可用,网络是否有问题,第二次延迟一定时间,再次删除缓存,是因为要保证读的请求在写的请求之前完成。

(第四种)内存队列:

  • 修改数据库、删除缓存这两个操作耦合在了一起,没有很好的做到单一职责;
  • 如果写操作比较频繁,可能会对Redis造成一定的压力;
  • 如果删除缓存失败,该怎么办?

第四种方式出现了:内存队列删除缓存:写操作只是修改数据库,然后把数据的Id放在内存队列里面,后台会有一个线程消费内存队列里面的数据,删除缓存,如果缓存删除失败,可以重试多次。

image-20220625154249565

这样,就把修改数据库和删除缓存两个操作解耦了,如果删除缓存失败,也可以多次尝试。由于后台有一个线程去消费内存队列去删除缓存,不是直接删除缓存,所以修改数据库和删除缓存之间产生了一定的延迟,这延迟应该可以保证读操作已经执行完毕了。
但是这么做也有不好的地方:

  • 程序复杂度成倍上升,需要维护线程、队列以及消费者;
  • 如果写操作非常频繁,队列的数据比较多,可能消费会比较慢,修改数据库后,间隔了一定的时间,缓存才被删除。

第三方队列:

如RabbitMQ,Kafka

image-20220625154434197

(1)更新数据库数据;
(2)缓存因为种种问题删除失败
(3)将需要删除的key发送至消息队列
(4)自己消费消息,获得需要删除的key
(5)继续重试删除操作,直到成功

对业务线代码造成大量的侵入。于是有了方案二,在方案二中,启动一个订阅程序去订阅数据库的binlog,获得需要操作的数据。在应用程序中,另起一段程序,获得这个订阅程序传来的信息,进行删除缓存操作。

image-20220625154541212

流程如下图所示:
(1)更新数据库数据
(2)数据库会将操作信息写入binlog日志当中
(3)订阅程序提取出所需要的数据以及key
(4)另起一段非业务代码,获得该信息
(5)尝试删除缓存操作,发现删除失败
(6)将这些信息发送至消息队列
(7)重新从消息队列中获得该数据,重试操作。

ps:camera_flash:

上述的订阅binlog程序在mysql中有现成的中间件叫canal,可以完成订阅binlog日志的功能。另外,重试机制,采用的是消息队列的方式。如果对一致性要求不是很高,直接在程序中另起一个线程,每隔一段时间去重试即可。

总结

首先,我们要明确一点,缓存不是更新,而应该是删除。

删除缓存有两种方式:

  1. 先删除缓存,再更新数据库。解决方案是使用延迟双删
  2. 先更新数据库,再删除缓存。解决方案是消息队列或者其他 binlog 同步,引入消息队列会带来更多的问题,并不推荐直接使用

针对缓存一致性要求不是很高的场景,那么只通过设置超时时间就可以了。

其实,如果不是很高的并发,无论你选择先删缓存还是后删缓存的方式,都几乎很少能产生这种问题,但是在高并发下,你应该知道怎么解决问题。

缓存穿透:

客户端请求的数据在缓存中和数据库中都不存在,这样的缓存永远都不会生效,这些请求永远都不会打到数据库。

解决方式:

缓存空对象:

image-20220625160442043

缓存空对象

优点:实现简单,维护方便

缺点:

  • 额外的内存消耗(可以添加过期时间)
  • 可能造成短期的不一致(当我们给这条数据添加了值,但是用户会查询到空值(可以插入更新这条数据,或者将过期时间设置足够短))

缓存空对象思路分析:当我们客户端访问不存在的数据时,先请求 redis,但是此时 redis 中没有数据,此时会访问到数据库,但是数据库中也没有数据,这个数据穿透了缓存,直击数据库,我们都知道数据库能够承载的并发不如 redis 这么高,如果大量的请求同时过来访问这种不存在的数据,这些请求就都会访问到数据库,简单的解决方案就是哪怕这个数据在数据库中也不存在,我们也把这个数据存入到 redis 中去,这样,下次用户过来访问这个不存在的数据,那么在 redis 中也能找到这个数据就不会进入到缓存了

布隆过滤器:

image-20220625160957802

  • 优点:

内存占用较少,没有多余key

  • 缺点:

实现复杂

存在误判可能

ps: 布隆过滤器:

https://zhuanlan.zhihu.com/p/433689454

当一个元素加入布隆过滤器中的时候,会进行如下操作:

  1. 使用布隆过滤器中的哈希函数对元素值进行计算,得到哈希值(有几个哈希函数得到几个哈希值)。
  2. 根据得到的哈希值,在位数组中把对应下标的值置为 1。

当我们需要判断一个元素是否存在于布隆过滤器的时候,会进行如下操作:

  1. 对给定元素再次进行相同的哈希计算;
  2. 得到值之后判断位数组中的每个元素是否都为 1,如果值都为 1,那么说明这个值在布隆过滤器中,如果存在一个值不为 1,说明该元素不在布隆过滤器中。

举个简单的例子:

image-20220625161706814

如果我们需要判断某个字符串是否在布隆过滤器中时,只需要对给定字符串再次进行相同的哈希计算,得到值之后判断位数组中的每个元素是否都为 1,如果值都为 1,那么说明这个值在布隆过滤器中,如果存在一个值不为 1,说明该元素不在布隆过滤器中。

应用:

如判断一个数字是否存在于包含大量数字的数字集中(数字集很大,5 亿以上!)、 防止缓存穿透(判断请求的数据是否有效避免直接绕过缓存请求数据库)等等、邮箱的垃圾邮件过滤、黑名单功能等等。

去重:比如爬给定网址的时候对已经爬取过的 URL 去重。

实现

利用 Google 开源的 Guava 中自带的布隆过滤器

自己实现的目的主要是为了让自己搞懂布隆过滤器的原理,Guava 中布隆过滤器的实现算是比较权威的,所以实际项目中我们不需要手动实现一个布隆过滤器。

首先我们需要在项目中引入 Guava 的依赖:

1
2
3
4
5
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.0-jre</version>
</dependency>

实际使用如下:

我们创建了一个最多存放 最多 1500 个整数的布隆过滤器,并且我们可以容忍误判的概率为百分之(0.01)

1
2
3
4
5
6
7
8
9
10
11
12
13
// 创建布隆过滤器对象
BloomFilter<Integer> filter = BloomFilter.create(
Funnels.integerFunnel(),
1500,
0.01);
// 判断指定元素是否存在
System.out.println(filter.mightContain(1));
System.out.println(filter.mightContain(2));
// 将元素添加进布隆过滤器
filter.put(1);
filter.put(2);
System.out.println(filter.mightContain(1));
System.out.println(filter.mightContain(2));

在我们的示例中,当 mightContain() 方法返回 true 时,我们可以 99%确定该元素在过滤器中,当过滤器返回 false 时,我们可以 100%确定该元素不存在于过滤器中。

Guava 提供的布隆过滤器的实现还是很不错的(想要详细了解的可以看一下它的源码实现),但是它有一个重大的缺陷就是只能单机使用(另外,容量扩展也不容易),而现在互联网一般都是分布式的场景。为了解决这个问题,我们就需要用到 Redis 中的布隆过滤器了。

总结

缓存穿透产生的原因是什么?

用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来巨大压力\

缓存穿透的解决方案有哪些?

  • 缓存null值

  • 布隆过滤

  • 增强id的复杂度,避免被猜测id规律

  • 做好数据的基础格式校验

  • 加强用户权限校验

  • 做好热点参数的限流

image-20220625163613788

image-20220625163623737

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
    @Override
public Result queryById(Long id) {
// 1.从redis中查询缓存数据
// 判断是否存在
String shopJson = stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);
if (StrUtil.isNotBlank(shopJson)) {
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
Shop byId = getById(id);
if (shopJson!=null) {
return Result.fail("店铺的信息不存在");

}
if (byId == null) {
// 如果都不存在,设置空值防止穿透

stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + id, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);

// 返回错误信息
return Result.fail("店铺不存在");
}
// 存在 写入redis
stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(byId), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
return Result.ok(shopJson);
}

缓存雪崩:

缓存雪崩指在同一时间内大量的key同时失效或者Redis服务宕机,导致大量的请求到达数据库,导致数据库连接不够或者数据库处理不过来,从而导致整个系统不可用。

解决方案:

  • 给不同的Key的TTL 添加随机值

  • 利用redis集群提高服务的可用性

  • 给缓存业务添加降级限流的策略

  • 给业务添加多级缓存(多个部分建立缓存,浏览器,jvm等)

交错失效时间:这种方法时间比较简单粗暴,既然在同一时间失效会造成请求过多雪崩,那我们错开不同的失效时间即可从一定长度上避免这种问题,在缓存进行失效时间设置的时候,从某个适当的值域中随机一个时间作为失效时间即可。

image-20220625164203763

缓存击穿:

缓存击穿实际上是缓存雪崩的一个特例也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。

击穿与雪崩的区别即在于击穿是对于某一特定的热点数据来说,而雪崩是全部数据。

image-20220625165431054

ps:calendar: 某个热点key失效后,多个线程都在查询数据库并且重建缓存数据。

解决方式:

  • 使用互斥锁
  • 逻辑过期

使用互斥锁

解决方案一:使用锁来解决:

因为锁能实现互斥性。假设线程过来,只能一个人一个人的来访问数据库,从而避免对于数据库访问压力过大,但这也会影响查询的性能,因为此时会让查询的性能从并行变成了串行,我们可以采用 tryLock 方法 + 双重检测 (double check) 来解决这样的问题。

假设现在线程 1 过来访问,他查询缓存没有命中,但是此时他获得到了锁的资源,那么线程 1 就会一个人去执行逻辑,假设现在线程 2 过来,线程 2 在执行过程中,并没有获得到锁,那么线程 2 就可以进行到休眠,直到线程 1 把锁释放后,线程 2 获得到锁,然后再来执行逻辑,此时就能够从缓存中拿到数据了。

ps:apple: :当某个大key失效后,必须想获取锁,其余的线程都在等待中,假如某个请求200ms,那么就是其他所有线程都必须在等待中。

image-20220625165800508

逻辑过期:

解决方案二:逻辑过期方案

方案分析:我们之所以会出现这个缓存击穿问题,主要原因是在于我们对 key 设置了过期时间,假设我们不设置过期时间,其实就不会有缓存击穿的问题,但是不设置过期时间,这样数据不就一直占用我们内存了吗,我们可以采用逻辑过期方案。

我们把过期时间设置在 redis 的 value 中,注意:这个过期时间并不会直接作用于 redis,而是我们后续通过逻辑去处理。假设线程 1 去查询缓存,然后从 value 中判断出来当前的数据已经过期了,此时线程 1 去获得互斥锁,那么其他线程会进行阻塞,获得了锁的线程他会开启一个 线程去进行 以前的重构数据的逻辑,直到新开的线程完成这个逻辑后,才释放锁, 而线程 1 直接进行返回,假设现在线程 3 过来访问,由于线程线程 2 持有着锁,所以线程 3 无法获得锁,线程 3 也直接返回数据,只有等到新开的线程 2 把重建数据构建完后,其他线程才能走返回正确的数据。

这种方案巧妙在于,异步的构建缓存,缺点在于在构建完缓存之前,返回的都是脏数据。

image-20220625170219873

认为设置一个过期时间,默认为没有过期

image-20220625171720659

线程一查询缓存,判断是否过期,如果已经过期了,那么在查询数据之前,获取互斥锁,然后异步查询,将查询出来的数据返回前端,再来一个线程,查询缓存过期获取锁失败,那么将过期是时间返回,异步任务结束之后,就进行释放锁。再来一个线程时,发现没有过期,就进行查询。

总结

image-20220625172151652

互斥锁相较于逻辑过期,因为需要加锁,保证了数据的一致性,但是线程需要等待,影响性能,会出现死锁的问题,如果你在业务中需要查询多个缓存,你拿到一把锁,去获取另外数据里面的锁,就会产生相互等待的情况。

逻辑过期不需要等待,性能较好,但是会有数据不一致性,但是提升了系统的可用性。

应用

基于互斥锁方式解决缓存击穿问题

核心思路:相较于原来从缓存中查询不到数据后直接查询数据库而言,现在的方案是 进行查询之后,如果从缓存没有查询到数据,则进行互斥锁的获取,获取互斥锁后,判断是否获得到了锁,如果没有获得到,则休眠,过一会再进行尝试,直到获取到锁为止,才能进行查询

如果获取到了锁的线程,再去进行查询,查询后将数据写入 redis,再释放锁,返回数据,利用互斥锁就能保证只有一个线程去执行操作数据库的逻辑,防止缓存击穿

修改根据id查询商铺的业务,基于互斥锁方式来解决缓存击穿问题

image-20220625172924149

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98

public Shop queryWithMutex(Long id) {
// 1.从redis中查询缓存数据
// 判断是否存在
String shopJson = stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);
if (StrUtil.isNotBlank(shopJson)) {
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return shop;
}
// 防止缓存穿透,如果数据库以及缓存中都没有
if (shopJson != null) {
// 返回错误信息
return null;
}
String lockKey = "lock:shop:" + id;
Shop shop;
// 尝试获取锁
try {
boolean isLock = tryLock(lockKey);
if (!isLock) {
// 失败,休眠后继续递归重试
Thread.sleep(50);
queryWithMutex(id);
}

shop = getById(id);
// 模拟重建的延时
Thread.sleep(200);

if (shop == null) {
// 如果都不存在,设置空值防止穿透
stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + id, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);
// 返回错误信息
return null;
}
// 存在
stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
// 释放互斥锁
unlock(lockKey);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// 释放互斥锁
unlock(lockKey);
}
// 返回
return shop ;
}


/**
* 查询
*/
public Shop queryWithPassThrough(Long id) {
// 1.从redis中查询缓存数据
// 判断是否存在
String shopJson = stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);
if (StrUtil.isNotBlank(shopJson)) {
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return shop;
}
Shop byId = getById(id);
// 判断命中是否是空值
if (shopJson != null) {
return null;

}
if (byId == null) {
// 如果都不存在,设置空值防止穿透

stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + id, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);

// 返回错误信息
return null;
}
// 存在 写入redis
stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(byId), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
return byId;
}

/**
* 释放锁
* @param key
*/
private void unlock(String key) {
stringRedisTemplate.delete(key);
}
/**
* 获取锁
*
* @return
*/
private boolean tryLock(String key) {
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
// 会进行拆箱,如果flag==null,会进行空指针异常
return BooleanUtil.isTrue(flag);
}

基于逻辑过期方式解决缓存击穿的问题

ps:dog: 根据id查询商铺的业务,基于逻辑过期方式来解决缓存击穿

image-20220625234139832

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

// 开启一个线程池
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);

public Shop queryWithLogicalExpire(Long id) {
String key = CACHE_SHOP_KEY + id;
//
String shopJson = stringRedisTemplate.opsForValue().get(key);

if (StrUtil.isBlank(shopJson)) {
return null;
}
// 命中,需要先把json反序列化
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
LocalDateTime expireTime = redisData.getExpireTime();
// 判断是否过期
if (expireTime.isAfter(LocalDateTime.now())) {
// 没有过期,直接返回店铺信息
return shop;
}
// 已经过期,需要缓存重建
// 获取互斥锁
String localKey = LOCK_SHOP_KEY + id;
boolean isLock = tryLock(localKey);
if (isLock) {
// 异步更新数据
CACHE_REBUILD_EXECUTOR.submit(() -> {
// 重建缓存
this.saveShop2Redis(id, 20L);
// 释放锁
unlock(localKey);
});
}
// 返回过期的商铺信息
return shop;
}

缓存工具的封装

基于StringRedisTemplate封装一个缓存工具类,满足以下需求:

  • 方法一:将任意的Java对象序列化为Json并存储在string类型的key中,并且可以设置TTL过期的时间。
  • 方法二:将任意的java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存的击穿的问题
  • 方法三: 根据指定的key查询缓存,并反序列化为指定的类型,利用缓存空值得方式缓存穿透问题
  • 方法四: 根据指定得key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题。

缓存穿透封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public <R, ID> R queryWithPassThrough(
String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) {
String key = keyPrefix + id;
// 1.从redis查询商铺缓存
String json = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if (StrUtil.isNotBlank(json)) {
// 3.存在,直接返回
return JSONUtil.toBean(json, type);
}
// 判断命中的是否是空值
if (json != null) {
// 返回一个错误信息
return null;
}

// 4.不存在,根据id查询数据库
//不确定sql 逻辑,函数表达式是传进来得。
R r = dbFallback.apply(id);
// 5.不存在,返回错误
if (r == null) {
// 将空值写入redis
stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
// 返回错误信息
return null;
}
// 6.存在,写入redis
this.set(key, r, time, unit);
return r;
}

逻辑过期封装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
    private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);  
public <R, ID> R queryWithLogicalExpire(
String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) {
String key = keyPrefix + id;
// 1.从redis查询商铺缓存
String json = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if (StrUtil.isBlank(json)) {
// 3.存在,直接返回
return null;
}
// 4.命中,需要先把json反序列化为对象
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);
LocalDateTime expireTime = redisData.getExpireTime();
// 5.判断是否过期
if (expireTime.isAfter(LocalDateTime.now())) {
// 5.1.未过期,直接返回店铺信息
return r;
}
// 5.2.已过期,需要缓存重建
// 6.缓存重建
// 6.1.获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
// 6.2.判断是否获取锁成功
if (isLock) {
// 6.3.成功,开启独立线程,实现缓存重建
CACHE_REBUILD_EXECUTOR.submit(() -> {
try {
// 查询数据库
R newR = dbFallback.apply(id);
// 重建缓存
this.setWithLogicalExpire(key, newR, time, unit);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
// 释放锁
unlock(lockKey);
}
});
}
// 6.4.返回过期的商铺信息
return r;
}
private boolean tryLock(String key) {
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}

private void unlock(String key) {
stringRedisTemplate.delete(key);
}

使用:

1
2
Shop shop = cacheClient
.queryWithPassThrough(CACHE_SHOP_KEY, id, Shop.class, this::getById, CACHE_SHOP_TTL, TimeUnit.MINUTES);

优惠券秒杀

全局唯一ID

每个店铺都可以发布优惠券:

订单时间表:

image-20220627110137070

当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题:

  • id的规律性太明显受
  • 单表数据量的限制

image-20220627104817322

image-20220628010219978

场景分析:如果我们的 id 具有太明显的规则,用户或者说商业对手很容易猜测出来我们的一些敏感信息,比如商城在一天时间内,卖出了多少单,这明显不合适。

场景分析二:随着我们商城规模越来越大,mysql 的单表的容量不宜超过 500W,数据量过大之后,我们要进行拆库拆表,但拆分表了之后,他们从逻辑上讲他们是同一张表,所以他们的 id 是不能一样的, 于是乎我们需要保证 id 的唯一性。

全局 ID 生成器,是一种分布式系统用来生成全局唯一的工具。

image-20220627105338077

全局唯一性:不能出现重复的ID号,既然是唯一标识,这是最基本的要求;

趋势递增:在MySQL InnoDB引擎中使用的是聚集索引,由于多数RDBMS使用B-tree的数据结构来存储索引数据,在主键的选择上面我们应该尽量使用有序的主键保证写入性能;

  • 单调递增:保证下一个ID一定大于上一个ID,例如事务版本号、IM增量消息、排序等特殊需求;
  • 信息安全:如果ID是连续的,恶意用户的扒取工作就非常容易做了,直接按照顺序下载指定URL即可;如果是订单号就更危险了,竞对可以直接知道我们一天的单量。所以在一些应用场景下,会需要ID无规则、不规则;

为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:

实现方式:

image-20220628105925073

Redis自增ID策略:

这主要依赖于Redis是单线程的,所以也可以用生成全局唯一的ID。可以用Redis的原子操作 INCR和INCRBY来实现

  • 每天一个key,方便统计订单量
  • ID构造是 时间戳 + 计数器

image-20220627105811956

ID的组成部分:

符号位:1bit,永远为0

时间戳:31bit,以秒为单位,可以使用69年

序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
    
public long nextId(String keyPrefix) {
// 1.生成时间戳,31位数据,当前时间减去初始时间
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;

// 2.生成序列号
// 2.1.获取当前日期,精确到天,得到日期的天
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
// 2.2.自增长
// key的构建,只有当每一天的数据进行拼接 ,每一天的数据放在一起,redis 的自增长的值每天都有上线,不能使用同一个key
// 不会出现空指针,如果为空,会自动创建 ,redis自增长,方便统计
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);

// 3.拼接并返回
// 32向左移动32位,或位运算
return timestamp << COUNT_BITS | count;
}

测试效率:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
    @Resource
private ShopServiceImpl shopService;

private RedisIdWorker redisIdWorker;


private ExecutorService es = Executors.newFixedThreadPool(500);

@Test
void testSaveShop() {
shopService.saveShop2Redis(1L, 10L);
}


@Test
void testIdWorker() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(300);
Runnable task = () -> {
for (int i = 0; i < 100; i++) {
long order = redisIdWorker.nextId("order");
System.out.println("order = " + order);
}
latch.countDown();
};
long begin = System.currentTimeMillis();
// 提交三百次
for (int i = 0; i < 300; i++) {
es.submit(task);
}
latch.await();
long end = System.currentTimeMillis();
System.out.println("time" + (end - begin));

}

image-20220628105520737

生成过期时间的key:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//
// 思路:日期(yyyyMMddHHmmss)+redis原子生成的数字(不足6位前面补0)
//
// 类似:20191206221953000001
/**
* 获取有过期时间的自增长ID
* @param key
* @param expireTime
* @return
*/
public long generate(String key, Date expireTime) {
RedisAtomicLong counter = new RedisAtomicLong(key, stringRedisTemplate.getConnectionFactory());
Long expire = counter.getExpire();
if(expire==-1){
counter.expireAt(expireTime);
}
return counter.incrementAndGet();
}
public String generateOrderId() {
//生成id为当前日期(yyMMddHHmmss)+6位(从000000开始不足位数补0)
LocalDateTime now = LocalDateTime.now();
String orderIdPrefix = getOrderIdPrefix(now);//生成yyyyMMddHHmmss
// 不足的前面补零
String orderId = orderIdPrefix+String.format("%1$06d", generate(orderIdPrefix,getExpireAtTime(now)));
return orderId;

}
public static String getOrderIdPrefix(LocalDateTime now){
return now.format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"));
}

public Date getExpireAtTime(LocalDateTime now){
ZoneId zoneId = ZoneId.systemDefault();
LocalDateTime localDateTime = now.plusSeconds(20);
ZonedDateTime zdt = localDateTime.atZone(zoneId);
Date date = Date.from(zdt.toInstant());
return date;
}

注意:在Redis集群情况下,同样和Mysql一样需要设置不同的增长步长,同时key一定要设置有效期。可以使用Redis集群来获取跟高的吞吐量。
假如一个集群中有5台Redis。可以初始化每台Redis的值分别是1,2,3,4,5,然后步都是5.
各个redis生成的ID为:
A: 1,6,11,16,21
B:2,7,12,17,22
C:3,8,13,18,23
D:4,9,14,19,24
E:5,10,15,20,25

测试类

知识小贴士:关于 countdownlatch

countdownlatch 名为信号枪:主要的作用是同步协调在多线程的等待于唤醒问题

我们如果没有 CountDownLatch ,那么由于程序是异步的,当异步程序没有执行完时,主线程就已经执行完了,然后我们期望的是分线程全部走完之后,主线程再走,所以我们此时需要使用到 CountDownLatch

CountDownLatch 中有两个最重要的方法

1、countDown

2、await

await 方法 是阻塞方法,我们担心分线程没有执行完时,main 线程就先执行,所以使用 await 可以让 main 线程阻塞,那么什么时候 main 线程不再阻塞呢?当 CountDownLatch 内部维护的 变量变为 0 时,就不再阻塞,直接放行,那么什么时候 CountDownLatch 维护的变量变为 0 呢,我们只需要调用一次 countDown ,内部变量就减少 1,我们让分线程和变量绑定, 执行完一个分线程就减少一个变量,当分线程全部走完,CountDownLatch 维护的变量就是 0,此时 await 就不再阻塞,统计出来的时间也就是所有分线程执行完后的时间。

优点:

不依赖于数据库,灵活方便,且性能优于数据库。

数字ID天然排序并自增,对分页或者需要排序的结果很有帮助。

缺点:

占用带宽,每次要向redis进行请求。

如果系统中没有Redis,还需要引入新的组件,增加系统复杂度。

需要编码和配置的工作量比较大。

强依赖redis,redis出问题业务不可用。

5种全局ID生成方式、优缺点及改进方案 - 知乎 (zhihu.com)

(66条消息) 分布式全局唯一ID的实现_Archie_java的博客-CSDN博客_分布式全局唯一id实现方案

实现优惠券秒杀下单

image-20220628215150945

添加优惠卷:

image-20220628215445858

实现优惠卷id:

下单是否开始或结束,如果尚未开始或已经结束无法下单

库存是否充足,不足则无法下单

image-20220628215812241

基本逻辑就是先判断秒杀是否开启,如果开启那么判断判断库存是否充足,如果库存充足,那么就进行扣减库存,扣减成功后,创建订单。

京东商品库存的扣除过程,如何防止超卖? (qq.com)

超卖问题

image-20220709160833160

悲观锁: 超卖问题是典型的多线程安全问题,针对解决方案就是加锁:

悲观锁:

认为线程一定会先获取锁,确保线程串行执行

  • 例如: Synchronized、Lock都是属于悲观锁

乐观锁:

认为线程安全问题不一定会发生,因此不加锁在,只是在更新数据的时候去判断有没有其他线程对数据做了修改。

  • 如果没有修改则认为是安全的,自己才更新数据

  • 如果已经被其他的线程修改说明发生了安全问题

    版本号:

    加上一个version字段,有一个版本号,每一次查询字段的时候,修复该的时候,是否存在改本,如果不存在,那么就是修改失败。

    image-20220709161957227

CAS法:

利用库存去代替版本号,更新的值是否与查询的值一样。

image-20220709170221801

image-20220709170256751

如果使用==去判断,就是当库存充足的时候,会引起失效,那么只需要将库存设置为>0扣减即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
    /**
* 返回orderId
* @param voucher
* @return
*/
@Override
@Transactional
public Result seckillVoucher(Long voucher) {
// 查询优惠卷
SeckillVoucher v = seckillVoucherService.getById(voucher);

// 判断秒杀是否开始
if (v.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀尚未开始");
}
// 秒杀已经结束
if (v.getEndTime().isBefore(LocalDateTime.now())) {
// 库存不足
return Result.fail("秒杀已经结束");
}
if (v.getStock()<1){
return Result.fail("库存不足");
}

// 扣减库存,使用乐观锁控制多线程访问问题,如果使用版本号进行控制,失败了太低,所以改成>0
boolean success = seckillVoucherService.update().setSql("stock=stock-1").eq("voucher_id", voucher).gt("stock",0).update();
if (!success) {
// 扣减失败
return Result.fail("库存不足!");
}
// 6创建订单,用户的id
long orderId = redisIdWorker.nextId("order");
Long id = UserHolder.getUser().getId();
// 代金卷id
VoucherOrder voucherOrder = new VoucherOrder().setId(orderId).setUserId(id).setVoucherId(voucher);

// 代金卷id
this.save(voucherOrder);

// 返回订单的id
return Result.ok(orderId);
}

修改代码方案一、

VoucherOrderServiceImpl 在扣减库存时,改为:

1
2
3
4
JAVA
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1") //set stock = stock -1
.eq("voucher_id", voucherId).eq("stock",voucher.getStock()).update(); //where id = ? and stock = ?

以上逻辑的核心含义是:只要我扣减库存时的库存和之前我查询到的库存是一样的,就意味着没有人在中间修改过库存,那么此时就是安全的,但是以上这种方式通过测试发现会有很多失败的情况,失败的原因在于:在使用乐观锁过程中假设 100 个线程同时都拿到了 100 的库存,然后大家一起去进行扣减,但是 100 个人中只有 1 个人能扣减成功,其他的人在处理时,他们在扣减时,库存已经被修改过了,所以此时其他线程都会失败

修改代码方案二、

之前的方式要修改前后都保持一致,但是这样我们分析过,成功的概率太低,所以我们的乐观锁需要变一下,改成 stock 大于 0 即可

1
2
3
4

boolean success = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId).update().gt("stock",0); //where id = ? and stock > 0

知识小扩展:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

/**
* 返回orderId
* @param voucher
* @return
*/
@Override
@Transactional
public Result seckillVoucher(Long voucher) {
// 查询优惠卷
SeckillVoucher v = seckillVoucherService.getById(voucher);

// 判断秒杀是否开始
if (v.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀尚未开始");
}
// 秒杀已经结束
if (v.getEndTime().isBefore(LocalDateTime.now())) {
// 库存不足
return Result.fail("秒杀已经结束");
}
if (v.getStock()<1){
return Result.fail("库存不足");
}

// 扣减库存,使用乐观锁控制多线程访问问题,如果使用版本号进行控制,失败了太低,所以改成>0
boolean success = seckillVoucherService.update().setSql("stock=stock-1").eq("voucher_id", voucher).gt("stock",0).update();
if (!success) {
// 扣减失败
return Result.fail("库存不足!");
}
// 6创建订单,用户的id
long orderId = redisIdWorker.nextId("order");
Long id = UserHolder.getUser().getId();
// 代金卷id
VoucherOrder voucherOrder = new VoucherOrder().setId(orderId).setUserId(id).setVoucherId(voucher);

// 代金卷id
this.save(voucherOrder);

// 返回订单的id
return Result.ok(orderId);
}

超卖这样的线程安全问题,解决方案有哪些?

悲观锁:添加同步锁,让线程串行执行

  • 优点:简单粗暴缺点:性能一般

乐观锁:不加锁,在更新时判断是否有其它线程在修改

  • 优点:性能好
  • 缺点:存在成功率低的问题

三种方式对比:

1)update product set stock=stock-1 where id =1 and stock>0
2)update product set stock=stock-1 where stock=#{原先查询的库存} and id = 1 and stock>0
3)update product set stock=stock-1,versioin =version+1 where id = 1 and stock>0 and version=#{原先查询的版本号}

前置知识:
什么是 ABA 问题:例如,有个用户同时进入领卷接口,A,B,C 进入时库存还有 10 张,A 领取后应该只有 9 张,但 B 给库存增加了一张,这样库存就还有 10 张,此时虽然库存是发生了变化的,但是 C 仍然领取成功了,这就是一个 ABA 问题。

答案:核⼼是解决超卖的问题,就是防⽌库存为负数

⽅案⼀:id 是主键索引的前提下,如果每次只是减少 1 个库存,则可以采⽤上⾯的⽅式,只做数据安全校验,可以有效减库存,性能更⾼,避免⼤量⽆⽤ sql,只要有库存就也可以操作成功,如果该 sql 语句中不加入条件 stock>0 的条件,则在高并发下就会产生库存扣减超卖,即库存将变成负数.
适用场景:⾼并发场景下的取号器,优惠券发放扣减库存等

⽅案⼆:使⽤业务⾃身的条件做为乐观锁,但是存在 ABA 问题,对⽐⽅案三的好处是不⽤增加 version 版本字段。
场景如果只是扣减库存且不在意 ABA 问题时,则可以采⽤上面的⽅式,但业务性能相对⽅案⼀就差了点,因为库存变动后 sql 就会无效。

⽅案三:增加版本号主要是为了解决 ABA 问题,数据读取后,更新前数据被别⼈篡改过,version 只能做递增。
场景:商品秒杀、优惠券⽅法,需要记录库存操作前后的业务

一人一单

需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单

现在的问题在于:

优惠卷是为了引流,但是目前的情况是,一个人可以无限制的抢这个优惠卷,所以我们应当增加一层逻辑,让一个用户只能下一个单,而不是让一个用户下多个单

具体操作逻辑如下:

比如时间是否充足,如果时间充足,则进一步判断库存是否足够,然后再根据优惠卷 id 和用户 id 查询是否已经下过这个订单,如果下过这个订单,则不再下单,否则进行下单

image-20220709171445853

如果使用多个线程之间,会存在一个人下多个单子的问题

image-20220709175920688

image-20220709175942269

VoucherOrderServiceImpl

初步代码:增加一人一单逻辑

1
2
3
4
JAVA
VoucherOrderServiceImpl

**初步代码:增加一人一单逻辑**

image-20220709205402834

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
// 5.一人一单逻辑
// 5.1.用户id
Long userId = UserHolder.getUser().getId();
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}

//6,扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId).update();
if (!success) {
//扣减库存
return Result.fail("库存不足!");
}
//7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);

voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);

return Result.ok(orderId);

}

存在问题:现在的问题还是和之前一样,并发过来,查询数据库,都不存在订单,所以我们还是需要加锁,但是乐观锁比较适合更新数据,而现在是插入数据,所以我们需要使用悲观锁操作

注意:在这里提到了非常多的问题,我们需要慢慢的来思考,首先我们的初始方案是封装了一个 createVoucherOrder 方法,同时为了确保他线程安全,在方法上添加了一把 synchronized 锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

@Transactional
public synchronized Result createVoucherOrder(Long voucherId) {

Long userId = UserHolder.getUser().getId();
// 5.1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}

// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
return Result.fail("库存不足!");
}

// 7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2.用户id
voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);

// 7.返回订单id
return Result.ok(orderId);
}

,但是这样添加锁,锁的粒度太粗了,在使用锁过程中,控制锁粒度 是一个非常重要的事情,因为如果锁的粒度太大,会导致每个线程进来都会锁住,所以我们需要去控制锁的粒度,以下这段代码需要修改为:
intern () 这个方法是从常量池中拿到数据,如果我们直接使用 userId.toString () 他拿到的对象实际上是不同的对象,new 出来的对象,我们使用锁必须保证锁必须是同一把,所以我们需要使用 intern () 方法

需要对用户进行加锁,那么怎么进行加锁呢?第一个是对整个方法使用悲观锁 Synchronized 进行加锁,但是会出现效率低下,对所有用户线程都是一把锁,因此我们抽取出来对用户进行加锁,如下图所示:

image-20220709185050047

但是存在一个问题是,方法事务的提交是在锁释放之后进行提交,就会存在一个并发问题,就是一个线程获取锁之后,还没有提交事务,那么另一个线程是查询不到该用户的数据因为还没有进行事务的提交。所以需要进行改进:

因为直接调用,并不是作用于 Spring 的代理对象,Spring 事务会失效,所以

在该 Service 类中使用 AopContext.currentProxy () 获取代理对象。

image-20220709190720284

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
JAVA
@MapperScan("com.hmdp.mapper")
@SpringBootApplication
@EnableAspectJAutoProxy(exposeProxy = true )
public class HmDianPingApplication {

public static void main(String[] args) {
SpringApplication.run(HmDianPingApplication.class, args);
}

}

最终代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {


@Resource
private ISeckillVoucherService seckillVoucherService;

@Resource
RedisIdWorker redisIdWorker;

/**
* 返回orderId
* @param voucher
* @return
*/
@Override

public Result seckillVoucher(Long voucher) {
// 查询优惠卷
SeckillVoucher v = seckillVoucherService.getById(voucher);

// 判断秒杀是否开始
if (v.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀尚未开始");
}
// 秒杀已经结束
if (v.getEndTime().isBefore(LocalDateTime.now())) {
// 库存不足
return Result.fail("秒杀已经结束");
}
if (v.getStock()<1){
return Result.fail("库存不足");
}
// 一人一单

// 一人一单,存在并发问题,如果多个人来操作数据,会出现
Long userId = UserHolder.getUser().getId();
synchronized ((userId.toString().intern())){
//获取事务对象
//在事务提交后获取锁
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
return proxy.createVoucherOrder( v.getVoucherId());
}
}

/**
* 对同一个用户进行加锁处理
* @param voucherId
* @return
*/
@Transactional
public Result createVoucherOrder(Long voucherId){
Long userId = UserHolder.getUser().getId();

Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if (count>0){
// 用户已经购买
return Result.fail("用户已经购买过一次");
}
// 扣减库存,使用乐观锁控制多线程访问问题,如果使用版本号进行控制,失败了太低,所以改成>0
boolean success = seckillVoucherService.update().setSql("stock=stock-1").eq("voucher_id", voucherId).gt("stock",0).update();
if (!success) {
// 扣减失败
return Result.fail("库存不足!");
}
// 6创建订单,用户的id
long orderId = redisIdWorker.nextId("order");
Long id = UserHolder.getUser().getId();
// 代金卷id
VoucherOrder voucherOrder = new VoucherOrder().setId(orderId).setUserId(id).setVoucherId(voucherId);

// 代金卷id
this.save(voucherOrder);

// 返回订单的id
return Result.ok(orderId);
}

image-20220709191119643

image-20220709191134743

image-20220709191144277

分布式锁

场景:

在个中,我们使用到一人一单中,通过加锁可以解决在单机情况下一人一单的问题,但是集群模式就不行了。

配置测试环境:

1、我们将服务启动两份,端口分别为8081和8082:

image-20220709191711151

2、然后修改nginx的conf目录下的nginx.conf文件,配置反向代理和负载均衡:

image-20220709191741033

现在,用户请求会在这两个节点上负载均衡,再次测试下是否存在线程安全问题。

image-20220709192717832

image-20220709192733572

image-20220709205444103

由于现在我们部署了多个 tomcat,每个 tomcat 都有一个属于自己的 jvm,那么假设在服务器 A 的 tomcat 内部,有两个线程,这两个线程由于使用的是同一份代码,那么他们的锁对象是同一个,是可以实现互斥的,但是如果现在是服务器 B 的 tomcat 内部,又有两个线程,但是他们的锁对象写的虽然和服务器 A 一样,但是锁对象却不是同一个,所以线程 3 和线程 4 可以实现互斥,但是却无法和线程 1 和线程 2 实现互斥,这就是 集群环境下,syn 锁失效的原因,在这种情况下,我们就需要使用分布式锁来解决这个问题。

分布式锁:

image-20220709205949569

那么分布式锁他应该满足一些什么样的条件呢?

  • 可见性:多个线程都能看到相同的结果,注意:这个地方说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化的意思
  • 互斥:互斥是分布式锁的最基本的条件,使得程序串行执行
  • 高可用:程序不易崩溃,时时刻刻都保证较高的可用性
  • 高性能:由于加锁本身就让性能降低,所有对于分布式锁本身需要他就较高的加锁性能和释放锁性能
  • 安全性:安全也是程序中必不可少的一环

常见实现方式:

实现多进程之间的互斥,常见三种:

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。

分布式锁的核心思想就是让大家都使用同一把锁,只要大家使用的是同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路

实现多进程之间的互斥,常见三种:

常见的分布式锁有三种

  • Mysql:mysql 本身就带有锁机制,但是由于 mysql 性能本身一般,所以采用分布式锁的情况下,其实使用 mysql 作为分布式锁比较少见
  • Redis:redis 作为分布式锁是非常常见的一种使用方式,现在企业级开发中基本都使用 redis 或者 zookeeper 作为分布式锁,利用 setnx 这个方法,如果插入 key 成功,则表示获得到了锁,如果有人插入成功,其他人插入失败则表示无法获得到锁,利用这套逻辑来实现分布式锁
  • Zookeeper:zookeeper 也是企业级开发中较好的一个实现分布式锁的方案,由于本套视频并不讲解 zookeeper 的原理和分布式锁的实现,所以不过多阐述

image-20220709210708904

基于Redis的分布式锁:

实现

实现分布式锁时需要实现两个方法:

获取锁:

互斥:确保只能有一个线程获取锁

#添加锁,利用setnx的互斥性

setnx lock thread1

#添加过期时间,避免服务宕机引起死锁

EXPIRE lock 10

互斥:确保只能有一个线程获取锁

释放锁:

  • 手动释放

  • 超时释放(如果加锁之后,锁不能够进行释放,所以我们添加一个超时的时间

    #释放锁

    DEL key

image-20220709211215789

以上的逻辑还有缺陷:如果在setnx和expire之间的服务器进程突然挂掉,可能是人为因为机器掉电或者人为造成,就会导致expire得不到执行,也会造成死锁。

根源在于 setnx与expire是两条指令而不是原子指令,expire是依赖setnx进行执行的,不能够使用事务进行解决,事务里面没有if-else语句。

在redis2.8中,加入了扩展指令,使得setnx与expire能够一起进行执行。

http://qiniu.littlekim.top/image/image-20220927120938304.png

#添加锁,NX是互斥,EX是超时时间

SET lock thread1 NX EX 10

创建接口(Lock ):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface ILock {

/**
* 尝试获取锁
* @param timeoutSex 锁持有的超时时间,过期后自动释放
* @return true 代表获取锁成功;false 代表获取锁失败
*/
boolean tryLock(long timeoutSex);

/**
* 释放锁
*/
void unlock();
}

定义实现类(SimpleRedisLock):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.hmdp.utils;

import org.springframework.data.redis.core.StringRedisTemplate;

import java.util.concurrent.TimeUnit;

public class SimpleRedisLock implements ILock{

private String name;
private StringRedisTemplate stringRedisTemplate;
private static final String KEY_PREFIX ="lock:";

public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}

/**
*
* @param timeoutSex 锁持有的超时时间,过期后自动释放
* @return
*/
@Override
public boolean tryLock(long timeoutSex) {
// 获取线程标识
final long threadId = Thread.currentThread().getId();
// 去尝试获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSex, TimeUnit.SECONDS);
// 或自动拆箱可能为空值,避免出现空指针异常
return Boolean.TRUE.equals(success);
}

@Override
public void unlock() {
stringRedisTemplate.delete(KEY_PREFIX+name);
}
}

修改实现类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
    /**
* 返回orderId
* @param voucher
* @return
*/
@Override
public Result seckillVoucher(Long voucher) {
// 查询优惠卷
SeckillVoucher v = seckillVoucherService.getById(voucher);

// 判断秒杀是否开始
if (v.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀尚未开始");
}
// 秒杀已经结束
if (v.getEndTime().isBefore(LocalDateTime.now())) {
// 库存不足
return Result.fail("秒杀已经结束");
}
if (v.getStock()<1){
return Result.fail("库存不足");
}
// 一人一单

// 一人一单,存在并发问题,如果多个人来操作数据,会出现
Long userId = UserHolder.getUser().getId();
// 创建锁对象
SimpleRedisLock simpleRedisLock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
boolean isLock = simpleRedisLock.tryLock(1200);
if (!isLock) {
// 获取锁失败
return Result.fail("不允许重复下单");
}
try {
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
return proxy.createVoucherOrder( v.getVoucherId());
}finally {
// 释放锁
simpleRedisLock.unlock();
}

结果测试:

image-20220709220105476

image-20220709215957523

image-20220709215906342

可以看到上图中redis中获取了锁,并记录了线程名字,此时我们可以看到两个进程中只有一个获取成功。

存在问题(分布式锁误删):

image-20220709222430739

图中 线程1 中获取锁,由于业务阻塞时间太长 ,导致超时释放锁,此时 线程2 尝试获取锁,

拿到锁后执行自己的业务。

image-20220709222502291

在线程2执行自己的业务时,线程1执行完毕,释放了线程2的锁。

image-20220709222513489

此时线程三尝试去获取锁,并开始执行自己的业务,所以 此时出现了线程并发的问题,两个线程在同时执行。

解决方式:

需要在获取锁的时候,判断是否该线程的锁,如果不是该线程的锁,不进行锁释放。

image-20220709223030815

流程优化:

image-20220709223319353

在存储时,存入线程标识,释放锁时,判断是否是在该线程内。

需求:修改之前的分布式锁实现,满足:

在获取锁时存入线程标示(可以用UUID表示)

在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致

  • 如果一致则释放锁
  • 如果不一致则不释放锁
实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.hmdp.utils;

import cn.hutool.core.lang.UUID;
import org.springframework.data.redis.core.StringRedisTemplate;


import java.util.concurrent.TimeUnit;

public class SimpleRedisLock implements ILock {

private String name;
private StringRedisTemplate stringRedisTemplate;
private static final String KEY_PREFIX = "lock:";
// 去掉UUID的横线
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";

public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}

/**
* @param timeoutSex 锁持有的超时时间,过期后自动释放
* @return
*/
@Override
public boolean tryLock(long timeoutSex) {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 去尝试获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSex, TimeUnit.SECONDS);
// 或自动拆箱可能为空值,避免出现空指针异常
return Boolean.TRUE.equals(success);
}

@Override
public void unlock() {
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 两个线程标识符是否是一致的
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
if (threadId.equals(id)) {
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
}

PostMan进行测试

image-20220709224939204

image-20220709225115455

image-20220709225209626

释放锁问题(接上)

在判断锁是否一样时,判断和删除不是一个原子性的操作。如果阻塞在判断操作,会引起超时释放

线程 1 现在持有锁之后,在执行业务逻辑过程中,他正准备删除锁,而且已经走到了条件判断的过程中,比如他已经拿到了当前这把锁确实是属于他自己的,正准备删除锁,但是此时他的锁到期了,那么此时线程 2 进来,但是线程 1 他会接着往后执行,当他卡顿结束后,他直接就会执行删除锁那行代码,相当于条件判断并没有起到作用,这就是删锁时的原子性问题,之所以有这个问题,是因为线程 1 的拿锁,比锁,删锁,实际上并不是原子性的,我们要防止刚才的情况发生,

image-20220709225849663

image-20220709225619627

image-20220709225722475

使用lua脚本优化:

释放锁的业务流程是这样的:

获取锁中的线程标示

判断是否与指定的标示(当前线程标示)一致

如果一致则释放锁(删除)

如果不一致则什么都不做

如果用Lua脚本来表示则是这样的:

需求:基于Lua脚本实现分布式锁的释放锁逻辑提示:RedisTemplate调用Lua脚本的API如下:

image-20220712234656452

image-20220712235016993

1
2
3
4
5
6
7
8
9
10
11
12
13
-- 获取锁中的线程标识 get key
--local id = redis.call('get',KEYS[1])
-- 比较线程标识与锁的标志是否一致
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致

if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0

修改代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.hmdp.utils;

import cn.hutool.core.lang.UUID;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;


import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class SimpleRedisLock implements ILock {

private String name;
private StringRedisTemplate stringRedisTemplate;
private static final String KEY_PREFIX = "lock:";
// 去掉UUID的横线
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";

private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
// 初始化脚本调用

static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}

public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}

/**
* @param timeoutSex 锁持有的超时时间,过期后自动释放
* @return
*/
@Override
public boolean tryLock(long timeoutSex) {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 去尝试获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSex, TimeUnit.SECONDS);
// 或自动拆箱可能为空值,避免出现空指针异常
return Boolean.TRUE.equals(success);
}

@Override
public void unlock() {
stringRedisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX + name), ID_PREFIX + Thread.currentThread().getId());
}


// @Override
// public void unlock() {
// String threadId = ID_PREFIX + Thread.currentThread().getId();
//// 两个线程标识符是否是一致的
// String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// if (threadId.equals(id)) {
// stringRedisTemplate.delete(KEY_PREFIX + name);
// }
// }

}

image-20220712235711810

image-20220712235804060

总结:

基于Redis的分布式锁实现思路:

利用set nx ex获取锁,并设置过期时间,保存线程标示

释放锁时先判断线程标示是否与自己一致,一致则删除锁

特性:

  • 利用set nx满足互斥性
  • 利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
  • 利用Redis集群保证高可用和高并发特性

到目前为止,还有可以升级的空间,但是对于生产环境,已经是一个相对安全的操作。

小总结:

基于 Redis 的分布式锁实现思路:

  • 利用 set nx ex 获取锁,并设置过期时间,保存线程标示
  • 释放锁时先判断线程标示是否与自己一致,一致则删除锁
    • 特性:
      • 利用 set nx 满足互斥性
      • 利用 set ex 保证故障时锁依然能释放,避免死锁,提高安全性
      • 利用 Redis 集群保证高可用和高并发特性

笔者总结:我们一路走来,利用添加过期时间,防止死锁问题的发生,但是有了过期时间之后,可能出现误删别人锁的问题,这个问题我们开始是利用删之前 通过拿锁,比锁,删锁这个逻辑来解决的,也就是删之前判断一下当前这把锁是否是属于自己的,但是现在还有原子性问题,也就是我们没法保证拿锁比锁删锁是一个原子性的动作,最后通过 lua 表达式来解决这个问题

但是目前还剩下一个问题锁不住,什么是锁不住呢,你想一想,如果当过期时间到了之后,我们可以给他续期一下,比如续个 30s,就好像是网吧上网, 网费到了之后,然后说,来,网管,再给我来 10 块的,是不是后边的问题都不会发生了,那么续期问题怎么解决呢,可以依赖于我们接下来要学习 redission 啦

测试逻辑:

第一个线程进来,得到了锁,手动删除锁,模拟锁超时了,其他线程会执行 lua 来抢锁,当第一天线程利用 lua 删除锁时,lua 能保证他不能删除他的锁,第二个线程删除锁时,利用 lua 同样可以保证不会删除别人的锁,同时还能保证原子性。

分布式锁Redisson(接上优化):

setnx实现的分布式锁

基于setnx实现的分布式锁存在下面的问题:

  • 不可重入

同一个线程无法多次获取同一把锁

  • 不可重试

获取锁只尝试一次就返回false,没有重试机制

  • 超时释放

锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患

  • 主从一致性

如果Redis提供了主从集群,主从同步存在延迟,当主宕机时,如果从并同步主中的锁数据,则会出现锁实现

Redisson

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

image-20220927122040018

image-20220713103143534

官网地址: https://redisson.orgGitHub

地址: https://github.com/redisson/redisson

使用:
1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
配置Redisson客户端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.hmdp.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {
/**
* retemplate相关配置
*
* @param factory
* @return
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {

RedisTemplate<String, Object> template = new RedisTemplate<>();
// 配置连接工厂
template.setConnectionFactory(factory);

//使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式)
Jackson2JsonRedisSerializer jacksonSeial = new Jackson2JsonRedisSerializer(Object.class);

ObjectMapper om = new ObjectMapper();
// 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);

// 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常
// om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); // 保留这行会报错:Unexpected token (VALUE_STRING)
jacksonSeial.setObjectMapper(om);

// 值采用json序列化
template.setValueSerializer(jacksonSeial);
//使用StringRedisSerializer来序列化和反序列化redis的key值
template.setKeySerializer(new StringRedisSerializer());

// 设置hash key 和value序列化模式
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(jacksonSeial);
template.afterPropertiesSet();

return template;
}

@Bean
public RedissonClient redissonClient() { // 配置类
Config config = new Config();
// 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址
// config.useSingleServer().setAddress("redis://120.26.160.122:6379").setPassword("");
config.useSingleServer().setAddress("redis://120.26.160.122:6379").setPassword("");

// 创建客户端
return Redisson.create(config);
}
}
使用Redission分布式锁:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

@Resource
private RedissonClient redissonClient;

@Test
void testRedisson() throws InterruptedException {
// 获取锁(可重入),指定锁的名称
RLock lock = redissonClient.getLock("anyLock");
// 尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
// 判断释放获取成功
if(isLock){
try {
System.out.println("执行业务");
}finally {
// 释放锁
lock.unlock();
} } }



Redisson的可重入锁:

可重入锁

https://blog.csdn.net/w8y56f/article/details/89554060

在 Lock 锁中,他是借助于底层的一个 voaltile 的一个 state 变量来记录重入的状态的,比如当前没有人持有这把锁,那么 state=0,假如有人持有这把锁,那么 state=1,如果持有这把锁的人再次持有这把锁,那么 state 就会 + 1 ,如果是对于 synchronized 而言,他在 c 语言代码中会有一个 count,原理和 state 类似,也是重入一次就加一,释放一次就 - 1 ,直到减少成 0 时,表示当前这把锁没有被人持有。

在 redission 中,我们的也支持支持可重入锁

在分布式锁中,他采用 hash 结构用来存储锁,其中大 key 表示表示这把锁是否存在,用小 key 表示当前这把锁被哪个线程持有,所以接下来我们一起分析一下当前的这个 lua 表达式

这个地方一共有 3 个参数

KEYS [1] : 锁名称

ARGV [1]: 锁失效时间

ARGV [2]: id + “:” + threadId; 锁的小 key

exists: 判断数据是否存在 name:是 lock 是否存在,如果 ==0,就表示当前这把锁不存在

redis.call (‘hset’, KEYS [1], ARGV [2], 1); 此时他就开始往 redis 里边去写数据 ,写成一个 hash 结构

Lock{

id + “:” + threadId : 1

}

如果当前这把锁存在,则第一个条件不满足,再判断

redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1

此时需要通过大 key + 小 key 判断当前这把锁是否是属于自己的,如果是自己的,则进行

redis.call(‘hincrby’, KEYS[1], ARGV[2], 1)

将当前这个锁的 value 进行 + 1 ,redis.call (‘pexpire’, KEYS [1], ARGV [1]); 然后再对其设置过期时间,如果以上两个条件都不满足,则表示当前这把锁抢锁失败,最后返回 pttl,即为当前这把锁的失效时间

如果小伙帮们看了前边的源码, 你会发现他会去判断当前这个方法的返回值是否为 null,如果是 null,则对应则前两个 if 对应的条件,退出抢锁逻辑,如果返回的不是 null,即走了第三个分支,在源码处会进行 while (true) 的自旋抢锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 创建锁对象 
RLock lock = redissonClient.getLock("lock");
@Test
void method1() {
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("获取锁失败,1");
return;
}
try {
log.info("获取锁成功,1");
method2();
} finally {
log.info("释放锁,1");
lock.unlock();
}
}

void method2() {
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("获取锁失败, 2");
return;
}
try {
log.info("获取锁成功,2");
} finally {
log.info("释放锁,2");
lock.unlock();
}
}

使用Hash结构,再加锁就将Value加上1,释放锁就将锁减去1

image-20220713112337285image-20220713112247387

使用lua脚本实现如上逻辑:

获取锁的Lua脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1];
-- 线程唯一标识
local releaseTime = ARGV[2];
-- 锁的自动释放时间
-- 判断是否存在
if(redis.call('exists', key) == 0) then
-- 不存在, 获取锁
redis.call('hset', key, threadId, '1');
-- 设置有效期
redis.call('expire', key, releaseTime);
return 1;
-- 返回结果
end;
-- 锁已经存在,判断threadId是否是自己
if(redis.call('hexists', key, threadId) == 1) then
-- 不存在, 获取锁,重入次数+1
redis.call('hincrby', key, threadId, '1');
-- 设置有效期
redis.call('expire', key, releaseTime);
return 1;
-- 返回结果
end;
return 0;
-- 代码走到这里,说明获取锁的不是自己,获取锁失败

释放锁的lua脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
local key = KEYS[1];
-- 锁的key

local threadId = ARGV[1];

-- 线程唯一标识
local releaseTime = ARGV[2];
-- 锁的自动释放时间
-- 判断当前锁是否还是被自己持有
if (redis.call('HEXISTS', key, threadId) == 0) then
return nil;
-- 如果已经不是自己,则直接返回
end;
-- 是自己的锁,则重入次数-1
local count = redis.call('HINCRBY', key, threadId, -1);
-- 判断是否重入次数是否已经为0
if (count > 0) then
-- 大于0说明不能释放锁,重置有效期然后返回
redis.call('EXPIRE', key, releaseTime);
return nil;
else
-- 等于0说明可以释放锁,直接删除
redis.call('DEL', key);
return nil;

解决了不可重入、不可重试、超时释放问题,但是还未有主从一致性问题;

Redisson分布式锁主从一致性问题:

image-20220713121215731

主节点负责一些写的操作,而从节点,负责读数据。主从之间存在数据的延迟,当应用进程向redis发来一个写操作,写入锁,这个时候 Redis主节点宕机了,哨兵会监听集群状态, 集群的会重新选择一个Master,这样锁就丢失了,这样客户再来访问时,就拿不到锁。

image-20220713121918741

给每一个节点都加上锁,客户端必须依次获取所有的锁 ,必须依次保存了这个锁才获取成功。如果某一结点出问题了,进行主从同步时,还未有进行主从同步,如果其他线程过来获取锁,是不能够成功的。因为其他结点上还存在锁。可以保证高可用性。

配置连接:

image-20220713122724147

总结image-20220713122823502

1)不可重入Redis分布式锁:

原理:利用setnx的互斥性;利用ex避免死锁;

释放锁时判断线程标示缺陷:不可重入、无法重试、锁超时失效

2)可重入的Redis分布式锁:

原理:利用hash结构,记录线程标示和重入次数;

利用watchDog延续锁时间;

利用信号量控制锁重试等待缺陷:

redis宕机引起锁失效问题

3)Redisson的multiLock:

原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功

缺陷:运维成本高、实现复杂

Redis优化秒杀

Redis消息队列实现异步秒杀

发布探店笔记

1.1 需求分析

探店笔记类似点评网站的评价,往往是图文结合。对应的表有两个:

  • tb_blog:探店笔记表,包含笔记中标题、文字、图片等
  • tb_blog_comments:其他用户对探店笔记的评价

image-20220724093931571

在这里插入图片描述

修改文件上传路径:

image-20220724092424525

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package com.hmdp.controller;

import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.StrUtil;
import com.hmdp.dto.Result;
import com.hmdp.utils.SystemConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;

import java.io.File;
import java.io.IOException;
import java.util.UUID;

@Slf4j
@RestController
@RequestMapping("upload")
public class UploadController {

@PostMapping("blog")
public Result uploadImage(@RequestParam("file") MultipartFile image) {
try {
// 获取原始文件名称
String originalFilename = image.getOriginalFilename();
// 生成新文件名
String fileName = createNewFileName(originalFilename);
// 保存文件
image.transferTo(new File(SystemConstants.IMAGE_UPLOAD_DIR, fileName));
// 返回结果
log.debug("文件上传成功,{}", fileName);
return Result.ok(fileName);
} catch (IOException e) {
throw new RuntimeException("文件上传失败", e);
}
}

@GetMapping("/blog/delete")
public Result deleteBlogImg(@RequestParam("name") String filename) {
File file = new File(SystemConstants.IMAGE_UPLOAD_DIR, filename);
if (file.isDirectory()) {
return Result.fail("错误的文件名称");
}
FileUtil.del(file);
return Result.ok();
}

private String createNewFileName(String originalFilename) {
// 获取后缀
String suffix = StrUtil.subAfter(originalFilename, ".", true);
// 生成目录
String name = UUID.randomUUID().toString();
int hash = name.hashCode();
int d1 = hash & 0xF;
int d2 = (hash >> 4) & 0xF;
// 判断目录是否存在
File dir = new File(SystemConstants.IMAGE_UPLOAD_DIR, StrUtil.format("/blogs/{}/{}", d1, d2));
if (!dir.exists()) {
dir.mkdirs();
}
// 生成文件名
return StrUtil.format("/blogs/{}/{}/{}.{}", d1, d2, name, suffix);
}
}

1
2
3
4
5
6
7
8
9
package com.hmdp.utils;

public class SystemConstants {
public static final String IMAGE_UPLOAD_DIR = "D:\\demandanalysis\\redis\\dianping\\nginx-1.18.0\\html\\hmdp\\imgs\\blogs";
public static final String USER_NICK_NAME_PREFIX = "user_";
public static final int DEFAULT_PAGE_SIZE = 5;
public static final int MAX_PAGE_SIZE = 10;
}

2.1:实现查看发布探店笔记的接口

需求:点击首页的探店笔记,会进入详情页面,实现该页面的查询接口:

image-20220724152957850

代码实现:

Bolg实体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("tb_blog")
public class Blog implements Serializable {

private static final long serialVersionUID = 1L;

/**
* 主键
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
* 商户id
*/
private Long shopId;
/**
* 用户id
*/
private Long userId;
/**
* 用户图标
*/
@TableField(exist = false)
private String icon;
/**
* 用户姓名
*/
@TableField(exist = false)
private String name;
/**
* 是否点赞过了
*/
@TableField(exist = false)
private Boolean isLike;

/**
* 标题
*/
private String title;

/**
* 探店的照片,最多9张,多张以","隔开
*/
private String images;

/**
* 探店的文字描述
*/
private String content;

/**
* 点赞数量
*/
private Integer liked;

/**
* 评论数量
*/
private Integer comments;

/**
* 创建时间
*/
private LocalDateTime createTime;

/**
* 更新时间
*/
private LocalDateTime updateTime;


}

BlogController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

/**
* 查询热门博客数据
* @param current
* @return
*/

@GetMapping("/hot")
public Result queryHotBlog(@RequestParam(value = "current", defaultValue = "1") Integer current) {
// 根据用户查询
Page<Blog> page = blogService.query()
.orderByDesc("liked")
.page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
// 获取当前页数据
List<Blog> records = page.getRecords();
// 查询用户
records.forEach(blog ->{
Long userId = blog.getUserId();
User user = userService.getById(userId);
blog.setName(user.getNickName());
blog.setIcon(user.getIcon());
});
return Result.ok(records);
}

/**
* 通过id查询博客数据
* @param id
* @return
*/
@GetMapping("/{id}")
public Result queryBlogById(@PathVariable("id")Long id){
return blogService.queryBlogById(id);
}

UserServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    @Resource
IUserService userService;

@Override
public Result queryBlogById(Long id) {
// 查询blog
Blog blog = getById(id);

if (blog == null) {
return Result.fail("笔记不存在");
}
// 2查询blog有关的用户
queryBlogUser(blog);
return Result.ok(blog);
}

效果:

image-20220724160308047

点赞:

需求分析:

在首页的探店笔记排行榜和探店图文详情页面都有点赞的功能:

image-20220724160528696

代码实现:

1
2
3
4
5
6
7
8
9

@PutMapping("/like/{id}")
public Result likeBlog(@PathVariable("id") Long id) {
// 修改点赞数量
blogService.update()
.setSql("liked = liked + 1").eq("id", id).update();
return Result.ok();
}

image-20220724160727388

代码修改:

  • 同一个用户只能点赞一次,再次点击则取消点赞
  • 如果当前用户已经点赞,则点赞按钮高亮显示(前端已实现,判断字段Blog类的isLike属性)

需要一个集合去记录所有点赞过的用户,同时一个用户只能点赞一次,要求用户 id 不能重复,即集合中元素唯一,而 Redis 中 set 集合满足这种需求。

步骤:

给Blog类中添加一个isLike字段,标示是否被当前用户点赞

修改点赞功能,利用Redis的set集合判断是否点赞过,未点赞过则点赞数+1,已点赞过则点赞数-1

修改根据id查询Blog的业务,判断当前登录用户是否点赞过,赋值给isLike字段

修改分页查询Blog业务,判断当前登录用户是否点赞过,赋值给isLike字段

代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("tb_blog")
public class Blog implements Serializable {

private static final long serialVersionUID = 1L;

/**
* 主键
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
* 商户id
*/
private Long shopId;
/**
* 用户id
*/
private Long userId;
/**
* 用户图标
*/
@TableField(exist = false)
private String icon;
/**
* 用户姓名
*/
@TableField(exist = false)
private String name;
/**
* 是否点赞过了
*/
@TableField(exist = false)
private Boolean isLike;

/**
* 标题
*/
private String title;

/**
* 探店的照片,最多9张,多张以","隔开
*/
private String images;

/**
* 探店的文字描述
*/
private String content;

/**
* 点赞数量
*/
private Integer liked;

/**
* 评论数量
*/
private Integer comments;

/**
* 创建时间
*/
private LocalDateTime createTime;

/**
* 更新时间
*/
private LocalDateTime updateTime;
}


BlogController 修改 likeBlog 方法(点赞方法)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@RestController
@RequestMapping("/blog")
public class BlogController {

@Resource
private IBlogService blogService;

@PutMapping("/like/{id}")
public Result likeBlog(@PathVariable("id") Long id) {
return blogService.likeBlog(id);
}

@GetMapping("/hot")
public Result queryHotBlog(@RequestParam(value = "current", defaultValue = "1") Integer current) {
return blogService.queryHotBlog(current);
}

@GetMapping("/{id}")
public Result queryBlogById(@PathVariable("id") String id){
return blogService.queryBlogById(id);
}
}


修改 IBlogService 类,增加 likeBlog 方法

1
2
3
4
5
6
7
8
9
public interface IBlogService extends IService<Blog> {

Result queryBlogById(String id);

Result queryHotBlog(Integer current);

Result likeBlog(Long id);
}

修改 BlogServiceImpl 类,实现 likeBlog 方法,修改查询笔记逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {

@Autowired
private IUserService userService;

@Autowired
private StringRedisTemplate stringRedisTemplate;

@Override
public Result queryHotBlog(Integer current) {
// 根据用户查询
Page<Blog> page = query()
.orderByDesc("liked")
.page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
// 获取当前页数据
List<Blog> records = page.getRecords();
// 查询用户
records.forEach(blog -> {
this.queryBlogUser(blog);
this.isBlogLiked(blog);
});
return Result.ok(records);
}

@Override
public Result likeBlog(Long id) {
// 1、获取登录用户
UserDTO user = UserHolder.getUser();
// 2、判断当前登录用户是否已经点赞
Boolean isMember = stringRedisTemplate.opsForSet().isMember(RedisConstants.BLOG_LIKED_KEY + id, user.getId().toString());
if(BooleanUtil.isFalse(isMember)) {
// 3、如果未点赞,可以点赞
// 3.1、数据库点赞数 +1
boolean isSuccess = update().setSql("liked = liked+1").eq("id", id).update();
// 3.2、保存用户到 Redis 的 set 集合
if(isSuccess){
stringRedisTemplate.opsForSet().add(RedisConstants.BLOG_LIKED_KEY + id, user.getId().toString());
}
} else {
// 4、如果已点赞,取消点赞
// 4.1、数据库点赞数 -1
boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
// 4.2、把用户从 Redis 的 set 集合移除
if(isSuccess){
stringRedisTemplate.opsForSet().remove(RedisConstants.BLOG_LIKED_KEY + id, user.getId().toString());
}
}
return Result.ok();
}


private void queryBlogUser(Blog blog) {
Long userId = blog.getUserId();
User user = userService.getById(userId);
blog.setName(user.getNickName());
blog.setIcon(user.getIcon());
}

@Override
public Result queryBlogById(String id) {
Blog blog = getById(id);

if(blog == null){
return Result.fail("笔记不存在!");
}

queryBlogUser(blog);
// 查询 Blog 是否被点赞
isBlogLiked(blog);

return Result.ok(blog);
}

private void isBlogLiked(Blog blog) {
Long userId = blog.getUserId();
String key = RedisConstants.BLOG_LIKED_KEY + blog.getId();
Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
blog.setIsLike(BooleanUtil.isTrue(isMember));
}
}


image-20220724164337381

点赞排行榜:

在探店笔记的详情页面,应该把给该笔记点赞的人显示出来,比如最早点赞的 TOP5,形成点赞排行榜:

image-20220724173938032

按照点赞时间先后排序,返回Top5的用户

image-20220724174013708

set 集合中的元素是无序的,点赞排行榜需要对点赞时间进行排序,这里 set 集合并不满足需求。

通过 ZSCORE 命令获取 SortedSet 中存储的元素的相关的 SCORE 值。
通过 ZRANGE 命令获取指定范围内的元素。

BlogController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@RestController
@RequestMapping("/blog")
public class BlogController {

@Resource
private IBlogService blogService;

@PutMapping("/like/{id}")
public Result likeBlog(@PathVariable("id") Long id) {
return blogService.likeBlog(id);
}

@GetMapping("/hot")
public Result queryHotBlog(@RequestParam(value = "current", defaultValue = "1") Integer current) {
return blogService.queryHotBlog(current);
}

@GetMapping("/{id}")
public Result queryBlogById(@PathVariable("id") String id){
return blogService.queryBlogById(id);
}

@GetMapping("/likes/{id}")
public Result queryBlogLikes(@PathVariable("id") String id) {
return blogService.queryBlogLikes(id);
}
}

IBlogService:

1
2
3
4
5
6
7
8
9
10
11
public interface IBlogService extends IService<Blog> {

Result queryBlogById(String id);

Result queryHotBlog(Integer current);

Result likeBlog(Long id);

Result queryBlogLikes(String id);
}

BlogServiceImpl:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {

@Autowired
private IUserService userService;

@Autowired
private StringRedisTemplate stringRedisTemplate;

@Override
public Result queryHotBlog(Integer current) {
// 根据用户查询
Page<Blog> page = query()
.orderByDesc("liked")
.page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
// 获取当前页数据
List<Blog> records = page.getRecords();
// 查询用户
records.forEach(blog -> {
this.queryBlogUser(blog);
this.isBlogLiked(blog);
});
return Result.ok(records);
}

@Override
public Result likeBlog(Long id) {
// 1、获取登录用户
UserDTO user = UserHolder.getUser();
// 2、判断当前登录用户是否已经点赞
Double score = stringRedisTemplate.opsForZSet().score(RedisConstants.BLOG_LIKED_KEY + id, user.getId().toString());
if(score == null) {
// 3、如果未点赞,可以点赞
// 3.1、数据库点赞数 +1
boolean isSuccess = update().setSql("liked = liked+1").eq("id", id).update();
// 3.2、保存用户到 Redis 的 set 集合
if(isSuccess){
// 时间作为 key 的 score
stringRedisTemplate.opsForZSet().add(RedisConstants.BLOG_LIKED_KEY + id, user.getId().toString(), System.currentTimeMillis());
}
} else {
// 4、如果已点赞,取消点赞
// 4.1、数据库点赞数 -1
boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
// 4.2、把用户从 Redis 的 set 集合移除
if(isSuccess){
stringRedisTemplate.opsForZSet().remove(RedisConstants.BLOG_LIKED_KEY + id, user.getId().toString());
}
}
return Result.ok();
}

@Override
public Result queryBlogLikes(String id) {
String key = RedisConstants.BLOG_LIKED_KEY + id;
// 查询 top5 的点赞用户
Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
if(top5 == null){
return Result.ok(Collections.emptyList());
}
// 解析出其中的用户id
List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
String join = StrUtil.join(",", ids);
// 根据用户id查询用户,如果使用in,不会按数组进行排序
List<UserDTO> userDTOS = userService.query().in("id", ids).last("order by field(id, "+join+")").list()
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());

return Result.ok(userDTOS);
}


private void queryBlogUser(Blog blog) {
Long userId = blog.getUserId();
User user = userService.getById(userId);
blog.setName(user.getNickName());
blog.setIcon(user.getIcon());
}

@Override
public Result queryBlogById(String id) {
Blog blog = getById(id);

if(blog == null){
return Result.fail("笔记不存在!");
}

queryBlogUser(blog);
// 查询 Blog 是否被点赞
isBlogLiked(blog);

return Result.ok(blog);
}

private void isBlogLiked(Blog blog) {
UserDTO user = UserHolder.getUser();
if(user == null){
return;
}
Long userId = user.getId();
String key = RedisConstants.BLOG_LIKED_KEY + blog.getId();
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
blog.setIsLike(score != null);
}
}

image-20220724174453312

好友关注:

关注和取关:

在探店图文的详情页中,可以关注发布笔记的作者:

image-20220724174709585

需求:基于该表数据结构,实现两个接口:
① 关注和取关接口
② 判断是否关注的接口

关注是 User 之间的关系,是博主与粉丝的关系,数据库中有一张 tb_follow 表来标识:

img

注意:这里需要把主键修改为自增长

img

FollowController:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@RestController
@RequestMapping("/follow")
public class FollowController {

@Autowired
private IFollowService followService;

@PutMapping("/{id}/{isFollow}")
public Result follow(@PathVariable("id") Long followUserId, @PathVariable("isFollow") Boolean isFollow){
return followService.follow(followUserId, isFollow);
}


@PutMapping("/or/not/{id}")
public Result isFollow(@PathVariable("id") Long followUserId){
return followService.isFollow(followUserId);
}

}

IFollowService:

1
2
3
4
5
6
7
8
public interface IFollowService extends IService<Follow> {

Result follow(Long followUserId, Boolean isFollow);

Result isFollow(Long followUserId);
}


FollowServiceImpl:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Service
public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {
@Override
public Result follow(Long followUserId, Boolean isFollow) {
// 获取当前登录用户
Long userId = UserHolder.getUser().getId();

// 判断是否关注
if (isFollow){
// 关注,新增
Follow follow = new Follow();
follow.setFollowUserId(followUserId);
follow.setUserId(userId);
// INSERT INTO tb_follow ( user_id, follow_user_id ) VALUES ( ?, ? )
save(follow);
}else {
// DELETE FROM tb_follow WHERE (follow_user_id = ? AND user_id = ?)
remove(new QueryWrapper<Follow>().eq("follow_user_id",followUserId).eq("user_id",userId));
}
return Result.ok();
}


@Override
public Result isFollow(Long followUserId) {
// 获取当前登录用户
Long userId = UserHolder.getUser().getId();
Integer count = query().eq("follow_user_id", followUserId).eq("user_id", userId).count();
return Result.ok(count > 0);
}
}

共同关注:

点击博主头像,可以进入博主首页:

image-20220724180555897

博主个人首页依赖两个接口:
① UserController 根据 id 查询 User 信息

1
2
3
4
5
6
7
8
9
10
11
@GetMapping("/{id}")
public Result queryUserById(@PathVariable("id") Long userId){
// 查询详情
User user = userService.getById(userId);
if(user == null){
return Result.ok();
}
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
return Result.ok(userDTO);
}

② BlogController 根据 id 查询博主的探店笔记

1
2
3
4
5
6
7
8
9
10
11
12
13
@GetMapping("/of/user")
public Result queryBlogByUserId(
@RequestParam(value = "current", defaultValue = "1") Integer current,
@RequestParam("id") Long id) {
// 根据用户查询
Page<Blog> page = blogService.query().
eq("user_id", id).
page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
// 获取当前页数据
List<Blog> records = page.getRecords();
return Result.ok(records);
}

image-20220724182334676

需求:利用Redis中恰当的数据结构,实现共同关注功能。在博主个人页面展示出当前用户与博主的共同好友。

image-20220724182522248

image-20220724182654482

FollowController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@RequestMapping("/follow")
public class FollowController {

@Autowired
private IFollowService followService;

@PutMapping("/{id}/{isFollow}")
public Result follow(@PathVariable("id") Long followUserId, @PathVariable("isFollow") Boolean isFollow){
return followService.follow(followUserId, isFollow);
}


@PutMapping("/or/not/{id}")
public Result isFollow(@PathVariable("id") Long followUserId){
return followService.isFollow(followUserId);
}

@GetMapping("/common/{id}")
public Result followCommons(@PathVariable("id") Long id){
return followService.followCommons(id);
}

}


IFollowService

1
2
3
4
5
6
7
8
9
10
public interface IFollowService extends IService<Follow> {

Result follow(Long followUserId, Boolean isFollow);

Result isFollow(Long followUserId);

Result followCommons(Long id);
}


对follow方法进行改进,在向数据库中写入信息时,同时保存在redis中,在删除的时候,同时移除Redis中的相关数据,followCommons为查询共同关注的方法,使用set数据结构的特性进行查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package com.hmdp.service.impl;

import cn.hutool.core.bean.BeanUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.hmdp.dto.Result;
import com.hmdp.dto.UserDTO;
import com.hmdp.entity.Follow;
import com.hmdp.mapper.FollowMapper;
import com.hmdp.service.IFollowService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.service.IUserService;
import com.hmdp.utils.UserHolder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* <p>
* 服务实现类
* </p>

*/
@Service
public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {

@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private IUserService userService;

@Override
public Result follow(Long followUserId, Boolean isFollow) {
// 获取当前登录用户
Long userId = UserHolder.getUser().getId();
String key = "follows:" + userId;
// 判断是否关注
if (isFollow) {
// 关注,新增
Follow follow = new Follow();
follow.setFollowUserId(followUserId);
follow.setUserId(userId);
// INSERT INTO tb_follow ( user_id, follow_user_id ) VALUES ( ?, ? )
boolean save = save(follow);
if (save) {
stringRedisTemplate.opsForSet().add(key, followUserId.toString());
}
} else {
// DELETE FROM tb_follow WHERE (follow_user_id = ? AND user_id = ?)
boolean isSuccess = remove(new QueryWrapper<Follow>().eq("follow_user_id", followUserId).eq("user_id", userId));
if (isSuccess) {
stringRedisTemplate.opsForSet().remove(key, followUserId);
}
}
return Result.ok();
}


@Override
public Result isFollow(Long followUserId) {
// 获取当前登录用户
Long userId = UserHolder.getUser().getId();
Integer count = query().eq("follow_user_id", followUserId).eq("user_id", userId).count();
return Result.ok(count > 0);
}

/**
* 共同关注信息
* @param id
* @return
*/
@Override
public Result followCommons(Long id) {
// 获取当前登录用户
Long userId = UserHolder.getUser().getId();
String key = "follows:" + userId;
String key2 = "follows:" + id;
// 获取交集
Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key, key2);

if (intersect == null) {
return Result.ok(Collections.emptyList());
}
List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
List<UserDTO> collect = userService.listByIds(ids).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());
return Result.ok(collect);
}
}

关注推送:

Feed 流实现方案分析

关注推送也叫做 Feed 流,直译为投喂。为用户持续的提供”沉浸式“的体验,通过无限下拉刷新获取新的信息。

Feed 流产品有两种常见模式:

① Timeline:不做内容筛选,简单的按照内容发布时间排序,常用语好友或关注。例如朋友圈
优点:信息全面,不会有缺失,并且实现也相对简单
缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低

② 智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容,推送用户感兴趣的信息来吸引用户
优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷
缺点:如果算法不精准,可能起到反作用

本例中的个人页面,是基于关注的好友来做 Feed 流,因此采用 Timeline 的模式。该模式的实现方案有三种:
① 拉模式
② 推模式
③ 推拉结合

拉模式: 也叫做读扩散

假设有三个人,分别是张三、李四、王五,这三个人分别会在自己的账号发布自己的笔记或者视频,在这里我们统一称之为消息,这三个人发布的所有的消息都会被发送到发件箱中,发送到发件箱中的消息除了消息本身之外,还需要带有时间戳。粉丝赵六会有一个收件箱,平时收件箱是空的,只有他在读取消息时,才会把赵六关注的所有人的发件箱中的消息拉取到他的收件箱中,拉取到收件箱后,消息会按照携带的时间戳进行排序,然后赵六就可以读取消息了。

在这里插入图片描述

优点:节省内存空间。收件箱中的消息在读取完后就会被清空,下次需要读取的时候会重新从所有关注人的发件箱中拉取。消息只保存了一份。
缺点:每次读取消息都要重新拉取发件箱中的消息,然后再做排序,比较耗时。

推模式: 也叫做写扩散

假设现在有两个 up 主:张三、李四,有三个粉丝:粉丝1、粉丝2、粉丝3,粉丝1关注了张三,粉丝2和3都关注了张三和李四,如果张三此时想要发送消息,那么这条消息会直接推送到张三的所有粉丝的收件箱中,而李四发送的消息也会被推送到粉丝2和3的收件箱中,收件箱收到消息后会对所有的消息进行排序,粉丝在读取消息时,就已经是排序好的消息了。这样的一个好处就是延时低,不必每次读取时都需要重新拉取消息。但这种方式内存占用会比较高,up 主每次发消息都要同步所有的粉丝,如果粉丝数过多,超过几百万,就需要复制几百万份。

img

推拉结合模式: 也叫做读写混合,兼具推和拉两种模式的优点。

普通粉丝人数众多,但是活跃度较低,读取消息的频率也就低,可采用拉模式;
而活跃粉丝人数少,但是活跃度高,读取消息的频率高,可采用推模式。
大 V 发送消息时,会直接将消息推送到活跃粉丝的发件箱中,而针对于普通粉丝,消息会先发送到发件箱中,当普通粉丝读取消息时,会直接从发件箱中拉取。

在这里插入图片描述

也可以对在线用户采用推模式,离线用户拉模式。

在这里插入图片描述

基于推模式实现关注推送功能

需求分析

需求:
① 修改新增探店笔记的业务,在保存 Blog 到数据库的同时,推送到粉丝的收件箱
② 收件箱满足可以根据时间戳排序,必须用 Redis 的数据结构实现
③ 查询收件箱数据时,可以实现分页查询

需求分析:
Redis 中 List、SortedSet 两种数据结构均可以实现排序功能。List 要想实现按时间排序,可以按照插入顺序排。SortedSet 排序则是按照 score 进行排序,score 值中可以存储时间戳来实现按照时间排序。
Redis中的 List 可以按照角标进行查询,完全可以实现分页查询。而 SortedSet 虽然没有角标,但是排序完成后,会有一个排名机制,可以使用排名机制进行查询,也能实现分页查询。
那究竟应该选择哪种数据结构来分页查询功能呢?
由于Feed 流中的数据会不断更新,这就导致数据的角标也在不断变化,因此不能采用传统的分页模式。
来看下传统分页模式:
在这里插入图片描述

假设 t1 时刻,Feed 流中有 10 条消息,此时从 Feed 流中读取前 5 条消息,在 t2时刻向 Feed 流中插入了一条新消息,当在 t3 时刻再去读取后 5 条数据时,就会出现数据重复读取的问题。为了避免这种情况,可以采用滚动分页。所谓滚动分页,其实就是记录每一次查询的最后一条消息的下标,下一次查询时从该条消息开始查询。

在这里插入图片描述

假设 t1 时刻,Feed 流中有 10 条消息,此时从 Feed 流中读取前 5 条消息,在 t2时刻向 Feed 流中插入了一条新消息,当在 t3 时刻再去读取后 5 条数据时,查询会从记录的 lastId 开始,向后查询 5 条,这样也就不会出现查询重复的问题。

SortedSet 除了可以按照时间戳排序,还支持按照 score 值的范围进行查询,即按照时间戳范围进行查询。每次查询时,记录最小的时间戳,下次查询时,从比该时间戳还小的下一个时间戳开始查询。
为了保证发布的消息不会重复进入需要推送的粉丝的收件箱中,以及保证查询速度,使用 SortedSet 更优。

推送到粉丝收件箱:

修改 BlogController 中的 saveBlog 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
@RestController
@RequestMapping("/blog")
public class BlogController {

@Resource
private IBlogService blogService;

@PostMapping
public Result saveBlog(@RequestBody Blog blog) {
return blogService.saveBlog(blog);
}
}

IBlogService

1
2
3
4
public interface IBlogService extends IService<Blog> {
Result saveBlog(Blog blog);
}

BlogServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {
@Autowired
private StringRedisTemplate stringRedisTemplate;

@Autowired
private IFollowService followService;

@Override
public Result saveBlog(Blog blog) {
// 获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
// 保存探店博文
boolean isSuccess = save(blog);
if(!isSuccess){
return Result.ok();
}
// 查询笔记作者的所有粉丝 select * from tb_follow where follow_user_id=?
List<Follow> follows = followService.query().eq("follow_user_id", user.getId()).list();

if(follows == null || follows.isEmpty()){
return Result.ok(blog.getId());
}

// 推送笔记给所有粉丝
for (Follow follow: follows) {
Long userId = follow.getUserId();
stringRedisTemplate.opsForZSet().add(RedisConstants.FEED_KEY + userId, blog.getId().toString(), System.currentTimeMillis());
}

// 返回id
return Result.ok(blog.getId());
}
}

实现关注推送页面的分页查询:

需求:在个人主页的“关注”卡片中,查询并展示推送的Blog信息

image-20220725160630801

实现滚动分页需要使用下面的命令:

1
ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]

返回有序集 key 中, score **值介于 max 和 min 之间(默认包括等于 max 或 min )**的所有的成员。

有序集成员按 score 值递减(从大到小)的次序排列。

  • WITHSCORES 表示是否返回成员分数。
  • LIMIT 表示是否分页,如果带有 LIMIT,则后面必须带有offset、count,
  • offset表示偏移量(相对于max值而言),count 表示结果数量。

(78条消息) Redis有序集合命令ZREVRANGEBYSCORE详解与应用_大大的微笑的博客-CSDN博客_zrevrangebyscore

这里要注意是:一旦使用分页查询后,max 以及 offset 的值就是一个动态变化的值了。其中,max 值是上一次查询结果中的最小分数(即时间戳)。而 offset 的取值也与上一次查询结果中的最小分数有关,如果上一次查询结果中的最小分数值重复多次出现,offset的值就应该为最小分数重复出现的次数。
image-20220725163832871

image-20220725163049003

根据上图可以看出 offset 是需要动态获取。

定义滚动查询结果类 ScrollResult

1
2
3
4
5
6
7
@Data
public class ScrollResult {
private List<?> list;
private Long minTime;
private Integer offset;
}

BlogController

1
2
3
4
5
6
7
8
9
10
11
12
13
@RestController
@RequestMapping("/blog")
public class BlogController {

@Resource
private IBlogService blogService;

@GetMapping("/of/follow")
public Result queryBlogOfFollow(@RequestParam("lastId") Long max, @RequestParam(value = "offset", defaultValue = "0") Integer offset){
return blogService.queryBlogOfFollow(max, offset);
}
}

IBlogService

1
2
3
4
public interface IBlogService extends IService<Blog> {
Result queryBlogOfFollow(Long max, Integer offset);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

@Override
public Result queryBlogOfFollow(Long max, Integer offset) {


UserDTO user = UserHolder.getUser();
// 查询收件箱 参数1:key 参数2:最小分数 参数3:最大分数 参数4:偏移量 参数5:每次取几条
Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(RedisConstants.FEED_KEY + user.getId(), 0, max, offset, 2);
if(typedTuples == null) {
return Result.ok(Collections.emptyList());
}
int count =1;
long minTime = 0;
List<Long> ids = new ArrayList<>(typedTuples.size());
// 在遍历的同时查找出不同的数据

for(ZSetOperations.TypedTuple<String> typedTuple:typedTuples){//5 4 4 2 2
// 获取用户的id
ids.add(Long.valueOf(typedTuple.getValue()));
// 获取分数(时间戳)
long time = typedTuple.getScore().longValue();
// 如果与最小值相同,那么判断是否是同的数据据
if (time ==minTime){
count++;
}else {
// 防止连续相同的数据产生,所以count需要进行重置
minTime = time;
count =1;
}


}
String join = StrUtil.join(",", ids);
// 根据 id 查询 blog,防止查询无序
List<Blog> blogs = query().in("id", ids).last("order by Field(id," + join + ")").list();
// 封装结果
ScrollResult scrollResult = new ScrollResult();
scrollResult.setList(blogs);
scrollResult.setMinTime(minTime);
scrollResult.setOffset(count);
return Result.ok(scrollResult);
}

image-20220725172649177

附近商户:

GEO 数据结构的基本用法

GEO 就是 Geolocation 的简写形式,代表地理坐标。Redis 在 3.2 版本中加入了对 GEO 的支持,允许存储地理坐标信息,帮助我们根据经纬度来检索数据。常见的命令有:
GEOADD:添加一个地理空间信息,包含:经度(longitude)、纬度(latitude)、值(member)
GEODIST:计算指定的两个点之间的距离并返回
GEOHASH:将指定 member 的坐标转为 hash 字符串形式并返回
GEOPOS:返回指定 member 的坐标
GEORADIUS:指定圆心、半径,找到该圆内包含的所有 member,并按照与圆心之间的距离排序后返回。6.2 以后已废弃
GEOSEARCH:在指定范围内搜索 member,并按照与指定点之间的距离排序后返回。范围可以是圆形或矩形。6.2 新功能
GEOSEARCHSTORE:与 GEOSEARCH 功能一致,不过可以把结果存储到一个指定的 key。6.2 新功能

1.2 练习 Redis 的 GEO 功能
需求:
1、添加下面几条数据:

北京南站(116.378248 39.865275)
北京站(116.42803 39.903738)
北京西站(116.322287 39.893729)
2、计算北京西站到北京站的距离
3、搜索天安门(116.397904 39.909005)附近 10km 内的所有火车站,并按照距离升序排序

GEOADD 命令

image-20220725190055022

添加北京南站、北京站、北京西站的坐标数据

image-20220725190144537

使用 RPM 查看存入的数据

image-20220725180008136

可以看出 GEO 底层是基于 SortedSet 实现的,Redis 将地理坐标转换成了 score 值存入到了 SortedSet 中。

image-20220725190235945

计算北京西站到北京站的距离

image-20220725180123902

返回使用 GEOADD 向 SortedSet 中添加的 member,这些 member 位于给定形状指定的区域的边界内。该命令对GEORADIUS命令进行了扩展,除了支持在圆形区域内搜索外,它还支持在矩形区域内搜索。

应使用此命令代替已弃用的GEORADIUS和GEORADIUSBYMEMBER命令。

查询的中心点由以下强制选项之一提供:

FROMMEMBER: 使用给定的且存在于SortedSet中的 member 的位置。
FROMLONLAT:使用给定的 longitude 和 latitude 位置。

查询的形状由以下强制选项之一提供:
BYRADIUS: 类似GEORADIUS,根据给定的圆形区域内搜索 radius。
BYBOX:在轴对齐的矩形内搜索,由 height 和 width 确定。

该命令可以选择使用以下选项返回附加信息:
WITHDIST: 返回匹配项到指定中心点的距离。返回的距离单位与半径或高度和宽度参数指定的单位相同。
WITHCOORD: 返回匹配项的经度和纬度。
WITHHASH:以 52 位无符号整数的形式返回项目的原始 geohash 编码排序集分数。

默认情况下,匹配项未排序返回。要对它们进行排序,请使用以下两个选项之一:
ASC:相对于中心点,从最近到最远对返回的项目进行排序。
DESC:相对于中心点,从最远到最近对返回的项目进行排序。

默认返回所有匹配项。如果想要将结果限制为前 N 个匹配项,可以使用COUNT选项。使用ANY选项时,只要找到足够的匹配项,该命令就会返回。

搜索天安门(116.397904 39.909005)附近 10km 内的所有火车站,并按照距离升序排序
image-20220725190333516

GEOPOS 和 GEOHASH 命令:

image-20220725190402084

导入店铺数据到 GEO

image-20220725190439007

按照商户类型做分组,类型相同的商户作为同一组,以 typeId 作为 key 存入同一个 GEO 集合中。

image-20220725190532503

HmDianPingApplicationTests

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@SpringBootTest
class HmDianPingApplicationTests {

@Autowired
private ShopServiceImpl shopService;

@Autowired
private StringRedisTemplate stringRedisTemplate;

@Test
public void loadShopData(){
// 1、查询店铺信息
List<Shop> list = shopService.list();
// 2、把店铺分组,按照 typeId 分组,typeId 一致的放到一个集合中
Map<Long, List<Shop>> map = list.stream().collect(Collectors.groupingBy(Shop::getTypeId));
// 3、分批完成写入 Redis
for (Map.Entry<Long, List<Shop>> longListEntry : map.entrySet()) {
Long typeId = longListEntry.getKey();
List<Shop> value = longListEntry.getValue();
List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>(value.size());
for (Shop shop : value) {
locations.add(new RedisGeoCommands.GeoLocation<>(
shop.getId().toString(),
new Point(shop.getX(), shop.getY())
));
}
stringRedisTemplate.opsForGeo().add(RedisConstants.SHOP_GEO_KEY + typeId, locations);
}

}
}

实现附近商户功能

SpringDataRedis 的 2.3.9 版本并不支持 Redis6.2 提供的 GEOSEARCH 命令,因此我们需要提示版本,修改自己的 POM 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<artifactId>spring-data-redis</artifactId>
<groupId>org.springframework.data</groupId>
</exclusion>
<exclusion>
<artifactId>lettuce-core</artifactId>
<groupId>io.lettuce</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.6.2</version>
</dependency>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.1.6.RELEASE</version>
</dependency>

ShopController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@RestController
@RequestMapping("/shop")
public class ShopController {

@Resource
public IShopService shopService;

/**
* 根据商铺类型分页查询商铺信息
* @param typeId 商铺类型
* @param current 页码
* @return 商铺列表
*/
@GetMapping("/of/type")
public Result queryShopByType(
@RequestParam("typeId") Integer typeId,
@RequestParam(value = "current", defaultValue = "1") Integer current,
@RequestParam(value = "x", required = false) Double x,
@RequestParam(value = "y", required = false) Double y
) {
return shopService.queryShopByType(typeId, current, x, y);
}
}

IShopService

1
2
3
4
public interface IShopService extends IService<Shop> {
Result queryShopByType(Integer typeId, Integer current, Double x, Double y);
}

ShopServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {

@Autowired
private StringRedisTemplate stringRedisTemplate;

@Override
public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {
// 判断是否需要根据坐标查询
if(x == null || y == null){
// 根据类型分页查询
Page<Shop> page = query()
.eq("type_id", typeId)
.page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));
// 返回数据
return Result.ok(page.getRecords());
}
// 计算分页参数
int from = (current - 1) * SystemConstants.DEFAULT_PAGE_SIZE;
int end = current * SystemConstants.DEFAULT_PAGE_SIZE;

// 查询 Redis,按照距离排序、分页。
GeoResults<RedisGeoCommands.GeoLocation<String>> search = stringRedisTemplate.opsForGeo().
search(RedisConstants.SHOP_GEO_KEY + typeId,
GeoReference.fromCoordinate(x, y),
new Distance(5000),
RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end));

if(search == null){
return Result.ok(Collections.emptyList());
}

// 查询 Redis,按照距离排序、分页
List<GeoResult<RedisGeoCommands.GeoLocation<String>>> content = search.getContent();
if(from >= content.size()){
return Result.ok(Collections.emptyList());
}

List<Long> ids = new ArrayList<>(content.size());
Map<String, Distance> distanceMap = new HashMap<>(content.size());
// 截取 from ~ end 的部分
content.stream().skip(from).forEach(result -> {
// 获取店铺 id
String shopIdStr = result.getContent().getName();
ids.add(Long.valueOf(shopIdStr));
// 获取距离
Distance distance = result.getDistance();
distanceMap.put(shopIdStr, distance);
});
String join = StrUtil.join(",", ids);
// 根据 id 查询 shop
List<Shop> shopList = query().in("id", ids).last("order by Field(" + join + ")").list();

for (Shop shop : shopList) {
shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());
}

return Result.ok(shopList);
}
}

用户签到:

BitMap用法:

假如我们用一张表来存储用户签到信息,其结构应该如下:

image-20220725200831921

假如有1000万用户,平均每人每年签到次数为10次,则这张表一年的数据量为 1亿条每签到一次需要使用(8 + 8 + 1 + 1 + 3 + 1)共22 字节的内存,一个月则最多需要600多字节

改进方案:
我们按月来统计用户签到信息,签到记录为 1,未签到则记录 0

image-20220725201042371

把每一个bit位对应当月的每一天,形成了映射关系。用0和1标示业务状态,这种思路就称为位图(BitMap)。

Redis中是利用string类型数据结构实现BitMap,因此最大上限是512M,转换为bit则是 2^32个bit位。

BitMap 的操作命令有:

  • SETBIT:向指定位置(offset)存入一个 0 或 1
  • GETBIT:获取指定位置(offset)的 bit 值
  • BITCOUNT:统计 BitMap 中值为 1 的 bit 位的数量
  • BITFIELD:操作(查询、修改、自增)BitMap 中的 bit 数组中的指定位置(offset)的值。关于BITFIELD 命令的详细介绍可以看下这篇文章bitfield 命令
  • BITFIELD_RO:获取 BitMap 中的 bit 数组,并以十进制形式返回
  • BITOP:将多个 BitMap 的结果做位运算(与、或、异或)
  • BITPOS:查找 bit 数组中指定范围内第一个 0 或 1 出现的位置

image-20220725203326133

在 RPM 中查看存入的数据:

image-20220725203352155

注意展示时要以 binary 格式展示

获取第三天的签到数据:

image-20220725203419633

获取总的签到次数:

image-20220725203447342

使用 BITFIELD 命令获取数据:从 0 号位开始获取两个 bit 位

image-20220725203522122

type表示类型参数:当被设置的二进制位范围值为整数时,用户可以在类型参数的前面添加 i 来表示有符号整数,或者使用 u 来表示无符号整数。比如说,我们可以使用 u8 来表示 8 位长的无符号整数,也可以使用 i16 来表示 16 位长的有符号整数。

查找 bit 数组中指定范围内第一个 0 或 1 出现的位置:

image-20220725203622929

签到功能

需求:实现签到接口,将当前用户当天签到信息保存到 Redis 中

image-20220725203802426

提示:因为BitMap底层是基于String数据结构,因此其操作也都封装在字符串相关操作中了

image-20220725203817091

UserController

1
2
3
4
5
6
7
8
9
10
@Slf4j
@RestController
@RequestMapping("/user")
public class UserController {
@PostMapping("/sign")
public Result sign(){
return userService.sign();
}
}

IUserService

1
2
3
4
public interface IUserService extends IService<User> {
Result sign();
}

UserServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    @Override
public Result sign() {

Long userId = UserHolder.getUser().getId();
LocalDateTime now = LocalDateTime.now();
// 构造月份
String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
// 拼接 key
String key = USER_SIGN_KEY + userId + keySuffix;
// 获取今天是本月的第几天 从 1-31
int dayOfMonth = now.getDayOfMonth();
// 从 0开始,需要减1
Boolean aBoolean = stringRedisTemplate.opsForValue().setBit(key, dayOfMonth - 1, true);

return Result.ok();

}

image-20220725210415336

存入的数据总共 16 个bit,即数据在存储是字节的形式存储的,1字节=8 bit,不足 8 位的以 0 补足。

签到统计

问题 1: 什么叫做连续签到天数?

从最后一次签到开始向前统计,直到遇到第一次未签到位置,计算总的签到次数,就是连续签到天数。

image-20220725210919587

连续签到天数为4

image-20220725211013996

问题 2: 如何得到本月到今天为止的所有签到数据?

使用命令:BITFIELD key GET u[dayOfMonth] 0 从 0 开始查询,到 dayOfMonth 截止

问题 3: 如何从后向前遍历每个 bit 位?

与 1 做与运算,就能得到最后一个 bit 位。任何一个二进制数与 1 做与运算,结果就是这个数本身。随后右移一位,下一个 bit 位就成为了最后一个 bit 位,然后再与 1 做与运算,以此类推。

实现签到统计功能

需求:实现下面接口,统计当前用户截止当前时间在本月的连续签到天数

image-20220725211423645

以命令形式修改签到数据:

image-20220725211920524

操作数据:

UserController

1
2
3
4
5

@GetMapping("/signCount")
public Result signCount(){
return userService.signCount();
}

IUserService

1
2
3
public interface IUserService extends IService<User> {
Result signCount();
}

UserServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
    @Override
public Result signCount() {
// 获取当前登录的用户
Long userId = UserHolder.getUser().getId();
// 获取当前日期
LocalDateTime now = LocalDateTime.now();
String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
// 拼接 key
String key = USER_SIGN_KEY + userId + keySuffix;
// 获取今天是本月的第几天
// 当前日期为 2022年7月25号,故而 dayOfMonth = 25
int dayOfMonth = now.getDayOfMonth();
// 获取从 0 到 dayOfMonth 的签到结果 BITFIELD sign:1010:202207 GET u24 0
// 获取从 0 到 dayOfMonth 的签到结果
List<Long> result = stringRedisTemplate.opsForValue().bitField(key,
BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0));

if (result == null || result.isEmpty()) {
// 没有签到结果
return Result.ok(0);
}
Long num = result.get(0);
if (num == null || num == 0) {
return Result.ok(0);
}

int count = 0;

while (true) {
// 让这个数字与 1 做与运算,得到数字的最后一位 bit 位 判断这个 bit 位是否为0
// num & 1 做与运算,其中 1 的左边以 0 补齐
if ((num & 1) == 0) {
break;

} else {
// 如果不为 0,说明已签到,计数器 +1
count++;
}
// 把数字右移一位,抛弃最后一个 bit 位,继续下一个 bit 位
// >> :右移 最高位是0,左边补齐0;最高为是1,左边补齐1
// << :左移 左边最高位丢弃,右边补齐0
// >>>:无符号右移 无论最高位是0还是1,左边补齐0
// num >>>= 1 ————> num = num >>> 1
num >>>= 1;
}
return Result.ok(count);
}

image-20220725215642381

image-20220725211940480

image-20220725215251107

image-20220725215238900

UV统计:

一、HyperLogLog 用法
UV:全称 Unique Visitor,也叫独立访客量,是指通过互联网访问、浏览这个网页的自然人、1 天内同一个用户多次访问该网站,只记录 1 次。
PV:全称 Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录一次 PV,用户多次打开页面,则记录多次 PV。往往用来衡量网站的流量。

UV 统计在服务端做会比较麻烦,因为要判断该用户是否已经统计过了,需要将统计过的用户信息保存。但是如果每个访问的用户都保存到Redis 中,数据量会非常恐怖。

HyperLogLog(HLL)是从 Loglog 算法派生的概率算法,用于确定非常大的集合的基数,而不需要存储其所有值。相关算法原理可以参考:HyperLogLog 算法的原理讲解以及 Redis 是如何应用它的 - 掘金 (juejin.cn)
Redis 中的 HLL 是基于 String 结构实现的,单个 HLL 的内存永远小于 16kb,内存占用低的令人发指!作为代价,其测量结果是概率性的,有小于 0.81% 的误差。不过对于 UV 统计来说,这完全可以忽略。
image-20220725220750667

实现 UV 统计

我们直接利用单元测试,向 HyperLogLog 中添加 100 万条数据,看看内存占用和统计效果如何:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@org.junit.Test
public void testHyperLogLog() {
String[] values = new String[1000];
int j = 0;
for (int i = 0; i < 1000000; i++) {
j = i % 1000;
values[j] = "user_" + i;
if(j == 999){
// 发送到 Redis
stringRedisTemplate.opsForHyperLogLog().add("hl2", values);
}
}
// 统计数量
Long count = stringRedisTemplate.opsForHyperLogLog().size("hl2");
System.out.println("count = " + count);
}


image-20220725222854645

Spring事务失效:

(69条消息) spring 事务失效的 12 种场景_hanjq_code的博客-CSDN博客