From 1c2472cc2c071fb1a4a054c13d401d1ef63569c4 Mon Sep 17 00:00:00 2001 From: mStar Date: Sun, 15 Sep 2024 12:24:36 +0200 Subject: [PATCH] Adding queues and storage is now postgres only --- config/config.go | 7 +- config/config.go.old | 178 +++++++++++++++++++++++++++++ docker-compose.yml | 0 outgoingEventQueue/queue.go | 1 - queues/inboundEventQueue/queue.go | 4 + queues/outgoingEventQueue/queue.go | 5 + storage/gormLogger.go | 64 +++++++++++ storage/inboundJobs.go | 36 ++++++ storage/outboundJobs.go | 13 +++ storage/storage.go | 33 ++---- storage/storage.go.old | 58 ++++++++++ 11 files changed, 368 insertions(+), 31 deletions(-) create mode 100644 config/config.go.old create mode 100644 docker-compose.yml delete mode 100644 outgoingEventQueue/queue.go create mode 100644 queues/inboundEventQueue/queue.go create mode 100644 queues/outgoingEventQueue/queue.go create mode 100644 storage/gormLogger.go create mode 100644 storage/inboundJobs.go create mode 100644 storage/outboundJobs.go create mode 100644 storage/storage.go.old diff --git a/config/config.go b/config/config.go index 19f633b..df04dba 100644 --- a/config/config.go +++ b/config/config.go @@ -48,12 +48,8 @@ type ConfigAdmin struct { } type ConfigStorage struct { - // Url to the database to use - // If DbIsPostgres is either not set or false, the url is expected to be a path to a sqlite file - // Otherwise, it's expected to be an url to a postgres server + // Url to the postgres database. Must contain credentials and stuff DatabaseUrl string `toml:"database_url"` - // Whether the target of the database url is a postgres server - DbIsPostgres *bool `toml:"db_is_postgres,omitempty"` // Url to redis server. If empty, no redis is used RedisUrl *string `toml:"redis_url,omitempty"` // The maximum size of the in-memory cache in bytes @@ -94,7 +90,6 @@ var defaultConfig Config = Config{ }, Storage: ConfigStorage{ DatabaseUrl: "db.sqlite", - DbIsPostgres: other.IntoPointer(false), RedisUrl: nil, MaxInMemoryCacheSize: 1e6, // 1 Megabyte }, diff --git a/config/config.go.old b/config/config.go.old new file mode 100644 index 0000000..19f633b --- /dev/null +++ b/config/config.go.old @@ -0,0 +1,178 @@ +package config + +import ( + "fmt" + "os" + + "github.com/BurntSushi/toml" + "github.com/rs/zerolog/log" + "gitlab.com/mstarongitlab/goutils/other" +) + +type ConfigSSL struct { + HandleSSL bool `toml:"handle_ssl"` // Whether Linstrom should handle SSL encryption itself + // If Linstrom is to handle SSL, whether it should use LetsEncrypt for certificates + UseLetsEncrypt *bool `toml:"use_lets_encrypt"` + // Path to the certificate if Linstrom is to handle SSL while not using LetsEncrypt + CertificateFile *string `toml:"certificate_file"` + // Mail adress to use in case of using LetsEncrypt + AdminMail *string `toml:"admin_mail"` +} + +type ConfigGeneral struct { + Protocol string `toml:"protocol"` // The protocol with which to access the server publicly (http/https) + // The subdomain under which the server lives (example: "linstrom" if the full domain is linstrom.example.com) + Subdomain *string `toml:"subdomain"` + // The root domain under which the server lives (example: "example.com" if the full domain is linstrom.example.com) + Domain string `toml:"domain"` + // The port on which the server runs on + PrivatePort int `toml:"private_port"` + // The port under which the public can reach the server (useful if running behind a reverse proxy) + PublicPort *int `toml:"public_port"` + // File to write structured logs to (structured being formatted as json) + // If not set, Linstrom won't write structured logs + StructuredLogFile *string +} + +type ConfigWebAuthn struct { + DisplayName string `toml:"display_name"` + HashingSecret string `toml:"hashing_secret"` +} + +type ConfigAdmin struct { + // Name of the server's root admin account + Username string `toml:"username"` + // A one time password used to verify account access to the root admin + // after a server has been created and before the account could be linked to a passkey + FirstTimeSetupOTP string `toml:"first_time_setup_otp"` +} + +type ConfigStorage struct { + // Url to the database to use + // If DbIsPostgres is either not set or false, the url is expected to be a path to a sqlite file + // Otherwise, it's expected to be an url to a postgres server + DatabaseUrl string `toml:"database_url"` + // Whether the target of the database url is a postgres server + DbIsPostgres *bool `toml:"db_is_postgres,omitempty"` + // Url to redis server. If empty, no redis is used + RedisUrl *string `toml:"redis_url,omitempty"` + // The maximum size of the in-memory cache in bytes + MaxInMemoryCacheSize int64 `toml:"max_in_memory_cache_size"` +} + +type Config struct { + General ConfigGeneral `toml:"general"` + SSL ConfigSSL `toml:"ssl"` + Admin ConfigAdmin `toml:"admin"` + Webauthn ConfigWebAuthn `toml:"webauthn"` + Storage ConfigStorage `toml:"storage"` +} + +var GlobalConfig Config + +var defaultConfig Config = Config{ + General: ConfigGeneral{ + Protocol: "http", + Subdomain: nil, + Domain: "localhost", + PrivatePort: 8080, + PublicPort: nil, + }, + SSL: ConfigSSL{ + HandleSSL: false, + UseLetsEncrypt: nil, + CertificateFile: nil, + AdminMail: nil, + }, + Admin: ConfigAdmin{ + Username: "server-admin", + FirstTimeSetupOTP: "Example otp password", + }, + Webauthn: ConfigWebAuthn{ + DisplayName: "Linstrom", + HashingSecret: "some super secure secret that should never be changed or else password storage breaks", + }, + Storage: ConfigStorage{ + DatabaseUrl: "db.sqlite", + DbIsPostgres: other.IntoPointer(false), + RedisUrl: nil, + MaxInMemoryCacheSize: 1e6, // 1 Megabyte + }, +} + +func (gc *ConfigGeneral) GetFullDomain() string { + if gc.Subdomain != nil { + return *gc.Subdomain + gc.Domain + } + return gc.Domain +} + +func (gc *ConfigGeneral) GetFullPublicUrl() string { + str := gc.Protocol + gc.GetFullDomain() + if gc.PublicPort != nil { + str += fmt.Sprint(*gc.PublicPort) + } else { + str += fmt.Sprint(gc.PrivatePort) + } + return str +} + +func WriteDefaultConfig(toFile string) error { + log.Trace().Caller().Send() + log.Info().Str("config-file", toFile).Msg("Writing default config to file") + file, err := os.Create(toFile) + if err != nil { + log.Error(). + Err(err). + Str("config-file", toFile). + Msg("Failed to create file for default config") + return err + } + defer file.Close() + + data, err := toml.Marshal(&defaultConfig) + if err != nil { + log.Error().Err(err).Msg("Failed to marshal default config to toml") + return err + } + _, err = file.Write(data) + if err != nil { + log.Error().Err(err).Str("config-file", toFile).Msg("Failed to write default config") + return err + } + + log.Info().Str("config-file", toFile).Msg("Wrote default config") + return nil +} + +func ReadAndWriteToGlobal(fileName string) error { + log.Trace().Caller().Send() + log.Debug().Str("config-file", fileName).Msg("Attempting to read config file") + data, err := os.ReadFile(fileName) + if err != nil { + log.Warn(). + Str("config-file", fileName). + Err(err). + Msg("Failed to read config file, attempting to write default config") + err = WriteDefaultConfig(fileName) + if err != nil { + log.Error(). + Err(err). + Str("config-file", fileName). + Msg("Failed to create default config file") + return err + } + GlobalConfig = defaultConfig + return nil + } + config := Config{} + log.Debug().Str("config-file", fileName).Msg("Read config file, attempting to unmarshal") + err = toml.Unmarshal(data, &config) + if err != nil { + log.Error().Err(err).Bytes("config-data-raw", data).Msg("Failed to unmarshal config file") + return err + } + GlobalConfig = config + log.Info().Str("config-file", fileName).Msg("Read and applied config file") + return nil +} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..e69de29 diff --git a/outgoingEventQueue/queue.go b/outgoingEventQueue/queue.go deleted file mode 100644 index 12e524f..0000000 --- a/outgoingEventQueue/queue.go +++ /dev/null @@ -1 +0,0 @@ -package outgoingeventqueue diff --git a/queues/inboundEventQueue/queue.go b/queues/inboundEventQueue/queue.go new file mode 100644 index 0000000..ce3467a --- /dev/null +++ b/queues/inboundEventQueue/queue.go @@ -0,0 +1,4 @@ +// Job queue for all inbound events +// Well, I say queue but it's more of a security measure adding the inbound job to the db as backup +// in case processing fails for any reason +package inboundeventqueue diff --git a/queues/outgoingEventQueue/queue.go b/queues/outgoingEventQueue/queue.go new file mode 100644 index 0000000..87cdcce --- /dev/null +++ b/queues/outgoingEventQueue/queue.go @@ -0,0 +1,5 @@ +// Queue for outbound events +// Actual queue here since other servers can't be expected to go at the same speed as Linstrom (be it slower or faster) +// This queue should enforce data consistency (new jobs are first stored in the db) and a fair processing speed for +// other servers, depending on their processing speed +package outgoingeventqueue diff --git a/storage/gormLogger.go b/storage/gormLogger.go new file mode 100644 index 0000000..ed8f976 --- /dev/null +++ b/storage/gormLogger.go @@ -0,0 +1,64 @@ +package storage + +import ( + "context" + "time" + + "github.com/rs/zerolog" + "gorm.io/gorm/logger" +) + +type gormLogger struct { + logger zerolog.Logger +} + +func newGormLogger(zerologger zerolog.Logger) *gormLogger { + return &gormLogger{zerologger} +} + +func (g *gormLogger) LogMode(newLevel logger.LogLevel) logger.Interface { + switch newLevel { + case logger.Error: + g.logger = g.logger.Level(zerolog.ErrorLevel) + case logger.Warn: + g.logger = g.logger.Level(zerolog.WarnLevel) + case logger.Info: + g.logger = g.logger.Level(zerolog.InfoLevel) + case logger.Silent: + g.logger = g.logger.Level(zerolog.Disabled) + } + return g +} +func (g *gormLogger) Info(ctx context.Context, format string, args ...interface{}) { + g.logger.Info().Ctx(ctx).Msgf(format, args...) +} +func (g *gormLogger) Warn(ctx context.Context, format string, args ...interface{}) { + g.logger.Warn().Ctx(ctx).Msgf(format, args...) +} +func (g *gormLogger) Error(ctx context.Context, format string, args ...interface{}) { + g.logger.Error().Ctx(ctx).Msgf(format, args...) +} + +func (g *gormLogger) Trace( + ctx context.Context, + begin time.Time, + fc func() (sql string, rowsAffected int64), + err error, +) { + sql, rowsAffected := fc() + g.logger.Trace(). + Ctx(ctx). + Time("gorm-begin", begin). + Err(err). + Str("gorm-query", sql). + Int64("gorm-rows-affected", rowsAffected). + Send() +} + +func (g *gormLogger) OverwriteLoggingLevel(new zerolog.Level) { + g.logger = g.logger.Level(new) +} + +func (g *gormLogger) OverwriteLogger(new zerolog.Logger) { + g.logger = new +} diff --git a/storage/inboundJobs.go b/storage/inboundJobs.go new file mode 100644 index 0000000..d28447d --- /dev/null +++ b/storage/inboundJobs.go @@ -0,0 +1,36 @@ +package storage + +import ( + "time" +) + +// Auto-generate string names for the various constants +//go:generate stringer -type InboundJobSource + +type InboundJobSource uint8 + +// TODO: Adjust and expand these constants later, depending on sources +const ( + InJobSourceAccInbox InboundJobSource = iota + InJobSourceServerInbox + InJobSourceApiMasto + InJobSourceApiLinstrom +) + +// Store inbound jobs from api and ap in the db until they finished processing +// Ensures data consistency in case the server is forced to restart unexpectedly +// No DeletedAt field since don't want completed jobs to linger in the db for any longer than necessary +type InboundJob struct { + ID uint `gorm:"primarykey"` + CreatedAt time.Time + // Raw data, could be json or gob data, check source for how to interpret + RawData []byte + // Where this job is coming from. Important for figuring out how to decode the raw data and what to do with it + Source InboundJobSource + + // Section: Various data + // TODO: Expand based on needs + + // If from an inbox, include the owner id here + InboxOwner *string +} diff --git a/storage/outboundJobs.go b/storage/outboundJobs.go new file mode 100644 index 0000000..27bda9e --- /dev/null +++ b/storage/outboundJobs.go @@ -0,0 +1,13 @@ +package storage + +import ( + "gorm.io/gorm" +) + +type OutboundJob struct { + gorm.Model // Include full model. Gives ID, created and updated at timestamps as well as soft deletes + // Read (and create) only values to ensure consistency + TargetServer string `gorm:"->;<-:create"` // The url of the target server + TargetPath string `gorm:"->;<-:create"` // The full path of api endpoint targeted + Data []byte `gorm:"->;<-:create"` // The raw data to send +} diff --git a/storage/storage.go b/storage/storage.go index 92ce234..a90daab 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -3,7 +3,7 @@ package storage import ( "errors" - "github.com/glebarez/sqlite" + "github.com/rs/zerolog/log" "gitlab.com/mstarongitlab/linstrom/storage/cache" "gorm.io/driver/postgres" "gorm.io/gorm" @@ -18,41 +18,26 @@ type Storage struct { var ErrInvalidData = errors.New("invalid data") -// Build a new storage using sqlite as database backend -func NewStorageSqlite(filePath string, cache *cache.Cache) (*Storage, error) { - db, err := gorm.Open(sqlite.Open(filePath)) +func NewStorage(dbUrl string, cache *cache.Cache) (*Storage, error) { + db, err := gorm.Open(postgres.Open(dbUrl), &gorm.Config{ + Logger: newGormLogger(log.Logger), + }) if err != nil { return nil, err } - return storageFromEmptyDb(db, cache) -} - -func NewStoragePostgres(dbUrl string, cache *cache.Cache) (*Storage, error) { - db, err := gorm.Open(postgres.Open(dbUrl)) - if err != nil { - return nil, err - } - return storageFromEmptyDb(db, cache) -} - -func storageFromEmptyDb(db *gorm.DB, cache *cache.Cache) (*Storage, error) { - // AutoMigrate ensures the db is in a state where all the structs given here - // have their own tables and relations setup. It also updates tables if necessary - err := db.AutoMigrate( + err = db.AutoMigrate( MediaMetadata{}, Account{}, RemoteServer{}, Note{}, Role{}, PasskeySession{}, + InboundJob{}, + OutboundJob{}, ) if err != nil { return nil, err } - // And finally, build the actual storage struct - return &Storage{ - db: db, - cache: cache, - }, nil + return &Storage{db, cache}, nil } diff --git a/storage/storage.go.old b/storage/storage.go.old new file mode 100644 index 0000000..92ce234 --- /dev/null +++ b/storage/storage.go.old @@ -0,0 +1,58 @@ +package storage + +import ( + "errors" + + "github.com/glebarez/sqlite" + "gitlab.com/mstarongitlab/linstrom/storage/cache" + "gorm.io/driver/postgres" + "gorm.io/gorm" +) + +// Storage is responsible for all database, cache and media related actions +// and serves as the lowest layer of the cake +type Storage struct { + db *gorm.DB + cache *cache.Cache +} + +var ErrInvalidData = errors.New("invalid data") + +// Build a new storage using sqlite as database backend +func NewStorageSqlite(filePath string, cache *cache.Cache) (*Storage, error) { + db, err := gorm.Open(sqlite.Open(filePath)) + if err != nil { + return nil, err + } + return storageFromEmptyDb(db, cache) +} + +func NewStoragePostgres(dbUrl string, cache *cache.Cache) (*Storage, error) { + db, err := gorm.Open(postgres.Open(dbUrl)) + if err != nil { + return nil, err + } + return storageFromEmptyDb(db, cache) +} + +func storageFromEmptyDb(db *gorm.DB, cache *cache.Cache) (*Storage, error) { + // AutoMigrate ensures the db is in a state where all the structs given here + // have their own tables and relations setup. It also updates tables if necessary + err := db.AutoMigrate( + MediaMetadata{}, + Account{}, + RemoteServer{}, + Note{}, + Role{}, + PasskeySession{}, + ) + if err != nil { + return nil, err + } + + // And finally, build the actual storage struct + return &Storage{ + db: db, + cache: cache, + }, nil +}