# 其他

\[TOC]

## Quartz

* 分为三个部分：e
  * Job\&Detial(任务)：定时任务的执行方法，与Trigger配套的
  * Trigger(触发器)：规定什么时候触发，与Job\&Detail配套的
  * Scheduler(调度器)：单例，把Trigger丢里面由调度器调度，只需要一个Scheduler，配置不同的Trigger；可以理解成类似线程池的东西
* 原理：ScheduledThreadPoolExecutor线程池 + 通过Object类的wait()和notify()或者Condition类的await()\signal()进行等待和唤醒、锁保证线程安全 来进行调度

  Scheduler有两个调度线程：regular Scheduler Thread（执行常规调度）和Misfire Scheduler Thread（执行错失的任务），Regular Thread 轮询所有Trigger，如果有将要触发的Trigger（用wait和notifyAll实现），则从任务线程池中获取一个空闲线程，然后执行与改Trigger关联的job；Misfire Thraed则是扫描所有的trigger，查看是否有错失的，如果有的话，根据一定的策略进行处理
* 默认是并发的，即如果当前任务没有完成，会自动开一个任务执行
* 注意在分布式集群的情况下，多台机子有相同的定时任务，会出错，此时通过共享数据库的方式实现

  Quartz的解决方案：

  quartz集群分为水平集群和垂直集群，水平集群即将定时任务节点部署在不同的服务器，其最大的问题就是时钟同步问题，若时钟不能同步，则会导致集群中各个节点状态紊乱，造成不可预知的后果；垂直集群则是集群各节点部署在同一台服务器，时钟同步自然不是问题，但存在单点故障问题，服务器宕机会严重影响服务的可用性

  在各个节点会上报任务，存到数据库中，执行时会从数据库中取出触发器来执行，如果触发器的名称和执行时间相同，则只有一个节点去执行此任务。

  如果此节点执行失败，则此任务则会被分派到另一节点执行，中途也会自动检查失效的定时调度，发现不成功的，其他节点立马接过来继续完成定时任务。Quartz有11个定时任务调度表

参考

