Added goals, cache and func to get any remote obj
Cache is for storage, also includes pooled encoders and decoders goals are things to eventually add to Linstrom
This commit is contained in:
parent
2977f09245
commit
8709238859
8 changed files with 907 additions and 16 deletions
63
storage/cache/cache.go
vendored
Normal file
63
storage/cache/cache.go
vendored
Normal file
|
@ -0,0 +1,63 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/dgraph-io/ristretto"
|
||||
"github.com/eko/gocache/lib/v4/cache"
|
||||
"github.com/eko/gocache/lib/v4/store"
|
||||
redis_store "github.com/eko/gocache/store/redis/v4"
|
||||
ristretto_store "github.com/eko/gocache/store/ristretto/v4"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/rs/zerolog/log"
|
||||
"gitlab.com/mstarongitlab/linstrom/config"
|
||||
)
|
||||
|
||||
type Cache struct {
|
||||
cache *cache.ChainCache[[]byte]
|
||||
decoders *DecoderPool
|
||||
encoders *EncoderPool
|
||||
}
|
||||
|
||||
func NewCache() (*Cache, error) {
|
||||
// ristretto is an in-memory cache
|
||||
ristrettoCache, err := ristretto.NewCache(&ristretto.Config{
|
||||
// The *10 is a recommendation from ristretto
|
||||
NumCounters: config.GlobalConfig.Storage.MaxInMemoryCacheSize * 10,
|
||||
MaxCost: config.GlobalConfig.Storage.MaxInMemoryCacheSize,
|
||||
BufferItems: 64, // Same here
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ristrettoStore := ristretto_store.NewRistretto(
|
||||
ristrettoCache,
|
||||
store.WithExpiration(time.Second*10),
|
||||
)
|
||||
|
||||
var cacheManager *cache.ChainCache[[]byte]
|
||||
if config.GlobalConfig.Storage.UseRedis {
|
||||
if config.GlobalConfig.Storage.RedisUrl == nil {
|
||||
log.Fatal().
|
||||
Msg("Told to use redis in addition to in-memory store, but no redis url provided!")
|
||||
}
|
||||
opts, err := redis.ParseURL(*config.GlobalConfig.Storage.RedisUrl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
redisClient := redis.NewClient(opts)
|
||||
redisStore := redis_store.NewRedis(redisClient, store.WithExpiration(time.Minute))
|
||||
cacheManager = cache.NewChain(
|
||||
cache.New[[]byte](ristrettoStore),
|
||||
cache.New[[]byte](redisStore),
|
||||
)
|
||||
} else {
|
||||
cacheManager = cache.NewChain(cache.New[[]byte](ristrettoStore))
|
||||
}
|
||||
|
||||
return &Cache{
|
||||
cache: cacheManager,
|
||||
decoders: NewDecoderPool(),
|
||||
encoders: NewEncoderPool(),
|
||||
}, nil
|
||||
}
|
162
storage/cache/coderPools.go
vendored
Normal file
162
storage/cache/coderPools.go
vendored
Normal file
|
@ -0,0 +1,162 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type EncoderPool struct {
|
||||
encoders []*gobEncoder
|
||||
lock sync.RWMutex
|
||||
}
|
||||
|
||||
func NewEncoderPool() *EncoderPool {
|
||||
return &EncoderPool{
|
||||
encoders: []*gobEncoder{},
|
||||
lock: sync.RWMutex{},
|
||||
}
|
||||
}
|
||||
|
||||
// Encode some value with gob
|
||||
func (p *EncoderPool) Encode(raw any) ([]byte, error) {
|
||||
var encoder *gobEncoder
|
||||
// First try to find an available encoder
|
||||
// Read only lock should be fine here since locks are atomic i
|
||||
//and thus no two goroutines should be able to lock the same encoder at the same time
|
||||
// One of those attempts is going to fail and continue looking for another available one
|
||||
p.lock.RLock()
|
||||
for _, v := range p.encoders {
|
||||
// If we can lock one, it's available
|
||||
if v.TryLock() {
|
||||
// Keep the reference, then break
|
||||
encoder = v
|
||||
break
|
||||
}
|
||||
}
|
||||
p.lock.RUnlock()
|
||||
// Didn't find an available encoder, create new one and add to pool
|
||||
if encoder == nil {
|
||||
encoder = p.expand()
|
||||
}
|
||||
// Ensure we free the encoder at the end
|
||||
defer encoder.Unlock()
|
||||
// Clear the buffer to avoid funky output from previous operations
|
||||
encoder.Buffer.Reset()
|
||||
encoder.Encoder.Encode(raw)
|
||||
data, err := io.ReadAll(encoder.Buffer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// Expands the pool of available encoders by one and returns a reference to the new one
|
||||
// The new encoder is already locked and ready for use
|
||||
func (p *EncoderPool) expand() *gobEncoder {
|
||||
enc := newEncoder()
|
||||
// Lock everything. First the pool fully since we need to overwrite the encoders slice
|
||||
p.lock.Lock()
|
||||
// And then the new encoder to make it available for use by the caller
|
||||
// so that they don't have to search for it again
|
||||
enc.Lock()
|
||||
p.encoders = append(p.encoders, &enc)
|
||||
p.lock.Unlock()
|
||||
return &enc
|
||||
}
|
||||
|
||||
// Prune all encoders not currently used from the pool
|
||||
func (p *EncoderPool) Prune() {
|
||||
stillActiveEncoders := []*gobEncoder{}
|
||||
p.lock.Lock()
|
||||
for _, v := range p.encoders {
|
||||
if !v.TryLock() {
|
||||
// Can't lock, encoder in use, keep it
|
||||
stillActiveEncoders = append(stillActiveEncoders, v)
|
||||
continue
|
||||
}
|
||||
// If we reach here, the encoder was available (since not locked), unlock and continue
|
||||
v.Unlock()
|
||||
}
|
||||
// Overwrite list of available encoders to only contain the ones we found to still be active
|
||||
p.encoders = stillActiveEncoders
|
||||
p.lock.Unlock()
|
||||
}
|
||||
|
||||
type DecoderPool struct {
|
||||
encoders []*gobDecoder
|
||||
lock sync.RWMutex
|
||||
}
|
||||
|
||||
func NewDecoderPool() *DecoderPool {
|
||||
return &DecoderPool{
|
||||
encoders: []*gobDecoder{},
|
||||
lock: sync.RWMutex{},
|
||||
}
|
||||
}
|
||||
|
||||
// Decode some value with gob
|
||||
func (p *DecoderPool) Encode(raw []byte, target any) error {
|
||||
var encoder *gobDecoder
|
||||
// First try to find an available encoder
|
||||
// Read only lock should be fine here since locks are atomic i
|
||||
//and thus no two goroutines should be able to lock the same encoder at the same time
|
||||
// One of those attempts is going to fail and continue looking for another available one
|
||||
p.lock.RLock()
|
||||
for _, v := range p.encoders {
|
||||
// If we can lock one, it's available
|
||||
if v.TryLock() {
|
||||
// Keep the reference, then break
|
||||
encoder = v
|
||||
break
|
||||
}
|
||||
}
|
||||
p.lock.RUnlock()
|
||||
// Didn't find an available encoder, create new one and add to pool
|
||||
if encoder == nil {
|
||||
encoder = p.expand()
|
||||
}
|
||||
// Desure we free the encoder at the end
|
||||
defer encoder.Unlock()
|
||||
// Clear the buffer to avoid funky output from previous operations
|
||||
encoder.Buffer.Reset()
|
||||
// Write the raw data to the buffer, then decode it
|
||||
// The write will always succeed (or panic)
|
||||
_, _ = encoder.Buffer.Write(raw)
|
||||
err := encoder.Decoder.Decode(target)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Expands the pool of available encoders by one and returns a reference to the new one
|
||||
// The new encoder is already locked and ready for use
|
||||
func (p *DecoderPool) expand() *gobDecoder {
|
||||
enc := newDecoder()
|
||||
// Lock everything. First the pool fully since we need to overwrite the encoders slice
|
||||
p.lock.Lock()
|
||||
// And then the new encoder to make it available for use by the caller
|
||||
// so that they don't have to search for it again
|
||||
enc.Lock()
|
||||
p.encoders = append(p.encoders, &enc)
|
||||
p.lock.Unlock()
|
||||
return &enc
|
||||
}
|
||||
|
||||
// Prune all encoders not currently used from the pool
|
||||
func (p *DecoderPool) Prune() {
|
||||
stillActiveDecoders := []*gobDecoder{}
|
||||
p.lock.Lock()
|
||||
for _, v := range p.encoders {
|
||||
if !v.TryLock() {
|
||||
// Can't lock, encoder in use, keep it
|
||||
stillActiveDecoders = append(stillActiveDecoders, v)
|
||||
continue
|
||||
}
|
||||
// If we reach here, the encoder was available (since not locked), unlock and continue
|
||||
v.Unlock()
|
||||
}
|
||||
// Overwrite list of available encoders to only contain the ones we found to still be active
|
||||
p.encoders = stillActiveDecoders
|
||||
p.lock.Unlock()
|
||||
}
|
35
storage/cache/lockedCoders.go
vendored
Normal file
35
storage/cache/lockedCoders.go
vendored
Normal file
|
@ -0,0 +1,35 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type gobEncoder struct {
|
||||
sync.Mutex
|
||||
Encoder *gob.Encoder
|
||||
Buffer *bytes.Buffer
|
||||
}
|
||||
|
||||
func newEncoder() gobEncoder {
|
||||
buf := bytes.Buffer{}
|
||||
return gobEncoder{
|
||||
Encoder: gob.NewEncoder(&buf),
|
||||
Buffer: &buf,
|
||||
}
|
||||
}
|
||||
|
||||
type gobDecoder struct {
|
||||
sync.Mutex
|
||||
Decoder *gob.Decoder
|
||||
Buffer *bytes.Buffer
|
||||
}
|
||||
|
||||
func newDecoder() gobDecoder {
|
||||
buf := bytes.Buffer{}
|
||||
return gobDecoder{
|
||||
Decoder: gob.NewDecoder(&buf),
|
||||
Buffer: &buf,
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue