Skip to content

jwells131313/danaides

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

29 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

danaides GoDoc Go Report Card

Leaky Bucket Rate Limiter algorithm for streaming or chunked use cases

Streaming Leaky Bucket

This implementation of the leaky bucket algorithm is meant for a streaming use case.

The Limiter is the bucket and it has a desired data flow rate. Chunks of data can be added to the bucket by number and the Limiter will stream element data back to the user by telling them how many elements can be processed at this time or telling the user how much time they need to wait before asking the limiter for more data.

This rate limiter must be called at least once a second to be able to approach the desired rate.

Basic usage:

limiter := New(100) // 100 per second


for {
    // Get data from the source
    limiter.Add(200)

    for {
        took, delay := limiter.Take()
        if took == 0 && delay == 0 {
            // The limiter is empty, break out and get more data
            break
        }

        if took == 0 {
            time.Sleep(delay)
        } else {
            // Give took elements to your sink
        }
    }
}

Blocking Leaky Bucket

This implementation of the leaky bucket algorithm is meant for a chunked or block-based input stream. In a block-based stream the Take must return the exact numbers input into the Add. So if you Add the values 20, 30 and 15 the Take must return 20, 30 and 15 in order. This is to handle underlying protocols that require that full data packets be sent rather than classical streams.

The only difference from above is the use of the TakeByBlock Option when creating the limiter.

limiter := New(100, TakeByBlock()) // 100 per second using blocking algorithm


for {
    // Get data from the source
    limiter.Add(200)
    limiter.Add(10)
    limiter.Add(50)

    for {
        took, delay := limiter.Take()
        if took == 0 && delay == 0 {
            // The limiter is empty, break out and get more data
            break
        }

        if took == 0 {
            time.Sleep(delay)
        } else {
            // Give took elements to your sink
            // In this case the values 200, 10 and 50 will be returned
            // with possible sleeps in the middle in order to achieve
            // the required limit
        }
    }
}

Errata

Daniades