mStar
8709238859
Cache is for storage, also includes pooled encoders and decoders goals are things to eventually add to Linstrom
162 lines
4.6 KiB
Go
162 lines
4.6 KiB
Go
package cache
|
|
|
|
import (
|
|
"io"
|
|
"sync"
|
|
)
|
|
|
|
type EncoderPool struct {
|
|
encoders []*gobEncoder
|
|
lock sync.RWMutex
|
|
}
|
|
|
|
func NewEncoderPool() *EncoderPool {
|
|
return &EncoderPool{
|
|
encoders: []*gobEncoder{},
|
|
lock: sync.RWMutex{},
|
|
}
|
|
}
|
|
|
|
// Encode some value with gob
|
|
func (p *EncoderPool) Encode(raw any) ([]byte, error) {
|
|
var encoder *gobEncoder
|
|
// First try to find an available encoder
|
|
// Read only lock should be fine here since locks are atomic i
|
|
//and thus no two goroutines should be able to lock the same encoder at the same time
|
|
// One of those attempts is going to fail and continue looking for another available one
|
|
p.lock.RLock()
|
|
for _, v := range p.encoders {
|
|
// If we can lock one, it's available
|
|
if v.TryLock() {
|
|
// Keep the reference, then break
|
|
encoder = v
|
|
break
|
|
}
|
|
}
|
|
p.lock.RUnlock()
|
|
// Didn't find an available encoder, create new one and add to pool
|
|
if encoder == nil {
|
|
encoder = p.expand()
|
|
}
|
|
// Ensure we free the encoder at the end
|
|
defer encoder.Unlock()
|
|
// Clear the buffer to avoid funky output from previous operations
|
|
encoder.Buffer.Reset()
|
|
encoder.Encoder.Encode(raw)
|
|
data, err := io.ReadAll(encoder.Buffer)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return data, nil
|
|
}
|
|
|
|
// Expands the pool of available encoders by one and returns a reference to the new one
|
|
// The new encoder is already locked and ready for use
|
|
func (p *EncoderPool) expand() *gobEncoder {
|
|
enc := newEncoder()
|
|
// Lock everything. First the pool fully since we need to overwrite the encoders slice
|
|
p.lock.Lock()
|
|
// And then the new encoder to make it available for use by the caller
|
|
// so that they don't have to search for it again
|
|
enc.Lock()
|
|
p.encoders = append(p.encoders, &enc)
|
|
p.lock.Unlock()
|
|
return &enc
|
|
}
|
|
|
|
// Prune all encoders not currently used from the pool
|
|
func (p *EncoderPool) Prune() {
|
|
stillActiveEncoders := []*gobEncoder{}
|
|
p.lock.Lock()
|
|
for _, v := range p.encoders {
|
|
if !v.TryLock() {
|
|
// Can't lock, encoder in use, keep it
|
|
stillActiveEncoders = append(stillActiveEncoders, v)
|
|
continue
|
|
}
|
|
// If we reach here, the encoder was available (since not locked), unlock and continue
|
|
v.Unlock()
|
|
}
|
|
// Overwrite list of available encoders to only contain the ones we found to still be active
|
|
p.encoders = stillActiveEncoders
|
|
p.lock.Unlock()
|
|
}
|
|
|
|
type DecoderPool struct {
|
|
encoders []*gobDecoder
|
|
lock sync.RWMutex
|
|
}
|
|
|
|
func NewDecoderPool() *DecoderPool {
|
|
return &DecoderPool{
|
|
encoders: []*gobDecoder{},
|
|
lock: sync.RWMutex{},
|
|
}
|
|
}
|
|
|
|
// Decode some value with gob
|
|
func (p *DecoderPool) Encode(raw []byte, target any) error {
|
|
var encoder *gobDecoder
|
|
// First try to find an available encoder
|
|
// Read only lock should be fine here since locks are atomic i
|
|
//and thus no two goroutines should be able to lock the same encoder at the same time
|
|
// One of those attempts is going to fail and continue looking for another available one
|
|
p.lock.RLock()
|
|
for _, v := range p.encoders {
|
|
// If we can lock one, it's available
|
|
if v.TryLock() {
|
|
// Keep the reference, then break
|
|
encoder = v
|
|
break
|
|
}
|
|
}
|
|
p.lock.RUnlock()
|
|
// Didn't find an available encoder, create new one and add to pool
|
|
if encoder == nil {
|
|
encoder = p.expand()
|
|
}
|
|
// Desure we free the encoder at the end
|
|
defer encoder.Unlock()
|
|
// Clear the buffer to avoid funky output from previous operations
|
|
encoder.Buffer.Reset()
|
|
// Write the raw data to the buffer, then decode it
|
|
// The write will always succeed (or panic)
|
|
_, _ = encoder.Buffer.Write(raw)
|
|
err := encoder.Decoder.Decode(target)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Expands the pool of available encoders by one and returns a reference to the new one
|
|
// The new encoder is already locked and ready for use
|
|
func (p *DecoderPool) expand() *gobDecoder {
|
|
enc := newDecoder()
|
|
// Lock everything. First the pool fully since we need to overwrite the encoders slice
|
|
p.lock.Lock()
|
|
// And then the new encoder to make it available for use by the caller
|
|
// so that they don't have to search for it again
|
|
enc.Lock()
|
|
p.encoders = append(p.encoders, &enc)
|
|
p.lock.Unlock()
|
|
return &enc
|
|
}
|
|
|
|
// Prune all encoders not currently used from the pool
|
|
func (p *DecoderPool) Prune() {
|
|
stillActiveDecoders := []*gobDecoder{}
|
|
p.lock.Lock()
|
|
for _, v := range p.encoders {
|
|
if !v.TryLock() {
|
|
// Can't lock, encoder in use, keep it
|
|
stillActiveDecoders = append(stillActiveDecoders, v)
|
|
continue
|
|
}
|
|
// If we reach here, the encoder was available (since not locked), unlock and continue
|
|
v.Unlock()
|
|
}
|
|
// Overwrite list of available encoders to only contain the ones we found to still be active
|
|
p.encoders = stillActiveDecoders
|
|
p.lock.Unlock()
|
|
}
|