[TOC]
Quartz
分为三个部分:e
Job&Detial(任务):定时任务的执行方法,与Trigger配套的
Trigger(触发器):规定什么时候触发,与Job&Detail配套的
Scheduler(调度器):单例,把Trigger丢里面由调度器调度,只需要一个Scheduler,配置不同的Trigger;可以理解成类似线程池的东西
原理:ScheduledThreadPoolExecutor线程池 + 通过Object类的wait()和notify()或者Condition类的await()\signal()进行等待和唤醒、锁保证线程安全 来进行调度
Scheduler有两个调度线程:regular Scheduler Thread(执行常规调度)和Misfire Scheduler Thread(执行错失的任务),Regular Thread 轮询所有Trigger,如果有将要触发的Trigger(用wait和notifyAll实现),则从任务线程池中获取一个空闲线程,然后执行与改Trigger关联的job;Misfire Thraed则是扫描所有的trigger,查看是否有错失的,如果有的话,根据一定的策略进行处理
默认是并发的,即如果当前任务没有完成,会自动开一个任务执行
注意在分布式集群的情况下,多台机子有相同的定时任务,会出错,此时通过共享数据库的方式实现
Quartz的解决方案:
quartz集群分为水平集群和垂直集群,水平集群即将定时任务节点部署在不同的服务器,其最大的问题就是时钟同步问题,若时钟不能同步,则会导致集群中各个节点状态紊乱,造成不可预知的后果;垂直集群则是集群各节点部署在同一台服务器,时钟同步自然不是问题,但存在单点故障问题,服务器宕机会严重影响服务的可用性
在各个节点会上报任务,存到数据库中,执行时会从数据库中取出触发器来执行,如果触发器的名称和执行时间相同,则只有一个节点去执行此任务。
如果此节点执行失败,则此任务则会被分派到另一节点执行,中途也会自动检查失效的定时调度,发现不成功的,其他节点立马接过来继续完成定时任务。Quartz有11个定时任务调度表
参考
Quartz原理解密
深入解读Quartz的原理
Quartz 2.2 的实现原理和运行过程
其他定时器
Timer:这是java自带的java.util.Timer类,这个类允许你调度一个java.util.TimerTask任务。使用这种方式可以让你的程序按照某一个频度执行,但不能在指定时间运行。一般用的较少。单线程,任务一多会阻塞;一个任务出异常其他任务都受影响;受系统时间影响
ScheduledExecutorService:也jdk自带的一个类;是基于线程池设计的定时任务类,每个调度任务都会分配到线程池中的一个线程去执行,也就是说,任务是并发执行,互不影响。线程池+延时队列DelayedQueue(数组、最小堆, 最近要执行的任务放在堆顶) 实现,如果堆顶任务时间未到就阻塞(通过自旋+condition.await\signal实现)。不受系统时间影响
参考:Java 定时任务实现原理详解
Java优先级队列DelayedWorkQueue原理分析
CORS
浏览器的同源政策的同源指的是:协议相同、域名相同、端口相同,如果非同源,有三种行为会受到限制:Cookie、LocalStorage和IndexDB无法读取;DOM无法获得、AJAX请求不能发送。
前后端分离的场景下,由于浏览器的同源策略,导致浏览器内的请求不同的源的后端是会失败,常见的解决跨域方法是使用CORS,现在常见的web框架都支持CORS,开启即可。
解决跨域的方法除了CORS,还有jsonp,不过已经很少使用了,jsonp本质是利用浏览器允许加载不同源的js文件即标签等,将跨域请求标签里,返回一段可执行的js代码,其中包含了请求结果,通常是json格式,前端通过返回的js代码执行回调获取结果。
详情见 跨域资源共享 CORS 详解
对于跨域产生的问题,如CSRF跨域请求攻击的解决方案,可参考:美团:如何防止csrf
session和cookie
首先HTTP是无状态的,因此需要通过session、cookie来达到记录用户状态的目的。
传统的session、cookie:session存用户信息,保存在服务端中,cookie里存session对应的sessionId,保存在客户端中,用于找到对应的session,每次请求都会带上该cookie来表示此用户。
如果客户端禁用掉cookie,可以使用LocalStorage存储,每次请求都以参数的形式传递;
cookie可设置长时间保持,session有效时间通常比较短,安全性上来将,cookie存储在客户端,容易被窃取,session存储在服务段,相对安全;
由于现在实例的部署不可能只部署一个,一般都是集群部署,因此session不可以只存在一个实例的内存中,因此引入Redis来存用户的登录信息
现在一般使用 token + Redis来实现 cookie - session 机制,本质上差不多,前端的cookie更多的是存token的信息而已,token也可以存在LocalStorage或sessionStorage中,发送请求时一般是把token的值放在请求头中,而不会把cookie发给后端,这样可以避免当用户禁用cookie导致功能不可用,还有CSRF问题。
JWT
JWT = JSON WEB TOKEN
原理
JWT实际上是一个token(令牌),分为三部分:Header(头部)、Payload(负载)、Signature(签名)。
Header(头部) :两部分组成,记录令牌类型和JWT的签名算法,一般是HMACSHA256。
Payload(负载): 记录用户登录信息(官方规范默认是不加密的,分为官方字段和私有字段)。
Signature(签名) :记录将 Header、Payload和服务端的密钥组合起来,使用Header(头部)里规定的方式加密。
比如header里保存的加密方式是HMACSHA256,签名 Signature = HMACSHA256(base64URL(header) + "." + base64URL(payload) + "." + 保存在后端的密钥)
最后的JWT = base64URL(Header) + "." + base64URL(Payload) + "." + Signature
,后端收到该JWT后验证该签名是否正确,来判断JWT里的用户信息是否可靠。
base64 :64指的是A-Z,a-z,0-9,+,/,将待转换的字符串转成二进制流,每3个8位转成4个6位,6位的二进制数转成十进制,根据码表找到对应的字符,以=号做后缀,凑齐位数
一般是为了解决一些字符编码的问题,将非ASCII字符转化为ASCII字符,还有就是可以对数据做简单加密,base64URL在base64的基础上增加对一些符号的编解码,比如把"-"替换成"+",使得它可以出现在url中。
HMACSHA256 :摘要算法,一般用于验证签名是否一致
使用
可以存储在浏览器的本地缓存localStorage或者cookie中,发送请求的时候放在cookie里,或者放在请求头中
JWT的目的是让服务器不保存任何session数据,让后端变成无状态的,因此没办法主动废弃某个token,一旦签发了JWT,在到期之前就会始终有效,如果想要实现这种功能,必然需要在后端保存JWT,就违背了JWT的设计初衷了。
要让JWT实现 续签 和 主动过期功能,必定需要在后端保存JWT
jwt主动过期问题,使用黑名单即可;分成两点,客户端要求失效,服务端记录token到黑名单;用户重置密码,服务端记录uid-time键值对,在此之前的token全部失效;客户端把保存的jwt删掉是没用的,此时的jwt依然有效,只是客户端没记录而已
jwt续签问题,一种解决方式是jwt中存储过期时间,服务端设置刷新时间,请求时判断是否在过期时间或刷新时间,在刷新时间内进行token刷新,失效token记入黑名单;
而黑名单过大问题,可以采用记录UID-刷新时间方式解决,判断jwt签发时间,jwt签发时间小于UID-刷新时间的记为失效
个人认为JWT的生成方式本身是有一套规范的,在实际使用过程中也可以对他进行改动,本质上还是一个签名校验而已,一般会对JWT进行魔改,比如使用Header(头部)里的加密方式加密Signature(签名),Signature(签名)加密Header(头部) 和Payload(负载) 这两部分,服务器里的私钥解密Payload(负载),得到需要的登录信息,不通过简单的base64URL编码,不对外暴露,签名算法或者签名里的密钥的方式可以改成其他等。
JWT参考:JWT 超详细分析
CAS模型 - SSO(单点登录)
可参考:CAS实现单点登录SSO执行原理探究 ,讲得算是比较明白,这里是总结基于CAS模式改的单点登录模式
第一次访问时,由于没有访问的token,会引导至登录
再次访问Web-1时,由于前端已存了token,直接使用token进行请求即可
已登录Web-1时去访问Web-2,会通过后端认证中心实现单点登录
这里在总结一下关于GrantTicket和ServiceTicket,跟CAS模型中提到的TGT、ST、PGT这些东西是类似的,本质是作为验证的票据,图中的GrantTicket、ServiceTicket、token含义如下
GrantTicket:全局会话票据,保存在登录页,通过GrantTicket才能换取ServiceTicket;
ServiceTicket表示访问资源的一次性票据,根据ServiceTicket换取token,换取后失效;
token:登录凭证
GT、ST和token都是保存在Redis中的,他们在Redis中的存储结构如下
Copy key:TOKEN_${Token的值}
value:
{
"createTime": 1565961654807,
"accountId": "123",
// 用户其他信息
"grantTicket": ${GrantTicket的值} // token关联GT,用于注销时实现全局注销
}
key:GRANT_TICKET_${GrantTicket的值}
value:
{
"createTime": 1565961654807,
"accountId": "123",
}
key:SERVICE_TICKET_${ServiceTicket的值}
value:
{
"createTime": 1565961654807,
"grantTicket": ${GrantTicket的值} // ST关联GT,用于判断该ST是否有效,换取token后删除
}
// token与grantTicket的记录,注销时,根据token中关联的GT,找到所有与之关联的token,进行删除,这里推荐使用Redis的scan命令进行分段查询,原因是Redis是单线程的,如果数据量太大使用keys命令遍历太久,阻塞Redis接收其他命令
key:{grantTicket}-{token}
value:无
基于OAuth2.0的第三方登录
可参考:理解OAuth 2.0 ,这样基本就入门了,这里是总结项目中如何接入,一般在集成facebook和google的第三方登录也是类似的流程机制,这里只用到了access_token,对于refresh_token,是用来延长access_token的过期时间的,减少短时间内的重复登录,这里就没有涉及到了
为什么要后端要根据code + clientId + secret换成access_token,再根据access_token换用户个人信息?
为什么后端不直接code + clientId + secret换用户个人信息呢?
主要还是为了安全,防止中间人攻击
重定向的参数是带在url里的,是直接暴露在客户端的,如果直接返回access_token就不安全,因此才多了code这一层,为了降低code被拦截泄漏后的风险,code的过期时间一般都很短,且是一次性的;
另外就是后端对于外部的请求都是不信任的,因此接收到的参数(code)首先还要配合凭证去验证其合法性,对于验证通过后获得的access_token也有更多的操作空间,由后端持有,不会暴露出去
像上图那种登录方案,后端只需要用户个人信息换完token就算完事了,所以看起来好像直接使用code + clientId + secret换用户个人信息就行,但是如果此时需要再获取用户的其他信息,就没有没办法再用code去换了,只能要求用户再次登录,此时如果有access_token就显得多么重要了
压测
总结一下做过的压测,压测工具jmetter,利用jmette可以多线程并发请求和可以实时查看简易报告的能力
先对被压测服务的接口针对不同场景编写压测用例,设定好TPS的起始和目标值,作为压测计划
画压测机器部署关系图,部署压测环境
对于被压测的服务,一般会mock掉与该服务相关关联的服务,比如该服务还连了数据库,该接口请求依赖一些独立部署的中间件,或者依赖其他服务,则会对这些相关的依赖用桩来代替,用于维持通信,以减少这些额外服务的影响。
一般一台机器只部署一个服务,特别是被压测服务,此外还要注意被压测服务所在的机器上网络设置相关的参数,比如TCP最大连接数、回收策略之类的设置
编写压测脚本,压测脚本越简单越好,尽量让压测工具不影响被压测服务,脚本最重要的几个设置 : 发起请求时的并发线程数、响应的断言、TPS数,其他那些花里胡哨的输出树状图,饼图啊那些都不用配了,用最简单的报告输出即可
部署完后,将脚本配置放到jmeter的机器上,启动压测
Copy nohup java -jar bin/ApacheJMeter.jar -n -t jmetter脚本路径/config.jmx > test.out &
输出到当前目录下的test.out文件里,这里启动是使用默认参数启动,如果对jmetter的JVM设置有要求,也可以在启动时指定JVM参数,如
Copy nohup java -server -XX:+HeapDumpOnOutOfMemoryError -Xms512m -Xmx512m -XX:+UseG1GC -XX:MaxGCPauseMillis=250 -XX:G1ReservePercent=20 -Djava.security.egd=file:/dev/urandom -jar bin/ApacheJMeter.jar -n -t jmetter脚本路径/config.jmx > test.out &
压测开启后可以打开test.out文件查看压测报告
一般是按照TPS从小往大压,小的TPS压,在正常延时的情况下可以先判断程序是否有问题,比如内存泄漏,内存溢出,没问题了再逐步往大了压。如果先从大往小压,延时又上不去,此时判断不了是程序内部问题还是过大的TPS导致。压测时间一般最少压一天
一般有如下几个点要注意,这些点到时也要输出到压测报告上
观察TPS是否符合预期、延时是否达到预期且稳定、错误率要为0。当程序正常时降低RT的手段 :减少不必要的日志输出、业务逻辑算法是否还有优化空间,是否有IO占用或者频繁序列化反序列化、内部队列是否阻塞
fgc,ygc不要太频繁,一般来说fgc 一小时要小于3~4次 ;ygc一分钟要小于3~4次为佳 。
注意jmetter端的CPU是否过高或波动很大,避免影响压测结论
如果cpu过高,如果连续达到90以上,基本上是内存泄漏导致了频繁的fgc;磁盘的占用情况,注意生成的日志是否把磁盘占满了
使用 jstat -gcutil [pid] [时间间隔,每几秒打印] [打印次数]
查看GC情况
当被压测端的gc不正常时,应尽量保存事发环境
1、收集内存使用基本情况统计:jmap -heap [pid] > [文件名,如heap.log]
2、收集线程堆栈运行信息:jstack [pid] > [文件名,如stack.log]
3、收集内存详细使用信息,生成dump内存快照:jmap -dump:format=b,file=[文件名,如heap.dump] [pid]
一般使用eclipse mat工具进行内存快照的分析,排查出内存泄漏的问题。
mat的使用参见:Eclipse MAT内存分析工具
一般压测脚本的模板:
Copy <?xml version="1.0" encoding="UTF-8"?>
<jmeterTestPlan version="1.2" properties="3.2" jmeter="3.2 r1790748">
<hashTree>
<TestPlan guiclass="TestPlanGui" testclass="TestPlan" testname="测试计划" enabled="true">
<!-- 一般写压测计划中的序号+名称 -->
<stringProp name="TestPlan.comments"></stringProp>
<boolProp name="TestPlan.functional_mode">false</boolProp>
<boolProp name="TestPlan.serialize_threadgroups">false</boolProp>
<elementProp name="TestPlan.user_defined_variables" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="用户定义的变量" enabled="true">
<collectionProp name="Arguments.arguments"/>
</elementProp>
<stringProp name="TestPlan.user_define_classpath"></stringProp>
</TestPlan>
<hashTree>
<ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Thread Group" enabled="true">
<stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
<elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="循环控制器" enabled="true">
<boolProp name="LoopController.continue_forever">false</boolProp>
<intProp name="LoopController.loops">-1</intProp>
</elementProp>
<stringProp name="ThreadGroup.num_threads">500</stringProp> <!-- 发起请求时的并发线程数,这里设置为500个并发线程,表示使用这么多的线程数来达到下面设置的TPS数 -->
<stringProp name="ThreadGroup.ramp_time">8</stringProp>
<longProp name="ThreadGroup.start_time">1509332694000</longProp>
<longProp name="ThreadGroup.end_time">1509332694000</longProp>
<boolProp name="ThreadGroup.scheduler">false</boolProp>
<stringProp name="ThreadGroup.duration"></stringProp>
<stringProp name="ThreadGroup.delay"></stringProp>
</ThreadGroup>
<hashTree>
<HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="click http request" enabled="true">
<elementProp name="HTTPsampler.Arguments" elementType="Arguments" guiclass="HTTPArgumentsPanel" testclass="Arguments" testname="用户定义的变量" enabled="true">
<collectionProp name="Arguments.arguments"/>
</elementProp>
<stringProp name="HTTPSampler.domain">192.168.1.123</stringProp> <!-- 此处为被压测服务的host -->
<stringProp name="HTTPSampler.port">12345</stringProp> <!-- 此处为被压测服务的port -->
<stringProp name="HTTPSampler.protocol">http</stringProp>
<stringProp name="HTTPSampler.contentEncoding"></stringProp>
<stringProp name="HTTPSampler.path">${__StringFromFile(/home/urls.log,,,)}</stringProp> <!-- 发起的http请求uri从文件读取,文件路径 -->
<stringProp name="HTTPSampler.method">GET</stringProp>
<boolProp name="HTTPSampler.follow_redirects">false</boolProp>
<boolProp name="HTTPSampler.auto_redirects">false</boolProp>
<boolProp name="HTTPSampler.use_keepalive">true</boolProp>
<boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
<stringProp name="HTTPSampler.embedded_url_re"></stringProp>
<stringProp name="HTTPSampler.implementation">Java</stringProp>
<stringProp name="HTTPSampler.connect_timeout"></stringProp>
<stringProp name="HTTPSampler.response_timeout"></stringProp>
</HTTPSamplerProxy>
<hashTree/>
<ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
<collectionProp name="Asserion.test_strings">
<stringProp name="49586">200</stringProp> <!-- http请求的响应断言,要求返回的http code为200才判定为成功 -->
</collectionProp>
<stringProp name="Assertion.test_field">Assertion.response_code</stringProp>
<boolProp name="Assertion.assume_success">false</boolProp>
<intProp name="Assertion.test_type">8</intProp>
</ResponseAssertion>
<hashTree/>
<ConstantThroughputTimer guiclass="TestBeanGUI" testclass="ConstantThroughputTimer" testname="Constant Throughput Timer" enabled="true">
<intProp name="calcMode">1</intProp>
<doubleProp>
<name>throughput</name>
<value>30000.0</value> <!-- 1分钟内发起的请求数,换算为tps为500 -->
<savedValue>0.0</savedValue>
</doubleProp>
</ConstantThroughputTimer>
<hashTree/>
</hashTree>
</hashTree>
<WorkBench guiclass="WorkBenchGui" testclass="WorkBench" testname="工作台" enabled="true">
<boolProp name="WorkBench.save">true</boolProp>
</WorkBench>
<hashTree/>
</hashTree>
</jmeterTestPlan>
调优
参考:https://tech.meituan.com/2016/12/02/performance-tunning.html
布隆过滤器
本质上是基于hash的概率性数据结构,是一个很长的二进制数组,主要用于判断元素可能存在集合中,或者一定不在集合中。
原理
有一个长度为m的bit数组,初始每个bit都是0,另外还有k个hash函数;
当加入一个元素时,先调用k个hash函数得到k个结果,将这k个结果与bit数组长度取模得到k个数组下标,将这k个数组下标对应的值置为 1;
查询元素时,同样经过上面步骤的计算,最终得到k个数组下标,判断这些下标对应的值是否为1,如果为1,说明元素可能存在,如果有一个不为1,说明元素一定不存在,返回结果;
误判率计算
涉及到3个重要的参数:
布隆过滤器中,一个元素插入后,某个bit为0的概率是(1 − 1/m)^k
n元素插入后,某个bit为0的概率是(1 − 1/m)^(n*k)
false positive的概率是(1−(1−1/m)^n*k)^k
因为需要的是k
个不同的bit被设置成1,概率是大约是(1−e^(−k*n/m))^k
实现
可以基于redis实现,但这里只给出go版本的实现,支持并发安全
Copy const (
mod7 = 1<<3 - 1
bitPerByte = 8
)
type Filter struct {
lock *sync.RWMutex
concurrent bool
// 长度之所以要取2的指数是因为要将取模操作优化成与操作, % 等于 &(2^n-1)
m uint64 // bit array of m bits, m will be ceiling to power of 2
n uint64 // number of inserted elements
log2m uint64 // log_2 of m
k uint64 // the number of hash function
keys []byte // byte array to store hash value
}
func New(size uint64, k uint64, race bool) *Filter {
log2 := uint64(math.Ceil(math.Log2(float64(size))))
filter := &Filter{
m: 1 << log2,
log2m: log2,
k: k,
keys: make([]byte, 1<<log2),
concurrent: race,
}
if filter.concurrent {
filter.lock = &sync.RWMutex{}
}
return filter
}
func (f *Filter) Add(data []byte) *Filter {
if f.concurrent {
f.lock.Lock()
defer f.lock.Unlock()
}
h := baseHash(data)
for i := uint64(0); i < f.k; i++ {
loc := location(h, i)
slot, mod := f.location(loc)
f.keys[slot] |= 1 << mod
}
f.n++
return f
}
// location returns the bit position in byte array
// & (f.m - 1) is the quick way for mod operation
func (f *Filter) location(h uint64) (uint64, uint64) {
slot := (h / bitPerByte) & (f.m - 1)
mod := h & mod7
return slot, mod
}
// baseHash returns the murmur3 128-bit hash
func baseHash(data []byte) []uint64 {
a1 := []byte{1} // to grab another bit of data
hasher := murmur3.New128()
hasher.Write(data) // #nosec
v1, v2 := hasher.Sum128()
hasher.Write(a1) // #nosec
v3, v4 := hasher.Sum128()
return []uint64{
v1, v2, v3, v4,
}
}
淘汰算法
LRU
java实现,非线程安全
Copy // 注意每一次对节点有操作对需要同时操作 nodeMap和lruQueue
// LRU本质是利用 hashMap 和 双向链表 实现
public class LRUCache {
private Map<String, Node> nodeMap;
private DoubleLinkedList lruQueue;
private int size = 0;
public LRUCache() {
this(3);
}
public LRUCache(int size) {
this.size = size;
this.nodeMap = new HashMap<>();
this.lruQueue = new DoubleLinkedList();
}
public String get(String key) {
Node n = nodeMap.get(key);
if (n == null) {
return null;
}
// 获取后直接放到到最前的位置
put(n.key, n.value);
return n.value;
}
public void put(String key, String value) {
Node newNode = new Node(key, value);
// 如果包含,则放到最前
if (nodeMap.containsKey(key)) {
lruQueue.remove(nodeMap.get(key));
lruQueue.addFirst(newNode);
// 记得更新map
nodeMap.put(key, newNode);
} else {
// 如果满了,则移除最后一个
if (size <= lruQueue.size()) {
Node last = lruQueue.removeLast();
nodeMap.remove(last.key);
}
nodeMap.put(key, newNode);
lruQueue.addFirst(newNode);
}
}
private class Node {
private String key;
private String value;
private Node pre;
private Node next;
public Node(String key, String value) {
this.key = key;
this.value = value;
}
}
// 封装双向链表方法,构建时要注意前后节点指向和空指针问题
private class DoubleLinkedList {
private Node head;
private Node tail;
private int count = 0;
public void addFirst(Node n) {
if (head == null) {
tail = n;
}
count ++;
n.next = head;
if (head != null) {
head.pre = n;
}
head = n;
}
public Node removeLast() {
if (count == 0) {
return null;
}
Node result = tail;
if (tail.pre != null) {
tail.pre.next = null;
} else {
head = null;
tail = null;
}
count --;
return result;
}
public void remove(Node n) {
if (count == 0) {
return ;
}
count --;
if (n.pre != null) {
n.pre.next = n.next;
} else {
head = n.next;
}
if (n.next != null) {
n.next.pre = n.pre;
} else {
tail = n.pre;
}
}
public int size() {
return count;
}
}
}
golang实现,非并发安全,下面这种写法是平铺了双向链表,链表的长度通过map的长度计算得到
Copy type LRUCache struct {
nodeMap map[string]*Node
head *Node
tail *Node
cap int
}
type Node struct {
key string
val string
next *Node
pre *Node
}
func NewLRUCache(capacity int) LRUCache {
return LRUCache{nodeMap: make(map[string]*Node), cap: capacity}
}
func (lru *LRUCache) Get(key string) string {
if existNode, exist := lru.nodeMap[key]; exist {
lru.remove(existNode)
lru.addFirst(existNode)
return existNode.val
}
return ""
}
func (lru *LRUCache) Put(key string, val string) {
if existNode, exist := lru.nodeMap[key]; exist {
existNode.val = val
lru.remove(existNode)
lru.addFirst(existNode)
return
} else {
newNode := &node{key: key, val: val}
lru.nodeMap[key] = newNode
lru.addFirst(newNode)
}
if len(lru.nodeMap) > lru.cap {
// 一定要先delete,如果先remove,会导致找不回tail的key进行delete,
delete(lru.nodeMap, lru.tail.key)
lru.remove(lru.tail)
}
}
func (lru *LRUCache) addFirst(n *Node) {
n.pre = nil
n.next = lru.head
if lru.head != nil {
lru.head.pre = n
}
lru.head = n
if lru.tail == nil {
lru.tail = n
lru.tail.next = nil
}
}
func (lru *LRUCache) remove(n *Node) {
// n的next和pre要置为null,防止内存泄漏
if n == lru.head {
lru.head = n.next
if n.next != nil {
n.next.pre = nil
}
n.next = nil
return
}
if n == lru.tail {
lru.tail = n.pre
n.pre.next = nil
n.pre = nil
return
}
n.pre.next = n.next
n.next.pre = n.pre
}
关于并发安全,最简单的实现就是在调用Get和Put方法时加读写锁,但是这种做法锁的粒度比较大,每次会锁住整个底层的双向链表和map,导致在高并发情况下吞吐量不高,优化的思路就是对map分片,通过分片上锁来减小锁的粒度,然后再双向链表节点的操作上进行优化。
Copy func New(capacity int) LRUCache {
shards := make(map[string]*LRUCacheShard, 256)
for i := 0; i < 256; i++ {
shards[fmt.Sprintf("%02x", i)] = &LRUCacheShard{
Cap: capacity,
Keys: make(map[int]*list.Element),
List: list.New(),
}
}
return LRUCache{
shards: shards,
}
}
func (c *LRUCache) Get(key int) int {
shard := c.GetShard(key)
shard.RLock()
defer shard.RUnlock()
……
}
func (c *LRUCache) Put(key int, value int) {
shard := c.GetShard(key)
shard.Lock()
defer shard.Unlock()
……
}
func (c *LRUCache) GetShard(key int) (shard *LRUCacheShard) {
hasher := sha1.New()
hasher.Write([]byte(key))
shardKey := fmt.Sprintf("%x", hasher.Sum(nil))[0:2]
return c.shards[shardKey]
}
其中,在Get方法中,如果存在,还需要修改key所在节点的位置,直接调put即可,当然这种方式的粒度还是比较大,再次优化的思路是对map的操作还是得上锁,但对双向链表的操作无需上锁,双向链表移动节点和删除节点可以同时操作,可以通过两个channel实现,参考
LFU
java实现,非线程安全
Copy /**
* 频次最少使用
* 设定容量,每次get key会修改使用次数和使用时间,当满容量时,移除次数最少的那个
* 如果有多个key的使用次数一样,则移除使用时间最旧的那个
*/
public class LFUCache {
Map<String, String> keyValMap;
Map<String, Integer> key2FreqMap; // key和使用频率的映射
// 用LinkHashSet来模拟使用时间,使用LinkHashSet还有一个目的是便于根据key进行删除
Map<Integer, LinkedHashSet<String>> freq2KeysMap; // 使用频率和key的映射
int minFreq;
int cap;
public LFUCache(int cap) {
keyValMap = new HashMap<>();
key2FreqMap = new HashMap<>();
freq2KeysMap = new HashMap<>();
this.cap = cap;
this.minFreq = 0;
}
public String get(String key) {
if (!keyValMap.containsKey(key)) {
return "";
}
increaseFreq(key);
return keyValMap.get(key);
}
public void put(String key, String val) {
if (this.cap <= 0) {
return;
}
if (keyValMap.containsKey(key)) {
keyValMap.put(key, val);
increaseFreq(key);
return;
}
if (this.cap <= keyValMap.size()) {
removeMinFreqKey();
}
keyValMap.put(key, val);
key2FreqMap.put(key, 1);
freq2KeysMap.putIfAbsent(1, new LinkedHashSet<>());
freq2KeysMap.get(1).add(key);
this.minFreq = 1;
}
void increaseFreq(String key) {
int freq = key2FreqMap.get(key);
key2FreqMap.put(key, freq+1);
freq2KeysMap.get(freq).remove(key);
freq2KeysMap.putIfAbsent(freq + 1, new LinkedHashSet<>());
freq2KeysMap.get(freq + 1).add(key);
if (freq2KeysMap.get(freq).isEmpty()) {
freq2KeysMap.remove(freq);
if (freq == this.minFreq) {
this.minFreq++;
}
}
}
void removeMinFreqKey() {
LinkedHashSet<String> keys = freq2KeysMap.get(this.minFreq);
String delKey = keys.iterator().next();
keys.remove(delKey);
if (keys.isEmpty()) {
freq2KeysMap.remove(this.minFreq);
// 这里无需更新 minFreq 的值,因为该方法是在插入新key时使用,此时minFreq一定是1
}
keyValMap.remove(delKey);
key2FreqMap.remove(delKey);
}
}
golang实现,非并发安全,利用 优先级队列(最小堆) + map实现,使用的是go自带的heap数据结构,通过Item数组结构实现
Copy import "container/heap"
type LFUNode struct {
key string
val string
freq int // 用于优先级,key的使用频率
count int // 用于当freq相同时的比较淘汰,总的获取次数,类似时间戳的概念
index int // 最小堆中元素的下标,用于重建最小堆.
}
type PriorityQueue []*LFUNode
func (pq PriorityQueue) Len() int { return len(pq) }
func (pq PriorityQueue) Less(i, j int) bool {
// 优先根据使用频率排列,相等时才使用count,从小到大排序
if pq[i].freq == pq[j].freq {
return pq[i].count < pq[j].count
}
return pq[i].freq < pq[j].freq
}
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *PriorityQueue) Push(x interface{}) {
n := len(*pq)
node := x.(*LFUNode)
node.index = n
*pq = append(*pq, node)
}
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
node := old[n-1]
old[n-1] = nil // 防止内存泄露
node.index = -1 // pop时重置下标保证安全
*pq = old[0 : n-1]
return node
}
// 更新最小堆里的元素
func (pq *PriorityQueue) update(node *LFUNode, value string, frequency int, count int) {
node.val = value
node.count = count
node.freq = frequency
heap.Fix(pq, node.index)
}
// ==========================================
type LFUCache struct {
cap int
pq PriorityQueue
nodeMap map[string]*LFUNode
counter int
}
func NewLFUCache(capacity int) LFUCache {
return LFUCache{
pq: PriorityQueue{},
nodeMap: make(map[string]*LFUNode, capacity),
cap: capacity,
}
}
func (lfu *LFUCache) Get(key string) string {
if lfu.cap == 0 {
return ""
}
if node, ok := lfu.nodeMap[key]; ok {
lfu.counter++
lfu.pq.update(node, node.val, node.freq+1, lfu.counter)
return node.val
}
return ""
}
func (lfu *LFUCache) Put(key string, value string) {
if lfu.cap == 0 {
return
}
lfu.counter++
// 如果存在,增加 frequency,再调整堆
if node, ok := lfu.nodeMap[key]; ok {
lfu.pq.update(node, value, node.freq + 1, lfu.counter)
return
}
// 如果不存在且缓存满了,需要删除。在 hashmap 和 pq 中删除。
if len(lfu.pq) == lfu.cap {
node := heap.Pop(&lfu.pq).(*LFUNode)
delete(lfu.nodeMap, node.key)
}
// 新建结点,在 hashmap 和 pq 中添加。
node := &LFUNode{
val: value,
key: key,
count: lfu.counter,
}
heap.Push(&lfu.pq, node)
lfu.nodeMap[key] = node
}
另一个版本实现,利用go本身提供的双向链表,但是由于go没泛型,在类型推断方面导致整体性能不会很高
Copy type LFUCache2 struct {
nodeMap map[string]*list.Element
freq2NodeList map[int]*list.List
cap int
min int
}
type node struct {
key string
value string
freq int
}
func NewLFUCache2(capacity int) LFUCache2 {
return LFUCache2{nodeMap: make(map[string]*list.Element),
freq2NodeList: make(map[int]*list.List),
cap: capacity,
min: 0,
}
}
func (lfu *LFUCache2) Get(key string) string {
value, ok := lfu.nodeMap[key]
if !ok {
return ""
}
currentNode := value.Value.(*node)
lfu.freq2NodeList[currentNode.freq].Remove(value)
currentNode.freq++
if _, ok := lfu.freq2NodeList[currentNode.freq]; !ok {
lfu.freq2NodeList[currentNode.freq] = list.New()
}
newList := lfu.freq2NodeList[currentNode.freq]
newNode := newList.PushFront(currentNode)
lfu.nodeMap[key] = newNode
if currentNode.freq-1 == lfu.min && lfu.freq2NodeList[currentNode.freq-1].Len() == 0 {
lfu.min++
}
return currentNode.value
}
func (lfu *LFUCache2) Put(key string, value string) {
if lfu.cap == 0 {
return
}
// 如果存在,更新访问次数
if currentValue, ok := lfu.nodeMap[key]; ok {
currentNode := currentValue.Value.(*node)
currentNode.value = value
lfu.Get(key)
return
}
// 如果不存在且缓存满了,需要删除
if lfu.cap == len(lfu.nodeMap) {
currentList := lfu.freq2NodeList[lfu.min]
backNode := currentList.Back()
delete(lfu.nodeMap, backNode.Value.(*node).key)
currentList.Remove(backNode)
}
// 新建结点,插入到 2 个 map 中
lfu.min = 1
currentNode := &node{
key: key,
value: value,
freq: 1,
}
if _, ok := lfu.freq2NodeList[1]; !ok {
lfu.freq2NodeList[1] = list.New()
}
newList := lfu.freq2NodeList[1]
newNode := newList.PushFront(currentNode)
lfu.nodeMap[key] = newNode
}
参考
布隆过滤器原理及golang实现
LRU / LFU 的青铜与王者