[TOC]
Quartz
分为三个部分:e
Job&Detial(任务):定时任务的执行方法,与Trigger配套的
Trigger(触发器):规定什么时候触发,与Job&Detail配套的
Scheduler(调度器):单例,把Trigger丢里面由调度器调度,只需要一个Scheduler,配置不同的Trigger;可以理解成类似线程池的东西
原理:ScheduledThreadPoolExecutor线程池 + 通过Object类的wait()和notify()或者Condition类的await()\signal()进行等待和唤醒、锁保证线程安全 来进行调度
Scheduler有两个调度线程:regular Scheduler Thread(执行常规调度)和Misfire Scheduler Thread(执行错失的任务),Regular Thread 轮询所有Trigger,如果有将要触发的Trigger(用wait和notifyAll实现),则从任务线程池中获取一个空闲线程,然后执行与改Trigger关联的job;Misfire Thraed则是扫描所有的trigger,查看是否有错失的,如果有的话,根据一定的策略进行处理
默认是并发的,即如果当前任务没有完成,会自动开一个任务执行
注意在分布式集群的情况下,多台机子有相同的定时任务,会出错,此时通过共享数据库的方式实现
Quartz的解决方案:
quartz集群分为水平集群和垂直集群,水平集群即将定时任务节点部署在不同的服务器,其最大的问题就是时钟同步问题,若时钟不能同步,则会导致集群中各个节点状态紊乱,造成不可预知的后果;垂直集群则是集群各节点部署在同一台服务器,时钟同步自然不是问题,但存在单点故障问题,服务器宕机会严重影响服务的可用性
在各个节点会上报任务,存到数据库中,执行时会从数据库中取出触发器来执行,如果触发器的名称和执行时间相同,则只有一个节点去执行此任务。
如果此节点执行失败,则此任务则会被分派到另一节点执行,中途也会自动检查失效的定时调度,发现不成功的,其他节点立马接过来继续完成定时任务。Quartz有11个定时任务调度表
参考
Quartz原理解密
深入解读Quartz的原理
Quartz 2.2 的实现原理和运行过程
其他定时器
Timer:这是java自带的java.util.Timer类,这个类允许你调度一个java.util.TimerTask任务。使用这种方式可以让你的程序按照某一个频度执行,但不能在指定时间运行。一般用的较少。单线程,任务一多会阻塞;一个任务出异常其他任务都受影响;受系统时间影响
ScheduledExecutorService:也jdk自带的一个类;是基于线程池设计的定时任务类,每个调度任务都会分配到线程池中的一个线程去执行,也就是说,任务是并发执行,互不影响。线程池+延时队列DelayedQueue(数组、最小堆, 最近要执行的任务放在堆顶) 实现,如果堆顶任务时间未到就阻塞(通过自旋+condition.await\signal实现)。不受系统时间影响
参考:Java 定时任务实现原理详解
Java优先级队列DelayedWorkQueue原理分析
CORS
浏览器的同源政策的同源指的是:协议相同、域名相同、端口相同,如果非同源,有三种行为会受到限制:Cookie、LocalStorage和IndexDB无法读取;DOM无法获得、AJAX请求不能发送。
前后端分离的场景下,由于浏览器的同源策略,导致浏览器内的请求不同的源的后端是会失败,常见的解决跨域方法是使用CORS,现在常见的web框架都支持CORS,开启即可。
解决跨域的方法除了CORS,还有jsonp,不过已经很少使用了,jsonp本质是利用浏览器允许加载不同源的js文件即标签等,将跨域请求标签里,返回一段可执行的js代码,其中包含了请求结果,通常是json格式,前端通过返回的js代码执行回调获取结果。
详情见 跨域资源共享 CORS 详解
对于跨域产生的问题,如CSRF跨域请求攻击的解决方案,可参考:美团:如何防止csrf
session和cookie
首先HTTP是无状态的,因此需要通过session、cookie来达到记录用户状态的目的。
传统的session、cookie:session存用户信息,保存在服务端中,cookie里存session对应的sessionId,保存在客户端中,用于找到对应的session,每次请求都会带上该cookie来表示此用户。
如果客户端禁用掉cookie,可以使用LocalStorage存储,每次请求都以参数的形式传递;
cookie可设置长时间保持,session有效时间通常比较短,安全性上来将,cookie存储在客户端,容易被窃取,session存储在服务段,相对安全;
由于现在实例的部署不可能只部署一个,一般都是集群部署,因此session不可以只存在一个实例的内存中,因此引入Redis来存用户的登录信息
现在一般使用 token + Redis来实现 cookie - session 机制,本质上差不多,前端的cookie更多的是存token的信息而已,token也可以存在LocalStorage或sessionStorage中,发送请求时一般是把token的值放在请求头中,而不会把cookie发给后端,这样可以避免当用户禁用cookie导致功能不可用,还有CSRF问题。
JWT
JWT = JSON WEB TOKEN
原理
JWT实际上是一个token(令牌),分为三部分:Header(头部)、Payload(负载)、Signature(签名)。
Header(头部) :两部分组成,记录令牌类型和JWT的签名算法,一般是HMACSHA256。
Payload(负载): 记录用户登录信息(官方规范默认是不加密的,分为官方字段和私有字段)。
Signature(签名) :记录将 Header、Payload和服务端的密钥组合起来,使用Header(头部)里规定的方式加密。
比如header里保存的加密方式是HMACSHA256,签名 Signature = HMACSHA256(base64URL(header) + "." + base64URL(payload) + "." + 保存在后端的密钥)
最后的JWT = base64URL(Header) + "." + base64URL(Payload) + "." + Signature
,后端收到该JWT后验证该签名是否正确,来判断JWT里的用户信息是否可靠。
base64 :64指的是A-Z,a-z,0-9,+,/,将待转换的字符串转成二进制流,每3个8位转成4个6位,6位的二进制数转成十进制,根据码表找到对应的字符,以=号做后缀,凑齐位数
一般是为了解决一些字符编码的问题,将非ASCII字符转化为ASCII字符,还有就是可以对数据做简单加密,base64URL在base64的基础上增加对一些符号的编解码,比如把"-"替换成"+",使得它可以出现在url中。
HMACSHA256 :摘要算法,一般用于验证签名是否一致
使用
可以存储在浏览器的本地缓存localStorage或者cookie中,发送请求的时候放在cookie里,或者放在请求头中
JWT的目的是让服务器不保存任何session数据,让后端变成无状态的,因此没办法主动废弃某个token,一旦签发了JWT,在到期之前就会始终有效,如果想要实现这种功能,必然需要在后端保存JWT,就违背了JWT的设计初衷了。
要让JWT实现 续签 和 主动过期功能,必定需要在后端保存JWT
jwt主动过期问题,使用黑名单即可;分成两点,客户端要求失效,服务端记录token到黑名单;用户重置密码,服务端记录uid-time键值对,在此之前的token全部失效;客户端把保存的jwt删掉是没用的,此时的jwt依然有效,只是客户端没记录而已
jwt续签问题,一种解决方式是jwt中存储过期时间,服务端设置刷新时间,请求时判断是否在过期时间或刷新时间,在刷新时间内进行token刷新,失效token记入黑名单;
而黑名单过大问题,可以采用记录UID-刷新时间方式解决,判断jwt签发时间,jwt签发时间小于UID-刷新时间的记为失效
个人认为JWT的生成方式本身是有一套规范的,在实际使用过程中也可以对他进行改动,本质上还是一个签名校验而已,一般会对JWT进行魔改,比如使用Header(头部)里的加密方式加密Signature(签名),Signature(签名)加密Header(头部) 和Payload(负载) 这两部分,服务器里的私钥解密Payload(负载),得到需要的登录信息,不通过简单的base64URL编码,不对外暴露,签名算法或者签名里的密钥的方式可以改成其他等。
JWT参考:JWT 超详细分析
CAS模型 - SSO(单点登录)
可参考:CAS实现单点登录SSO执行原理探究 ,讲得算是比较明白,这里是总结基于CAS模式改的单点登录模式
第一次访问时,由于没有访问的token,会引导至登录
再次访问Web-1时,由于前端已存了token,直接使用token进行请求即可
已登录Web-1时去访问Web-2,会通过后端认证中心实现单点登录
这里在总结一下关于GrantTicket和ServiceTicket,跟CAS模型中提到的TGT、ST、PGT这些东西是类似的,本质是作为验证的票据,图中的GrantTicket、ServiceTicket、token含义如下
GrantTicket:全局会话票据,保存在登录页,通过GrantTicket才能换取ServiceTicket;
ServiceTicket表示访问资源的一次性票据,根据ServiceTicket换取token,换取后失效;
token:登录凭证
GT、ST和token都是保存在Redis中的,他们在Redis中的存储结构如下
Copy key:TOKEN_${Token的值}
value:
{
"createTime": 1565961654807,
"accountId": "123",
// 用户其他信息
"grantTicket": ${GrantTicket的值} // token关联GT,用于注销时实现全局注销
}
key:GRANT_TICKET_${GrantTicket的值}
value:
{
"createTime": 1565961654807,
"accountId": "123",
}
key:SERVICE_TICKET_${ServiceTicket的值}
value:
{
"createTime": 1565961654807,
"grantTicket": ${GrantTicket的值} // ST关联GT,用于判断该ST是否有效,换取token后删除
}
// token与grantTicket的记录,注销时,根据token中关联的GT,找到所有与之关联的token,进行删除,这里推荐使用Redis的scan命令进行分段查询,原因是Redis是单线程的,如果数据量太大使用keys命令遍历太久,阻塞Redis接收其他命令
key:{grantTicket}-{token}
value:无
基于OAuth2.0的第三方登录
可参考:理解OAuth 2.0 ,这样基本就入门了,这里是总结项目中如何接入,一般在集成facebook和google的第三方登录也是类似的流程机制,这里只用到了access_token,对于refresh_token,是用来延长access_token的过期时间的,减少短时间内的重复登录,这里就没有涉及到了
为什么要后端要根据code + clientId + secret换成access_token,再根据access_token换用户个人信息?
为什么后端不直接code + clientId + secret换用户个人信息呢?
主要还是为了安全,防止中间人攻击
重定向的参数是带在url里的,是直接暴露在客户端的,如果直接返回access_token就不安全,因此才多了code这一层,为了降低code被拦截泄漏后的风险,code的过期时间一般都很短,且是一次性的;
另外就是后端对于外部的请求都是不信任的,因此接收到的参数(code)首先还要配合凭证去验证其合法性,对于验证通过后获得的access_token也有更多的操作空间,由后端持有,不会暴露出去
像上图那种登录方案,后端只需要用户个人信息换完token就算完事了,所以看起来好像直接使用code + clientId + secret换用户个人信息就行,但是如果此时需要再获取用户的其他信息,就没有没办法再用code去换了,只能要求用户再次登录,此时如果有access_token就显得多么重要了
压测
总结一下做过的压测,压测工具jmetter,利用jmette可以多线程并发请求和可以实时查看简易报告的能力
先对被压测服务的接口针对不同场景编写压测用例,设定好TPS的起始和目标值,作为压测计划
画压测机器部署关系图,部署压测环境
对于被压测的服务,一般会mock掉与该服务相关关联的服务,比如该服务还连了数据库,该接口请求依赖一些独立部署的中间件,或者依赖其他服务,则会对这些相关的依赖用桩来代替,用于维持通信,以减少这些额外服务的影响。
一般一台机器只部署一个服务,特别是被压测服务,此外还要注意被压测服务所在的机器上网络设置相关的参数,比如TCP最大连接数、回收策略之类的设置
编写压测脚本,压测脚本越简单越好,尽量让压测工具不影响被压测服务,脚本最重要的几个设置 : 发起请求时的并发线程数、响应的断言、TPS数,其他那些花里胡哨的输出树状图,饼图啊那些都不用配了,用最简单的报告输出即可
部署完后,将脚本配置放到jmeter的机器上,启动压测
Copy nohup java -jar bin/ApacheJMeter.jar -n -t jmetter脚本路径/config.jmx > test.out &
输出到当前目录下的test.out文件里,这里启动是使用默认参数启动,如果对jmetter的JVM设置有要求,也可以在启动时指定JVM参数,如
Copy nohup java -server -XX:+HeapDumpOnOutOfMemoryError -Xms512m -Xmx512m -XX:+UseG1GC -XX:MaxGCPauseMillis=250 -XX:G1ReservePercent=20 -Djava.security.egd=file:/dev/urandom -jar bin/ApacheJMeter.jar -n -t jmetter脚本路径/config.jmx > test.out &
压测开启后可以打开test.out文件查看压测报告
一般是按照TPS从小往大压,小的TPS压,在正常延时的情况下可以先判断程序是否有问题,比如内存泄漏,内存溢出,没问题了再逐步往大了压。如果先从大往小压,延时又上不去,此时判断不了是程序内部问题还是过大的TPS导致。压测时间一般最少压一天
一般有如下几个点要注意,这些点到时也要输出到压测报告上
观察TPS是否符合预期、延时是否达到预期且稳定、错误率要为0。当程序正常时降低RT的手段 :减少不必要的日志输出、业务逻辑算法是否还有优化空间,是否有IO占用或者频繁序列化反序列化、内部队列是否阻塞
fgc,ygc不要太频繁,一般来说fgc 一小时要小于3~4次 ;ygc一分钟要小于3~4次为佳 。
注意jmetter端的CPU是否过高或波动很大,避免影响压测结论
如果cpu过高,如果连续达到90以上,基本上是内存泄漏导致了频繁的fgc;磁盘的占用情况,注意生成的日志是否把磁盘占满了
使用 jstat -gcutil [pid] [时间间隔,每几秒打印] [打印次数]
查看GC情况
当被压测端的gc不正常时,应尽量保存事发环境
1、收集内存使用基本情况统计:jmap -heap [pid] > [文件名,如heap.log]
2、收集线程堆栈运行信息:jstack [pid] > [文件名,如stack.log]
3、收集内存详细使用信息,生成dump内存快照:jmap -dump:format=b,file=[文件名,如heap.dump] [pid]
一般使用eclipse mat工具进行内存快照的分析,排查出内存泄漏的问题。
mat的使用参见:Eclipse MAT内存分析工具
一般压测脚本的模板:
Copy <? xml version = "1.0" encoding = "UTF-8" ?>
< jmeterTestPlan version = "1.2" properties = "3.2" jmeter = "3.2 r1790748" >
< hashTree >
< TestPlan guiclass = "TestPlanGui" testclass = "TestPlan" testname = "测试计划" enabled = "true" >
<!-- 一般写压测计划中的序号+名称 -->
< stringProp name = "TestPlan.comments" ></ stringProp >
< boolProp name = "TestPlan.functional_mode" >false</ boolProp >
< boolProp name = "TestPlan.serialize_threadgroups" >false</ boolProp >
< elementProp name = "TestPlan.user_defined_variables" elementType = "Arguments" guiclass = "ArgumentsPanel" testclass = "Arguments" testname = "用户定义的变量" enabled = "true" >
< collectionProp name = "Arguments.arguments" />
</ elementProp >
< stringProp name = "TestPlan.user_define_classpath" ></ stringProp >
</ TestPlan >
< hashTree >
< ThreadGroup guiclass = "ThreadGroupGui" testclass = "ThreadGroup" testname = "Thread Group" enabled = "true" >
< stringProp name = "ThreadGroup.on_sample_error" >continue</ stringProp >
< elementProp name = "ThreadGroup.main_controller" elementType = "LoopController" guiclass = "LoopControlPanel" testclass = "LoopController" testname = "循环控制器" enabled = "true" >
< boolProp name = "LoopController.continue_forever" >false</ boolProp >
< intProp name = "LoopController.loops" >-1</ intProp >
</ elementProp >
< stringProp name = "ThreadGroup.num_threads" >500</ stringProp > <!-- 发起请求时的并发线程数,这里设置为500个并发线程,表示使用这么多的线程数来达到下面设置的TPS数 -->
< stringProp name = "ThreadGroup.ramp_time" >8</ stringProp >
< longProp name = "ThreadGroup.start_time" >1509332694000</ longProp >
< longProp name = "ThreadGroup.end_time" >1509332694000</ longProp >
< boolProp name = "ThreadGroup.scheduler" >false</ boolProp >
< stringProp name = "ThreadGroup.duration" ></ stringProp >
< stringProp name = "ThreadGroup.delay" ></ stringProp >
</ ThreadGroup >
< hashTree >
< HTTPSamplerProxy guiclass = "HttpTestSampleGui" testclass = "HTTPSamplerProxy" testname = "click http request" enabled = "true" >
< elementProp name = "HTTPsampler.Arguments" elementType = "Arguments" guiclass = "HTTPArgumentsPanel" testclass = "Arguments" testname = "用户定义的变量" enabled = "true" >
< collectionProp name = "Arguments.arguments" />
</ elementProp >
< stringProp name = "HTTPSampler.domain" >192.168.1.123</ stringProp > <!-- 此处为被压测服务的host -->
< stringProp name = "HTTPSampler.port" >12345</ stringProp > <!-- 此处为被压测服务的port -->
< stringProp name = "HTTPSampler.protocol" >http</ stringProp >
< stringProp name = "HTTPSampler.contentEncoding" ></ stringProp >
< stringProp name = "HTTPSampler.path" >${__StringFromFile(/home/urls.log,,,)}</ stringProp > <!-- 发起的http请求uri从文件读取,文件路径 -->
< stringProp name = "HTTPSampler.method" >GET</ stringProp >
< boolProp name = "HTTPSampler.follow_redirects" >false</ boolProp >
< boolProp name = "HTTPSampler.auto_redirects" >false</ boolProp >
< boolProp name = "HTTPSampler.use_keepalive" >true</ boolProp >
< boolProp name = "HTTPSampler.DO_MULTIPART_POST" >false</ boolProp >
< stringProp name = "HTTPSampler.embedded_url_re" ></ stringProp >
< stringProp name = "HTTPSampler.implementation" >Java</ stringProp >
< stringProp name = "HTTPSampler.connect_timeout" ></ stringProp >
< stringProp name = "HTTPSampler.response_timeout" ></ stringProp >
</ HTTPSamplerProxy >
< hashTree />
< ResponseAssertion guiclass = "AssertionGui" testclass = "ResponseAssertion" testname = "Response Assertion" enabled = "true" >
< collectionProp name = "Asserion.test_strings" >
< stringProp name = "49586" >200</ stringProp > <!-- http请求的响应断言,要求返回的http code为200才判定为成功 -->
</ collectionProp >
< stringProp name = "Assertion.test_field" >Assertion.response_code</ stringProp >
< boolProp name = "Assertion.assume_success" >false</ boolProp >
< intProp name = "Assertion.test_type" >8</ intProp >
</ ResponseAssertion >
< hashTree />
< ConstantThroughputTimer guiclass = "TestBeanGUI" testclass = "ConstantThroughputTimer" testname = "Constant Throughput Timer" enabled = "true" >
< intProp name = "calcMode" >1</ intProp >
< doubleProp >
< name >throughput</ name >
< value >30000.0</ value > <!-- 1分钟内发起的请求数,换算为tps为500 -->
< savedValue >0.0</ savedValue >
</ doubleProp >
</ ConstantThroughputTimer >
< hashTree />
</ hashTree >
</ hashTree >
< WorkBench guiclass = "WorkBenchGui" testclass = "WorkBench" testname = "工作台" enabled = "true" >
< boolProp name = "WorkBench.save" >true</ boolProp >
</ WorkBench >
< hashTree />
</ hashTree >
</ jmeterTestPlan >
调优
参考:https://tech.meituan.com/2016/12/02/performance-tunning.html
布隆过滤器
本质上是基于hash的概率性数据结构,是一个很长的二进制数组,主要用于判断元素可能存在集合中,或者一定不在集合中。
原理
有一个长度为m的bit数组,初始每个bit都是0,另外还有k个hash函数;
当加入一个元素时,先调用k个hash函数得到k个结果,将这k个结果与bit数组长度取模得到k个数组下标,将这k个数组下标对应的值置为 1;
查询元素时,同样经过上面步骤的计算,最终得到k个数组下标,判断这些下标对应的值是否为1,如果为1,说明元素可能存在,如果有一个不为1,说明元素一定不存在,返回结果;
误判率计算
涉及到3个重要的参数:
布隆过滤器中,一个元素插入后,某个bit为0的概率是(1 − 1/m)^k
n元素插入后,某个bit为0的概率是(1 − 1/m)^(n*k)
false positive的概率是(1−(1−1/m)^n*k)^k
因为需要的是k
个不同的bit被设置成1,概率是大约是(1−e^(−k*n/m))^k
实现
可以基于redis实现,但这里只给出go版本的实现,支持并发安全
Copy const (
mod7 = 1 << 3 - 1
bitPerByte = 8
)
type Filter struct {
lock * sync . RWMutex
concurrent bool
// 长度之所以要取2的指数是因为要将取模操作优化成与操作, % 等于 &(2^n-1)
m uint64 // bit array of m bits, m will be ceiling to power of 2
n uint64 // number of inserted elements
log2m uint64 // log_2 of m
k uint64 // the number of hash function
keys [] byte // byte array to store hash value
}
func New (size uint64 , k uint64 , race bool ) * Filter {
log2 := uint64 (math. Ceil (math. Log2 ( float64 (size))))
filter := & Filter {
m: 1 << log2,
log2m: log2,
k: k,
keys: make ([] byte , 1 << log2),
concurrent: race,
}
if filter.concurrent {
filter.lock = & sync . RWMutex {}
}
return filter
}
func (f * Filter ) Add (data [] byte ) * Filter {
if f.concurrent {
f.lock. Lock ()
defer f.lock. Unlock ()
}
h := baseHash (data)
for i := uint64 ( 0 ); i < f.k; i ++ {
loc := location (h, i)
slot, mod := f. location (loc)
f.keys[slot] |= 1 << mod
}
f.n ++
return f
}
// location returns the bit position in byte array
// & (f.m - 1) is the quick way for mod operation
func (f * Filter ) location (h uint64 ) ( uint64 , uint64 ) {
slot := (h / bitPerByte) & (f.m - 1 )
mod := h & mod7
return slot, mod
}
// baseHash returns the murmur3 128-bit hash
func baseHash (data [] byte ) [] uint64 {
a1 := [] byte { 1 } // to grab another bit of data
hasher := murmur3. New128 ()
hasher. Write (data) // #nosec
v1, v2 := hasher. Sum128 ()
hasher. Write (a1) // #nosec
v3, v4 := hasher. Sum128 ()
return [] uint64 {
v1, v2, v3, v4,
}
}
淘汰算法
LRU
java实现,非线程安全
Copy // 注意每一次对节点有操作对需要同时操作 nodeMap和lruQueue
// LRU本质是利用 hashMap 和 双向链表 实现
public class LRUCache {
private Map < String , Node > nodeMap;
private DoubleLinkedList lruQueue;
private int size = 0 ;
public LRUCache () {
this ( 3 );
}
public LRUCache ( int size) {
this . size = size;
this . nodeMap = new HashMap <>();
this . lruQueue = new DoubleLinkedList() ;
}
public String get ( String key) {
Node n = nodeMap . get (key);
if (n == null ) {
return null ;
}
// 获取后直接放到到最前的位置
put( n . key , n . value ) ;
return n . value ;
}
public void put ( String key , String value) {
Node newNode = new Node(key , value) ;
// 如果包含,则放到最前
if ( nodeMap . containsKey (key)) {
lruQueue . remove ( nodeMap . get (key));
lruQueue . addFirst (newNode);
// 记得更新map
nodeMap . put (key , newNode);
} else {
// 如果满了,则移除最后一个
if (size <= lruQueue . size ()) {
Node last = lruQueue . removeLast ();
nodeMap . remove ( last . key );
}
nodeMap . put (key , newNode);
lruQueue . addFirst (newNode);
}
}
private class Node {
private String key;
private String value;
private Node pre;
private Node next;
public Node ( String key , String value) {
this . key = key;
this . value = value;
}
}
// 封装双向链表方法,构建时要注意前后节点指向和空指针问题
private class DoubleLinkedList {
private Node head;
private Node tail;
private int count = 0 ;
public void addFirst ( Node n) {
if (head == null ) {
tail = n;
}
count ++ ;
n . next = head;
if (head != null ) {
head . pre = n;
}
head = n;
}
public Node removeLast () {
if (count == 0 ) {
return null ;
}
Node result = tail;
if ( tail . pre != null ) {
tail . pre . next = null ;
} else {
head = null ;
tail = null ;
}
count -- ;
return result;
}
public void remove ( Node n) {
if (count == 0 ) {
return ;
}
count -- ;
if ( n . pre != null ) {
n . pre . next = n . next ;
} else {
head = n . next ;
}
if ( n . next != null ) {
n . next . pre = n . pre ;
} else {
tail = n . pre ;
}
}
public int size () {
return count;
}
}
}
golang实现,非并发安全,下面这种写法是平铺了双向链表,链表的长度通过map的长度计算得到
Copy type LRUCache struct {
nodeMap map [ string ] * Node
head * Node
tail * Node
cap int
}
type Node struct {
key string
val string
next * Node
pre * Node
}
func NewLRUCache (capacity int ) LRUCache {
return LRUCache {nodeMap: make ( map [ string ] * Node ), cap: capacity}
}
func (lru * LRUCache ) Get (key string ) string {
if existNode, exist := lru.nodeMap[key]; exist {
lru. remove (existNode)
lru. addFirst (existNode)
return existNode.val
}
return ""
}
func (lru * LRUCache ) Put (key string , val string ) {
if existNode, exist := lru.nodeMap[key]; exist {
existNode.val = val
lru. remove (existNode)
lru. addFirst (existNode)
return
} else {
newNode := & node {key: key, val: val}
lru.nodeMap[key] = newNode
lru. addFirst (newNode)
}
if len (lru.nodeMap) > lru.cap {
// 一定要先delete,如果先remove,会导致找不回tail的key进行delete,
delete (lru.nodeMap, lru.tail.key)
lru. remove (lru.tail)
}
}
func (lru * LRUCache ) addFirst (n * Node ) {
n.pre = nil
n.next = lru.head
if lru.head != nil {
lru.head.pre = n
}
lru.head = n
if lru.tail == nil {
lru.tail = n
lru.tail.next = nil
}
}
func (lru * LRUCache ) remove (n * Node ) {
// n的next和pre要置为null,防止内存泄漏
if n == lru.head {
lru.head = n.next
if n.next != nil {
n.next.pre = nil
}
n.next = nil
return
}
if n == lru.tail {
lru.tail = n.pre
n.pre.next = nil
n.pre = nil
return
}
n.pre.next = n.next
n.next.pre = n.pre
}
关于并发安全,最简单的实现就是在调用Get和Put方法时加读写锁,但是这种做法锁的粒度比较大,每次会锁住整个底层的双向链表和map,导致在高并发情况下吞吐量不高,优化的思路就是对map分片,通过分片上锁来减小锁的粒度,然后再双向链表节点的操作上进行优化。
Copy func New (capacity int ) LRUCache {
shards := make ( map [ string ] * LRUCacheShard , 256 )
for i := 0 ; i < 256 ; i ++ {
shards[fmt. Sprintf ( " %02x " , i)] = & LRUCacheShard {
Cap: capacity,
Keys: make ( map [ int ] * list . Element ),
List: list. New (),
}
}
return LRUCache {
shards: shards,
}
}
func (c * LRUCache ) Get (key int ) int {
shard := c. GetShard (key)
shard. RLock ()
defer shard. RUnlock ()
……
}
func (c * LRUCache ) Put (key int , value int ) {
shard := c. GetShard (key)
shard. Lock ()
defer shard. Unlock ()
……
}
func (c * LRUCache ) GetShard (key int ) (shard * LRUCacheShard ) {
hasher := sha1. New ()
hasher. Write ([] byte (key))
shardKey := fmt. Sprintf ( " %x " , hasher. Sum ( nil ))[ 0 : 2 ]
return c.shards[shardKey]
}
其中,在Get方法中,如果存在,还需要修改key所在节点的位置,直接调put即可,当然这种方式的粒度还是比较大,再次优化的思路是对map的操作还是得上锁,但对双向链表的操作无需上锁,双向链表移动节点和删除节点可以同时操作,可以通过两个channel实现,参考
LFU
java实现,非线程安全
Copy /**
* 频次最少使用
* 设定容量,每次get key会修改使用次数和使用时间,当满容量时,移除次数最少的那个
* 如果有多个key的使用次数一样,则移除使用时间最旧的那个
*/
public class LFUCache {
Map < String , String > keyValMap;
Map < String , Integer > key2FreqMap; // key和使用频率的映射
// 用LinkHashSet来模拟使用时间,使用LinkHashSet还有一个目的是便于根据key进行删除
Map < Integer , LinkedHashSet < String >> freq2KeysMap; // 使用频率和key的映射
int minFreq;
int cap;
public LFUCache ( int cap) {
keyValMap = new HashMap <>();
key2FreqMap = new HashMap <>();
freq2KeysMap = new HashMap <>();
this . cap = cap;
this . minFreq = 0 ;
}
public String get ( String key) {
if ( ! keyValMap . containsKey (key)) {
return "" ;
}
increaseFreq(key) ;
return keyValMap . get (key);
}
public void put ( String key , String val) {
if ( this . cap <= 0 ) {
return ;
}
if ( keyValMap . containsKey (key)) {
keyValMap . put (key , val);
increaseFreq(key) ;
return ;
}
if ( this . cap <= keyValMap . size ()) {
removeMinFreqKey() ;
}
keyValMap . put (key , val);
key2FreqMap . put (key , 1 );
freq2KeysMap . putIfAbsent ( 1 , new LinkedHashSet <>());
freq2KeysMap . get ( 1 ) . add (key);
this . minFreq = 1 ;
}
void increaseFreq ( String key) {
int freq = key2FreqMap . get (key);
key2FreqMap . put (key , freq + 1 );
freq2KeysMap . get (freq) . remove (key);
freq2KeysMap . putIfAbsent (freq + 1 , new LinkedHashSet <>());
freq2KeysMap . get (freq + 1 ) . add (key);
if ( freq2KeysMap . get (freq) . isEmpty ()) {
freq2KeysMap . remove (freq);
if (freq == this . minFreq ) {
this . minFreq ++ ;
}
}
}
void removeMinFreqKey () {
LinkedHashSet < String > keys = freq2KeysMap . get ( this . minFreq );
String delKey = keys . iterator () . next ();
keys . remove (delKey);
if ( keys . isEmpty ()) {
freq2KeysMap . remove ( this . minFreq );
// 这里无需更新 minFreq 的值,因为该方法是在插入新key时使用,此时minFreq一定是1
}
keyValMap . remove (delKey);
key2FreqMap . remove (delKey);
}
}
golang实现,非并发安全,利用 优先级队列(最小堆) + map实现,使用的是go自带的heap数据结构,通过Item数组结构实现
Copy import "container/heap"
type LFUNode struct {
key string
val string
freq int // 用于优先级,key的使用频率
count int // 用于当freq相同时的比较淘汰,总的获取次数,类似时间戳的概念
index int // 最小堆中元素的下标,用于重建最小堆.
}
type PriorityQueue [] * LFUNode
func (pq PriorityQueue ) Len () int { return len (pq) }
func (pq PriorityQueue ) Less (i, j int ) bool {
// 优先根据使用频率排列,相等时才使用count,从小到大排序
if pq[i].freq == pq[j].freq {
return pq[i].count < pq[j].count
}
return pq[i].freq < pq[j].freq
}
func (pq PriorityQueue ) Swap (i, j int ) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq * PriorityQueue ) Push (x interface {}) {
n := len ( * pq)
node := x.( * LFUNode )
node.index = n
* pq = append ( * pq, node)
}
func (pq * PriorityQueue ) Pop () interface {} {
old := * pq
n := len (old)
node := old[n - 1 ]
old[n - 1 ] = nil // 防止内存泄露
node.index = - 1 // pop时重置下标保证安全
* pq = old[ 0 : n - 1 ]
return node
}
// 更新最小堆里的元素
func (pq * PriorityQueue ) update (node * LFUNode , value string , frequency int , count int ) {
node.val = value
node.count = count
node.freq = frequency
heap. Fix (pq, node.index)
}
// ==========================================
type LFUCache struct {
cap int
pq PriorityQueue
nodeMap map [ string ] * LFUNode
counter int
}
func NewLFUCache (capacity int ) LFUCache {
return LFUCache {
pq: PriorityQueue {},
nodeMap: make ( map [ string ] * LFUNode , capacity),
cap: capacity,
}
}
func (lfu * LFUCache ) Get (key string ) string {
if lfu.cap == 0 {
return ""
}
if node, ok := lfu.nodeMap[key]; ok {
lfu.counter ++
lfu.pq. update (node, node.val, node.freq + 1 , lfu.counter)
return node.val
}
return ""
}
func (lfu * LFUCache ) Put (key string , value string ) {
if lfu.cap == 0 {
return
}
lfu.counter ++
// 如果存在,增加 frequency,再调整堆
if node, ok := lfu.nodeMap[key]; ok {
lfu.pq. update (node, value, node.freq + 1 , lfu.counter)
return
}
// 如果不存在且缓存满了,需要删除。在 hashmap 和 pq 中删除。
if len (lfu.pq) == lfu.cap {
node := heap. Pop ( & lfu.pq).( * LFUNode )
delete (lfu.nodeMap, node.key)
}
// 新建结点,在 hashmap 和 pq 中添加。
node := & LFUNode {
val: value,
key: key,
count: lfu.counter,
}
heap. Push ( & lfu.pq, node)
lfu.nodeMap[key] = node
}
另一个版本实现,利用go本身提供的双向链表,但是由于go没泛型,在类型推断方面导致整体性能不会很高
Copy type LFUCache2 struct {
nodeMap map [ string ] * list . Element
freq2NodeList map [ int ] * list . List
cap int
min int
}
type node struct {
key string
value string
freq int
}
func NewLFUCache2 (capacity int ) LFUCache2 {
return LFUCache2 {nodeMap: make ( map [ string ] * list . Element ),
freq2NodeList: make ( map [ int ] * list . List ),
cap: capacity,
min: 0 ,
}
}
func (lfu * LFUCache2 ) Get (key string ) string {
value, ok := lfu.nodeMap[key]
if ! ok {
return ""
}
currentNode := value.Value.( * node )
lfu.freq2NodeList[currentNode.freq]. Remove (value)
currentNode.freq ++
if _, ok := lfu.freq2NodeList[currentNode.freq]; ! ok {
lfu.freq2NodeList[currentNode.freq] = list. New ()
}
newList := lfu.freq2NodeList[currentNode.freq]
newNode := newList. PushFront (currentNode)
lfu.nodeMap[key] = newNode
if currentNode.freq - 1 == lfu.min && lfu.freq2NodeList[currentNode.freq - 1 ]. Len () == 0 {
lfu.min ++
}
return currentNode.value
}
func (lfu * LFUCache2 ) Put (key string , value string ) {
if lfu.cap == 0 {
return
}
// 如果存在,更新访问次数
if currentValue, ok := lfu.nodeMap[key]; ok {
currentNode := currentValue.Value.( * node )
currentNode.value = value
lfu. Get (key)
return
}
// 如果不存在且缓存满了,需要删除
if lfu.cap == len (lfu.nodeMap) {
currentList := lfu.freq2NodeList[lfu.min]
backNode := currentList. Back ()
delete (lfu.nodeMap, backNode.Value.( * node ).key)
currentList. Remove (backNode)
}
// 新建结点,插入到 2 个 map 中
lfu.min = 1
currentNode := & node {
key: key,
value: value,
freq: 1 ,
}
if _, ok := lfu.freq2NodeList[ 1 ]; ! ok {
lfu.freq2NodeList[ 1 ] = list. New ()
}
newList := lfu.freq2NodeList[ 1 ]
newNode := newList. PushFront (currentNode)
lfu.nodeMap[key] = newNode
}
参考
布隆过滤器原理及golang实现
LRU / LFU 的青铜与王者