其他

不好归类的算在这一part

[TOC]

Quartz

  • 分为三个部分:e

    • Job&Detial(任务):定时任务的执行方法,与Trigger配套的

    • Trigger(触发器):规定什么时候触发,与Job&Detail配套的

    • Scheduler(调度器):单例,把Trigger丢里面由调度器调度,只需要一个Scheduler,配置不同的Trigger;可以理解成类似线程池的东西

  • 原理:ScheduledThreadPoolExecutor线程池 + 通过Object类的wait()和notify()或者Condition类的await()\signal()进行等待和唤醒、锁保证线程安全 来进行调度

    Scheduler有两个调度线程:regular Scheduler Thread(执行常规调度)和Misfire Scheduler Thread(执行错失的任务),Regular Thread 轮询所有Trigger,如果有将要触发的Trigger(用wait和notifyAll实现),则从任务线程池中获取一个空闲线程,然后执行与改Trigger关联的job;Misfire Thraed则是扫描所有的trigger,查看是否有错失的,如果有的话,根据一定的策略进行处理

  • 默认是并发的,即如果当前任务没有完成,会自动开一个任务执行

  • 注意在分布式集群的情况下,多台机子有相同的定时任务,会出错,此时通过共享数据库的方式实现

    Quartz的解决方案:

    quartz集群分为水平集群和垂直集群,水平集群即将定时任务节点部署在不同的服务器,其最大的问题就是时钟同步问题,若时钟不能同步,则会导致集群中各个节点状态紊乱,造成不可预知的后果;垂直集群则是集群各节点部署在同一台服务器,时钟同步自然不是问题,但存在单点故障问题,服务器宕机会严重影响服务的可用性

    在各个节点会上报任务,存到数据库中,执行时会从数据库中取出触发器来执行,如果触发器的名称和执行时间相同,则只有一个节点去执行此任务。

    如果此节点执行失败,则此任务则会被分派到另一节点执行,中途也会自动检查失效的定时调度,发现不成功的,其他节点立马接过来继续完成定时任务。Quartz有11个定时任务调度表

参考

Quartz原理解密

深入解读Quartz的原理

Quartz 2.2 的实现原理和运行过程

其他定时器

  • Timer:这是java自带的java.util.Timer类,这个类允许你调度一个java.util.TimerTask任务。使用这种方式可以让你的程序按照某一个频度执行,但不能在指定时间运行。一般用的较少。单线程,任务一多会阻塞;一个任务出异常其他任务都受影响;受系统时间影响

  • ScheduledExecutorService:也jdk自带的一个类;是基于线程池设计的定时任务类,每个调度任务都会分配到线程池中的一个线程去执行,也就是说,任务是并发执行,互不影响。线程池+延时队列DelayedQueue(数组、最小堆, 最近要执行的任务放在堆顶) 实现,如果堆顶任务时间未到就阻塞(通过自旋+condition.await\signal实现)。不受系统时间影响

  • Spring 中的 @Schedule 注解

参考:Java 定时任务实现原理详解

Java优先级队列DelayedWorkQueue原理分析

CORS

浏览器的同源政策的同源指的是:协议相同、域名相同、端口相同,如果非同源,有三种行为会受到限制:Cookie、LocalStorage和IndexDB无法读取;DOM无法获得、AJAX请求不能发送。

前后端分离的场景下,由于浏览器的同源策略,导致浏览器内的请求不同的源的后端是会失败,常见的解决跨域方法是使用CORS,现在常见的web框架都支持CORS,开启即可。

解决跨域的方法除了CORS,还有jsonp,不过已经很少使用了,jsonp本质是利用浏览器允许加载不同源的js文件即标签等,将跨域请求标签里,返回一段可执行的js代码,其中包含了请求结果,通常是json格式,前端通过返回的js代码执行回调获取结果。

详情见 跨域资源共享 CORS 详解

对于跨域产生的问题,如CSRF跨域请求攻击的解决方案,可参考:美团:如何防止csrf

  • 首先HTTP是无状态的,因此需要通过session、cookie来达到记录用户状态的目的。

  • 传统的session、cookie:session存用户信息,保存在服务端中,cookie里存session对应的sessionId,保存在客户端中,用于找到对应的session,每次请求都会带上该cookie来表示此用户。

    如果客户端禁用掉cookie,可以使用LocalStorage存储,每次请求都以参数的形式传递;

  • cookie可设置长时间保持,session有效时间通常比较短,安全性上来将,cookie存储在客户端,容易被窃取,session存储在服务段,相对安全;

  • 由于现在实例的部署不可能只部署一个,一般都是集群部署,因此session不可以只存在一个实例的内存中,因此引入Redis来存用户的登录信息

  • 现在一般使用 token + Redis来实现 cookie - session 机制,本质上差不多,前端的cookie更多的是存token的信息而已,token也可以存在LocalStorage或sessionStorage中,发送请求时一般是把token的值放在请求头中,而不会把cookie发给后端,这样可以避免当用户禁用cookie导致功能不可用,还有CSRF问题。

