linstrom/storage/cache/coderPools.go

165 lines
4.6 KiB
Go
Raw Permalink Normal View History

package cache
import (
"io"
"sync"
)
type EncoderPool struct {
encoders []*gobEncoder
lock sync.RWMutex
}
func NewEncoderPool() *EncoderPool {
return &EncoderPool{
encoders: []*gobEncoder{},
lock: sync.RWMutex{},
}
}
// Encode some value with gob
func (p *EncoderPool) Encode(raw any) ([]byte, error) {
var encoder *gobEncoder
// First try to find an available encoder
// Read only lock should be fine here since locks are atomic i
//and thus no two goroutines should be able to lock the same encoder at the same time
// One of those attempts is going to fail and continue looking for another available one
p.lock.RLock()
for _, v := range p.encoders {
// If we can lock one, it's available
if v.TryLock() {
// Keep the reference, then break
encoder = v
break
}
}
p.lock.RUnlock()
// Didn't find an available encoder, create new one and add to pool
if encoder == nil {
encoder = p.expand()
}
// Ensure we free the encoder at the end
defer encoder.Unlock()
// Clear the buffer to avoid funky output from previous operations
encoder.Buffer.Reset()
2024-09-13 13:02:32 +00:00
if err := encoder.Encoder.Encode(raw); err != nil {
return nil, err
}
data, err := io.ReadAll(encoder.Buffer)
if err != nil {
return nil, err
}
return data, nil
}
// Expands the pool of available encoders by one and returns a reference to the new one
// The new encoder is already locked and ready for use
func (p *EncoderPool) expand() *gobEncoder {
enc := newEncoder()
// Lock everything. First the pool fully since we need to overwrite the encoders slice
p.lock.Lock()
// And then the new encoder to make it available for use by the caller
// so that they don't have to search for it again
enc.Lock()
p.encoders = append(p.encoders, &enc)
p.lock.Unlock()
return &enc
}
// Prune all encoders not currently used from the pool
func (p *EncoderPool) Prune() {
stillActiveEncoders := []*gobEncoder{}
p.lock.Lock()
for _, v := range p.encoders {
if !v.TryLock() {
// Can't lock, encoder in use, keep it
stillActiveEncoders = append(stillActiveEncoders, v)
continue
}
// If we reach here, the encoder was available (since not locked), unlock and continue
v.Unlock()
}
// Overwrite list of available encoders to only contain the ones we found to still be active
p.encoders = stillActiveEncoders
p.lock.Unlock()
}
type DecoderPool struct {
encoders []*gobDecoder
lock sync.RWMutex
}
func NewDecoderPool() *DecoderPool {
return &DecoderPool{
encoders: []*gobDecoder{},
lock: sync.RWMutex{},
}
}
// Decode some value with gob
2024-09-12 06:56:57 +00:00
func (p *DecoderPool) Decode(raw []byte, target any) error {
var encoder *gobDecoder
// First try to find an available encoder
// Read only lock should be fine here since locks are atomic i
//and thus no two goroutines should be able to lock the same encoder at the same time
// One of those attempts is going to fail and continue looking for another available one
p.lock.RLock()
for _, v := range p.encoders {
// If we can lock one, it's available
if v.TryLock() {
// Keep the reference, then break
encoder = v
break
}
}
p.lock.RUnlock()
// Didn't find an available encoder, create new one and add to pool
if encoder == nil {
encoder = p.expand()
}
// Desure we free the encoder at the end
defer encoder.Unlock()
// Clear the buffer to avoid funky output from previous operations
encoder.Buffer.Reset()
// Write the raw data to the buffer, then decode it
// The write will always succeed (or panic)
_, _ = encoder.Buffer.Write(raw)
err := encoder.Decoder.Decode(target)
if err != nil {
return err
}
return nil
}
// Expands the pool of available encoders by one and returns a reference to the new one
// The new encoder is already locked and ready for use
func (p *DecoderPool) expand() *gobDecoder {
enc := newDecoder()
// Lock everything. First the pool fully since we need to overwrite the encoders slice
p.lock.Lock()
// And then the new encoder to make it available for use by the caller
// so that they don't have to search for it again
enc.Lock()
p.encoders = append(p.encoders, &enc)
p.lock.Unlock()
return &enc
}
// Prune all encoders not currently used from the pool
func (p *DecoderPool) Prune() {
stillActiveDecoders := []*gobDecoder{}
p.lock.Lock()
for _, v := range p.encoders {
if !v.TryLock() {
// Can't lock, encoder in use, keep it
stillActiveDecoders = append(stillActiveDecoders, v)
continue
}
// If we reach here, the encoder was available (since not locked), unlock and continue
v.Unlock()
}
// Overwrite list of available encoders to only contain the ones we found to still be active
p.encoders = stillActiveDecoders
p.lock.Unlock()
}