Files
limit/limit.go

279 lines
7.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Copyright 2025 肖其顿
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package limit 提供了一个高性能、并发安全的动态速率限制器。
// 它使用分片锁来减少高并发下的锁竞争,并能自动清理长期未使用的限制器。
package limit
import (
"hash"
"hash/fnv"
"sync"
"sync/atomic"
"time"
"golang.org/x/time/rate"
)
// defaultShardCount 是默认的分片数量设为2的幂可以优化哈希计算。
const defaultShardCount = 32
// Config 定义了 Limiter 的可配置项。
type Config struct {
// ShardCount 指定分片数量必须是2的幂。如果为0或无效值则使用默认值32。
ShardCount int
// GCInterval 指定GC周期即检查并清理过期限制器的间隔。如果为0则使用默认值10分钟。
GCInterval time.Duration
// Expiration 指定过期时间即限制器在最后一次使用后能存活多久。如果为0则使用默认值30分钟。
Expiration time.Duration
}
// Limiter 是一个高性能、分片实现的动态速率限制器。
// 它的实例在并发使用时是安全的。
type Limiter struct {
// 存储所有分片
shards []*shard
// 配置信息
config Config
// 标记限制器是否已停止
stopped atomic.Bool
// 确保Stop方法只执行一次
stopOnce sync.Once
}
// New 使用默认配置创建一个新的 Limiter 实例。
func New() *Limiter {
return NewWithConfig(Config{})
}
// NewWithConfig 根据提供的配置创建一个新的 Limiter 实例。
func NewWithConfig(config Config) *Limiter {
// 如果未设置,则使用默认值
if config.ShardCount == 0 {
config.ShardCount = defaultShardCount
}
if config.GCInterval == 0 {
config.GCInterval = 10 * time.Minute
}
if config.Expiration == 0 {
config.Expiration = 30 * time.Minute
}
// 确保分片数量是2的幂以便进行高效的位运算
if config.ShardCount <= 0 || (config.ShardCount&(config.ShardCount-1)) != 0 {
config.ShardCount = defaultShardCount
}
l := &Limiter{
shards: make([]*shard, config.ShardCount),
config: config,
}
// 初始化所有分片
for i := 0; i < config.ShardCount; i++ {
l.shards[i] = newShard(config.GCInterval, config.Expiration)
}
return l
}
// Get 获取或创建一个与指定键关联的速率限制器。
// 如果限制器已存在,它会根据传入的 r (速率) 和 b (并发数) 更新其配置。
// 如果 Limiter 实例已被 Stop 方法关闭,此方法将返回 nil。
func (l *Limiter) Get(k string, r rate.Limit, b int) *rate.Limiter {
// 快速路径检查,避免在已停止时进行哈希和查找
if l.stopped.Load() {
return nil
}
// 定位到具体分片进行操作
return l.getShard(k).get(k, r, b)
}
// Del 手动移除一个与指定键关联的速率限制器。
// 如果 Limiter 实例已被 Stop 方法关闭,此方法不执行任何操作。
func (l *Limiter) Del(k string) {
// 快速路径检查
if l.stopped.Load() {
return
}
// 定位到具体分片进行操作
l.getShard(k).del(k)
}
// Stop 停止 Limiter 的所有后台清理任务,并释放相关资源。
// 此方法对于并发调用是安全的,并且可以被多次调用。
func (l *Limiter) Stop() {
l.stopOnce.Do(func() {
l.stopped.Store(true)
for _, s := range l.shards {
s.stop()
}
})
}
// getShard 根据key的哈希值获取对应的分片。
func (l *Limiter) getShard(key string) *shard {
hasher := fnvHasherPool.Get().(hash.Hash32)
defer func() {
hasher.Reset()
fnvHasherPool.Put(hasher)
}()
_, _ = hasher.Write([]byte(key)) // FNV-1a never returns an error.
// 使用位运算代替取模,提高效率
return l.shards[hasher.Sum32()&(uint32(l.config.ShardCount)-1)]
}
// shard 代表 Limiter 的一个分片,它包含独立的锁和数据,以减少全局锁竞争。
type shard struct {
mutex sync.Mutex
stopCh chan struct{}
limiter map[string]*session
stopOnce sync.Once
waitGroup sync.WaitGroup
}
// newShard 创建一个新的分片实例并启动其gc任务。
func newShard(gcInterval, expiration time.Duration) *shard {
s := &shard{
// mutex 会被自动初始化为其零值(未锁定状态)
stopCh: make(chan struct{}),
limiter: make(map[string]*session),
}
s.waitGroup.Add(1)
go s.gc(gcInterval, expiration)
return s
}
// gc 定期清理分片中过期的限制器。
func (s *shard) gc(interval, expiration time.Duration) {
defer s.waitGroup.Done()
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
// 优先检查停止信号,确保能快速响应
select {
case <-s.stopCh:
return
default:
}
select {
case <-ticker.C:
s.mutex.Lock()
// 再次检查分片是否已停止,防止在等待锁期间被停止
if s.limiter == nil {
s.mutex.Unlock()
return
}
for k, v := range s.limiter {
// 清理过期的限制器
if time.Since(v.lastGet) > expiration {
// 将 session 对象放回池中前,重置其状态
v.limiter = nil
v.lastGet = time.Time{}
sessionPool.Put(v)
delete(s.limiter, k)
}
}
s.mutex.Unlock()
case <-s.stopCh:
// 收到停止信号退出goroutine
return
}
}
}
// get 获取或创建一个新的速率限制器,如果已存在则更新其配置。
func (s *shard) get(k string, r rate.Limit, b int) *rate.Limiter {
s.mutex.Lock()
defer s.mutex.Unlock()
// 检查分片是否已停止
if s.limiter == nil {
return nil
}
sess, ok := s.limiter[k]
if !ok {
// 从池中获取 session 对象
sess = sessionPool.Get().(*session)
sess.limiter = rate.NewLimiter(r, b)
s.limiter[k] = sess
} else {
// 如果已存在,则更新其速率和并发数
sess.limiter.SetLimit(r)
sess.limiter.SetBurst(b)
}
sess.lastGet = time.Now()
return sess.limiter
}
// del 从分片中移除一个键的速率限制器。
func (s *shard) del(k string) {
s.mutex.Lock()
defer s.mutex.Unlock()
// 检查分片是否已停止
if s.limiter == nil {
return
}
if sess, ok := s.limiter[k]; ok {
// 将 session 对象放回池中前,重置其状态
sess.limiter = nil
sess.lastGet = time.Time{}
sessionPool.Put(sess)
delete(s.limiter, k)
}
}
// stop 停止分片的gc任务并同步等待其完成后再清理资源。
func (s *shard) stop() {
// 使用 sync.Once 确保 channel 只被关闭一次,彻底避免并发风险
s.stopOnce.Do(func() {
close(s.stopCh)
})
// 等待 gc goroutine 完全退出
s.waitGroup.Wait()
// 锁定并进行最终的资源清理
// 因为 gc 已经退出,所以此时只有 Get/Del 会竞争锁
s.mutex.Lock()
defer s.mutex.Unlock()
// 检查是否已被清理,防止重复操作
if s.limiter == nil {
return
}
// 将所有 session 对象放回对象池
for _, sess := range s.limiter {
sess.limiter = nil
sess.lastGet = time.Time{}
sessionPool.Put(sess)
}
// 清理map释放内存并作为停止标记
s.limiter = nil
}
// session 存储每个键的速率限制器实例和最后访问时间。
type session struct {
// 最后一次访问时间
lastGet time.Time
// 速率限制器
limiter *rate.Limiter
}
// sessionPool 使用 sync.Pool 来复用 session 对象,以减少 GC 压力。
var sessionPool = sync.Pool{
New: func() interface{} {
return new(session)
},
}
// fnvHasherPool 使用 sync.Pool 来复用 FNV-1a 哈希对象,以减少高并发下的内存分配。
var fnvHasherPool = sync.Pool{
New: func() interface{} {
return fnv.New32a()
},
}