JWT

JWT = JSON WEB TOKEN

原理

JWT实际上是一个token(令牌),分为三部分:Header(头部)、Payload(负载)、Signature(签名)。

Header(头部) :两部分组成,记录令牌类型和JWT的签名算法,一般是HMACSHA256。

Payload(负载): 记录用户登录信息(官方规范默认是不加密的,分为官方字段和私有字段)。

Signature(签名) :记录将 Header、Payload和服务端的密钥组合起来,使用Header(头部)里规定的方式加密。

比如header里保存的加密方式是HMACSHA256,签名 Signature = HMACSHA256(base64URL(header) + "." + base64URL(payload) + "." + 保存在后端的密钥)

最后的JWT = base64URL(Header) + "." + base64URL(Payload) + "." + Signature,后端收到该JWT后验证该签名是否正确,来判断JWT里的用户信息是否可靠。

base64:64指的是A-Z,a-z,0-9,+,/,将待转换的字符串转成二进制流,每3个8位转成4个6位,6位的二进制数转成十进制,根据码表找到对应的字符,以=号做后缀,凑齐位数

一般是为了解决一些字符编码的问题,将非ASCII字符转化为ASCII字符,还有就是可以对数据做简单加密,base64URL在base64的基础上增加对一些符号的编解码,比如把"-"替换成"+",使得它可以出现在url中。

HMACSHA256:摘要算法,一般用于验证签名是否一致

使用

可以存储在浏览器的本地缓存localStorage或者cookie中,发送请求的时候放在cookie里,或者放在请求头中

  • JWT的目的是让服务器不保存任何session数据,让后端变成无状态的,因此没办法主动废弃某个token,一旦签发了JWT,在到期之前就会始终有效,如果想要实现这种功能,必然需要在后端保存JWT,就违背了JWT的设计初衷了。

  • 要让JWT实现 续签 和 主动过期功能,必定需要在后端保存JWT

    • jwt主动过期问题,使用黑名单即可;分成两点,客户端要求失效,服务端记录token到黑名单;用户重置密码,服务端记录uid-time键值对,在此之前的token全部失效;客户端把保存的jwt删掉是没用的,此时的jwt依然有效,只是客户端没记录而已

    • jwt续签问题,一种解决方式是jwt中存储过期时间,服务端设置刷新时间,请求时判断是否在过期时间或刷新时间,在刷新时间内进行token刷新,失效token记入黑名单;

    • 而黑名单过大问题,可以采用记录UID-刷新时间方式解决,判断jwt签发时间,jwt签发时间小于UID-刷新时间的记为失效

  • 个人认为JWT的生成方式本身是有一套规范的,在实际使用过程中也可以对他进行改动,本质上还是一个签名校验而已,一般会对JWT进行魔改,比如使用Header(头部)里的加密方式加密Signature(签名),Signature(签名)加密Header(头部) 和Payload(负载) 这两部分,服务器里的私钥解密Payload(负载),得到需要的登录信息,不通过简单的base64URL编码,不对外暴露,签名算法或者签名里的密钥的方式可以改成其他等。

JWT参考:JWT 超详细分析

CAS模型 - SSO(单点登录)

可参考:CAS实现单点登录SSO执行原理探究,讲得算是比较明白,这里是总结基于CAS模式改的单点登录模式

  • 第一次访问时,由于没有访问的token,会引导至登录

  • 再次访问Web-1时,由于前端已存了token,直接使用token进行请求即可

  • 已登录Web-1时去访问Web-2,会通过后端认证中心实现单点登录

这里在总结一下关于GrantTicket和ServiceTicket,跟CAS模型中提到的TGT、ST、PGT这些东西是类似的,本质是作为验证的票据,图中的GrantTicket、ServiceTicket、token含义如下

GrantTicket:全局会话票据,保存在登录页,通过GrantTicket才能换取ServiceTicket;

ServiceTicket表示访问资源的一次性票据,根据ServiceTicket换取token,换取后失效;

token:登录凭证

GT、ST和token都是保存在Redis中的,他们在Redis中的存储结构如下

key:TOKEN_${Token的值}
value:
{
    "createTime": 1565961654807,
    "accountId": "123",
    // 用户其他信息
    "grantTicket": ${GrantTicket的值}  // token关联GT,用于注销时实现全局注销
}

key:GRANT_TICKET_${GrantTicket的值}
value:
{
    "createTime": 1565961654807,
    "accountId": "123",
}

key:SERVICE_TICKET_${ServiceTicket的值}
value:
{
    "createTime": 1565961654807,
    "grantTicket": ${GrantTicket的值} // ST关联GT,用于判断该ST是否有效,换取token后删除
}

// token与grantTicket的记录,注销时,根据token中关联的GT,找到所有与之关联的token,进行删除,这里推荐使用Redis的scan命令进行分段查询,原因是Redis是单线程的,如果数据量太大使用keys命令遍历太久,阻塞Redis接收其他命令
key:{grantTicket}-{token}
value:无

基于OAuth2.0的第三方登录

可参考:理解OAuth 2.0,这样基本就入门了,这里是总结项目中如何接入,一般在集成facebook和google的第三方登录也是类似的流程机制,这里只用到了access_token,对于refresh_token,是用来延长access_token的过期时间的,减少短时间内的重复登录,这里就没有涉及到了

为什么要后端要根据code + clientId + secret换成access_token,再根据access_token换用户个人信息?

为什么后端不直接code + clientId + secret换用户个人信息呢?

主要还是为了安全,防止中间人攻击

  • 重定向的参数是带在url里的,是直接暴露在客户端的,如果直接返回access_token就不安全,因此才多了code这一层,为了降低code被拦截泄漏后的风险,code的过期时间一般都很短,且是一次性的;

  • 另外就是后端对于外部的请求都是不信任的,因此接收到的参数(code)首先还要配合凭证去验证其合法性,对于验证通过后获得的access_token也有更多的操作空间,由后端持有,不会暴露出去

    像上图那种登录方案,后端只需要用户个人信息换完token就算完事了,所以看起来好像直接使用code + clientId + secret换用户个人信息就行,但是如果此时需要再获取用户的其他信息,就没有没办法再用code去换了,只能要求用户再次登录,此时如果有access_token就显得多么重要了

压测

总结一下做过的压测,压测工具jmetter,利用jmette可以多线程并发请求和可以实时查看简易报告的能力

  1. 先对被压测服务的接口针对不同场景编写压测用例,设定好TPS的起始和目标值,作为压测计划

  2. 画压测机器部署关系图,部署压测环境

    • 对于被压测的服务,一般会mock掉与该服务相关关联的服务,比如该服务还连了数据库,该接口请求依赖一些独立部署的中间件,或者依赖其他服务,则会对这些相关的依赖用桩来代替,用于维持通信,以减少这些额外服务的影响。

    • 一般一台机器只部署一个服务,特别是被压测服务,此外还要注意被压测服务所在的机器上网络设置相关的参数,比如TCP最大连接数、回收策略之类的设置

  3. 编写压测脚本,压测脚本越简单越好,尽量让压测工具不影响被压测服务,脚本最重要的几个设置: 发起请求时的并发线程数、响应的断言、TPS数,其他那些花里胡哨的输出树状图,饼图啊那些都不用配了,用最简单的报告输出即可

  4. 部署完后,将脚本配置放到jmeter的机器上,启动压测

    nohup java -jar bin/ApacheJMeter.jar -n -t jmetter脚本路径/config.jmx > test.out &

    输出到当前目录下的test.out文件里,这里启动是使用默认参数启动,如果对jmetter的JVM设置有要求,也可以在启动时指定JVM参数,如

    nohup java -server -XX:+HeapDumpOnOutOfMemoryError -Xms512m -Xmx512m -XX:+UseG1GC -XX:MaxGCPauseMillis=250 -XX:G1ReservePercent=20 -Djava.security.egd=file:/dev/urandom -jar bin/ApacheJMeter.jar -n -t jmetter脚本路径/config.jmx > test.out &

    压测开启后可以打开test.out文件查看压测报告

  5. 一般是按照TPS从小往大压,小的TPS压,在正常延时的情况下可以先判断程序是否有问题,比如内存泄漏,内存溢出,没问题了再逐步往大了压。如果先从大往小压,延时又上不去,此时判断不了是程序内部问题还是过大的TPS导致。压测时间一般最少压一天

  6. 输出压测报告

一般有如下几个点要注意,这些点到时也要输出到压测报告上

监控点
说明

jmetter端的TPS、延时、错误率

观察TPS是否符合预期、延时是否达到预期且稳定、错误率要为0。当程序正常时降低RT的手段:减少不必要的日志输出、业务逻辑算法是否还有优化空间,是否有IO占用或者频繁序列化反序列化、内部队列是否阻塞

被压测服务的gc

