Adding queues and storage is now postgres only
This commit is contained in:
parent
7ec0ce5a45
commit
1c2472cc2c
11 changed files with 368 additions and 31 deletions
|
@ -48,12 +48,8 @@ type ConfigAdmin struct {
|
|||
}
|
||||
|
||||
type ConfigStorage struct {
|
||||
// Url to the database to use
|
||||
// If DbIsPostgres is either not set or false, the url is expected to be a path to a sqlite file
|
||||
// Otherwise, it's expected to be an url to a postgres server
|
||||
// Url to the postgres database. Must contain credentials and stuff
|
||||
DatabaseUrl string `toml:"database_url"`
|
||||
// Whether the target of the database url is a postgres server
|
||||
DbIsPostgres *bool `toml:"db_is_postgres,omitempty"`
|
||||
// Url to redis server. If empty, no redis is used
|
||||
RedisUrl *string `toml:"redis_url,omitempty"`
|
||||
// The maximum size of the in-memory cache in bytes
|
||||
|
@ -94,7 +90,6 @@ var defaultConfig Config = Config{
|
|||
},
|
||||
Storage: ConfigStorage{
|
||||
DatabaseUrl: "db.sqlite",
|
||||
DbIsPostgres: other.IntoPointer(false),
|
||||
RedisUrl: nil,
|
||||
MaxInMemoryCacheSize: 1e6, // 1 Megabyte
|
||||
},
|
||||
|
|
178
config/config.go.old
Normal file
178
config/config.go.old
Normal file
|
@ -0,0 +1,178 @@
|
|||
package config
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/BurntSushi/toml"
|
||||
"github.com/rs/zerolog/log"
|
||||
"gitlab.com/mstarongitlab/goutils/other"
|
||||
)
|
||||
|
||||
type ConfigSSL struct {
|
||||
HandleSSL bool `toml:"handle_ssl"` // Whether Linstrom should handle SSL encryption itself
|
||||
// If Linstrom is to handle SSL, whether it should use LetsEncrypt for certificates
|
||||
UseLetsEncrypt *bool `toml:"use_lets_encrypt"`
|
||||
// Path to the certificate if Linstrom is to handle SSL while not using LetsEncrypt
|
||||
CertificateFile *string `toml:"certificate_file"`
|
||||
// Mail adress to use in case of using LetsEncrypt
|
||||
AdminMail *string `toml:"admin_mail"`
|
||||
}
|
||||
|
||||
type ConfigGeneral struct {
|
||||
Protocol string `toml:"protocol"` // The protocol with which to access the server publicly (http/https)
|
||||
// The subdomain under which the server lives (example: "linstrom" if the full domain is linstrom.example.com)
|
||||
Subdomain *string `toml:"subdomain"`
|
||||
// The root domain under which the server lives (example: "example.com" if the full domain is linstrom.example.com)
|
||||
Domain string `toml:"domain"`
|
||||
// The port on which the server runs on
|
||||
PrivatePort int `toml:"private_port"`
|
||||
// The port under which the public can reach the server (useful if running behind a reverse proxy)
|
||||
PublicPort *int `toml:"public_port"`
|
||||
// File to write structured logs to (structured being formatted as json)
|
||||
// If not set, Linstrom won't write structured logs
|
||||
StructuredLogFile *string
|
||||
}
|
||||
|
||||
type ConfigWebAuthn struct {
|
||||
DisplayName string `toml:"display_name"`
|
||||
HashingSecret string `toml:"hashing_secret"`
|
||||
}
|
||||
|
||||
type ConfigAdmin struct {
|
||||
// Name of the server's root admin account
|
||||
Username string `toml:"username"`
|
||||
// A one time password used to verify account access to the root admin
|
||||
// after a server has been created and before the account could be linked to a passkey
|
||||
FirstTimeSetupOTP string `toml:"first_time_setup_otp"`
|
||||
}
|
||||
|
||||
type ConfigStorage struct {
|
||||
// Url to the database to use
|
||||
// If DbIsPostgres is either not set or false, the url is expected to be a path to a sqlite file
|
||||
// Otherwise, it's expected to be an url to a postgres server
|
||||
DatabaseUrl string `toml:"database_url"`
|
||||
// Whether the target of the database url is a postgres server
|
||||
DbIsPostgres *bool `toml:"db_is_postgres,omitempty"`
|
||||
// Url to redis server. If empty, no redis is used
|
||||
RedisUrl *string `toml:"redis_url,omitempty"`
|
||||
// The maximum size of the in-memory cache in bytes
|
||||
MaxInMemoryCacheSize int64 `toml:"max_in_memory_cache_size"`
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
General ConfigGeneral `toml:"general"`
|
||||
SSL ConfigSSL `toml:"ssl"`
|
||||
Admin ConfigAdmin `toml:"admin"`
|
||||
Webauthn ConfigWebAuthn `toml:"webauthn"`
|
||||
Storage ConfigStorage `toml:"storage"`
|
||||
}
|
||||
|
||||
var GlobalConfig Config
|
||||
|
||||
var defaultConfig Config = Config{
|
||||
General: ConfigGeneral{
|
||||
Protocol: "http",
|
||||
Subdomain: nil,
|
||||
Domain: "localhost",
|
||||
PrivatePort: 8080,
|
||||
PublicPort: nil,
|
||||
},
|
||||
SSL: ConfigSSL{
|
||||
HandleSSL: false,
|
||||
UseLetsEncrypt: nil,
|
||||
CertificateFile: nil,
|
||||
AdminMail: nil,
|
||||
},
|
||||
Admin: ConfigAdmin{
|
||||
Username: "server-admin",
|
||||
FirstTimeSetupOTP: "Example otp password",
|
||||
},
|
||||
Webauthn: ConfigWebAuthn{
|
||||
DisplayName: "Linstrom",
|
||||
HashingSecret: "some super secure secret that should never be changed or else password storage breaks",
|
||||
},
|
||||
Storage: ConfigStorage{
|
||||
DatabaseUrl: "db.sqlite",
|
||||
DbIsPostgres: other.IntoPointer(false),
|
||||
RedisUrl: nil,
|
||||
MaxInMemoryCacheSize: 1e6, // 1 Megabyte
|
||||
},
|
||||
}
|
||||
|
||||
func (gc *ConfigGeneral) GetFullDomain() string {
|
||||
if gc.Subdomain != nil {
|
||||
return *gc.Subdomain + gc.Domain
|
||||
}
|
||||
return gc.Domain
|
||||
}
|
||||
|
||||
func (gc *ConfigGeneral) GetFullPublicUrl() string {
|
||||
str := gc.Protocol + gc.GetFullDomain()
|
||||
if gc.PublicPort != nil {
|
||||
str += fmt.Sprint(*gc.PublicPort)
|
||||
} else {
|
||||
str += fmt.Sprint(gc.PrivatePort)
|
||||
}
|
||||
return str
|
||||
}
|
||||
|
||||
func WriteDefaultConfig(toFile string) error {
|
||||
log.Trace().Caller().Send()
|
||||
log.Info().Str("config-file", toFile).Msg("Writing default config to file")
|
||||
file, err := os.Create(toFile)
|
||||
if err != nil {
|
||||
log.Error().
|
||||
Err(err).
|
||||
Str("config-file", toFile).
|
||||
Msg("Failed to create file for default config")
|
||||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
data, err := toml.Marshal(&defaultConfig)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to marshal default config to toml")
|
||||
return err
|
||||
}
|
||||
_, err = file.Write(data)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Str("config-file", toFile).Msg("Failed to write default config")
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info().Str("config-file", toFile).Msg("Wrote default config")
|
||||
return nil
|
||||
}
|
||||
|
||||
func ReadAndWriteToGlobal(fileName string) error {
|
||||
log.Trace().Caller().Send()
|
||||
log.Debug().Str("config-file", fileName).Msg("Attempting to read config file")
|
||||
data, err := os.ReadFile(fileName)
|
||||
if err != nil {
|
||||
log.Warn().
|
||||
Str("config-file", fileName).
|
||||
Err(err).
|
||||
Msg("Failed to read config file, attempting to write default config")
|
||||
err = WriteDefaultConfig(fileName)
|
||||
if err != nil {
|
||||
log.Error().
|
||||
Err(err).
|
||||
Str("config-file", fileName).
|
||||
Msg("Failed to create default config file")
|
||||
return err
|
||||
}
|
||||
GlobalConfig = defaultConfig
|
||||
return nil
|
||||
}
|
||||
config := Config{}
|
||||
log.Debug().Str("config-file", fileName).Msg("Read config file, attempting to unmarshal")
|
||||
err = toml.Unmarshal(data, &config)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Bytes("config-data-raw", data).Msg("Failed to unmarshal config file")
|
||||
return err
|
||||
}
|
||||
GlobalConfig = config
|
||||
log.Info().Str("config-file", fileName).Msg("Read and applied config file")
|
||||
return nil
|
||||
}
|
0
docker-compose.yml
Normal file
0
docker-compose.yml
Normal file
|
@ -1 +0,0 @@
|
|||
package outgoingeventqueue
|
4
queues/inboundEventQueue/queue.go
Normal file
4
queues/inboundEventQueue/queue.go
Normal file
|
@ -0,0 +1,4 @@
|
|||
// Job queue for all inbound events
|
||||
// Well, I say queue but it's more of a security measure adding the inbound job to the db as backup
|
||||
// in case processing fails for any reason
|
||||
package inboundeventqueue
|
5
queues/outgoingEventQueue/queue.go
Normal file
5
queues/outgoingEventQueue/queue.go
Normal file
|
@ -0,0 +1,5 @@
|
|||
// Queue for outbound events
|
||||
// Actual queue here since other servers can't be expected to go at the same speed as Linstrom (be it slower or faster)
|
||||
// This queue should enforce data consistency (new jobs are first stored in the db) and a fair processing speed for
|
||||
// other servers, depending on their processing speed
|
||||
package outgoingeventqueue
|
64
storage/gormLogger.go
Normal file
64
storage/gormLogger.go
Normal file
|
@ -0,0 +1,64 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
"gorm.io/gorm/logger"
|
||||
)
|
||||
|
||||
type gormLogger struct {
|
||||
logger zerolog.Logger
|
||||
}
|
||||
|
||||
func newGormLogger(zerologger zerolog.Logger) *gormLogger {
|
||||
return &gormLogger{zerologger}
|
||||
}
|
||||
|
||||
func (g *gormLogger) LogMode(newLevel logger.LogLevel) logger.Interface {
|
||||
switch newLevel {
|
||||
case logger.Error:
|
||||
g.logger = g.logger.Level(zerolog.ErrorLevel)
|
||||
case logger.Warn:
|
||||
g.logger = g.logger.Level(zerolog.WarnLevel)
|
||||
case logger.Info:
|
||||
g.logger = g.logger.Level(zerolog.InfoLevel)
|
||||
case logger.Silent:
|
||||
g.logger = g.logger.Level(zerolog.Disabled)
|
||||
}
|
||||
return g
|
||||
}
|
||||
func (g *gormLogger) Info(ctx context.Context, format string, args ...interface{}) {
|
||||
g.logger.Info().Ctx(ctx).Msgf(format, args...)
|
||||
}
|
||||
func (g *gormLogger) Warn(ctx context.Context, format string, args ...interface{}) {
|
||||
g.logger.Warn().Ctx(ctx).Msgf(format, args...)
|
||||
}
|
||||
func (g *gormLogger) Error(ctx context.Context, format string, args ...interface{}) {
|
||||
g.logger.Error().Ctx(ctx).Msgf(format, args...)
|
||||
}
|
||||
|
||||
func (g *gormLogger) Trace(
|
||||
ctx context.Context,
|
||||
begin time.Time,
|
||||
fc func() (sql string, rowsAffected int64),
|
||||
err error,
|
||||
) {
|
||||
sql, rowsAffected := fc()
|
||||
g.logger.Trace().
|
||||
Ctx(ctx).
|
||||
Time("gorm-begin", begin).
|
||||
Err(err).
|
||||
Str("gorm-query", sql).
|
||||
Int64("gorm-rows-affected", rowsAffected).
|
||||
Send()
|
||||
}
|
||||
|
||||
func (g *gormLogger) OverwriteLoggingLevel(new zerolog.Level) {
|
||||
g.logger = g.logger.Level(new)
|
||||
}
|
||||
|
||||
func (g *gormLogger) OverwriteLogger(new zerolog.Logger) {
|
||||
g.logger = new
|
||||
}
|
36
storage/inboundJobs.go
Normal file
36
storage/inboundJobs.go
Normal file
|
@ -0,0 +1,36 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// Auto-generate string names for the various constants
|
||||
//go:generate stringer -type InboundJobSource
|
||||
|
||||
type InboundJobSource uint8
|
||||
|
||||
// TODO: Adjust and expand these constants later, depending on sources
|
||||
const (
|
||||
InJobSourceAccInbox InboundJobSource = iota
|
||||
InJobSourceServerInbox
|
||||
InJobSourceApiMasto
|
||||
InJobSourceApiLinstrom
|
||||
)
|
||||
|
||||
// Store inbound jobs from api and ap in the db until they finished processing
|
||||
// Ensures data consistency in case the server is forced to restart unexpectedly
|
||||
// No DeletedAt field since don't want completed jobs to linger in the db for any longer than necessary
|
||||
type InboundJob struct {
|
||||
ID uint `gorm:"primarykey"`
|
||||
CreatedAt time.Time
|
||||
// Raw data, could be json or gob data, check source for how to interpret
|
||||
RawData []byte
|
||||
// Where this job is coming from. Important for figuring out how to decode the raw data and what to do with it
|
||||
Source InboundJobSource
|
||||
|
||||
// Section: Various data
|
||||
// TODO: Expand based on needs
|
||||
|
||||
// If from an inbox, include the owner id here
|
||||
InboxOwner *string
|
||||
}
|
13
storage/outboundJobs.go
Normal file
13
storage/outboundJobs.go
Normal file
|
@ -0,0 +1,13 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type OutboundJob struct {
|
||||
gorm.Model // Include full model. Gives ID, created and updated at timestamps as well as soft deletes
|
||||
// Read (and create) only values to ensure consistency
|
||||
TargetServer string `gorm:"->;<-:create"` // The url of the target server
|
||||
TargetPath string `gorm:"->;<-:create"` // The full path of api endpoint targeted
|
||||
Data []byte `gorm:"->;<-:create"` // The raw data to send
|
||||
}
|
|
@ -3,7 +3,7 @@ package storage
|
|||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/glebarez/sqlite"
|
||||
"github.com/rs/zerolog/log"
|
||||
"gitlab.com/mstarongitlab/linstrom/storage/cache"
|
||||
"gorm.io/driver/postgres"
|
||||
"gorm.io/gorm"
|
||||
|
@ -18,41 +18,26 @@ type Storage struct {
|
|||
|
||||
var ErrInvalidData = errors.New("invalid data")
|
||||
|
||||
// Build a new storage using sqlite as database backend
|
||||
func NewStorageSqlite(filePath string, cache *cache.Cache) (*Storage, error) {
|
||||
db, err := gorm.Open(sqlite.Open(filePath))
|
||||
func NewStorage(dbUrl string, cache *cache.Cache) (*Storage, error) {
|
||||
db, err := gorm.Open(postgres.Open(dbUrl), &gorm.Config{
|
||||
Logger: newGormLogger(log.Logger),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return storageFromEmptyDb(db, cache)
|
||||
}
|
||||
|
||||
func NewStoragePostgres(dbUrl string, cache *cache.Cache) (*Storage, error) {
|
||||
db, err := gorm.Open(postgres.Open(dbUrl))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return storageFromEmptyDb(db, cache)
|
||||
}
|
||||
|
||||
func storageFromEmptyDb(db *gorm.DB, cache *cache.Cache) (*Storage, error) {
|
||||
// AutoMigrate ensures the db is in a state where all the structs given here
|
||||
// have their own tables and relations setup. It also updates tables if necessary
|
||||
err := db.AutoMigrate(
|
||||
err = db.AutoMigrate(
|
||||
MediaMetadata{},
|
||||
Account{},
|
||||
RemoteServer{},
|
||||
Note{},
|
||||
Role{},
|
||||
PasskeySession{},
|
||||
InboundJob{},
|
||||
OutboundJob{},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// And finally, build the actual storage struct
|
||||
return &Storage{
|
||||
db: db,
|
||||
cache: cache,
|
||||
}, nil
|
||||
return &Storage{db, cache}, nil
|
||||
}
|
||||
|
|
58
storage/storage.go.old
Normal file
58
storage/storage.go.old
Normal file
|
@ -0,0 +1,58 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/glebarez/sqlite"
|
||||
"gitlab.com/mstarongitlab/linstrom/storage/cache"
|
||||
"gorm.io/driver/postgres"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// Storage is responsible for all database, cache and media related actions
|
||||
// and serves as the lowest layer of the cake
|
||||
type Storage struct {
|
||||
db *gorm.DB
|
||||
cache *cache.Cache
|
||||
}
|
||||
|
||||
var ErrInvalidData = errors.New("invalid data")
|
||||
|
||||
// Build a new storage using sqlite as database backend
|
||||
func NewStorageSqlite(filePath string, cache *cache.Cache) (*Storage, error) {
|
||||
db, err := gorm.Open(sqlite.Open(filePath))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return storageFromEmptyDb(db, cache)
|
||||
}
|
||||
|
||||
func NewStoragePostgres(dbUrl string, cache *cache.Cache) (*Storage, error) {
|
||||
db, err := gorm.Open(postgres.Open(dbUrl))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return storageFromEmptyDb(db, cache)
|
||||
}
|
||||
|
||||
func storageFromEmptyDb(db *gorm.DB, cache *cache.Cache) (*Storage, error) {
|
||||
// AutoMigrate ensures the db is in a state where all the structs given here
|
||||
// have their own tables and relations setup. It also updates tables if necessary
|
||||
err := db.AutoMigrate(
|
||||
MediaMetadata{},
|
||||
Account{},
|
||||
RemoteServer{},
|
||||
Note{},
|
||||
Role{},
|
||||
PasskeySession{},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// And finally, build the actual storage struct
|
||||
return &Storage{
|
||||
db: db,
|
||||
cache: cache,
|
||||
}, nil
|
||||
}
|
Loading…
Reference in a new issue