限流器的四种实现方法

knoci 发布于 2025-02-07 89 次阅读


关于限流和限流器

​ 限流(Rate Limiting)是一种控制资源使用率的机制,通常用于防止系统过载和滥用。

​ 限流器(Rate Limiter)是一种实现限流的组件或服务。

以下是限流器的一些主要用途和原因: 展开 / 收起
81ea9992faabfd618afa1453cb28d4a


固定窗口限流器

这种限流器一时间为周期,用一个计数器配合定时器,限制周期内访问的次数。

4f56e37260c0de9cc877e8d17bdb590

type FixedWindowRateLimiter struct {
    windowSize time.Duration
    limit      uint64
    counter    uint64
    ticker     *time.Ticker
    stop       chan struct{}
    status     bool
}

func NewFixedWindowRateLimiter(windowSize time.Duration, limit uint64) *FixedWindowRateLimiter {
    now := uint64(time.Now().UnixNano())
    return &FixedWindowRateLimiter{
       windowSize: windowSize,
       limit:      limit,
       start:      now,
       stop:       make(chan struct{}),
       status:     false,
    }
}
  • windowSize限流器周期
  • limit最大访问次数限制
  • counter计数器
  • ticker计时器
  • stop关闭信号管道
  • status限流器状态

启动和关闭:

​ Start启动定时器用go协程处理周期更新和收到关闭信号,Close向关闭信号管道发送关闭信号。

func (fwrl *FixedWindowRateLimiter) Start() {
    fwrl.ticker = time.NewTicker(fwrl.windowSize)
    fwrl.status = true
    go func() {
        for {
            select {
            case <-fwrl.ticker.C:
                atomic.StoreUint64(&fwrl.counter, 0)
            case <-fwrl.stop:
                fwrl.ticker.Stop()
                fwrl.status = false
                return
            }
        }
    }()
}

func (fwrl *FixedWindowRateLimiter) Close() {
    close(fwrl.stop)
}

实现的方法:

// 请求一次访问
func (fwrl *FixedWindowRateLimiter) Do() bool {
    if !fwrl.status {
        return false
    }
    currentCounter := atomic.LoadUint64(&fwrl.counter)
    if currentCounter >= fwrl.limit {
        return false
    }
    atomic.AddUint64(&fwrl.counter, 1)
    return true
}

// 剩余可访问次数
func (fwrl *FixedWindowRateLimiter) Check() uint64 {
    if !fwrl.status {
        return 0
    }
    return fwrl.limit - atomic.LoadUint64(&fwrl.counter)
}

// 更新并重启限流器
func (fwrl *FixedWindowRateLimiter) Update(windowSize time.Duration, limit uint64) {
    fwrl.windowSize = windowSize
    fwrl.limit = limit
    if fwrl.status {
        fwrl.ticker.Stop()
        fwrl.Start()
    }
}

关于原子操作:

​ 在代码中使用了atomic包的原子操作,目的是为了保证高并发读写下限流器的数据准确性。atomic包的实现在sre/internal/runtime/atomic,本质上使用汇编语言保证了操作的原子性。

​ 例如swapUint64函数中,调用.Xchg64的代码如下:

TEXT ·Xchg64(SB), NOSPLIT, $0-24
    MOVD    ptr+0(FP), R0
    MOVD    new+8(FP), R1
#ifndef GOARM64_LSE
    MOVBU   internal∕cpu·ARM64+const_offsetARM64HasATOMICS(SB), R4
    CBZ     R4, load_store_loop
#endif
    SWPALD  R1, (R0), R2
    MOVD    R2, ret+16(FP)
    RET
#ifndef GOARM64_LSE
load_store_loop:
    LDAXR   (R0), R2
    STLXR   R1, (R0), R3
    CBNZ    R3, load_store_loop
    MOVD    R2, ret+16(FP)
    RET
#endif

计数器限流的严重问题:
这个算法虽然简单,但是有一个十分致命的问题,那就是临界问题,我们看下图:

img

​ 从上图中我们可以看到,假设有一个恶意用户,他在0:59时,瞬间发送了100个请求,并且1:00又瞬间发送了100个请求,那么其实这个用户在 1秒里面,瞬间发送了200个请求。

​ 我们刚才规定的是1分钟最多100个请求(规划的吞吐量),也就是每秒钟最多1.7个请求,用户通过在时间窗口的重置节点处突发请求, 可以瞬间超过我们的速率限制。

​ 用户有可能通过算法的这个漏洞,瞬间压垮我们的应用。


滑动窗口限流器

跟固定窗口限流器差不多,只不过粒度更小了。

ed2ba998132a08adbb203869895441c

type SlidingWindowRateLimiter struct {
    windowSize time.Duration
    subWindow  time.Duration
    limit      uint64
    counters   []uint64
    start      uint64
    ticker     *time.Ticker
    subticker  *time.Ticker
    stop       chan struct{}
    status     bool
}