fgc,ygc不要太频繁,一般来说fgc 一小时要小于3~4次ygc一分钟要小于3~4次为佳

jmetter端的CPU、内存使用率等

注意jmetter端的CPU是否过高或波动很大,避免影响压测结论

被压测服务端的CPU、磁盘、内存使用率等

如果cpu过高,如果连续达到90以上,基本上是内存泄漏导致了频繁的fgc;磁盘的占用情况,注意生成的日志是否把磁盘占满了

使用 jstat -gcutil [pid] [时间间隔,每几秒打印] [打印次数]查看GC情况

当被压测端的gc不正常时,应尽量保存事发环境

​ 1、收集内存使用基本情况统计:jmap -heap [pid] > [文件名,如heap.log]

​ 2、收集线程堆栈运行信息:jstack [pid] > [文件名,如stack.log]

​ 3、收集内存详细使用信息,生成dump内存快照:jmap -dump:format=b,file=[文件名,如heap.dump] [pid]

一般使用eclipse mat工具进行内存快照的分析,排查出内存泄漏的问题。

mat的使用参见:Eclipse MAT内存分析工具

一般压测脚本的模板:

<?xml version="1.0" encoding="UTF-8"?>
<jmeterTestPlan version="1.2" properties="3.2" jmeter="3.2 r1790748">
  <hashTree>
    <TestPlan guiclass="TestPlanGui" testclass="TestPlan" testname="测试计划" enabled="true">
        <!-- 一般写压测计划中的序号+名称 -->
      <stringProp name="TestPlan.comments"></stringProp>
      <boolProp name="TestPlan.functional_mode">false</boolProp>
      <boolProp name="TestPlan.serialize_threadgroups">false</boolProp>
      <elementProp name="TestPlan.user_defined_variables" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="用户定义的变量" enabled="true">
        <collectionProp name="Arguments.arguments"/>
      </elementProp>
      <stringProp name="TestPlan.user_define_classpath"></stringProp>
    </TestPlan>
    <hashTree>
      <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Thread Group" enabled="true">
        <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
        <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="循环控制器" enabled="true">
          <boolProp name="LoopController.continue_forever">false</boolProp>
          <intProp name="LoopController.loops">-1</intProp>
        </elementProp>
        <stringProp name="ThreadGroup.num_threads">500</stringProp>                                             <!-- 发起请求时的并发线程数,这里设置为500个并发线程,表示使用这么多的线程数来达到下面设置的TPS数 -->
        <stringProp name="ThreadGroup.ramp_time">8</stringProp>
        <longProp name="ThreadGroup.start_time">1509332694000</longProp>
        <longProp name="ThreadGroup.end_time">1509332694000</longProp>
        <boolProp name="ThreadGroup.scheduler">false</boolProp>
        <stringProp name="ThreadGroup.duration"></stringProp>
        <stringProp name="ThreadGroup.delay"></stringProp>
      </ThreadGroup>
      <hashTree>
        <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="click http request" enabled="true">
          <elementProp name="HTTPsampler.Arguments" elementType="Arguments" guiclass="HTTPArgumentsPanel" testclass="Arguments" testname="用户定义的变量" enabled="true">
            <collectionProp name="Arguments.arguments"/>
          </elementProp>
          <stringProp name="HTTPSampler.domain">192.168.1.123</stringProp>         <!-- 此处为被压测服务的host -->
          <stringProp name="HTTPSampler.port">12345</stringProp>                    <!-- 此处为被压测服务的port -->
          <stringProp name="HTTPSampler.protocol">http</stringProp>
          <stringProp name="HTTPSampler.contentEncoding"></stringProp>
          <stringProp name="HTTPSampler.path">${__StringFromFile(/home/urls.log,,,)}</stringProp>  <!-- 发起的http请求uri从文件读取,文件路径 -->
          <stringProp name="HTTPSampler.method">GET</stringProp>
          <boolProp name="HTTPSampler.follow_redirects">false</boolProp>
          <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
          <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
          <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
          <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
          <stringProp name="HTTPSampler.implementation">Java</stringProp>
          <stringProp name="HTTPSampler.connect_timeout"></stringProp>
          <stringProp name="HTTPSampler.response_timeout"></stringProp>
        </HTTPSamplerProxy>
        <hashTree/>
        <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
          <collectionProp name="Asserion.test_strings">
            <stringProp name="49586">200</stringProp>                                       <!-- http请求的响应断言,要求返回的http code为200才判定为成功 -->
          </collectionProp>
          <stringProp name="Assertion.test_field">Assertion.response_code</stringProp>
          <boolProp name="Assertion.assume_success">false</boolProp>
          <intProp name="Assertion.test_type">8</intProp>
        </ResponseAssertion>
        <hashTree/>
        <ConstantThroughputTimer guiclass="TestBeanGUI" testclass="ConstantThroughputTimer" testname="Constant Throughput Timer" enabled="true">
          <intProp name="calcMode">1</intProp>
          <doubleProp>
            <name>throughput</name>
            <value>30000.0</value>          <!-- 1分钟内发起的请求数,换算为tps为500 -->
            <savedValue>0.0</savedValue>
          </doubleProp>
        </ConstantThroughputTimer>
        <hashTree/>
      </hashTree>
    </hashTree>
    <WorkBench guiclass="WorkBenchGui" testclass="WorkBench" testname="工作台" enabled="true">
      <boolProp name="WorkBench.save">true</boolProp>
    </WorkBench>
    <hashTree/>
  </hashTree>