[Quartz原理解密](https://www.cnblogs.com/Dorae/p/9357180.html)

[深入解读Quartz的原理](https://blog.csdn.net/scgyus/article/details/79360316)

[Quartz 2.2 的实现原理和运行过程](https://blog.csdn.net/xlxxcc/article/details/52104463)

### 其他定时器

* Timer：这是java自带的java.util.Timer类，这个类允许你调度一个java.util.TimerTask任务。使用这种方式可以让你的程序按照某一个频度执行，但不能在指定时间运行。一般用的较少。单线程，任务一多会阻塞；一个任务出异常其他任务都受影响；受系统时间影响
* ScheduledExecutorService：也jdk自带的一个类；是基于线程池设计的定时任务类,每个调度任务都会分配到线程池中的一个线程去执行,也就是说,任务是并发执行,互不影响。线程池+延时队列DelayedQueue(数组、最小堆, 最近要执行的任务放在堆顶) 实现，如果堆顶任务时间未到就阻塞（通过自旋+condition.await\signal实现）。不受系统时间影响
* Spring 中的 @Schedule 注解

参考：[Java 定时任务实现原理详解](https://blog.csdn.net/u013332124/article/details/79603943)

[Java优先级队列DelayedWorkQueue原理分析](https://www.jianshu.com/p/587901245c95)

## CORS

浏览器的同源政策的同源指的是：协议相同、域名相同、端口相同，如果非同源，有三种行为会受到限制：Cookie、LocalStorage和IndexDB无法读取；DOM无法获得、AJAX请求不能发送。

前后端分离的场景下，由于浏览器的同源策略，导致浏览器内的请求不同的源的后端是会失败，常见的解决跨域方法是使用CORS，现在常见的web框架都支持CORS，开启即可。

解决跨域的方法除了CORS，还有jsonp，不过已经很少使用了，jsonp本质是利用浏览器允许加载不同源的js文件即标签等，将跨域请求标签里，返回一段可执行的js代码，其中包含了请求结果，通常是json格式，前端通过返回的js代码执行回调获取结果。

详情见 [跨域资源共享 CORS 详解](http://www.ruanyifeng.com/blog/2016/04/cors.html)

对于跨域产生的问题，如CSRF跨域请求攻击的解决方案，可参考：[美团:如何防止csrf](https://tech.meituan.com/2018/10/11/fe-security-csrf.html)

## session和cookie

* 首先HTTP是无状态的，因此需要通过session、cookie来达到记录用户状态的目的。
* 传统的session、cookie：session存用户信息，保存在服务端中，cookie里存session对应的sessionId，保存在客户端中，用于找到对应的session，每次请求都会带上该cookie来表示此用户。

  如果客户端禁用掉cookie，可以使用LocalStorage存储，每次请求都以参数的形式传递；
* cookie可设置长时间保持，session有效时间通常比较短，安全性上来将，cookie存储在客户端，容易被窃取，session存储在服务段，相对安全；
* 由于现在实例的部署不可能只部署一个，一般都是集群部署，因此session不可以只存在一个实例的内存中，因此引入Redis来存用户的登录信息
* 现在一般使用 token + Redis来实现 cookie - session 机制，本质上差不多，前端的cookie更多的是存token的信息而已，token也可以存在LocalStorage或sessionStorage中，发送请求时一般是把token的值放在请求头中，而不会把cookie发给后端，这样可以避免当用户禁用cookie导致功能不可用，还有CSRF问题。

## JWT

JWT = JSON WEB TOKEN

### 原理

JWT实际上是一个token(令牌)，分为三部分：Header(头部)、Payload(负载)、Signature(签名)。

Header(头部) ：两部分组成，记录令牌类型和JWT的签名算法，一般是HMACSHA256。

Payload(负载)： 记录用户登录信息(官方规范默认是不加密的，分为官方字段和私有字段）。

Signature(签名) ：记录将 Header、Payload和服务端的密钥组合起来，使用Header(头部)里规定的方式加密。

比如header里保存的加密方式是HMACSHA256，`签名 Signature = HMACSHA256(base64URL(header) + "." + base64URL(payload) + "." + 保存在后端的密钥)`

最后的JWT = `base64URL(Header) + "." + base64URL(Payload) + "." + Signature`，后端收到该JWT后验证该签名是否正确，来判断JWT里的用户信息是否可靠。

**base64**：64指的是A-Z,a-z，0-9，+，/，将待转换的字符串转成二进制流，每3个8位转成4个6位，6位的二进制数转成十进制，根据码表找到对应的字符，以=号做后缀，凑齐位数

一般是为了解决一些字符编码的问题，将非ASCII字符转化为ASCII字符，还有就是可以对数据做简单加密，base64URL在base64的基础上增加对一些符号的编解码，比如把"-"替换成"+"，使得它可以出现在url中。

**HMACSHA256**：摘要算法，一般用于验证签名是否一致

### 使用

可以存储在浏览器的本地缓存localStorage或者cookie中，发送请求的时候放在cookie里，或者放在请求头中

* JWT的目的是让服务器不保存任何session数据，让后端变成无状态的，因此没办法主动废弃某个token，一旦签发了JWT，在到期之前就会始终有效，如果想要实现这种功能，必然需要在后端保存JWT，就违背了JWT的设计初衷了。
* 要让JWT实现 续签 和 主动过期功能，必定需要在后端保存JWT
  * jwt主动过期问题，使用黑名单即可；分成两点，客户端要求失效，服务端记录token到黑名单；用户重置密码，服务端记录uid-time键值对，在此之前的token全部失效；客户端把保存的jwt删掉是没用的，此时的jwt依然有效，只是客户端没记录而已
  * jwt续签问题，一种解决方式是jwt中存储过期时间，服务端设置刷新时间，请求时判断是否在过期时间或刷新时间，在刷新时间内进行token刷新，失效token记入黑名单；
  * 而黑名单过大问题，可以采用记录UID-刷新时间方式解决，判断jwt签发时间，jwt签发时间小于UID-刷新时间的记为失效
* 个人认为JWT的生成方式本身是有一套规范的，在实际使用过程中也可以对他进行改动，本质上还是一个签名校验而已，一般会对JWT进行魔改，比如使用Header(头部)里的加密方式加密Signature(签名)，Signature(签名)加密Header(头部) 和Payload(负载) 这两部分，服务器里的私钥解密Payload(负载)，得到需要的登录信息，不通过简单的base64URL编码，不对外暴露，签名算法或者签名里的密钥的方式可以改成其他等。

JWT参考：[JWT 超详细分析](https://learnku.com/articles/17883)

## CAS模型 - SSO(单点登录)

可参考：[CAS实现单点登录SSO执行原理探究](https://blog.csdn.net/javaloveiphone/article/details/52439613)，讲得算是比较明白，这里是总结基于CAS模式改的单点登录模式

* 第一次访问时，由于没有访问的token，会引导至登录

![第一次访问](https://github.com/Nixum/Java-Note/raw/master/picture/sso-first-access.png)

* 再次访问Web-1时，由于前端已存了token，直接使用token进行请求即可
* 已登录Web-1时去访问Web-2，会通过后端认证中心实现单点登录

![第二次访问](https://github.com/Nixum/Java-Note/raw/master/picture/sso-second-access.png)

这里在总结一下关于GrantTicket和ServiceTicket，跟CAS模型中提到的TGT、ST、PGT这些东西是类似的，本质是作为验证的票据，图中的GrantTicket、ServiceTicket、token含义如下

GrantTicket：全局会话票据，保存在登录页，通过GrantTicket才能换取ServiceTicket；

ServiceTicket表示访问资源的一次性票据，根据ServiceTicket换取token，换取后失效；

token：登录凭证

GT、ST和token都是保存在Redis中的，他们在Redis中的存储结构如下

```
key：TOKEN_${Token的值}
value:
{
    "createTime": 1565961654807,
    "accountId": "123",
    // 用户其他信息
    "grantTicket": ${GrantTicket的值}  // token关联GT，用于注销时实现全局注销
}

key：GRANT_TICKET_${GrantTicket的值}
value:
{
    "createTime": 1565961654807,
    "accountId": "123",
}

key：SERVICE_TICKET_${ServiceTicket的值}
value:
{
    "createTime": 1565961654807,
    "grantTicket": ${GrantTicket的值} // ST关联GT，用于判断该ST是否有效，换取token后删除
}

// token与grantTicket的记录，注销时，根据token中关联的GT，找到所有与之关联的token，进行删除，这里推荐使用Redis的scan命令进行分段查询，原因是Redis是单线程的，如果数据量太大使用keys命令遍历太久，阻塞Redis接收其他命令
key：{grantTicket}-{token}
value：无
```

## 基于OAuth2.0的第三方登录

可参考：[理解OAuth 2.0](https://www.ruanyifeng.com/blog/2014/05/oauth_2_0.html)，这样基本就入门了，这里是总结项目中如何接入，一般在集成facebook和google的第三方登录也是类似的流程机制，这里只用到了access\_token，对于refresh\_token，是用来延长access\_token的过期时间的，减少短时间内的重复登录，这里就没有涉及到了

![基于OAuth2的第三方登录](https://github.com/Nixum/Java-Note/raw/master/picture/%E5%9F%BA%E4%BA%8Eoauth2%E7%9A%84%E7%AC%AC%E4%B8%89%E6%96%B9%E7%99%BB%E5%BD%95%E6%B5%81%E7%A8%8B.png)

为什么要后端要根据code + clientId + secret换成access\_token，再根据access\_token换用户个人信息？

为什么后端不直接code + clientId + secret换用户个人信息呢？

主要还是为了安全，防止中间人攻击

* 重定向的参数是带在url里的，是直接暴露在客户端的，如果直接返回access\_token就不安全，因此才多了code这一层，为了降低code被拦截泄漏后的风险，code的过期时间一般都很短，且是一次性的；
* 另外就是后端对于外部的请求都是不信任的，因此接收到的参数(code)首先还要配合凭证去验证其合法性，对于验证通过后获得的access\_token也有更多的操作空间，由后端持有，不会暴露出去

  像上图那种登录方案，后端只需要用户个人信息换完token就算完事了，所以看起来好像直接使用code + clientId + secret换用户个人信息就行，但是如果此时需要再获取用户的其他信息，就没有没办法再用code去换了，只能要求用户再次登录，此时如果有access\_token就显得多么重要了

## 压测

总结一下做过的压测，压测工具jmetter，利用jmette可以多线程并发请求和可以实时查看简易报告的能力

1. 先对被压测服务的接口针对不同场景编写压测用例，设定好TPS的起始和目标值，作为压测计划
2. 画压测机器部署关系图，部署压测环境
   * 对于被压测的服务，一般会mock掉与该服务相关关联的服务，比如该服务还连了数据库，该接口请求依赖一些独立部署的中间件，或者依赖其他服务，则会对这些相关的依赖用桩来代替，用于维持通信，以减少这些额外服务的影响。
   * 一般一台机器只部署一个服务，特别是被压测服务，此外还要注意被压测服务所在的机器上网络设置相关的参数，比如TCP最大连接数、回收策略之类的设置
3. 编写压测脚本，压测脚本越简单越好，尽量让压测工具不影响被压测服务，**脚本最重要的几个设置**： 发起请求时的并发线程数、响应的断言、TPS数，其他那些花里胡哨的输出树状图，饼图啊那些都不用配了，用最简单的报告输出即可
4. 部署完后，将脚本配置放到jmeter的机器上，启动压测

   ```
   nohup java -jar bin/ApacheJMeter.jar -n -t jmetter脚本路径/config.jmx > test.out &
   ```

   输出到当前目录下的test.out文件里，这里启动是使用默认参数启动，如果对jmetter的JVM设置有要求，也可以在启动时指定JVM参数，如

   ```
   nohup java -server -XX:+HeapDumpOnOutOfMemoryError -Xms512m -Xmx512m -XX:+UseG1GC -XX:MaxGCPauseMillis=250 -XX:G1ReservePercent=20 -Djava.security.egd=file:/dev/urandom -jar bin/ApacheJMeter.jar -n -t jmetter脚本路径/config.jmx > test.out &
   ```

   压测开启后可以打开test.out文件查看压测报告
5. 一般是按照TPS从小往大压，小的TPS压，在正常延时的情况下可以先判断程序是否有问题，比如内存泄漏，内存溢出，没问题了再逐步往大了压。如果先从大往小压，延时又上不去，此时判断不了是程序内部问题还是过大的TPS导致。压测时间一般最少压一天
6. 输出压测报告

一般有如下几个点要注意，这些点到时也要输出到压测报告上

| 监控点                  | 说明                                                                                                     |
| -------------------- | ------------------------------------------------------------------------------------------------------ |
| jmetter端的TPS、延时、错误率  | 观察TPS是否符合预期、延时是否达到预期且稳定、错误率要为0。**当程序正常时降低RT的手段**：减少不必要的日志输出、业务逻辑算法是否还有优化空间，是否有IO占用或者频繁序列化反序列化、内部队列是否阻塞 |
| 被压测服务的gc             | fgc，ygc不要太频繁，一般来说**fgc 一小时要小于3\~4次**；**ygc一分钟要小于3\~4次为佳**。                                             |
| jmetter端的CPU、内存使用率等  | 注意jmetter端的CPU是否过高或波动很大，避免影响压测结论                                                                       |
| 被压测服务端的CPU、磁盘、内存使用率等 | 如果cpu过高，如果连续达到90以上，基本上是内存泄漏导致了频繁的fgc；磁盘的占用情况，注意生成的日志是否把磁盘占满了                                           |

使用 `jstat -gcutil [pid] [时间间隔，每几秒打印] [打印次数]`查看GC情况

当被压测端的gc不正常时，应尽量保存事发环境

​ 1、收集内存使用基本情况统计：`jmap -heap [pid] > [文件名，如heap.log]`

​ 2、收集线程堆栈运行信息：`jstack [pid] > [文件名，如stack.log]`

​ 3、收集内存详细使用信息，生成dump内存快照：`jmap -dump:format=b,file=[文件名，如heap.dump] [pid]`

一般使用eclipse mat工具进行内存快照的分析，排查出内存泄漏的问题。

mat的使用参见：[Eclipse MAT内存分析工具](https://www.cnblogs.com/yueshutong/p/9824772.html)

**一般压测脚本的模板：**

```xml
<?xml version="1.0" encoding="UTF-8"?>
<jmeterTestPlan version="1.2" properties="3.2" jmeter="3.2 r1790748">
  <hashTree>
    <TestPlan guiclass="TestPlanGui" testclass="TestPlan" testname="测试计划" enabled="true">
        <!-- 一般写压测计划中的序号+名称 -->
      <stringProp name="TestPlan.comments"></stringProp>
      <boolProp name="TestPlan.functional_mode">false</boolProp>
      <boolProp name="TestPlan.serialize_threadgroups">false</boolProp>
      <elementProp name="TestPlan.user_defined_variables" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="用户定义的变量" enabled="true">
        <collectionProp name="Arguments.arguments"/>
      </elementProp>
      <stringProp name="TestPlan.user_define_classpath"></stringProp>
    </TestPlan>
    <hashTree>
      <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Thread Group" enabled="true">
        <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
        <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="循环控制器" enabled="true">
          <boolProp name="LoopController.continue_forever">false</boolProp>
          <intProp name="LoopController.loops">-1</intProp>
        </elementProp>
        <stringProp name="ThreadGroup.num_threads">500</stringProp>                                             <!-- 发起请求时的并发线程数，这里设置为500个并发线程，表示使用这么多的线程数来达到下面设置的TPS数 -->
        <stringProp name="ThreadGroup.ramp_time">8</stringProp>
        <longProp name="ThreadGroup.start_time">1509332694000</longProp>
        <longProp name="ThreadGroup.end_time">1509332694000</longProp>
        <boolProp name="ThreadGroup.scheduler">false</boolProp>
        <stringProp name="ThreadGroup.duration"></stringProp>
        <stringProp name="ThreadGroup.delay"></stringProp>
      </ThreadGroup>
      <hashTree>
        <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="click http request" enabled="true">
          <elementProp name="HTTPsampler.Arguments" elementType="Arguments" guiclass="HTTPArgumentsPanel" testclass="Arguments" testname="用户定义的变量" enabled="true">
            <collectionProp name="Arguments.arguments"/>
          </elementProp>
          <stringProp name="HTTPSampler.domain">192.168.1.123</stringProp>         <!-- 此处为被压测服务的host -->
          <stringProp name="HTTPSampler.port">12345</stringProp>                    <!-- 此处为被压测服务的port -->
          <stringProp name="HTTPSampler.protocol">http</stringProp>
          <stringProp name="HTTPSampler.contentEncoding"></stringProp>
          <stringProp name="HTTPSampler.path">${__StringFromFile(/home/urls.log,,,)}</stringProp>  <!-- 发起的http请求uri从文件读取，文件路径 -->
          <stringProp name="HTTPSampler.method">GET</stringProp>
          <boolProp name="HTTPSampler.follow_redirects">false</boolProp>
          <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
          <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
          <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
          <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
          <stringProp name="HTTPSampler.implementation">Java</stringProp>
          <stringProp name="HTTPSampler.connect_timeout"></stringProp>
          <stringProp name="HTTPSampler.response_timeout"></stringProp>
        </HTTPSamplerProxy>
        <hashTree/>
        <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
          <collectionProp name="Asserion.test_strings">
            <stringProp name="49586">200</stringProp>                                       <!-- http请求的响应断言，要求返回的http code为200才判定为成功 -->
          </collectionProp>
          <stringProp name="Assertion.test_field">Assertion.response_code</stringProp>
          <boolProp name="Assertion.assume_success">false</boolProp>
          <intProp name="Assertion.test_type">8</intProp>
        </ResponseAssertion>
        <hashTree/>
        <ConstantThroughputTimer guiclass="TestBeanGUI" testclass="ConstantThroughputTimer" testname="Constant Throughput Timer" enabled="true">
          <intProp name="calcMode">1</intProp>
          <doubleProp>
            <name>throughput</name>
            <value>30000.0</value>          <!-- 1分钟内发起的请求数，换算为tps为500 -->
            <savedValue>0.0</savedValue>
          </doubleProp>
        </ConstantThroughputTimer>
        <hashTree/>
      </hashTree>
    </hashTree>
    <WorkBench guiclass="WorkBenchGui" testclass="WorkBench" testname="工作台" enabled="true">
      <boolProp name="WorkBench.save">true</boolProp>
    </WorkBench>
    <hashTree/>
  </hashTree>
</jmeterTestPlan>
```

## 调优

参考：<https://tech.meituan.com/2016/12/02/performance-tunning.html>

## 布隆过滤器

本质上是基于hash的概率性数据结构，是一个很长的二进制数组，主要用于判断元素可能存在集合中，或者一定不在集合中。

### 原理

1. 有一个长度为m的bit数组，初始每个bit都是0，另外还有k个hash函数；
2. 当加入一个元素时，先调用k个hash函数得到k个结果，将这k个结果与bit数组长度取模得到k个数组下标，将这k个数组下标对应的值置为 1；
3. 查询元素时，同样经过上面步骤的计算，最终得到k个数组下标，判断这些下标对应的值是否为1，如果为1，说明元素可能存在，如果有一个不为1，说明元素一定不存在，返回结果；

### 误判率计算

> 涉及到3个重要的参数：
>
> 1. `m`表示bit数组的长度
> 2. `k`表示散列函数的个数
> 3. `n`表示插入的元素个数
>
> 布隆过滤器中，一个元素插入后，某个bit为0的概率是`(1 − 1/m)^k`
>
> n元素插入后，某个bit为0的概率是`(1 − 1/m)^(n*k)`
>
> false positive的概率是`(1−(1−1/m)^n*k)^k`
>
> 因为需要的是`k`个不同的bit被设置成1，概率是大约是`(1−e^(−k*n/m))^k`

### 实现

可以基于redis实现，但这里只给出go版本的实现，支持并发安全

```go
const (
	mod7       = 1<<3 - 1
	bitPerByte = 8
)

type Filter struct {
	lock       *sync.RWMutex
	concurrent bool
	// 长度之所以要取2的指数是因为要将取模操作优化成与操作， % 等于 &(2^n-1)
	m     uint64 // bit array of m bits, m will be ceiling to power of 2
	n     uint64 // number of inserted elements
	log2m uint64 // log_2 of m
	k     uint64 // the number of hash function
	keys  []byte // byte array to store hash value
}

func New(size uint64, k uint64, race bool) *Filter {
	log2 := uint64(math.Ceil(math.Log2(float64(size))))
	filter := &Filter{
		m:          1 << log2,
		log2m:      log2,
		k:          k,
		keys:       make([]byte, 1<<log2),
		concurrent: race,
	}
	if filter.concurrent {
		filter.lock = &sync.RWMutex{}
	}
	return filter
}

func (f *Filter) Add(data []byte) *Filter {
	if f.concurrent {
		f.lock.Lock()
		defer f.lock.Unlock()
	}
	h := baseHash(data)
	for i := uint64(0); i < f.k; i++ {
		loc := location(h, i)
		slot, mod := f.location(loc)
		f.keys[slot] |= 1 << mod
	}
	f.n++
	return f
}

// location returns the bit position in byte array
// & (f.m - 1) is the quick way for mod operation
func (f *Filter) location(h uint64) (uint64, uint64) {
	slot := (h / bitPerByte) & (f.m - 1)
	mod := h & mod7
	return slot, mod
}

// baseHash returns the murmur3 128-bit hash
func baseHash(data []byte) []uint64 {
	a1 := []byte{1} // to grab another bit of data
	hasher := murmur3.New128()
	hasher.Write(data) // #nosec
	v1, v2 := hasher.Sum128()
	hasher.Write(a1) // #nosec
	v3, v4 := hasher.Sum128()
	return []uint64{
		v1, v2, v3, v4,
	}
}
```

## 淘汰算法

### LRU

java实现，非线程安全

```java
// 注意每一次对节点有操作对需要同时操作 nodeMap和lruQueue
// LRU本质是利用 hashMap 和 双向链表 实现
public class LRUCache {

  private Map<String, Node> nodeMap;
  private DoubleLinkedList lruQueue;
  private int size = 0;

  public LRUCache() {
    this(3);
  }

  public LRUCache(int size) {
    this.size = size;
    this.nodeMap = new HashMap<>();
    this.lruQueue = new DoubleLinkedList();
  }


  public String get(String key) {
    Node n = nodeMap.get(key);
    if (n == null) {
      return null;
    }
    // 获取后直接放到到最前的位置
    put(n.key, n.value);
    return n.value;
  }

  public void put(String key, String value) {
    Node newNode = new Node(key, value);
    // 如果包含，则放到最前
    if (nodeMap.containsKey(key)) {
      lruQueue.remove(nodeMap.get(key));
      lruQueue.addFirst(newNode);
      // 记得更新map
      nodeMap.put(key, newNode);
    } else {
      // 如果满了，则移除最后一个
      if (size <= lruQueue.size()) {
        Node last = lruQueue.removeLast();
        nodeMap.remove(last.key);
      }
      nodeMap.put(key, newNode);
      lruQueue.addFirst(newNode);
    }
  }

  private class Node {
    private String key;
    private String value;
    private Node pre;
    private Node next;

    public Node(String key, String value) {
      this.key = key;
      this.value = value;
    }
  }

  // 封装双向链表方法，构建时要注意前后节点指向和空指针问题
  private class DoubleLinkedList {
    private Node head;
    private Node tail;
    private int count = 0;

    public void addFirst(Node n) {
      if (head == null) {
        tail = n;
      }
      count ++;
      n.next = head;
      if (head != null) {
        head.pre = n;
      }
      head = n;
    }

    public Node removeLast() {
      if (count == 0) {
        return null;
      }
      Node result = tail;
      if (tail.pre != null) {
        tail.pre.next = null;
      } else {
        head = null;
        tail = null;
      }
      count --;
      return result;
    }

    public void remove(Node n) {
      if (count == 0) {
        return ;
      }
      count --;
      if (n.pre != null) {
        n.pre.next = n.next;
      } else {
        head = n.next;
      }
      if (n.next != null) {
        n.next.pre = n.pre;
      } else {
        tail = n.pre;
      }
    }

    public int size() {
      return count;
    }
  }
}
```

golang实现，非并发安全，下面这种写法是平铺了双向链表，链表的长度通过map的长度计算得到

```go
type LRUCache struct {
	nodeMap  map[string]*Node
	head     *Node
	tail     *Node
	cap      int
}

type Node struct {
	key   string
	val   string
	next  *Node
	pre   *Node
}

func NewLRUCache(capacity int) LRUCache {
	return LRUCache{nodeMap: make(map[string]*Node), cap: capacity}
}

func (lru *LRUCache) Get(key string) string {
    if existNode, exist := lru.nodeMap[key]; exist {
        lru.remove(existNode)
        lru.addFirst(existNode)
        return existNode.val
    }
    return ""
}

func (lru *LRUCache) Put(key string, val string) {
    if existNode, exist := lru.nodeMap[key]; exist {
        existNode.val = val
        lru.remove(existNode)
        lru.addFirst(existNode)
        return
    } else {
        newNode := &node{key: key, val: val}
        lru.nodeMap[key] = newNode
        lru.addFirst(newNode)
    }
    if len(lru.nodeMap) > lru.cap {
        // 一定要先delete，如果先remove，会导致找不回tail的key进行delete，
        delete(lru.nodeMap, lru.tail.key)
        lru.remove(lru.tail)
    }
}

func (lru *LRUCache) addFirst(n *Node) {
	n.pre = nil
    n.next = lru.head
    if lru.head != nil {
        lru.head.pre = n
    }
    lru.head = n
    if lru.tail == nil {
        lru.tail = n
        lru.tail.next = nil
    }
}

func (lru *LRUCache) remove(n *Node) {
    // n的next和pre要置为null，防止内存泄漏
    if n == lru.head {
        lru.head = n.next
        if n.next != nil {
            n.next.pre = nil
        }
        n.next = nil
        return
    }
    if n == lru.tail {
        lru.tail = n.pre
        n.pre.next = nil
        n.pre = nil
        return
    }
    n.pre.next = n.next
    n.next.pre = n.pre
}
```

关于并发安全，最简单的实现就是在调用Get和Put方法时加读写锁，但是这种做法锁的粒度比较大，每次会锁住整个底层的双向链表和map，导致在高并发情况下吞吐量不高，优化的思路就是对map分片，通过分片上锁来减小锁的粒度，然后再双向链表节点的操作上进行优化。

```go
func New(capacity int) LRUCache {
	shards := make(map[string]*LRUCacheShard, 256)
	for i := 0; i < 256; i++ {
		shards[fmt.Sprintf("%02x", i)] = &LRUCacheShard{
			Cap:  capacity,
			Keys: make(map[int]*list.Element),
			List: list.New(),
		}
	}
	return LRUCache{
		shards: shards,
	}
}

func (c *LRUCache) Get(key int) int {
	shard := c.GetShard(key)
	shard.RLock()
	defer shard.RUnlock()
	……
}

func (c *LRUCache) Put(key int, value int) {
  	shard := c.GetShard(key)
	shard.Lock()
	defer shard.Unlock()
	……
}

func (c *LRUCache) GetShard(key int) (shard *LRUCacheShard) {
  hasher := sha1.New()
  hasher.Write([]byte(key))
  shardKey :=  fmt.Sprintf("%x", hasher.Sum(nil))[0:2]
  return c.shards[shardKey]
}
```

其中，在Get方法中，如果存在，还需要修改key所在节点的位置，直接调put即可，当然这种方式的粒度还是比较大，再次优化的思路是对map的操作还是得上锁，但对双向链表的操作无需上锁，双向链表移动节点和删除节点可以同时操作，可以通过两个channel实现，[参考](https://github.com/halfrost/LeetCode-Go/blob/master/template/CLRUCache.go)

### LFU

java实现，非线程安全

```java
/**
 * 频次最少使用
 * 设定容量，每次get key会修改使用次数和使用时间，当满容量时，移除次数最少的那个
 * 如果有多个key的使用次数一样，则移除使用时间最旧的那个
 */
public class LFUCache {

    Map<String, String> keyValMap;
    Map<String, Integer> key2FreqMap; // key和使用频率的映射
    // 用LinkHashSet来模拟使用时间，使用LinkHashSet还有一个目的是便于根据key进行删除
    Map<Integer, LinkedHashSet<String>> freq2KeysMap; // 使用频率和key的映射
    int minFreq;
    int cap;

    public LFUCache(int cap) {
        keyValMap = new HashMap<>();
        key2FreqMap = new HashMap<>();
        freq2KeysMap = new HashMap<>();
        this.cap = cap;
        this.minFreq = 0;
    }

    public String get(String key) {
        if (!keyValMap.containsKey(key)) {
            return "";
        }
        increaseFreq(key);
        return keyValMap.get(key);
    }

    public void put(String key, String val) {
        if (this.cap <= 0) {
            return;
        }
        if (keyValMap.containsKey(key)) {
            keyValMap.put(key, val);
            increaseFreq(key);
            return;
        }
        if (this.cap <= keyValMap.size()) {
            removeMinFreqKey();
        }
        keyValMap.put(key, val);
        key2FreqMap.put(key, 1);
        freq2KeysMap.putIfAbsent(1, new LinkedHashSet<>());
        freq2KeysMap.get(1).add(key);
        this.minFreq = 1;
    }

    void increaseFreq(String key) {
        int freq = key2FreqMap.get(key);
        key2FreqMap.put(key, freq+1);
        freq2KeysMap.get(freq).remove(key);
        freq2KeysMap.putIfAbsent(freq + 1, new LinkedHashSet<>());
        freq2KeysMap.get(freq + 1).add(key);
        if (freq2KeysMap.get(freq).isEmpty()) {
            freq2KeysMap.remove(freq);
            if (freq == this.minFreq) {
                this.minFreq++;
            }
        }
    }

    void removeMinFreqKey() {
        LinkedHashSet<String> keys = freq2KeysMap.get(this.minFreq);
        String delKey = keys.iterator().next();
        keys.remove(delKey);
        if (keys.isEmpty()) {
            freq2KeysMap.remove(this.minFreq);
            // 这里无需更新 minFreq 的值，因为该方法是在插入新key时使用，此时minFreq一定是1
        }
        keyValMap.remove(delKey);
        key2FreqMap.remove(delKey);
    }
}
```

golang实现，非并发安全，利用 优先级队列（最小堆） + map实现，使用的是go自带的heap数据结构，通过Item数组结构实现

```go
import "container/heap"

type LFUNode struct {
  	key   string
	val   string
	freq  int // 用于优先级，key的使用频率
	count int // 用于当freq相同时的比较淘汰，总的获取次数，类似时间戳的概念
	index int // 最小堆中元素的下标，用于重建最小堆.
}

type PriorityQueue []*LFUNode

func (pq PriorityQueue) Len() int { return len(pq) }

func (pq PriorityQueue) Less(i, j int) bool {
	// 优先根据使用频率排列，相等时才使用count，从小到大排序
	if pq[i].freq == pq[j].freq {
		return pq[i].count < pq[j].count
	}
	return pq[i].freq < pq[j].freq
}

func (pq PriorityQueue) Swap(i, j int) {
	pq[i], pq[j] = pq[j], pq[i]
	pq[i].index = i
	pq[j].index = j
}

func (pq *PriorityQueue) Push(x interface{}) {
	n := len(*pq)
	node := x.(*LFUNode)
	node.index = n
	*pq = append(*pq, node)
}

func (pq *PriorityQueue) Pop() interface{} {
	old := *pq
	n := len(old)
	node := old[n-1]
	old[n-1] = nil  // 防止内存泄露
	node.index = -1 // pop时重置下标保证安全
	*pq = old[0 : n-1]
	return node
}

// 更新最小堆里的元素
func (pq *PriorityQueue) update(node *LFUNode, value string, frequency int, count int) {
	node.val = value
	node.count = count
	node.freq = frequency
	heap.Fix(pq, node.index)
}

// ==========================================
type LFUCache struct {
	cap      int
	pq       PriorityQueue
	nodeMap  map[string]*LFUNode
	counter  int
}

func NewLFUCache(capacity int) LFUCache {
	return LFUCache{
		pq:       PriorityQueue{},
		nodeMap:  make(map[string]*LFUNode, capacity),
		cap:      capacity,
	}
}

func (lfu *LFUCache) Get(key string) string {
	if lfu.cap == 0 {
		return ""
	}
	if node, ok := lfu.nodeMap[key]; ok {
		lfu.counter++
		lfu.pq.update(node, node.val, node.freq+1, lfu.counter)
		return node.val
	}
	return ""
}

func (lfu *LFUCache) Put(key string, value string) {
	if lfu.cap == 0 {
		return
	}
	lfu.counter++
	// 如果存在，增加 frequency，再调整堆
	if node, ok := lfu.nodeMap[key]; ok {
		lfu.pq.update(node, value, node.freq + 1, lfu.counter)
		return
	}
	// 如果不存在且缓存满了，需要删除。在 hashmap 和 pq 中删除。
	if len(lfu.pq) == lfu.cap {
		node := heap.Pop(&lfu.pq).(*LFUNode)
		delete(lfu.nodeMap, node.key)
	}
	// 新建结点，在 hashmap 和 pq 中添加。
	node := &LFUNode{
		val:   value,
		key:   key,
		count: lfu.counter,
	}
	heap.Push(&lfu.pq, node)
	lfu.nodeMap[key] = node
}
```

另一个版本实现，利用go本身提供的双向链表，但是由于go没泛型，在类型推断方面导致整体性能不会很高

```go
type LFUCache2 struct {
	nodeMap       map[string]*list.Element
	freq2NodeList map[int]*list.List
	cap           int
	min           int
}

type node struct {
	key       string
	value     string
	freq      int
}

func NewLFUCache2(capacity int) LFUCache2 {
	return LFUCache2{nodeMap: make(map[string]*list.Element),
		freq2NodeList:    make(map[int]*list.List),
		cap:      capacity,
		min:      0,
	}
}

func (lfu *LFUCache2) Get(key string) string {
	value, ok := lfu.nodeMap[key]
	if !ok {
		return ""
	}
	currentNode := value.Value.(*node)
	lfu.freq2NodeList[currentNode.freq].Remove(value)
	currentNode.freq++
	if _, ok := lfu.freq2NodeList[currentNode.freq]; !ok {
		lfu.freq2NodeList[currentNode.freq] = list.New()
	}
	newList := lfu.freq2NodeList[currentNode.freq]
	newNode := newList.PushFront(currentNode)
	lfu.nodeMap[key] = newNode
	if currentNode.freq-1 == lfu.min && lfu.freq2NodeList[currentNode.freq-1].Len() == 0 {
		lfu.min++
	}
	return currentNode.value
}

func (lfu *LFUCache2) Put(key string, value string) {
	if lfu.cap == 0 {
		return
	}
	// 如果存在，更新访问次数
	if currentValue, ok := lfu.nodeMap[key]; ok {
		currentNode := currentValue.Value.(*node)
		currentNode.value = value
		lfu.Get(key)
		return
	}
	// 如果不存在且缓存满了，需要删除
	if lfu.cap == len(lfu.nodeMap) {
		currentList := lfu.freq2NodeList[lfu.min]
		backNode := currentList.Back()
		delete(lfu.nodeMap, backNode.Value.(*node).key)
		currentList.Remove(backNode)
	}
	// 新建结点，插入到 2 个 map 中
	lfu.min = 1
	currentNode := &node{
		key:       key,
		value:     value,
		freq:      1,
	}
	if _, ok := lfu.freq2NodeList[1]; !ok {
		lfu.freq2NodeList[1] = list.New()
	}
	newList := lfu.freq2NodeList[1]
	newNode := newList.PushFront(currentNode)
	lfu.nodeMap[key] = newNode
}
```

## 参考

[布隆过滤器原理及golang实现](https://liangyaopei.github.io/2020/07/29/bloom-filter/)

[LRU / LFU 的青铜与王者](https://halfrost.com/lru_lfu_interview/)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://nixum.gitbook.io/note/qi-ta.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