func NewSlidingWindowRateLimiter(windowSize, subWindow time.Duration, limit uint64) (*SlidingWindowRateLimiter, error) {
    if windowSize <= subWindow {
        return nil, errors.New("wrong size")
    }
    numSubWindows := int(windowSize / subWindow)
    return &SlidingWindowRateLimiter{
        windowSize: windowSize,
        subWindow:  subWindow,
        limit:      limit,
        counters:   make([]uint64, numSubWindows),
        start:      uint64(time.Now().UnixNano()),
        stop:       make(chan struct{}),
    }, nil
}
  • windowSize限流器周期
  • subWindow滑动窗口周期
  • limit最大访问次数限制
  • counters滑动窗口计数器切片
  • start周期开启时间
  • ticker周期计时器
  • subticker窗口计时器
  • stop关闭信号管道
  • status限流器状态

启动和关闭:

​ 滑窗是小粒度的固定窗口,计算index并且重置计数

func (swrl *SlidingWindowRateLimiter) Start() {
    swrl.ticker = time.NewTicker(swrl.windowSize)
    swrl.subticker = time.NewTicker(swrl.subWindow)
    swrl.status = true
    go func() {
        for {
            select {
            case <-swrl.subticker.C:
                now := uint64(time.Now().UnixNano())
                index := int((now - swrl.start) / uint64(swrl.subWindow))
                atomic.StoreUint64(&swrl.counters[index%len(swrl.counters)], 0)
            case <-swrl.ticker.C:
                swrl.start = uint64(time.Now().UnixNano())
            case <-swrl.stop:
                swrl.ticker.Stop()
                swrl.status = false
                return
            }
        }
    }()
}

func (swrl *SlidingWindowRateLimiter) Close() {
    close(swrl.stop)
}

其他方法:

​ 这里Do和Check都是要计算周期内每个滑窗总和

func (swrl *SlidingWindowRateLimiter) Do() bool {
    if !fwrl.status {
        return false
    }
    now := uint64(time.Now().UnixNano())
    startIndex := int((now - swrl.start) / uint64(swrl.subWindow))
    endIndex := int((now - swrl.start + uint64(swrl.windowSize)) / uint64(swrl.subWindow))
    // 计算周期内每个滑窗总和是否大于limit
    sum := uint64(0)
    for i := startIndex; i <= endIndex; i++ {
        index := i
        if index >= len(swrl.counters) {
            index -= len(swrl.counters)
        }
        sum += atomic.LoadUint64(&swrl.counters[index])
    }
    if sum >= swrl.limit {
        return false
    }
    // 增加当前子窗口的计数
    currentIndex := int((now - swrl.start) / uint64(swrl.subWindow))
    atomic.AddUint64(&swrl.counters[currentIndex], 1)
    return true
}

func (swrl *SlidingWindowRateLimiter) Check() uint64 {
    if !fwrl.status {
        return 0
    }
    now := uint64(time.Now().UnixNano())
    startIndex := int((now - swrl.start) / uint64(swrl.subWindow))
    endIndex := int((now - swrl.start + uint64(swrl.windowSize)) / uint64(swrl.subWindow))
    sum := uint64(0)
    for i := startIndex; i <= endIndex; i++ {
        index := i
        if index >= len(swrl.counters) {
            index -= len(swrl.counters)
        }
        sum += atomic.LoadUint64(&swrl.counters[index])
    }
    return swrl.limit - sum
}

func (swrl *SlidingWindowRateLimiter) Update(windowSize time.Duration, subWindow time.Duration, limit uint64) {
    swrl.windowSize = windowSize
    swrl.limit = limit
    swrl.subWindow = subWindow
    if swrl.status {
        swrl.ticker.Stop()
        numSubWindows := int(windowSize / swrl.subWindow)
        swrl.counters = make([]uint64, numSubWindows)
        swrl.Start()
    }
}

漏桶限流器

漏桶,可以想象成一个木桶下面钻了一个小孔,把水倒进来就是请求访问,水漏出去就是允许请求。

理论上小孔流出水的速率是不变的,也就是允许请求的速率是不变的,这就是漏桶的特点,你可以随便倒水,但是流出水速率恒定,实现按了平稳的访问速率。

a93dc412009e69880b771bf38aac777

type LeakyBucket struct {
    capacity  uint
    remaining uint
    ticker    *time.Ticker
    reset     time.Time
    rate      time.Duration
    mutex     sync.Mutex
    stop       chan struct{}
    status    bool
}

func NewLeakyBucket(capacity uint, rate time.Duration) (*LeakyBucket, error) {
    return &LeakyBucket{
        capacity:   capacity,
        remaining:  capacity,
        rate:       rate,
        status:     false,
    }
}
  • capacity桶的容量
  • remaining剩余的容量
  • ticker计时器
  • reset重置的时间
  • rate漏桶的速率
  • mutes互斥锁
  • stop关闭信号管道
  • status桶的状态