</jmeterTestPlan>

调优

参考:https://tech.meituan.com/2016/12/02/performance-tunning.html

布隆过滤器

本质上是基于hash的概率性数据结构,是一个很长的二进制数组,主要用于判断元素可能存在集合中,或者一定不在集合中。

原理

  1. 有一个长度为m的bit数组,初始每个bit都是0,另外还有k个hash函数;

  2. 当加入一个元素时,先调用k个hash函数得到k个结果,将这k个结果与bit数组长度取模得到k个数组下标,将这k个数组下标对应的值置为 1;

  3. 查询元素时,同样经过上面步骤的计算,最终得到k个数组下标,判断这些下标对应的值是否为1,如果为1,说明元素可能存在,如果有一个不为1,说明元素一定不存在,返回结果;

误判率计算

涉及到3个重要的参数:

  1. m表示bit数组的长度

  2. k表示散列函数的个数

  3. n表示插入的元素个数

布隆过滤器中,一个元素插入后,某个bit为0的概率是(1 − 1/m)^k

n元素插入后,某个bit为0的概率是(1 − 1/m)^(n*k)

false positive的概率是(1−(1−1/m)^n*k)^k

因为需要的是k个不同的bit被设置成1,概率是大约是(1−e^(−k*n/m))^k

实现

可以基于redis实现,但这里只给出go版本的实现,支持并发安全

const (
	mod7       = 1<<3 - 1
	bitPerByte = 8
)

type Filter struct {
	lock       *sync.RWMutex
	concurrent bool
	// 长度之所以要取2的指数是因为要将取模操作优化成与操作, % 等于 &(2^n-1)
	m     uint64 // bit array of m bits, m will be ceiling to power of 2
	n     uint64 // number of inserted elements
	log2m uint64 // log_2 of m
	k     uint64 // the number of hash function
	keys  []byte // byte array to store hash value
}

func New(size uint64, k uint64, race bool) *Filter {
	log2 := uint64(math.Ceil(math.Log2(float64(size))))
	filter := &Filter{
		m:          1 << log2,
		log2m:      log2,
		k:          k,
		keys:       make([]byte, 1<<log2),
		concurrent: race,
	}
	if filter.concurrent {
		filter.lock = &sync.RWMutex{}
	}
	return filter
}

func (f *Filter) Add(data []byte) *Filter {
	if f.concurrent {
		f.lock.Lock()
		defer f.lock.Unlock()
	}
	h := baseHash(data)
	for i := uint64(0); i < f.k; i++ {
		loc := location(h, i)
		slot, mod := f.location(loc)
		f.keys[slot] |= 1 << mod
	}
	f.n++
	return f
}

// location returns the bit position in byte array
// & (f.m - 1) is the quick way for mod operation
func (f *Filter) location(h uint64) (uint64, uint64) {
	slot := (h / bitPerByte) & (f.m - 1)
	mod := h & mod7
	return slot, mod
}

// baseHash returns the murmur3 128-bit hash
func baseHash(data []byte) []uint64 {
	a1 := []byte{1} // to grab another bit of data
	hasher := murmur3.New128()
	hasher.Write(data) // #nosec
	v1, v2 := hasher.Sum128()
	hasher.Write(a1) // #nosec
	v3, v4 := hasher.Sum128()
	return []uint64{
		v1, v2, v3, v4,
	}
}

淘汰算法

LRU

java实现,非线程安全

// 注意每一次对节点有操作对需要同时操作 nodeMap和lruQueue
// LRU本质是利用 hashMap 和 双向链表 实现
public class LRUCache {

  private Map<String, Node> nodeMap;
  private DoubleLinkedList lruQueue;
  private int size = 0;

  public LRUCache() {
    this(3);
  }

  public LRUCache(int size) {
    this.size = size;
    this.nodeMap = new HashMap<>();
    this.lruQueue = new DoubleLinkedList();
  }


