# 其他

\[TOC]

## Quartz

* 分为三个部分：e
  * Job\&Detial(任务)：定时任务的执行方法，与Trigger配套的
  * Trigger(触发器)：规定什么时候触发，与Job\&Detail配套的
  * Scheduler(调度器)：单例，把Trigger丢里面由调度器调度，只需要一个Scheduler，配置不同的Trigger；可以理解成类似线程池的东西
* 原理：ScheduledThreadPoolExecutor线程池 + 通过Object类的wait()和notify()或者Condition类的await()\signal()进行等待和唤醒、锁保证线程安全 来进行调度

  Scheduler有两个调度线程：regular Scheduler Thread（执行常规调度）和Misfire Scheduler Thread（执行错失的任务），Regular Thread 轮询所有Trigger，如果有将要触发的Trigger（用wait和notifyAll实现），则从任务线程池中获取一个空闲线程，然后执行与改Trigger关联的job；Misfire Thraed则是扫描所有的trigger，查看是否有错失的，如果有的话，根据一定的策略进行处理
* 默认是并发的，即如果当前任务没有完成，会自动开一个任务执行
* 注意在分布式集群的情况下，多台机子有相同的定时任务，会出错，此时通过共享数据库的方式实现

  Quartz的解决方案：

  quartz集群分为水平集群和垂直集群，水平集群即将定时任务节点部署在不同的服务器，其最大的问题就是时钟同步问题，若时钟不能同步，则会导致集群中各个节点状态紊乱，造成不可预知的后果；垂直集群则是集群各节点部署在同一台服务器，时钟同步自然不是问题，但存在单点故障问题，服务器宕机会严重影响服务的可用性

  在各个节点会上报任务，存到数据库中，执行时会从数据库中取出触发器来执行，如果触发器的名称和执行时间相同，则只有一个节点去执行此任务。

  如果此节点执行失败，则此任务则会被分派到另一节点执行，中途也会自动检查失效的定时调度，发现不成功的，其他节点立马接过来继续完成定时任务。Quartz有11个定时任务调度表

参考