启动和关闭:

func (lb *LeakyBucket) Start() {
    lb.ticker = time.NewTicker(swrl.rate)
    lb.status = true
    lb.reset = time.Now().Add(rate)
    go func() {
        for {
            select {
            case <-lb.ticker.C:
                lb.mutex.lock()
                if lb.remaining < lb.capacity {
                    lb.remaining += 1
                }
                lb.mutex.unlock()
            case <-lb.stop:
                lb.ticker.Stop()
                lb.status = false
                return
            }
        }
    }()
}

func (lb *LeakyBucket) Close() {
    close(lb.stop)
}

其他方法:

// 返回容量
func (lb *LeakyBucket) Capacity() uint {
    return lb.capacity
}

// 桶里剩余容量
func (lb *LeakyBucket) Remaining() uint {
    return lb.remaining
}

// 重置桶
func (lb *LeakyBucket) Reset() time.Time {
    lb.remaining = lb.capacity
    // 更新reset时间为一个rate后,这样就不用加锁了
    lb.reset = time.Now().Add(rate)
    return lb.reset
}

// 往桶里加请求
func (lb *LeakyBucket) Add(amount uint) (bool, error) {
    lb.mutex.Lock()
    defer lb.mutex.Unlock()
    // 时间在重置前那就重置后再取
    if time.Now().After(lb.reset) {
        lb.reset = time.Now().Add(lb.rate)
        lb.remaining = lb.capacity
    }
    if amount > lb.remaining {
        return false, errors.New("too many")
    }
    lb.remaining -= amount
    return true, nil
}

漏桶的问题:
漏桶的出水速度固定,也就是请求放行速度是固定的,因此漏桶不能有效应对突发流量,但是能起到平滑突发流量(整流)的作用。

​ 漏桶出口的速度固定,不能灵活的应对后端能力提升,比如,通过动态扩容,后端流量从1000QPS提升到1WQPS,漏桶没有办法。


令牌桶限流器

令牌桶算法以一个设定的速率产生令牌并放入令牌桶,每次用户请求都得申请令牌,如果令牌不足,则拒绝请求。
​ 令牌桶算法中新请求到来时会从桶里拿走一个令牌,如果桶内没有令牌可拿,就拒绝服务。当然,令牌的数量也是有上限的。令牌的数量与时间和发放速率强相关,时间流逝的时间越长,会不断往桶里加入越多的令牌,如果令牌发放的速度比申请速度快,令牌桶会放满令牌,直到令牌占满整个令牌桶。

a93dc412009e69880b771bf38aac777

type TookenBucket struct {
    capacity  uint
    tooken    uint
    ticker    *time.Ticker
    reset     time.Time
    rate      time.Duration
    mutex     sync.Mutex
    stop       chan struct{}
    status    bool
}

func NewTookenBucket(capacity uint, rate time.Duration) (*TookenBucket, error) {
    return &TookenBucket{
        capacity:   capacity,
        tooken:     0,
        rate:       rate,
        status:     false,
    }
}
  • capacity桶的容量
  • tooken剩余的tooken
  • ticker计时器
  • reset重置的时间
  • rate漏桶的速率
  • mutes互斥锁
  • stop关闭信号管道
  • status桶的状态

启动和关闭:

func (lb *TookenBucket) Start() {
    tb.ticker = time.NewTicker(swrl.rate)
    tb.status = true
    tb.reset = time.Now().Add(rate)
    go func() {
        for {
            select {
            case <-tb.ticker.C:
                tb.mutex.lock()
                if tb.tooken < lb.capacity {
                    tb.tooken += 1
                }
                tb.mutex.unlock()
            case <-tb.stop:
                tb.ticker.Stop()
                tb.status = false
                return
            }
        }
    }()
}

func (tb *TookenBucket) Close() {
    close(tb.stop)
}

其他方法:

// 返回容量
func (tb *TookenBucket) Capacity() uint {
    return tb.capacity
}

// 桶里tooken
func (tb *TookenBucket) Cheak() uint {
    return tb.tooken
}

// 重置桶
func (tb *TookenBucket) Reset() time.Time {
    tb.tooken = 0
    // 更新reset时间为一个rate后,这样就不用加锁了
    tb.reset = time.Now().Add(rate)
    return tb.reset
}

// 往桶里取令牌
func (tb *TookenBucket) Took(amount uint) (bool, error) {
    tb.mutex.Lock()
    defer tb.mutex.Unlock()
    // 时间在重置前那就重置后再取
    if time.Now().After(tb.reset) {
        tb.reset = time.Now().Add(tb.rate)
        tb.token = 0
        return false, nil
    }
    if amount > tb.tooken {
        return false, errors.New("too many")
    }
    tb.tooken -= amount
    return true, nil
}

总结

​ 以上就是四种限流方法实现,一般在高并发实战中,采用漏桶+令牌桶。