  public String get(String key) {
    Node n = nodeMap.get(key);
    if (n == null) {
      return null;
    }
    // 获取后直接放到到最前的位置
    put(n.key, n.value);
    return n.value;
  }

  public void put(String key, String value) {
    Node newNode = new Node(key, value);
    // 如果包含,则放到最前
    if (nodeMap.containsKey(key)) {
      lruQueue.remove(nodeMap.get(key));
      lruQueue.addFirst(newNode);
      // 记得更新map
      nodeMap.put(key, newNode);
    } else {
      // 如果满了,则移除最后一个
      if (size <= lruQueue.size()) {
        Node last = lruQueue.removeLast();
        nodeMap.remove(last.key);
      }
      nodeMap.put(key, newNode);
      lruQueue.addFirst(newNode);
    }
  }

  private class Node {
    private String key;
    private String value;
    private Node pre;
    private Node next;

    public Node(String key, String value) {
      this.key = key;
      this.value = value;
    }
  }

  // 封装双向链表方法,构建时要注意前后节点指向和空指针问题
  private class DoubleLinkedList {
    private Node head;
    private Node tail;
    private int count = 0;

    public void addFirst(Node n) {
      if (head == null) {
        tail = n;
      }
      count ++;
      n.next = head;
      if (head != null) {
        head.pre = n;
      }
      head = n;
    }

    public Node removeLast() {
      if (count == 0) {
        return null;
      }
      Node result = tail;
      if (tail.pre != null) {
        tail.pre.next = null;
      } else {
        head = null;
        tail = null;
      }
      count --;
      return result;
    }

    public void remove(Node n) {
      if (count == 0) {
        return ;
      }
      count --;
      if (n.pre != null) {
        n.pre.next = n.next;
      } else {
        head = n.next;
      }
      if (n.next != null) {
        n.next.pre = n.pre;
      } else {
        tail = n.pre;
      }
    }

    public int size() {
      return count;
    }
  }
}

golang实现,非并发安全,下面这种写法是平铺了双向链表,链表的长度通过map的长度计算得到

type LRUCache struct {
	nodeMap  map[string]*Node
	head     *Node
	tail     *Node
	cap      int
}

type Node struct {
	key   string
	val   string
	next  *Node
	pre   *Node
}

func NewLRUCache(capacity int) LRUCache {
	return LRUCache{nodeMap: make(map[string]*Node), cap: capacity}
}

func (lru *LRUCache) Get(key string) string {
    if existNode, exist := lru.nodeMap[key]; exist {
        lru.remove(existNode)
        lru.addFirst(existNode)
        return existNode.val
    }
    return ""
}

func (lru *LRUCache) Put(key string, val string) {
    if existNode, exist := lru.nodeMap[key]; exist {
        existNode.val = val
        lru.remove(existNode)
        lru.addFirst(existNode)
        return
    } else {
        newNode := &node{key: key, val: val}
        lru.nodeMap[key] = newNode
        lru.addFirst(newNode)
    }
    if len(lru.nodeMap) > lru.cap {
        // 一定要先delete,如果先remove,会导致找不回tail的key进行delete,
        delete(lru.nodeMap, lru.tail.key)
        lru.remove(lru.tail)
    }
}

func (lru *LRUCache) addFirst(n *Node) {
	n.pre = nil
    n.next = lru.head
    if lru.head != nil {
        lru.head.pre = n
    }
    lru.head = n
    if lru.tail == nil {
        lru.tail = n
        lru.tail.next = nil
    }
}

func (lru *LRUCache) remove(n *Node) {
    // n的next和pre要置为null,防止内存泄漏
    if n == lru.head {
        lru.head = n.next
        if n.next != nil {
            n.next.pre = nil
        }
        n.next = nil
        return
    }
    if n == lru.tail {
        lru.tail = n.pre
        n.pre.next = nil
        n.pre = nil
        return
    }
    n.pre.next = n.next
    n.next.pre = n.pre
}

关于并发安全,最简单的实现就是在调用Get和Put方法时加读写锁,但是这种做法锁的粒度比较大,每次会锁住整个底层的双向链表和map,导致在高并发情况下吞吐量不高,优化的思路就是对map分片,通过分片上锁来减小锁的粒度,然后再双向链表节点的操作上进行优化。

func New(capacity int) LRUCache {
	shards := make(map[string]*LRUCacheShard, 256)
	for i := 0; i < 256; i++ {
		shards[fmt.Sprintf("%02x", i)] = &LRUCacheShard{
			Cap:  capacity,
			Keys: make(map[int]*list.Element),
			List: list.New(),
		}
	}
	return LRUCache{
		shards: shards,
	}
}