[Quartz原理解密](https://www.cnblogs.com/Dorae/p/9357180.html)

[深入解读Quartz的原理](https://blog.csdn.net/scgyus/article/details/79360316)

[Quartz 2.2 的实现原理和运行过程](https://blog.csdn.net/xlxxcc/article/details/52104463)

### 其他定时器

* Timer：这是java自带的java.util.Timer类，这个类允许你调度一个java.util.TimerTask任务。使用这种方式可以让你的程序按照某一个频度执行，但不能在指定时间运行。一般用的较少。单线程，任务一多会阻塞；一个任务出异常其他任务都受影响；受系统时间影响
* ScheduledExecutorService：也jdk自带的一个类；是基于线程池设计的定时任务类,每个调度任务都会分配到线程池中的一个线程去执行,也就是说,任务是并发执行,互不影响。线程池+延时队列DelayedQueue(数组、最小堆, 最近要执行的任务放在堆顶) 实现，如果堆顶任务时间未到就阻塞（通过自旋+condition.await\signal实现）。不受系统时间影响
* Spring 中的 @Schedule 注解

参考：[Java 定时任务实现原理详解](https://blog.csdn.net/u013332124/article/details/79603943)

[Java优先级队列DelayedWorkQueue原理分析](https://www.jianshu.com/p/587901245c95)

## CORS

浏览器的同源政策的同源指的是：协议相同、域名相同、端口相同，如果非同源，有三种行为会受到限制：Cookie、LocalStorage和IndexDB无法读取；DOM无法获得、AJAX请求不能发送。

前后端分离的场景下，由于浏览器的同源策略，导致浏览器内的请求不同的源的后端是会失败，常见的解决跨域方法是使用CORS，现在常见的web框架都支持CORS，开启即可。

解决跨域的方法除了CORS，还有jsonp，不过已经很少使用了，jsonp本质是利用浏览器允许加载不同源的js文件即标签等，将跨域请求标签里，返回一段可执行的js代码，其中包含了请求结果，通常是json格式，前端通过返回的js代码执行回调获取结果。

详情见 [跨域资源共享 CORS 详解](http://www.ruanyifeng.com/blog/2016/04/cors.html)

对于跨域产生的问题，如CSRF跨域请求攻击的解决方案，可参考：[美团:如何防止csrf](https://tech.meituan.com/2018/10/11/fe-security-csrf.html)

## session和cookie

* 首先HTTP是无状态的，因此需要通过session、cookie来达到记录用户状态的目的。
* 传统的session、cookie：session存用户信息，保存在服务端中，cookie里存session对应的sessionId，保存在客户端中，用于找到对应的session，每次请求都会带上该cookie来表示此用户。

  如果客户端禁用掉cookie，可以使用LocalStorage存储，每次请求都以参数的形式传递；
* cookie可设置长时间保持，session有效时间通常比较短，安全性上来将，cookie存储在客户端，容易被窃取，session存储在服务段，相对安全；
* 由于现在实例的部署不可能只部署一个，一般都是集群部署，因此session不可以只存在一个实例的内存中，因此引入Redis来存用户的登录信息
* 现在一般使用 token + Redis来实现 cookie - session 机制，本质上差不多，前端的cookie更多的是存token的信息而已，token也可以存在LocalStorage或sessionStorage中，发送请求时一般是把token的值放在请求头中，而不会把cookie发给后端，这样可以避免当用户禁用cookie导致功能不可用，还有CSRF问题。

## JWT

JWT = JSON WEB TOKEN

### 原理

JWT实际上是一个token(令牌)，分为三部分：Header(头部)、Payload(负载)、Signature(签名)。

Header(头部) ：两部分组成，记录令牌类型和JWT的签名算法，一般是HMACSHA256。

Payload(负载)： 记录用户登录信息(官方规范默认是不加密的，分为官方字段和私有字段）。

Signature(签名) ：记录将 Header、Payload和服务端的密钥组合起来，使用Header(头部)里规定的方式加密。

比如header里保存的加密方式是HMACSHA256，`签名 Signature = HMACSHA256(base64URL(header) + "." + base64URL(payload) + "." + 保存在后端的密钥)`

最后的JWT = `base64URL(Header) + "." + base64URL(Payload) + "." + Signature`，后端收到该JWT后验证该签名是否正确，来判断JWT里的用户信息是否可靠。

**base64**：64指的是A-Z,a-z，0-9，+，/，将待转换的字符串转成二进制流，每3个8位转成4个6位，6位的二进制数转成十进制，根据码表找到对应的字符，以=号做后缀，凑齐位数

一般是为了解决一些字符编码的问题，将非ASCII字符转化为ASCII字符，还有就是可以对数据做简单加密，base64URL在base64的基础上增加对一些符号的编解码，比如把"-"替换成"+"，使得它可以出现在url中。

**HMACSHA256**：摘要算法，一般用于验证签名是否一致

### 使用

可以存储在浏览器的本地缓存localStorage或者cookie中，发送请求的时候放在cookie里，或者放在请求头中

* JWT的目的是让服务器不保存任何session数据，让后端变成无状态的，因此没办法主动废弃某个token，一旦签发了JWT，在到期之前就会始终有效，如果想要实现这种功能，必然需要在后端保存JWT，就违背了JWT的设计初衷了。
* 要让JWT实现 续签 和 主动过期功能，必定需要在后端保存JWT
  * jwt主动过期问题，使用黑名单即可；分成两点，客户端要求失效，服务端记录token到黑名单；用户重置密码，服务端记录uid-time键值对，在此之前的token全部失效；客户端把保存的jwt删掉是没用的，此时的jwt依然有效，只是客户端没记录而已
  * jwt续签问题，一种解决方式是jwt中存储过期时间，服务端设置刷新时间，请求时判断是否在过期时间或刷新时间，在刷新时间内进行token刷新，失效token记入黑名单；
  * 而黑名单过大问题，可以采用记录UID-刷新时间方式解决，判断jwt签发时间，jwt签发时间小于UID-刷新时间的记为失效
* 个人认为JWT的生成方式本身是有一套规范的，在实际使用过程中也可以对他进行改动，本质上还是一个签名校验而已，一般会对JWT进行魔改，比如使用Header(头部)里的加密方式加密Signature(签名)，Signature(签名)加密Header(头部) 和Payload(负载) 这两部分，服务器里的私钥解密Payload(负载)，得到需要的登录信息，不通过简单的base64URL编码，不对外暴露，签名算法或者签名里的密钥的方式可以改成其他等。

JWT参考：[JWT 超详细分析](https://learnku.com/articles/17883)

## CAS模型 - SSO(单点登录)

可参考：[CAS实现单点登录SSO执行原理探究](https://blog.csdn.net/javaloveiphone/article/details/52439613)，讲得算是比较明白，这里是总结基于CAS模式改的单点登录模式

* 第一次访问时，由于没有访问的token，会引导至登录

![第一次访问](https://github.com/Nixum/Java-Note/raw/master/picture/sso-first-access.png)

* 再次访问Web-1时，由于前端已存了token，直接使用token进行请求即可
* 已登录Web-1时去访问Web-2，会通过后端认证中心实现单点登录

![第二次访问](https://github.com/Nixum/Java-Note/raw/master/picture/sso-second-access.png)

这里在总结一下关于GrantTicket和ServiceTicket，跟CAS模型中提到的TGT、ST、PGT这些东西是类似的，本质是作为验证的票据，图中的GrantTicket、ServiceTicket、token含义如下

GrantTicket：全局会话票据，保存在登录页，通过GrantTicket才能换取ServiceTicket；

ServiceTicket表示访问资源的一次性票据，根据ServiceTicket换取token，换取后失效；

token：登录凭证

GT、ST和token都是保存在Redis中的，他们在Redis中的存储结构如下

```
key：TOKEN_${Token的值}
value:
{
    "createTime": 1565961654807,
    "accountId": "123",
    // 用户其他信息
    "grantTicket": ${GrantTicket的值}  // token关联GT，用于注销时实现全局注销
}

key：GRANT_TICKET_${GrantTicket的值}
value:
{
    "createTime": 1565961654807,
    "accountId": "123",
}

key：SERVICE_TICKET_${ServiceTicket的值}
value:
{
    "createTime": 1565961654807,
    "grantTicket": ${GrantTicket的值} // ST关联GT，用于判断该ST是否有效，换取token后删除
}

// token与grantTicket的记录，注销时，根据token中关联的GT，找到所有与之关联的token，进行删除，这里推荐使用Redis的scan命令进行分段查询，原因是Redis是单线程的，如果数据量太大使用keys命令遍历太久，阻塞Redis接收其他命令
key：{grantTicket}-{token}
value：无
```

## 基于OAuth2.0的第三方登录

可参考：[理解OAuth 2.0](https://www.ruanyifeng.com/blog/2014/05/oauth_2_0.html)，这样基本就入门了，这里是总结项目中如何接入，一般在集成facebook和google的第三方登录也是类似的流程机制，这里只用到了access\_token，对于refresh\_token，是用来延长access\_token的过期时间的，减少短时间内的重复登录，这里就没有涉及到了

![基于OAuth2的第三方登录](https://github.com/Nixum/Java-Note/raw/master/picture/%E5%9F%BA%E4%BA%8Eoauth2%E7%9A%84%E7%AC%AC%E4%B8%89%E6%96%B9%E7%99%BB%E5%BD%95%E6%B5%81%E7%A8%8B.png)

为什么要后端要根据code + clientId + secret换成access\_token，再根据access\_token换用户个人信息？

为什么后端不直接code + clientId + secret换用户个人信息呢？

主要还是为了安全，防止中间人攻击

* 重定向的参数是带在url里的，是直接暴露在客户端的，如果直接返回access\_token就不安全，因此才多了code这一层，为了降低code被拦截泄漏后的风险，code的过期时间一般都很短，且是一次性的；
* 另外就是后端对于外部的请求都是不信任的，因此接收到的参数(code)首先还要配合凭证去验证其合法性，对于验证通过后获得的access\_token也有更多的操作空间，由后端持有，不会暴露出去

  像上图那种登录方案，后端只需要用户个人信息换完token就算完事了，所以看起来好像直接使用code + clientId + secret换用户个人信息就行，但是如果此时需要再获取用户的其他信息，就没有没办法再用code去换了，只能要求用户再次登录，此时如果有access\_token就显得多么重要了

## 压测

总结一下做过的压测，压测工具jmetter，利用jmette可以多线程并发请求和可以实时查看简易报告的能力

1. 先对被压测服务的接口针对不同场景编写压测用例，设定好TPS的起始和目标值，作为压测计划
2. 画压测机器部署关系图，部署压测环境
   * 对于被压测的服务，一般会mock掉与该服务相关关联的服务，比如该服务还连了数据库，该接口请求依赖一些独立部署的中间件，或者依赖其他服务，则会对这些相关的依赖用桩来代替，用于维持通信，以减少这些额外服务的影响。
   * 一般一台机器只部署一个服务，特别是被压测服务，此外还要注意被压测服务所在的机器上网络设置相关的参数，比如TCP最大连接数、回收策略之类的设置
3. 编写压测脚本，压测脚本越简单越好，尽量让压测工具不影响被压测服务，**脚本最重要的几个设置**： 发起请求时的并发线程数、响应的断言、TPS数，其他那些花里胡哨的输出树状图，饼图啊那些都不用配了，用最简单的报告输出即可
4. 部署完后，将脚本配置放到jmeter的机器上，启动压测

   ```
   nohup java -jar bin/ApacheJMeter.jar -n -t jmetter脚本路径/config.jmx > test.out &
   ```

   输出到当前目录下的test.out文件里，这里启动是使用默认参数启动，如果对jmetter的JVM设置有要求，也可以在启动时指定JVM参数，如

   ```
   nohup java -server -XX:+HeapDumpOnOutOfMemoryError -Xms512m -Xmx512m -XX:+UseG1GC -XX:MaxGCPauseMillis=250 -XX:G1ReservePercent=20 -Djava.security.egd=file:/dev/urandom -jar bin/ApacheJMeter.jar -n -t jmetter脚本路径/config.jmx > test.out &
   ```

   压测开启后可以打开test.out文件查看压测报告
5. 一般是按照TPS从小往大压，小的TPS压，在正常延时的情况下可以先判断程序是否有问题，比如内存泄漏，内存溢出，没问题了再逐步往大了压。如果先从大往小压，延时又上不去，此时判断不了是程序内部问题还是过大的TPS导致。压测时间一般最少压一天
6. 输出压测报告

一般有如下几个点要注意，这些点到时也要输出到压测报告上

| 监控点                  | 说明                                                                                                     |
| -------------------- | ------------------------------------------------------------------------------------------------------ |
| jmetter端的TPS、延时、错误率  | 观察TPS是否符合预期、延时是否达到预期且稳定、错误率要为0。**当程序正常时降低RT的手段**：减少不必要的日志输出、业务逻辑算法是否还有优化空间，是否有IO占用或者频繁序列化反序列化、内部队列是否阻塞 |
| 被压测服务的gc             | fgc，ygc不要太频繁，一般来说**fgc 一小时要小于3\~4次**；**ygc一分钟要小于3\~4次为佳**。                                             |
| jmetter端的CPU、内存使用率等  | 注意jmetter端的CPU是否过高或波动很大，避免影响压测结论                                                                       |
| 被压测服务端的CPU、磁盘、内存使用率等 | 如果cpu过高，如果连续达到90以上，基本上是内存泄漏导致了频繁的fgc；磁盘的占用情况，注意生成的日志是否把磁盘占满了                                           |

使用 `jstat -gcutil [pid] [时间间隔，每几秒打印] [打印次数]`查看GC情况

当被压测端的gc不正常时，应尽量保存事发环境

​ 1、收集内存使用基本情况统计：`jmap -heap [pid] > [文件名，如heap.log]`

​ 2、收集线程堆栈运行信息：`jstack [pid] > [文件名，如stack.log]`

​ 3、收集内存详细使用信息，生成dump内存快照：`jmap -dump:format=b,file=[文件名，如heap.dump] [pid]`

一般使用eclipse mat工具进行内存快照的分析，排查出内存泄漏的问题。

mat的使用参见：[Eclipse MAT内存分析工具](https://www.cnblogs.com/yueshutong/p/9824772.html)

**一般压测脚本的模板：**

```xml
<?xml version="1.0" encoding="UTF-8"?>
<jmeterTestPlan version="1.2" properties="3.2" jmeter="3.2 r1790748">
  <hashTree>
    <TestPlan guiclass="TestPlanGui" testclass="TestPlan" testname="测试计划" enabled="true">
        <!-- 一般写压测计划中的序号+名称 -->
      <stringProp name="TestPlan.comments"></stringProp>
      <boolProp name="TestPlan.functional_mode">false</boolProp>
      <boolProp name="TestPlan.serialize_threadgroups">false</boolProp>
      <elementProp name="TestPlan.user_defined_variables" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="用户定义的变量" enabled="true">
        <collectionProp name="Arguments.arguments"/>
      </elementProp>
      <stringProp name="TestPlan.user_define_classpath"></stringProp>
    </TestPlan>
    <hashTree>
      <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Thread Group" enabled="true">
        <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
        <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="循环控制器" enabled="true">
          <boolProp name="LoopController.continue_forever">false</boolProp>
          <intProp name="LoopController.loops">-1</intProp>
        </elementProp>
        <stringProp name="ThreadGroup.num_threads">500</stringProp>                                             <!-- 发起请求时的并发线程数，这里设置为500个并发线程，表示使用这么多的线程数来达到下面设置的TPS数 -->
        <stringProp name="ThreadGroup.ramp_time">8</stringProp>
        <longProp name="ThreadGroup.start_time">1509332694000</longProp>
        <longProp name="ThreadGroup.end_time">1509332694000</longProp>
        <boolProp name="ThreadGroup.scheduler">false</boolProp>
        <stringProp name="ThreadGroup.duration"></stringProp>
        <stringProp name="ThreadGroup.delay"></stringProp>
      </ThreadGroup>
      <hashTree>
        <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="click http request" enabled="true">
          <elementProp name="HTTPsampler.Arguments" elementType="Arguments" guiclass="HTTPArgumentsPanel" testclass="Arguments" testname="用户定义的变量" enabled="true">
            <collectionProp name="Arguments.arguments"/>
          </elementProp>
          <stringProp name="HTTPSampler.domain">192.168.1.123</stringProp>         <!-- 此处为被压测服务的host -->
          <stringProp name="HTTPSampler.port">12345</stringProp>                    <!-- 此处为被压测服务的port -->
          <stringProp name="HTTPSampler.protocol">http</stringProp>
          <stringProp name="HTTPSampler.contentEncoding"></stringProp>
          <stringProp name="HTTPSampler.path">${__StringFromFile(/home/urls.log,,,)}</stringProp>  <!-- 发起的http请求uri从文件读取，文件路径 -->
          <stringProp name="HTTPSampler.method">GET</stringProp>
          <boolProp name="HTTPSampler.follow_redirects">false</boolProp>
          <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
          <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
          <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
          <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
          <stringProp name="HTTPSampler.implementation">Java</stringProp>
          <stringProp name="HTTPSampler.connect_timeout"></stringProp>
          <stringProp name="HTTPSampler.response_timeout"></stringProp>
        </HTTPSamplerProxy>
        <hashTree/>
        <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
          <collectionProp name="Asserion.test_strings">
            <stringProp name="49586">200</stringProp>                                       <!-- http请求的响应断言，要求返回的http code为200才判定为成功 -->
          </collectionProp>
          <stringProp name="Assertion.test_field">Assertion.response_code</stringProp>
          <boolProp name="Assertion.assume_success">false</boolProp>
          <intProp name="Assertion.test_type">8</intProp>
        </ResponseAssertion>
        <hashTree/>
        <ConstantThroughputTimer guiclass="TestBeanGUI" testclass="ConstantThroughputTimer" testname="Constant Throughput Timer" enabled="true">
          <intProp name="calcMode">1</intProp>
          <doubleProp>
            <name>throughput</name>
            <value>30000.0</value>          <!-- 1分钟内发起的请求数，换算为tps为500 -->
            <savedValue>0.0</savedValue>
          </doubleProp>
        </ConstantThroughputTimer>
        <hashTree/>
      </hashTree>
    </hashTree>
    <WorkBench guiclass="WorkBenchGui" testclass="WorkBench" testname="工作台" enabled="true">
      <boolProp name="WorkBench.save">true</boolProp>
    </WorkBench>
    <hashTree/>
  </hashTree>
</jmeterTestPlan>
```

## 调优

参考：<https://tech.meituan.com/2016/12/02/performance-tunning.html>

## 布隆过滤器

本质上是基于hash的概率性数据结构，是一个很长的二进制数组，主要用于判断元素可能存在集合中，或者一定不在集合中。

### 原理

1. 有一个长度为m的bit数组，初始每个bit都是0，另外还有k个hash函数；
2. 当加入一个元素时，先调用k个hash函数得到k个结果，将这k个结果与bit数组长度取模得到k个数组下标，将这k个数组下标对应的值置为 1；
3. 查询元素时，同样经过上面步骤的计算，最终得到k个数组下标，判断这些下标对应的值是否为1，如果为1，说明元素可能存在，如果有一个不为1，说明元素一定不存在，返回结果；

### 误判率计算

> 涉及到3个重要的参数：
>
> 1. `m`表示bit数组的长度
> 2. `k`表示散列函数的个数
> 3. `n`表示插入的元素个数
>
> 布隆过滤器中，一个元素插入后，某个bit为0的概率是`(1 − 1/m)^k`
>
> n元素插入后，某个bit为0的概率是`(1 − 1/m)^(n*k)`
>
> false positive的概率是`(1−(1−1/m)^n*k)^k`
>
> 因为需要的是`k`个不同的bit被设置成1，概率是大约是`(1−e^(−k*n/m))^k`

### 实现

可以基于redis实现，但这里只给出go版本的实现，支持并发安全

```go
const (
	mod7       = 1<<3 - 1
	bitPerByte = 8
)

type Filter struct {
	lock       *sync.RWMutex
	concurrent bool
	// 长度之所以要取2的指数是因为要将取模操作优化成与操作， % 等于 &(2^n-1)
	m     uint64 // bit array of m bits, m will be ceiling to power of 2
	n     uint64 // number of inserted elements
	log2m uint64 // log_2 of m
	k     uint64 // the number of hash function
	keys  []byte // byte array to store hash value
}

func New(size uint64, k uint64, race bool) *Filter {
	log2 := uint64(math.Ceil(math.Log2(float64(size))))
	filter := &Filter{
		m:          1 << log2,
		log2m:      log2,
		k:          k,
		keys:       make([]byte, 1<<log2),
		concurrent: race,
	}
	if filter.concurrent {
		filter.lock = &sync.RWMutex{}
	}
	return filter
}

func (f *Filter) Add(data []byte) *Filter {
	if f.concurrent {
		f.lock.Lock()
		defer f.lock.Unlock()
	}
	h := baseHash(data)
	for i := uint64(0); i < f.k; i++ {
		loc := location(h, i)
		slot, mod := f.location(loc)
		f.keys[slot] |= 1 << mod
	}
	f.n++
	return f
}

// location returns the bit position in byte array
// & (f.m - 1) is the quick way for mod operation
func (f *Filter) location(h uint64) (uint64, uint64) {
	slot := (h / bitPerByte) & (f.m - 1)
	mod := h & mod7
	return slot, mod
}

// baseHash returns the murmur3 128-bit hash
func baseHash(data []byte) []uint64 {
	a1 := []byte{1} // to grab another bit of data
	hasher := murmur3.New128()
	hasher.Write(data) // #nosec
	v1, v2 := hasher.Sum128()
	hasher.Write(a1) // #nosec
	v3, v4 := hasher.Sum128()
	return []uint64{
		v1, v2, v3, v4,
	}
}
```

## 淘汰算法

### LRU

java实现，非线程安全

```java
// 注意每一次对节点有操作对需要同时操作 nodeMap和lruQueue
// LRU本质是利用 hashMap 和 双向链表 实现
public class LRUCache {

  private Map<String, Node> nodeMap;
  private DoubleLinkedList lruQueue;
  private int size = 0;

  public LRUCache() {
    this(3);
  }

  public LRUCache(int size) {
    this.size = size;
    this.nodeMap = new HashMap<>();
    this.lruQueue = new DoubleLinkedList();
  }


  public String get(String key) {
    Node n = nodeMap.get(key);
    if (n == null) {
      return null;
    }
    // 获取后直接放到到最前的位置
    put(n.key, n.value);
    return n.value;
  }

  public void put(String key, String value) {
    Node newNode = new Node(key, value);
    // 如果包含，则放到最前
    if (nodeMap.containsKey(key)) {
      lruQueue.remove(nodeMap.get(key));
      lruQueue.addFirst(newNode);
      // 记得更新map
      nodeMap.put(key, newNode);
    } else {
      // 如果满了，则移除最后一个
      if (size <= lruQueue.size()) {
        Node last = lruQueue.removeLast();
        nodeMap.remove(last.key);
      }
      nodeMap.put(key, newNode);
      lruQueue.addFirst(newNode);
    }
  }

  private class Node {
    private String key;
    private String value;
    private Node pre;
    private Node next;

    public Node(String key, String value) {
      this.key = key;
      this.value = value;
    }
  }

  // 封装双向链表方法，构建时要注意前后节点指向和空指针问题
  private class DoubleLinkedList {
    private Node head;
    private Node tail;
    private int count = 0;

    public void addFirst(Node n) {
      if (head == null) {
        tail = n;
      }
      count ++;
      n.next = head;
      if (head != null) {
        head.pre = n;
      }
      head = n;
    }

    public Node removeLast() {
      if (count == 0) {
        return null;
      }
      Node result = tail;
      if (tail.pre != null) {
        tail.pre.next = null;
      } else {
        head = null;
        tail = null;
      }
      count --;
      return result;
    }

    public void remove(Node n) {
      if (count == 0) {
        return ;
      }
      count --;
      if (n.pre != null) {
        n.pre.next = n.next;
      } else {
        head = n.next;
      }
      if (n.next != null) {
        n.next.pre = n.pre;
      } else {
        tail = n.pre;
      }
    }

    public int size() {
      return count;
    }
  }
}
```

golang实现，非并发安全，下面这种写法是平铺了双向链表，链表的长度通过map的长度计算得到

```go
type LRUCache struct {
	nodeMap  map[string]*Node
	head     *Node
	tail     *Node
	cap      int
}

type Node struct {
	key   string
	val   string
	next  *Node
	pre   *Node
}

func NewLRUCache(capacity int) LRUCache {
	return LRUCache{nodeMap: make(map[string]*Node), cap: capacity}
}

func (lru *LRUCache) Get(key string) string {
    if existNode, exist := lru.nodeMap[key]; exist {
        lru.remove(existNode)
        lru.addFirst(existNode)
        return existNode.val
    }
    return ""
}

func (lru *LRUCache) Put(key string, val string) {
    if existNode, exist := lru.nodeMap[key]; exist {
        existNode.val = val
        lru.remove(existNode)
        lru.addFirst(existNode)
        return
    } else {
        newNode := &node{key: key, val: val}
        lru.nodeMap[key] = newNode
        lru.addFirst(newNode)
    }
    if len(lru.nodeMap) > lru.cap {
        // 一定要先delete，如果先remove，会导致找不回tail的key进行delete，
        delete(lru.nodeMap, lru.tail.key)
        lru.remove(lru.tail)
    }
}

func (lru *LRUCache) addFirst(n *Node) {
	n.pre = nil
    n.next = lru.head
    if lru.head != nil {
        lru.head.pre = n
    }
    lru.head = n
    if lru.tail == nil {
        lru.tail = n
        lru.tail.next = nil
    }
}

func (lru *LRUCache) remove(n *Node) {
    // n的next和pre要置为null，防止内存泄漏
    if n == lru.head {
        lru.head = n.next
        if n.next != nil {
            n.next.pre = nil
        }
        n.next = nil
        return
    }
    if n == lru.tail {
        lru.tail = n.pre
        n.pre.next = nil
        n.pre = nil
        return
    }
    n.pre.next = n.next
    n.next.pre = n.pre
}
```

关于并发安全，最简单的实现就是在调用Get和Put方法时加读写锁，但是这种做法锁的粒度比较大，每次会锁住整个底层的双向链表和map，导致在高并发情况下吞吐量不高，优化的思路就是对map分片，通过分片上锁来减小锁的粒度，然后再双向链表节点的操作上进行优化。

```go
func New(capacity int) LRUCache {
	shards := make(map[string]*LRUCacheShard, 256)
	for i := 0; i < 256; i++ {
		shards[fmt.Sprintf("%02x", i)] = &LRUCacheShard{
			Cap:  capacity,
			Keys: make(map[int]*list.Element),
			List: list.New(),
		}
	}
	return LRUCache{
		shards: shards,
	}
}

func (c *LRUCache) Get(key int) int {
	shard := c.GetShard(key)
	shard.RLock()
	defer shard.RUnlock()
	……
}

func (c *LRUCache) Put(key int, value int) {
  	shard := c.GetShard(key)
	shard.Lock()
	defer shard.Unlock()
	……
}

func (c *LRUCache) GetShard(key int) (shard *LRUCacheShard) {
  hasher := sha1.New()
  hasher.Write([]byte(key))
  shardKey :=  fmt.Sprintf("%x", hasher.Sum(nil))[0:2]
  return c.shards[shardKey]
}
```

其中，在Get方法中，如果存在，还需要修改key所在节点的位置，直接调put即可，当然这种方式的粒度还是比较大，再次优化的思路是对map的操作还是得上锁，但对双向链表的操作无需上锁，双向链表移动节点和删除节点可以同时操作，可以通过两个channel实现，[参考](https://github.com/halfrost/LeetCode-Go/blob/master/template/CLRUCache.go)

### LFU

java实现，非线程安全

```java
/**
 * 频次最少使用
 * 设定容量，每次get key会修改使用次数和使用时间，当满容量时，移除次数最少的那个
 * 如果有多个key的使用次数一样，则移除使用时间最旧的那个
 */
public class LFUCache {

    Map<String, String> keyValMap;
    Map<String, Integer> key2FreqMap; // key和使用频率的映射
    // 用LinkHashSet来模拟使用时间，使用LinkHashSet还有一个目的是便于根据key进行删除
    Map<Integer, LinkedHashSet<String>> freq2KeysMap; // 使用频率和key的映射
    int minFreq;
    int cap;

    public LFUCache(int cap) {
        keyValMap = new HashMap<>();
        key2FreqMap = new HashMap<>();
        freq2KeysMap = new HashMap<>();
        this.cap = cap;
        this.minFreq = 0;
    }

    public String get(String key) {
        if (!keyValMap.containsKey(key)) {
            return "";
        }
        increaseFreq(key);
        return keyValMap.get(key);
    }

    public void put(String key, String val) {
        if (this.cap <= 0) {
            return;
        }
        if (keyValMap.containsKey(key)) {
            keyValMap.put(key, val);
            increaseFreq(key);
            return;
        }
        if (this.cap <= keyValMap.size()) {
            removeMinFreqKey();
        }
        keyValMap.put(key, val);
        key2FreqMap.put(key, 1);
        freq2KeysMap.putIfAbsent(1, new LinkedHashSet<>());
        freq2KeysMap.get(1).add(key);
        this.minFreq = 1;
    }

    void increaseFreq(String key) {
        int freq = key2FreqMap.get(key);
        key2FreqMap.put(key, freq+1);
        freq2KeysMap.get(freq).remove(key);
        freq2KeysMap.putIfAbsent(freq + 1, new LinkedHashSet<>());
        freq2KeysMap.get(freq + 1).add(key);
        if (freq2KeysMap.get(freq).isEmpty()) {
            freq2KeysMap.remove(freq);
            if (freq == this.minFreq) {
                this.minFreq++;
            }
        }
    }

    void removeMinFreqKey() {
        LinkedHashSet<String> keys = freq2KeysMap.get(this.minFreq);
        String delKey = keys.iterator().next();
        keys.remove(delKey);
        if (keys.isEmpty()) {
            freq2KeysMap.remove(this.minFreq);
            // 这里无需更新 minFreq 的值，因为该方法是在插入新key时使用，此时minFreq一定是1
        }
        keyValMap.remove(delKey);
        key2FreqMap.remove(delKey);
    }
}
```

golang实现，非并发安全，利用 优先级队列（最小堆） + map实现，使用的是go自带的heap数据结构，通过Item数组结构实现

```go
import "container/heap"

type LFUNode struct {
  	key   string
	val   string
	freq  int // 用于优先级，key的使用频率
	count int // 用于当freq相同时的比较淘汰，总的获取次数，类似时间戳的概念
	index int // 最小堆中元素的下标，用于重建最小堆.
}

type PriorityQueue []*LFUNode

func (pq PriorityQueue) Len() int { return len(pq) }

func (pq PriorityQueue) Less(i, j int) bool {
	// 优先根据使用频率排列，相等时才使用count，从小到大排序
	if pq[i].freq == pq[j].freq {
		return pq[i].count < pq[j].count
	}
	return pq[i].freq < pq[j].freq
}

func (pq PriorityQueue) Swap(i, j int) {
	pq[i], pq[j] = pq[j], pq[i]
	pq[i].index = i
	pq[j].index = j
}

func (pq *PriorityQueue) Push(x interface{}) {
	n := len(*pq)
	node := x.(*LFUNode)
	node.index = n
	*pq = append(*pq, node)
}

func (pq *PriorityQueue) Pop() interface{} {
	old := *pq
	n := len(old)
	node := old[n-1]
	old[n-1] = nil  // 防止内存泄露
	node.index = -1 // pop时重置下标保证安全
	*pq = old[0 : n-1]
	return node
}

// 更新最小堆里的元素
func (pq *PriorityQueue) update(node *LFUNode, value string, frequency int, count int) {
	node.val = value
	node.count = count
	node.freq = frequency
	heap.Fix(pq, node.index)
}

// ==========================================
type LFUCache struct {
	cap      int
	pq       PriorityQueue
	nodeMap  map[string]*LFUNode
	counter  int
}

func NewLFUCache(capacity int) LFUCache {
	return LFUCache{
		pq:       PriorityQueue{},
		nodeMap:  make(map[string]*LFUNode, capacity),
		cap:      capacity,
	}
}

func (lfu *LFUCache) Get(key string) string {
	if lfu.cap == 0 {
		return ""
	}
	if node, ok := lfu.nodeMap[key]; ok {
		lfu.counter++
		lfu.pq.update(node, node.val, node.freq+1, lfu.counter)
		return node.val
	}
	return ""
}

func (lfu *LFUCache) Put(key string, value string) {
	if lfu.cap == 0 {
		return
	}
	lfu.counter++
	// 如果存在，增加 frequency，再调整堆
	if node, ok := lfu.nodeMap[key]; ok {
		lfu.pq.update(node, value, node.freq + 1, lfu.counter)
		return
	}
	// 如果不存在且缓存满了，需要删除。在 hashmap 和 pq 中删除。
	if len(lfu.pq) == lfu.cap {
		node := heap.Pop(&lfu.pq).(*LFUNode)
		delete(lfu.nodeMap, node.key)
	}
	// 新建结点，在 hashmap 和 pq 中添加。
	node := &LFUNode{
		val:   value,
		key:   key,
		count: lfu.counter,
	}
	heap.Push(&lfu.pq, node)
	lfu.nodeMap[key] = node
}
```

另一个版本实现，利用go本身提供的双向链表，但是由于go没泛型，在类型推断方面导致整体性能不会很高

```go
type LFUCache2 struct {
	nodeMap       map[string]*list.Element
	freq2NodeList map[int]*list.List
	cap           int
	min           int
}

type node struct {
	key       string
	value     string
	freq      int
}

func NewLFUCache2(capacity int) LFUCache2 {
	return LFUCache2{nodeMap: make(map[string]*list.Element),
		freq2NodeList:    make(map[int]*list.List),
		cap:      capacity,
		min:      0,
	}
}

func (lfu *LFUCache2) Get(key string) string {
	value, ok := lfu.nodeMap[key]
	if !ok {
		return ""
	}
	currentNode := value.Value.(*node)
	lfu.freq2NodeList[currentNode.freq].Remove(value)
	currentNode.freq++
	if _, ok := lfu.freq2NodeList[currentNode.freq]; !ok {
		lfu.freq2NodeList[currentNode.freq] = list.New()
	}
	newList := lfu.freq2NodeList[currentNode.freq]
	newNode := newList.PushFront(currentNode)
	lfu.nodeMap[key] = newNode
	if currentNode.freq-1 == lfu.min && lfu.freq2NodeList[currentNode.freq-1].Len() == 0 {
		lfu.min++
	}
	return currentNode.value
}

func (lfu *LFUCache2) Put(key string, value string) {
	if lfu.cap == 0 {
		return
	}
	// 如果存在，更新访问次数
	if currentValue, ok := lfu.nodeMap[key]; ok {
		currentNode := currentValue.Value.(*node)
		currentNode.value = value
		lfu.Get(key)
		return
	}
	// 如果不存在且缓存满了，需要删除
	if lfu.cap == len(lfu.nodeMap) {
		currentList := lfu.freq2NodeList[lfu.min]
		backNode := currentList.Back()
		delete(lfu.nodeMap, backNode.Value.(*node).key)
		currentList.Remove(backNode)
	}
	// 新建结点，插入到 2 个 map 中
	lfu.min = 1
	currentNode := &node{
		key:       key,
		value:     value,
		freq:      1,
	}
	if _, ok := lfu.freq2NodeList[1]; !ok {
		lfu.freq2NodeList[1] = list.New()
	}
	newList := lfu.freq2NodeList[1]
	newNode := newList.PushFront(currentNode)
	lfu.nodeMap[key] = newNode
}
```

## 参考

[布隆过滤器原理及golang实现](https://liangyaopei.github.io/2020/07/29/bloom-filter/)

[LRU / LFU 的青铜与王者](https://halfrost.com/lru_lfu_interview/)
