本文共 2204 字,大约阅读时间需要 7 分钟。
对于大型缓存系统,存在着以下两种情况会是系统无法做到高可用。第一种情况,对于新系统上线,redis中可能没有缓存数据,此时如果大量请求涌入,则会压垮DB是系统无法正常使用;第二种情况,可能系统运行过程中redis的数据全部丢失了,即使开启了持久化也无法恢复,那么也会出现上述描述的异常情况。因此,可以采用缓存预热来解决以上问题
nginx+lua将访问流量上报到kafka中
结合之前的业务代码,在这里通过Openresty安装组件来使lua整合kafka,让通过nginx的流量发送至kafka中,具体核心实现:cd /usr/localwget https://github.com/doujiang24/lua-resty-kafka/archive/master.zipyum install -y unzipunzip lua-resty-kafka-master.zipcp -rf /usr/local/lua-resty-kafka-master/lib/resty /usr/hello/lualibnginx -s reload//在lua脚本中添加:local cjson = require("cjson") local producer = require("resty.kafka.producer") local broker_list = { { host = "192.168.1.107", port = 9092 }, { host = "192.168.1.104", port = 9092 }, { host = "192.168.1.105", port = 9092 }}local log_json = {} log_json["headers"] = ngx.req.get_headers() log_json["uri_args"] = ngx.req.get_uri_args() log_json["body"] = ngx.req.read_body() log_json["http_version"] = ngx.req.http_version() log_json["method"] =ngx.req.get_method() log_json["raw_reader"] = ngx.req.raw_header() log_json["body_data"] = ngx.req.get_body_data() local message = cjson.encode(log_json); local productId = ngx.req.get_uri_args()["productId"]local async_producer = producer:new(broker_list, { producer_type = "async" }) local ok, err = async_producer:send("access-log", productId, message) if not ok then ngx.log(ngx.ERR, "kafka send err:", err) return end
需要添加被发送的topic,然后测试是否成功发送:
bin/kafka-topics.sh --zookeeper 192.168.1.107:2181,192.107.1.104:2181,192.168.1.105:2181 --topic access-log --replication-factor 1 --partitions 1 --createbin/kafka-console-consumer.sh --zookeeper 192.168.1.107:2181,192.168.1.104:2181,192.168.1.105:2181 --topic access-log --from-beginning
storm从kafka中消费数据,实时统计出每个商品的访问次数,访问次数基于LRU内存数据结构(LRUMap)的存储方案
同时每个storm task启动的时候,基于zk分布式锁(因为多个task可能并发执行),将自己所在的taskid写入zk同一个节点中,最终多个task保存的结果应该是111,222,222这种形式
每个storm task负责完成部分热数据统计,每隔一段时间就遍历这个LRUMap,去维护一个前n的商品的id相关的list,并且将这个list写入到基于上述该task对应taskid的zk中,类似于111-[1,2,3]…
在缓存服务中,添加缓存预热的逻辑代码。首先获取zk中保存的storm对应的taskid分布式锁(上述第3步中的taskid),如果没有获取到说明已经有其他缓存服务正在预热处理该taskid(前提是部署多个缓存服务),则直接抛错获取下一个taskid对应的分布式锁,如果获取到锁,则再获取上述taskid对应的分布式锁(上述第3步中的taskid),来处理列表中的每个商品id对应的缓存至redis和本地Ehcache中,处理完之后修改该分布式锁状态来标识已经预热过。最终释放双重分布式锁
转载地址:http://uhrgi.baihongyu.com/