func (c *LRUCache) Get(key int) int {
	shard := c.GetShard(key)
	shard.RLock()
	defer shard.RUnlock()
	……
}

func (c *LRUCache) Put(key int, value int) {
  	shard := c.GetShard(key)
	shard.Lock()
	defer shard.Unlock()
	……
}

func (c *LRUCache) GetShard(key int) (shard *LRUCacheShard) {
  hasher := sha1.New()
  hasher.Write([]byte(key))
  shardKey :=  fmt.Sprintf("%x", hasher.Sum(nil))[0:2]
  return c.shards[shardKey]
}

其中,在Get方法中,如果存在,还需要修改key所在节点的位置,直接调put即可,当然这种方式的粒度还是比较大,再次优化的思路是对map的操作还是得上锁,但对双向链表的操作无需上锁,双向链表移动节点和删除节点可以同时操作,可以通过两个channel实现,参考

LFU

java实现,非线程安全

/**
 * 频次最少使用
 * 设定容量,每次get key会修改使用次数和使用时间,当满容量时,移除次数最少的那个
 * 如果有多个key的使用次数一样,则移除使用时间最旧的那个
 */
public class LFUCache {

    Map<String, String> keyValMap;
    Map<String, Integer> key2FreqMap; // key和使用频率的映射
    // 用LinkHashSet来模拟使用时间,使用LinkHashSet还有一个目的是便于根据key进行删除
    Map<Integer, LinkedHashSet<String>> freq2KeysMap; // 使用频率和key的映射
    int minFreq;
    int cap;

    public LFUCache(int cap) {
        keyValMap = new HashMap<>();
        key2FreqMap = new HashMap<>();
        freq2KeysMap = new HashMap<>();
        this.cap = cap;
        this.minFreq = 0;
    }

    public String get(String key) {
        if (!keyValMap.containsKey(key)) {
            return "";
        }
        increaseFreq(key);
        return keyValMap.get(key);
    }

    public void put(String key, String val) {
        if (this.cap <= 0) {
            return;
        }
        if (keyValMap.containsKey(key)) {
            keyValMap.put(key, val);
            increaseFreq(key);
            return;
        }
        if (this.cap <= keyValMap.size()) {
            removeMinFreqKey();
        }
        keyValMap.put(key, val);
        key2FreqMap.put(key, 1);
        freq2KeysMap.putIfAbsent(1, new LinkedHashSet<>());
        freq2KeysMap.get(1).add(key);
        this.minFreq = 1;
    }

    void increaseFreq(String key) {
        int freq = key2FreqMap.get(key);
        key2FreqMap.put(key, freq+1);
        freq2KeysMap.get(freq).remove(key);
        freq2KeysMap.putIfAbsent(freq + 1, new LinkedHashSet<>());
        freq2KeysMap.get(freq + 1).add(key);
        if (freq2KeysMap.get(freq).isEmpty()) {
            freq2KeysMap.remove(freq);
            if (freq == this.minFreq) {
                this.minFreq++;
            }
        }
    }

    void removeMinFreqKey() {
        LinkedHashSet<String> keys = freq2KeysMap.get(this.minFreq);
        String delKey = keys.iterator().next();
        keys.remove(delKey);
        if (keys.isEmpty()) {
            freq2KeysMap.remove(this.minFreq);
            // 这里无需更新 minFreq 的值,因为该方法是在插入新key时使用,此时minFreq一定是1
        }
        keyValMap.remove(delKey);
        key2FreqMap.remove(delKey);
    }
}

golang实现,非并发安全,利用 优先级队列(最小堆) + map实现,使用的是go自带的heap数据结构,通过Item数组结构实现

import "container/heap"

type LFUNode struct {
  	key   string
	val   string
	freq  int // 用于优先级,key的使用频率
	count int // 用于当freq相同时的比较淘汰,总的获取次数,类似时间戳的概念
	index int // 最小堆中元素的下标,用于重建最小堆.
}

type PriorityQueue []*LFUNode

func (pq PriorityQueue) Len() int { return len(pq) }

func (pq PriorityQueue) Less(i, j int) bool {
	// 优先根据使用频率排列,相等时才使用count,从小到大排序
	if pq[i].freq == pq[j].freq {
		return pq[i].count < pq[j].count
	}
	return pq[i].freq < pq[j].freq
}

func (pq PriorityQueue) Swap(i, j int) {
	pq[i], pq[j] = pq[j], pq[i]
	pq[i].index = i
	pq[j].index = j
}

func (pq *PriorityQueue) Push(x interface{}) {
	n := len(*pq)
	node := x.(*LFUNode)
	node.index = n
	*pq = append(*pq, node)
}

