Work on failed outbound requests and move type migrators

- DB type migrators are now in separate file, in preparation for full
  custom sql migration statements
- Start work on handling failed outbound requests stored in the db
This commit is contained in:
Melody Becker 2025-07-03 13:57:30 +02:00
parent 81a01fbf8b
commit 7ac4c628b8
Signed by: mstar
SSH key fingerprint: SHA256:9VAo09aaVNTWKzPW7Hq2LW+ox9OdwmTSHRoD4mlz1yI
13 changed files with 372 additions and 145 deletions

View file

@ -54,6 +54,7 @@ func main() {
g.ApplyInterface(func(models.IAccessToken) {}, models.AccessToken{})
g.ApplyInterface(func(models.INote) {}, models.Note{})
g.ApplyInterface(func(models.IUserToUserRelation) {}, models.UserToUserRelation{})
g.ApplyInterface(func(models.IFailedOutboundRequest) {}, models.FailedOutboundRequest{})
log.Info().Msg("Extra features applied, starting generation")
g.Execute()

2
go.mod
View file

@ -80,7 +80,7 @@ require (
go.uber.org/mock v0.5.0 // indirect
golang.org/x/mod v0.24.0 // indirect
golang.org/x/net v0.39.0 // indirect
golang.org/x/sync v0.13.0 // indirect
golang.org/x/sync v0.15.0 // indirect
golang.org/x/text v0.24.0 // indirect
golang.org/x/tools v0.32.0 // indirect
gorm.io/datatypes v1.2.5 // indirect

2
go.sum
View file

@ -193,6 +193,8 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610=
golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8=
golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

View file

@ -0,0 +1,30 @@
package cleaners
import (
"time"
"github.com/rs/zerolog/log"
"git.mstar.dev/mstar/linstrom/storage-new/dbgen"
)
const maxServerAge = time.Hour * 24 * 30 // One month
func init() {
cleanerBuilders = append(cleanerBuilders, buildKillDeadServers)
}
// Marks all servers where the last interaction time is older than maxServerAge
func tickKillDeadServers(now time.Time) {
_, err := dbgen.RemoteServer.Where(dbgen.RemoteServer.LastInteraction.Lt(now.Add(-maxServerAge)), dbgen.RemoteServer.IsSelf.Is(false)).
UpdateColumn(dbgen.RemoteServer.IsDead, true)
if err != nil {
log.Error().
Err(err).
Msg("Failed to mark servers without interaction for over a 30 days as dead")
}
}
func buildKillDeadServers() (onTick func(time.Time), name string, tickSpeed time.Duration) {
return tickKillDeadServers, "kill-dead-servers", time.Hour
}

View file

@ -0,0 +1,113 @@
package cleaners
import (
"net/http"
"strings"
"sync"
"time"
"github.com/rs/zerolog/log"
"golang.org/x/sync/errgroup"
"gorm.io/gen"
"git.mstar.dev/mstar/linstrom/storage-new/dbgen"
"git.mstar.dev/mstar/linstrom/storage-new/models"
webshared "git.mstar.dev/mstar/linstrom/web/shared"
)
const maxFailedRequestsBeforeDeath = 10
var (
reqErrStrUnknownHost = "no such host"
)
func init() {
cleanerBuilders = append(cleanerBuilders, buildRetryRequests)
}
func TickRetryRequests(now time.Time) {
batchResults := []*models.FailedOutboundRequest{}
fo := dbgen.FailedOutboundRequest
idsToDelete := []uint64{}
var idsDeleteLock sync.Mutex
err := fo.Preload(fo.TargetServer, fo.ActingUser).
// Join with server data to exclude dead servers
LeftJoin(dbgen.RemoteServer, dbgen.RemoteServer.ID.EqCol(fo.TargetServerId)).
Where(dbgen.RemoteServer.IsDead.Is(false)).
Order(fo.Id.Asc()).
FindInBatches(&batchResults, 50, func(tx gen.Dao, batch int) error {
var g errgroup.Group
for _, failedRequest := range batchResults {
g.Go(func() (err error) {
defer func() {
failedRequest.NrOfAttempts += 1
err = dbgen.FailedOutboundRequest.Save(failedRequest)
if err != nil {
return
}
if failedRequest.NrOfAttempts >= maxFailedRequestsBeforeDeath {
_, err = dbgen.RemoteServer.Where(dbgen.RemoteServer.ID.Eq(failedRequest.TargetServerId)).
UpdateColumn(dbgen.RemoteServer.IsDead, true)
}
}()
var res *http.Response
res, _, err = webshared.RequestSigned(
"POST",
failedRequest.Target,
failedRequest.RawData,
failedRequest.ActingUser,
)
if err != nil {
failedRequest.NrOfAttempts += 1
errString := err.Error()
// FIXME: Use the actual error types instead of error string
// Using substring matching is awful and probably not reliable. Using error type is likely more reliable
if strings.Contains(errString, reqErrStrUnknownHost) {
failedRequest.LastFailureReason = string(
models.RequestFailureRequestError,
)
} else {
failedRequest.LastFailureReason = string(models.RequestFailureRequestError)
}
return
}
if res.StatusCode < 400 {
idsDeleteLock.Lock()
idsToDelete = append(idsToDelete, failedRequest.Id)
idsDeleteLock.Unlock()
// Defer func will always add one (to make the expected failure case easier)
// Sub one here to prevent a potential server kill if it was at maxFailedRequestsBeforeDeath-1 failed requests before
failedRequest.NrOfAttempts -= 1
return nil
}
switch res.StatusCode {
case http.StatusInternalServerError:
failedRequest.LastFailureReason = string(models.RequestFailureInternalError)
case http.StatusForbidden:
failedRequest.LastFailureReason = string(models.RequestFailureRejected)
case http.StatusTooManyRequests:
// TODO: Check Timeout headers and write apropriate message
}
return nil
})
}
return nil
})
if err != nil {
log.Error().Err(err).Msg("Failed to batch-process all failed outbound requests")
}
_, err = fo.Where(fo.Id.In(idsToDelete...)).Delete()
if err != nil {
log.Error().Err(err).Msg("Failed to batch-delete all successful retries")
}
err = dbgen.FailedOutboundRequest.KillServers(maxFailedRequestsBeforeDeath)
if err != nil {
log.Error().Err(err).Msg("Failed to kill all servers with too many failed requests")
}
}
func buildRetryRequests() (onTick func(time.Time), name string, tickSpeed time.Duration) {
return TickRetryRequests, "retry-requests", time.Hour
}

View file

@ -7,6 +7,7 @@ package dbgen
import (
"context"
"database/sql"
"strings"
"git.mstar.dev/mstar/linstrom/storage-new/models"
"gorm.io/gorm"
@ -607,6 +608,34 @@ type IFailedOutboundRequestDo interface {
Returning(value interface{}, columns ...string) IFailedOutboundRequestDo
UnderlyingDB() *gorm.DB
schema.Tabler
KillServers(maxAttempts uint32) (err error)
}
// KillServers marks all servers where a request has more than X failed attempts as dead
//
// WITH servers_to_kill AS (
//
// SELECT target_server_id
// FROM failed_outbound_requests
// WHERE nr_of_attempts > @maxAttempts
//
// )
// UPDATE remote_servers
// SET is_dead = true
// WHERE id IN (SELECT target_server_id FROM servers_to_kill)
func (f failedOutboundRequestDo) KillServers(maxAttempts uint32) (err error) {
var params []interface{}
var generateSQL strings.Builder
params = append(params, maxAttempts)
generateSQL.WriteString("WITH servers_to_kill AS ( SELECT target_server_id FROM failed_outbound_requests WHERE nr_of_attempts > ? ) UPDATE remote_servers SET is_dead = true WHERE id IN (SELECT target_server_id FROM servers_to_kill) ")
var executeSQL *gorm.DB
executeSQL = f.UnderlyingDB().Exec(generateSQL.String(), params...) // ignore_security_alert
err = executeSQL.Error
return
}
func (f failedOutboundRequestDo) Debug() IFailedOutboundRequestDo {

View file

@ -55,6 +55,8 @@ func newRole(db *gorm.DB, opts ...gen.DOOption) role {
_role.CanMentionOthers = field.NewBool(tableName, "can_mention_others")
_role.HasMentionCountLimit = field.NewBool(tableName, "has_mention_count_limit")
_role.MentionLimit = field.NewUint32(tableName, "mention_limit")
_role.MaxIndividualFileSize = field.NewUint64(tableName, "max_individual_file_size")
_role.MaxTotalFileSize = field.NewUint64(tableName, "max_total_file_size")
_role.AutoNsfwMedia = field.NewBool(tableName, "auto_nsfw_media")
_role.AutoCwPosts = field.NewBool(tableName, "auto_cw_posts")
_role.AutoCwPostsText = field.NewString(tableName, "auto_cw_posts_text")
@ -118,6 +120,8 @@ type role struct {
CanMentionOthers field.Bool
HasMentionCountLimit field.Bool
MentionLimit field.Uint32
MaxIndividualFileSize field.Uint64
MaxTotalFileSize field.Uint64
AutoNsfwMedia field.Bool
AutoCwPosts field.Bool
AutoCwPostsText field.String
@ -187,6 +191,8 @@ func (r *role) updateTableName(table string) *role {
r.CanMentionOthers = field.NewBool(table, "can_mention_others")
r.HasMentionCountLimit = field.NewBool(table, "has_mention_count_limit")
r.MentionLimit = field.NewUint32(table, "mention_limit")
r.MaxIndividualFileSize = field.NewUint64(table, "max_individual_file_size")
r.MaxTotalFileSize = field.NewUint64(table, "max_total_file_size")
r.AutoNsfwMedia = field.NewBool(table, "auto_nsfw_media")
r.AutoCwPosts = field.NewBool(table, "auto_cw_posts")
r.AutoCwPostsText = field.NewString(table, "auto_cw_posts_text")
@ -228,7 +234,7 @@ func (r *role) GetFieldByName(fieldName string) (field.OrderExpr, bool) {
}
func (r *role) fillFieldMap() {
r.fieldMap = make(map[string]field.Expr, 53)
r.fieldMap = make(map[string]field.Expr, 55)
r.fieldMap["id"] = r.ID
r.fieldMap["created_at"] = r.CreatedAt
r.fieldMap["updated_at"] = r.UpdatedAt
@ -257,6 +263,8 @@ func (r *role) fillFieldMap() {
r.fieldMap["can_mention_others"] = r.CanMentionOthers
r.fieldMap["has_mention_count_limit"] = r.HasMentionCountLimit
r.fieldMap["mention_limit"] = r.MentionLimit
r.fieldMap["max_individual_file_size"] = r.MaxIndividualFileSize
r.fieldMap["max_total_file_size"] = r.MaxTotalFileSize
r.fieldMap["auto_nsfw_media"] = r.AutoNsfwMedia
r.fieldMap["auto_cw_posts"] = r.AutoCwPosts
r.fieldMap["auto_cw_posts_text"] = r.AutoCwPostsText

View file

@ -39,6 +39,7 @@ func newServerMetadata(db *gorm.DB, opts ...gen.DOOption) serverMetadata {
_serverMetadata.LECSR = field.NewBytes(tableName, "lecsr")
_serverMetadata.LELastUpdate = field.NewField(tableName, "le_last_update")
_serverMetadata.LEUserPrivKey = field.NewBytes(tableName, "le_user_priv_key")
_serverMetadata.LastMigrationVersion = field.NewUint64(tableName, "last_migration_version")
_serverMetadata.fillFieldMap()
@ -61,6 +62,7 @@ type serverMetadata struct {
LECSR field.Bytes
LELastUpdate field.Field
LEUserPrivKey field.Bytes
LastMigrationVersion field.Uint64
fieldMap map[string]field.Expr
}
@ -89,6 +91,7 @@ func (s *serverMetadata) updateTableName(table string) *serverMetadata {
s.LECSR = field.NewBytes(table, "lecsr")
s.LELastUpdate = field.NewField(table, "le_last_update")
s.LEUserPrivKey = field.NewBytes(table, "le_user_priv_key")
s.LastMigrationVersion = field.NewUint64(table, "last_migration_version")
s.fillFieldMap()
@ -105,7 +108,7 @@ func (s *serverMetadata) GetFieldByName(fieldName string) (field.OrderExpr, bool
}
func (s *serverMetadata) fillFieldMap() {
s.fieldMap = make(map[string]field.Expr, 12)
s.fieldMap = make(map[string]field.Expr, 13)
s.fieldMap["id"] = s.Id
s.fieldMap["created_at"] = s.CreatedAt
s.fieldMap["updated_at"] = s.UpdatedAt
@ -118,6 +121,7 @@ func (s *serverMetadata) fillFieldMap() {
s.fieldMap["lecsr"] = s.LECSR
s.fieldMap["le_last_update"] = s.LELastUpdate
s.fieldMap["le_user_priv_key"] = s.LEUserPrivKey
s.fieldMap["last_migration_version"] = s.LastMigrationVersion
}
func (s serverMetadata) clone(db *gorm.DB) serverMetadata {

View file

@ -1,17 +1,16 @@
package storage
import (
"fmt"
"strings"
"git.mstar.dev/mstar/goutils/other"
"git.mstar.dev/mstar/goutils/sliceutils"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"gorm.io/gorm"
"git.mstar.dev/mstar/linstrom/storage-new/models"
"git.mstar.dev/mstar/linstrom/storage-new/dbgen"
)
const CurrentMigrationVersion = 1
// Auto-migrate all tables and types used
func Migrate(db *gorm.DB) error {
// Shut up gorm's queries during automigrate by setting log level to info during migration
@ -38,7 +37,10 @@ func Migrate(db *gorm.DB) error {
return other.Error("storage", "Failed to create Activitystreams Activity type", err)
}
if err := createCollectionTarget(db); err != nil {
return other.Error("storage", "Failed t ocreate collections target type", err)
return other.Error("storage", "Failed to create collections target type", err)
}
if err := preTypeMigrations(db); err != nil {
return other.Error("storage", "Failed to run pre-type migrations", err)
}
if err := migrateTypes(db); err != nil {
return other.Error("storage", "Failed to automigrate data structs", err)
@ -46,124 +48,17 @@ func Migrate(db *gorm.DB) error {
return nil
}
// Returns the raw error created by gorm, with no wrapping
func migrateTypes(db *gorm.DB) error {
if err := db.AutoMigrate(models.AllTypes...); err != nil {
return err
}
return nil
}
// Ensure the being enum exists for the user
func createBeingType(db *gorm.DB) error {
return migrateEnum(
db,
"being_type",
sliceutils.Map(models.AllBeings, func(t models.BeingType) string { return string(t) }),
)
}
func createAccountRelationType(db *gorm.DB) error {
return migrateEnum(
db,
"relation_type",
sliceutils.Map(
models.AllRelations,
func(t models.RelationType) string { return string(t) },
),
)
}
func createAccountAuthMethodType(db *gorm.DB) error {
return migrateEnum(
db,
"auth_method_type",
sliceutils.Map(
models.AllAuthMethods,
func(t models.AuthenticationMethodType) string { return string(t) },
),
)
}
func createRemoteServerSoftwareType(db *gorm.DB) error {
return migrateEnum(
db,
"server_software_type",
sliceutils.Map(
models.AllServerSoftwareTypes,
func(t models.ServerSoftwareType) string { return string(t) },
),
)
}
func createActitiystreamsObjectType(db *gorm.DB) error {
return migrateEnum(
db,
"activitystreams_object_type",
sliceutils.Map(
models.AllActivitystreamsObjectTypes,
func(t models.ActivitystreamsObjectType) string { return string(t) },
),
)
}
func createActitiystreamsActivityType(db *gorm.DB) error {
return migrateEnum(
db,
"activitystreams_activity_type",
sliceutils.Map(
models.AllActivitystreamsActivityTypes,
func(t models.ActivitystreamsActivityType) string { return string(t) },
),
)
}
// func createActitiystreamsActivityTargetType(db *gorm.DB) error {
// return migrateEnum(
// db,
// "activitystreams_activity_target_type",
// sliceutils.Map(
// models.AllActivitystreamsActivityTargetTypes,
// func(t models.ActivitystreamsActivityTargetType) string { return string(t) },
// ),
// )
// }
func createCollectionTarget(db *gorm.DB) error {
return migrateEnum(
db,
"collection_target_type",
sliceutils.Map(
models.AllCollectionTargetTypes,
func(t models.CollectionTargetType) string { return string(t) },
),
)
}
// Helper function for ensuring the existence of an enum with the given values
func migrateEnum(db *gorm.DB, name string, values []string) error {
if err := db.Exec("DROP TYPE IF EXISTS " + name + " CASCADE;").Error; err != nil {
return other.Error(
"storage",
fmt.Sprintf("Failed to remove old type %s (if it exists)", name),
err,
)
}
queryBuilder := strings.Builder{}
queryBuilder.WriteString("CREATE TYPE ")
queryBuilder.WriteString(name)
queryBuilder.WriteString(" AS ENUM (")
blen := len(values)
for i, btype := range values {
queryBuilder.WriteString("'" + string(btype) + "'")
// Append comma everywhere except last entry
if i+1 < blen {
queryBuilder.WriteString(",")
}
}
queryBuilder.WriteString(");")
if err := db.Exec(queryBuilder.String()).Error; err != nil {
return err
func preTypeMigrations(db *gorm.DB) error {
genTmp := dbgen.Use(db)
meta, err := genTmp.
ServerMetadata.Select(dbgen.ServerMetadata.LastMigrationVersion).
First()
if err != nil {
return nil
}
if meta.LastMigrationVersion == CurrentMigrationVersion {
return nil
}
log.Error().Msg("custom schema migrations not implemented yet")
return nil
}

View file

@ -16,3 +16,17 @@ type FailedOutboundRequest struct {
TargetServer *RemoteServer // The remote server being targeted. Included to determine if a request still has a chance of success
TargetServerId uint // Id of the target remote server
}
type IFailedOutboundRequest interface {
// KillServers marks all servers where a request has more than X failed attempts as dead
//
// WITH servers_to_kill AS (
// SELECT target_server_id
// FROM failed_outbound_requests
// WHERE nr_of_attempts > @maxAttempts
// )
// UPDATE remote_servers
// SET is_dead = true
// WHERE id IN (SELECT target_server_id FROM servers_to_kill)
KillServers(maxAttempts uint32) error
}

View file

@ -12,6 +12,7 @@ type ServerMetadata struct {
UpdatedAt time.Time
// ---- Section TLS LetsEncrypt
LEDomain string
LECertUrl string
LECertStableUrl string
@ -21,4 +22,7 @@ type ServerMetadata struct {
LECSR []byte // Encrypted
LELastUpdate sql.NullTime
LEUserPrivKey []byte // Encrypted
// ---- Section Database
LastMigrationVersion uint64
}

View file

@ -0,0 +1,123 @@
package storage
import (
"fmt"
"strings"
"git.mstar.dev/mstar/goutils/other"
"git.mstar.dev/mstar/goutils/sliceutils"
"gorm.io/gorm"
"git.mstar.dev/mstar/linstrom/storage-new/models"
)
// Returns the raw error created by gorm, with no wrapping
func migrateTypes(db *gorm.DB) error {
if err := db.AutoMigrate(models.AllTypes...); err != nil {
return err
}
return nil
}
// Ensure the being enum exists for the user
func createBeingType(db *gorm.DB) error {
return migrateEnum(
db,
"being_type",
sliceutils.Map(models.AllBeings, func(t models.BeingType) string { return string(t) }),
)
}
func createAccountRelationType(db *gorm.DB) error {
return migrateEnum(
db,
"relation_type",
sliceutils.Map(
models.AllRelations,
func(t models.RelationType) string { return string(t) },
),
)
}
func createAccountAuthMethodType(db *gorm.DB) error {
return migrateEnum(
db,
"auth_method_type",
sliceutils.Map(
models.AllAuthMethods,
func(t models.AuthenticationMethodType) string { return string(t) },
),
)
}
func createRemoteServerSoftwareType(db *gorm.DB) error {
return migrateEnum(
db,
"server_software_type",
sliceutils.Map(
models.AllServerSoftwareTypes,
func(t models.ServerSoftwareType) string { return string(t) },
),
)
}
func createActitiystreamsObjectType(db *gorm.DB) error {
return migrateEnum(
db,
"activitystreams_object_type",
sliceutils.Map(
models.AllActivitystreamsObjectTypes,
func(t models.ActivitystreamsObjectType) string { return string(t) },
),
)
}
func createActitiystreamsActivityType(db *gorm.DB) error {
return migrateEnum(
db,
"activitystreams_activity_type",
sliceutils.Map(
models.AllActivitystreamsActivityTypes,
func(t models.ActivitystreamsActivityType) string { return string(t) },
),
)
}
func createCollectionTarget(db *gorm.DB) error {
return migrateEnum(
db,
"collection_target_type",
sliceutils.Map(
models.AllCollectionTargetTypes,
func(t models.CollectionTargetType) string { return string(t) },
),
)
}
// Helper function for ensuring the existence of an enum with the given values
func migrateEnum(db *gorm.DB, name string, values []string) error {
if err := db.Exec("DROP TYPE IF EXISTS " + name + " CASCADE;").Error; err != nil {
return other.Error(
"storage",
fmt.Sprintf("Failed to remove old type %s (if it exists)", name),
err,
)
}
queryBuilder := strings.Builder{}
queryBuilder.WriteString("CREATE TYPE ")
queryBuilder.WriteString(name)
queryBuilder.WriteString(" AS ENUM (")
blen := len(values)
for i, btype := range values {
queryBuilder.WriteString("'" + string(btype) + "'")
// Append comma everywhere except last entry
if i+1 < blen {
queryBuilder.WriteString(",")
}
}
queryBuilder.WriteString(");")
if err := db.Exec(queryBuilder.String()).Error; err != nil {
return err
}
return nil
}

View file

@ -6,6 +6,7 @@ import (
"errors"
"io"
"net/http"
"slices"
webutils "git.mstar.dev/mstar/goutils/http"
"git.mstar.dev/mstar/goutils/sliceutils"
@ -27,6 +28,7 @@ func postAs(w http.ResponseWriter, r *http.Request) {
Content string `json:"content"`
ContentWarning *string `json:"content_warning"`
ReplyTo *string `json:"reply_to"`
Mentions []string `json:"mentions"`
}
log := hlog.FromRequest(r)
dec := json.NewDecoder(r.Body)
@ -86,7 +88,9 @@ func postAs(w http.ResponseWriter, r *http.Request) {
return
}
}
mentions := webshared.MentionsFromContent(data.Content)
mentions := sliceutils.RemoveDuplicate(
slices.Concat(webshared.MentionsFromContent(data.Content), data.Mentions),
)
dbPings := []*models.NoteToPing{}
for _, mention := range mentions {
accId, err := activitypub.ImportRemoteAccountByHandle(mention)