Store failed requests in db for later retries

This commit is contained in:
Melody Becker 2025-06-13 13:43:27 +02:00
parent d86ad370df
commit 1c216e415d
Signed by: mstar
SSH key fingerprint: SHA256:9VAo09aaVNTWKzPW7Hq2LW+ox9OdwmTSHRoD4mlz1yI
8 changed files with 1292 additions and 250 deletions

View file

@ -2,12 +2,17 @@ package webshared
import (
"bytes"
"cmp"
"crypto/sha256"
"crypto/x509"
"encoding/base64"
"fmt"
"io"
"net/http"
"net/url"
"slices"
"strconv"
"strings"
"time"
"github.com/rs/zerolog/log"
@ -15,6 +20,7 @@ import (
"git.mstar.dev/mstar/linstrom/config"
"git.mstar.dev/mstar/linstrom/shared"
"git.mstar.dev/mstar/linstrom/storage-new/dbgen"
"git.mstar.dev/mstar/linstrom/storage-new/models"
)
@ -41,13 +47,22 @@ func RequestSigned(
body []byte,
actor *models.User,
) (response *http.Response, wasRfc9421 bool, err error) {
res, err := RequestSignedRFC9421(method, target, body, actor)
if err == nil && res.StatusCode >= 200 && res.StatusCode < 400 {
return res, true, nil
if method == "POST" {
storedRequestId, err := prePostRequest(target, body, actor)
if err != nil {
return nil, false, err
}
defer postPostRequest(response, err, *storedRequestId)
}
wasRfc9421 = true
response, err = RequestSignedRFC9421(method, target, body, actor)
if err == nil && response.StatusCode < 400 {
return
}
wasRfc9421 = false
log.Debug().Str("target", target).Msg("RFC9421 signed request failed, trying cavage signed")
res, err = RequestSignedCavage(method, target, body, actor)
return res, false, err
response, err = RequestSignedCavage(method, target, body, actor)
return
}
// Perform a request, signing it as specified in RFC 9421
@ -155,3 +170,130 @@ func applyBodyHash(headers http.Header, body []byte) error {
headers.Set("Digest", header)
return nil
}
// Runs before a signed outbound request.
// If the request is POST, stores it in the db as not processed yet.
// This is to ensure data consistency
func prePostRequest(
method, target string,
body []byte,
actor *models.User,
) (*uint64, error) {
targetUrl, err := url.Parse(target)
if err != nil {
return nil, err
}
server, err := dbgen.RemoteServer.Where(dbgen.RemoteServer.Domain.Eq(targetUrl.Hostname())).
First()
if err != nil {
return nil, err
}
fr := dbgen.FailedOutboundRequest
now := time.Now()
data := models.FailedOutboundRequest{
TargetServer: server,
TargetServerId: server.ID,
ActingUserId: actor.ID,
ActingUser: actor,
NrOfAttempts: 1,
RawData: body,
FirstAttempt: now,
LastAttempt: now,
LastFailureReason: string(models.RequestFailureNotAttemptedYet),
}
err = fr.Create(&data)
if err != nil {
return nil, err
}
return &data.Id, nil
}
// Updates the db request based on the results of the request
func postPostRequest(resp *http.Response, reqErr error, dbId uint64) {
fr := dbgen.FailedOutboundRequest
failureReason := "generic"
update := true // Flag to tell defer func to not update since request info has been deleted
defer func() {
if !update {
return
}
_, err := fr.Where(fr.Id.Eq(dbId)).UpdateColumn(fr.LastFailureReason, failureReason)
if err != nil {
log.Error().
Err(err).
Str("reason", failureReason).
Uint64("request-id", dbId).
Msg("Failed to update failure reason")
}
}()
if reqErr != nil {
failureReason = string(models.RequestFailureRequestError)
return
}
// Only check response data after handling response error
// Response could be nil otherwise, causing a panic (or an extra, useless check)
// If response status is ok (< 400) delete entry in db to not process it again
if resp.StatusCode < 400 {
update = false
fr.Where(fr.Id.Eq(dbId)).Delete()
return
}
if resp.StatusCode == 429 {
// Always prefer the rate limit headers as defined by https://www.ietf.org/archive/id/draft-polli-ratelimit-headers-02.html
limit := cmp.Or(
resp.Header.Get("RateLimit-Limit"),
resp.Header.Get("X-RateLimit-Limit"),
resp.Header.Get("X-Rate-Limit-Limit"),
)
remaining := cmp.Or(
resp.Header.Get("RateLimit-Remaining"),
resp.Header.Get("X-RateLimit-Remaining"),
resp.Header.Get("X-Rate-Limit-Remaining"),
)
reset := cmp.Or(
resp.Header.Get("RateLimit-Reset"),
resp.Header.Get("X-RateLimit-Reset"),
resp.Header.Get("X-Rate-Limit-Reset"),
)
if cmp.Or(limit, remaining, reset) == "" {
failureReason = string(models.RequestFailureRateLimitedNoInfo)
return
} else {
limit = strings.Split(limit, ",")[0]
limit = strings.Split(limit, ";")[0]
if limit == "" {
limit = "-1"
}
limitNum, err := strconv.Atoi(limit)
if err != nil {
failureReason = string(models.RequestFailureRateLimitedNoInfo)
return
}
if remaining == "" {
remaining = "-1"
}
remainingNum, err := strconv.Atoi(remaining)
if err != nil {
failureReason = string(models.RequestFailureRateLimitedNoInfo)
return
}
if reset == "" {
reset = "-1"
}
resetNum, err := strconv.Atoi(reset)
if err != nil {
failureReason = string(models.RequestFailureRateLimitedNoInfo)
return
}
failureReason = fmt.Sprintf(string(models.RequestFailureRateLimitTemplate), limitNum, remainingNum, resetNum)
}
return
} else {
if resp.StatusCode >= 500 {
failureReason = string(models.RequestFailureInternalError)
} else {
failureReason = string(models.RequestFailureRejected)
}
}
}