func (pq *PriorityQueue) Pop() interface{} {
	old := *pq
	n := len(old)
	node := old[n-1]
	old[n-1] = nil  // 防止内存泄露
	node.index = -1 // pop时重置下标保证安全
	*pq = old[0 : n-1]
	return node
}

// 更新最小堆里的元素
func (pq *PriorityQueue) update(node *LFUNode, value string, frequency int, count int) {
	node.val = value
	node.count = count
	node.freq = frequency
	heap.Fix(pq, node.index)
}

// ==========================================
type LFUCache struct {
	cap      int
	pq       PriorityQueue
	nodeMap  map[string]*LFUNode
	counter  int
}

func NewLFUCache(capacity int) LFUCache {
	return LFUCache{
		pq:       PriorityQueue{},
		nodeMap:  make(map[string]*LFUNode, capacity),
		cap:      capacity,
	}
}

func (lfu *LFUCache) Get(key string) string {
	if lfu.cap == 0 {
		return ""
	}
	if node, ok := lfu.nodeMap[key]; ok {
		lfu.counter++
		lfu.pq.update(node, node.val, node.freq+1, lfu.counter)
		return node.val
	}
	return ""
}

func (lfu *LFUCache) Put(key string, value string) {
	if lfu.cap == 0 {
		return
	}
	lfu.counter++
	// 如果存在,增加 frequency,再调整堆
	if node, ok := lfu.nodeMap[key]; ok {
		lfu.pq.update(node, value, node.freq + 1, lfu.counter)
		return
	}
	// 如果不存在且缓存满了,需要删除。在 hashmap 和 pq 中删除。
	if len(lfu.pq) == lfu.cap {
		node := heap.Pop(&lfu.pq).(*LFUNode)
		delete(lfu.nodeMap, node.key)
	}
	// 新建结点,在 hashmap 和 pq 中添加。
	node := &LFUNode{
		val:   value,
		key:   key,
		count: lfu.counter,
	}
	heap.Push(&lfu.pq, node)
	lfu.nodeMap[key] = node
}

另一个版本实现,利用go本身提供的双向链表,但是由于go没泛型,在类型推断方面导致整体性能不会很高

type LFUCache2 struct {
	nodeMap       map[string]*list.Element
	freq2NodeList map[int]*list.List
	cap           int
	min           int
}

type node struct {
	key       string
	value     string
	freq      int
}

func NewLFUCache2(capacity int) LFUCache2 {
	return LFUCache2{nodeMap: make(map[string]*list.Element),
		freq2NodeList:    make(map[int]*list.List),
		cap:      capacity,
		min:      0,
	}
}

func (lfu *LFUCache2) Get(key string) string {
	value, ok := lfu.nodeMap[key]
	if !ok {
		return ""
	}
	currentNode := value.Value.(*node)
	lfu.freq2NodeList[currentNode.freq].Remove(value)
	currentNode.freq++
	if _, ok := lfu.freq2NodeList[currentNode.freq]; !ok {
		lfu.freq2NodeList[currentNode.freq] = list.New()
	}
	newList := lfu.freq2NodeList[currentNode.freq]
	newNode := newList.PushFront(currentNode)
	lfu.nodeMap[key] = newNode
	if currentNode.freq-1 == lfu.min && lfu.freq2NodeList[currentNode.freq-1].Len() == 0 {
		lfu.min++
	}
	return currentNode.value
}

func (lfu *LFUCache2) Put(key string, value string) {
	if lfu.cap == 0 {
		return
	}
	// 如果存在,更新访问次数
	if currentValue, ok := lfu.nodeMap[key]; ok {
		currentNode := currentValue.Value.(*node)
		currentNode.value = value
		lfu.Get(key)
		return
	}
	// 如果不存在且缓存满了,需要删除
	if lfu.cap == len(lfu.nodeMap) {
		currentList := lfu.freq2NodeList[lfu.min]
		backNode := currentList.Back()
		delete(lfu.nodeMap, backNode.Value.(*node).key)
		currentList.Remove(backNode)
	}
	// 新建结点,插入到 2 个 map 中
	lfu.min = 1
	currentNode := &node{
		key:       key,
		value:     value,
		freq:      1,
	}
	if _, ok := lfu.freq2NodeList[1]; !ok {
		lfu.freq2NodeList[1] = list.New()
	}
	newList := lfu.freq2NodeList[1]
	newNode := newList.PushFront(currentNode)
	lfu.nodeMap[key] = newNode
}

参考

布隆过滤器原理及golang实现

LRU / LFU 的青铜与王者

Last updated