From d1d4795f061093853ce2c798181f7ddd3704d381 Mon Sep 17 00:00:00 2001 From: lostlevels Date: Fri, 1 Nov 2024 13:35:15 -0700 Subject: [PATCH 1/3] [BACK-3245] Respond with appropriate error code when unable to retrieve contents of a device log. --- blob/service/api/v1/v1.go | 4 ++-- blob/service/api/v1/v1_test.go | 27 +++++++++++++++++++++++++++ request/responder.go | 4 ++++ 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/blob/service/api/v1/v1.go b/blob/service/api/v1/v1.go index 0dcad8fac6..a894116f67 100644 --- a/blob/service/api/v1/v1.go +++ b/blob/service/api/v1/v1.go @@ -281,11 +281,11 @@ func (r *Router) GetDeviceLogsContent(res rest.ResponseWriter, req *rest.Request content, err := blobClient.GetDeviceLogsContent(req.Context(), *deviceLogMetadata.ID) if err != nil { - responder.Error(http.StatusInternalServerError, err) + responder.CodedError(err) return } if content == nil || content.Body == nil { - responder.Error(http.StatusNotFound, request.ErrorResourceNotFoundWithID(deviceLogID)) + responder.CodedError(request.ErrorResourceNotFoundWithID(deviceLogID)) return } defer content.Body.Close() diff --git a/blob/service/api/v1/v1_test.go b/blob/service/api/v1/v1_test.go index 2124d7d0c6..b296235fe8 100644 --- a/blob/service/api/v1/v1_test.go +++ b/blob/service/api/v1/v1_test.go @@ -521,6 +521,20 @@ var _ = Describe("V1", func() { errorsTest.ExpectErrorJSON(request.ErrorResourceNotFoundWithID(id), res.WriteInputs[0]) }) + It("responds with not found error when the client returns a blob but the blob has no content", func() { + // This has happened when blob creation succeeded but the request context canceled before the contents finished uploading to S3 so there is a "stray" blob w/o content + deviceLogsBlob := blobTest.RandomDeviceLogsBlob() + client.GetDeviceLogsBlobOutputs = []blobTest.GetDeviceLogsBlobOutput{{Blob: deviceLogsBlob}} + client.GetDeviceLogsContentOutputs = []blobTest.GetDeviceLogsContentOutput{{Error: request.ErrorResourceNotFoundWithID(*deviceLogsBlob.ID)}} + res.WriteOutputs = []testRest.WriteOutput{{BytesWritten: 0, Error: nil}} + + handlerFunc(res, req) + Expect(res.WriteHeaderInputs).To(Equal([]int{http.StatusNotFound})) + Expect(res.HeaderOutput).To(Equal(&http.Header{"Content-Type": []string{"application/json; charset=utf-8"}})) + Expect(res.WriteInputs).To(HaveLen(1)) + errorsTest.ExpectErrorJSON(request.ErrorResourceNotFoundWithID(*deviceLogsBlob.ID), res.WriteInputs[0]) + }) + It("responds successfully with headers", func() { deviceLogsBlob := blobTest.RandomDeviceLogsBlob() content := blob.NewDeviceLogsContent() @@ -607,6 +621,19 @@ var _ = Describe("V1", func() { errorsTest.ExpectErrorJSON(request.ErrorResourceNotFoundWithID(id), res.WriteInputs[0]) }) + It("responds with not found error when the client returns a blob but the blob has no content", func() { + deviceLogsBlob := blobTest.RandomDeviceLogsBlob() + deviceLogsBlob.UserID = pointer.FromString(userID) + client.GetDeviceLogsBlobOutputs = []blobTest.GetDeviceLogsBlobOutput{{Blob: deviceLogsBlob}} + client.GetDeviceLogsContentOutputs = []blobTest.GetDeviceLogsContentOutput{{Error: request.ErrorResourceNotFoundWithID(*deviceLogsBlob.ID)}} + res.WriteOutputs = []testRest.WriteOutput{{BytesWritten: 0, Error: nil}} + + handlerFunc(res, req) + Expect(res.WriteHeaderInputs).To(Equal([]int{http.StatusNotFound})) + Expect(res.HeaderOutput).To(Equal(&http.Header{"Content-Type": []string{"application/json; charset=utf-8"}})) + Expect(res.WriteInputs).To(HaveLen(1)) + errorsTest.ExpectErrorJSON(request.ErrorResourceNotFoundWithID(*deviceLogsBlob.ID), res.WriteInputs[0]) + }) It("responds successfully with headers for user's own logs content", func() { deviceLogsBlob := blobTest.RandomDeviceLogsBlob() deviceLogsBlob.UserID = pointer.FromString(userID) diff --git a/request/responder.go b/request/responder.go index 708673437a..db5089e936 100644 --- a/request/responder.go +++ b/request/responder.go @@ -119,6 +119,10 @@ func (r *Responder) Error(statusCode int, err error, mutators ...ResponseMutator } } +func (r *Responder) CodedError(err error, mutators ...ResponseMutator) { + r.Error(StatusCodeForError(err), err, mutators...) +} + func (r *Responder) InternalServerError(err error, mutators ...ResponseMutator) { if err == nil { err = ErrorInternalServerError(errors.New("error is missing")) From e809c542e5b1dd78cdc016a2fae9356cf7cb8179 Mon Sep 17 00:00:00 2001 From: lostlevels Date: Wed, 6 Nov 2024 17:53:05 -0800 Subject: [PATCH 2/3] Cleanup even if context is canceled. --- blob/service/client/client.go | 104 ++++++++++++++++++++++++---------- 1 file changed, 74 insertions(+), 30 deletions(-) diff --git a/blob/service/client/client.go b/blob/service/client/client.go index a5b6bf516f..610290a8ef 100644 --- a/blob/service/client/client.go +++ b/blob/service/client/client.go @@ -4,7 +4,9 @@ import ( "context" "crypto/md5" "encoding/base64" + stdErrs "errors" "io" + "time" "github.com/tidepool-org/platform/blob" blobStoreStructured "github.com/tidepool-org/platform/blob/store/structured" @@ -19,6 +21,11 @@ import ( structureValidator "github.com/tidepool-org/platform/structure/validator" ) +const ( + // arbritrary timeout value for cleanup operations such as deleting from S3 or Repo + defaultCleanupTimeout = time.Second * 5 +) + type Provider interface { BlobStructuredStore() blobStoreStructured.Store BlobUnstructuredStore() blobStoreUnstructured.Store @@ -70,31 +77,40 @@ func (c *Client) Create(ctx context.Context, userID string, content *blob.Conten options.MediaType = content.MediaType err = c.BlobUnstructuredStore().Put(ctx, userID, *result.ID, io.TeeReader(io.TeeReader(io.LimitReader(content.Body, blob.SizeMaximum+1), hasher), sizer), options) if err != nil { - if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil { - logger.WithError(destroyErr).Error("Unable to destroy blob after failure to put blob content") - } + doDoneCtx(ctx, defaultCleanupTimeout, func(ctx context.Context) error { + if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil { + logger.WithError(destroyErr).Error("Unable to destroy blob after failure to put blob content") + } + return nil + }) return nil, err } size := sizer.Size if size > blob.SizeMaximum { - if _, deleteErr := c.BlobUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil { - logger.WithError(deleteErr).Error("Unable to delete blob content exceeding maximum size") - } - if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil { - logger.WithError(destroyErr).Error("Unable to destroy blob exceeding maximum size") - } + doDoneCtx(ctx, defaultCleanupTimeout, func(ctx context.Context) error { + if _, deleteErr := c.BlobUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil { + logger.WithError(deleteErr).Error("Unable to delete blob content exceeding maximum size") + } + if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil { + logger.WithError(destroyErr).Error("Unable to destroy blob exceeding maximum size") + } + return nil + }) return nil, request.ErrorResourceTooLarge() } digestMD5 := base64.StdEncoding.EncodeToString(hasher.Sum(nil)) if content.DigestMD5 != nil && *content.DigestMD5 != digestMD5 { - if _, deleteErr := c.BlobUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil { - logger.WithError(deleteErr).Error("Unable to delete blob content with incorrect MD5 digest") - } - if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil { - logger.WithError(destroyErr).Error("Unable to destroy blob with incorrect MD5 digest") - } + doDoneCtx(ctx, defaultCleanupTimeout, func(ctx context.Context) error { + if _, deleteErr := c.BlobUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil { + logger.WithError(deleteErr).Error("Unable to delete blob content with incorrect MD5 digest") + } + if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil { + logger.WithError(destroyErr).Error("Unable to destroy blob with incorrect MD5 digest") + } + return nil + }) return nil, errors.WithSource(request.ErrorDigestsNotEqual(*content.DigestMD5, digestMD5), structure.NewPointerSource().WithReference("digestMD5")) } @@ -129,31 +145,41 @@ func (c *Client) CreateDeviceLogs(ctx context.Context, userID string, content *b options.MediaType = content.MediaType err = c.DeviceLogsUnstructuredStore().Put(ctx, userID, *result.ID, io.TeeReader(io.TeeReader(io.LimitReader(content.Body, blob.SizeMaximum+1), hasher), sizer), options) if err != nil { - if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil { - logger.WithError(destroyErr).Error("Unable to destroy blob after failure to put blob content") - } + doDoneCtx(ctx, defaultCleanupTimeout, func(ctx context.Context) error { + if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil { + logger.WithError(destroyErr).Error("Unable to destroy blob after failure to put blob content") + } + return nil + }) return nil, err } size := sizer.Size if size > blob.SizeMaximum { - if _, deleteErr := c.DeviceLogsUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil { - logger.WithError(deleteErr).Error("Unable to delete blob content exceeding maximum size") - } - if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil { - logger.WithError(destroyErr).Error("Unable to destroy blob exceeding maximum size") - } + doDoneCtx(ctx, defaultCleanupTimeout, func(ctx context.Context) error { + if _, deleteErr := c.DeviceLogsUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil { + logger.WithError(deleteErr).Error("Unable to delete blob content exceeding maximum size") + } + if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil { + logger.WithError(destroyErr).Error("Unable to destroy blob exceeding maximum size") + } + return nil + }) return nil, request.ErrorResourceTooLarge() } digestMD5 := base64.StdEncoding.EncodeToString(hasher.Sum(nil)) if content.DigestMD5 != nil && *content.DigestMD5 != digestMD5 { - if _, deleteErr := c.DeviceLogsUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil { - logger.WithError(deleteErr).Error("Unable to delete blob content with incorrect MD5 digest") - } - if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil { - logger.WithError(destroyErr).Error("Unable to destroy blob with incorrect MD5 digest") - } + doDoneCtx(ctx, defaultCleanupTimeout, func(ctx context.Context) error { + if _, deleteErr := c.DeviceLogsUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil { + logger.WithError(deleteErr).Error("Unable to delete blob content with incorrect MD5 digest") + } + if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil { + logger.WithError(destroyErr).Error("Unable to destroy blob with incorrect MD5 digest") + } + return nil + }) + return nil, errors.WithSource(request.ErrorDigestsNotEqual(*content.DigestMD5, digestMD5), structure.NewPointerSource().WithReference("digestMD5")) } @@ -287,3 +313,21 @@ func (s *SizeWriter) Write(bites []byte) (int, error) { s.Size += length return length, nil } + +// doDoneCtx performs an action given a context even if the context is +// canceled or timed out. This is used if we have any cleanup functions that we +// still want to perform and passing the parent context would time out any +// child contexts. +func doDoneCtx(ctx context.Context, timeout time.Duration, fn func(ctx context.Context) error) error { + newContext := ctx + var cancel context.CancelFunc + select { + case <-ctx.Done(): + if stdErrs.Is(ctx.Err(), context.Canceled) || stdErrs.Is(ctx.Err(), context.DeadlineExceeded) { + newContext, cancel = context.WithTimeout(context.WithoutCancel(ctx), timeout) + defer cancel() + } + default: + } + return fn(newContext) +} From 6ae0d1b6416411eb47bf417a38e5658ef3f2048b Mon Sep 17 00:00:00 2001 From: lostlevels Date: Fri, 7 Feb 2025 12:30:55 -0800 Subject: [PATCH 3/3] Updates from PR comments. --- blob/service/api/v1/v1.go | 5 ++--- blob/service/client/client.go | 29 ++++++++++------------------- request/responder.go | 4 ---- 3 files changed, 12 insertions(+), 26 deletions(-) diff --git a/blob/service/api/v1/v1.go b/blob/service/api/v1/v1.go index a894116f67..aa647aba05 100644 --- a/blob/service/api/v1/v1.go +++ b/blob/service/api/v1/v1.go @@ -280,12 +280,11 @@ func (r *Router) GetDeviceLogsContent(res rest.ResponseWriter, req *rest.Request } content, err := blobClient.GetDeviceLogsContent(req.Context(), *deviceLogMetadata.ID) - if err != nil { - responder.CodedError(err) + if responder.RespondIfError(err) { return } if content == nil || content.Body == nil { - responder.CodedError(request.ErrorResourceNotFoundWithID(deviceLogID)) + responder.RespondIfError(request.ErrorResourceNotFoundWithID(deviceLogID)) return } defer content.Body.Close() diff --git a/blob/service/client/client.go b/blob/service/client/client.go index 610290a8ef..4aeac6bd2b 100644 --- a/blob/service/client/client.go +++ b/blob/service/client/client.go @@ -4,7 +4,6 @@ import ( "context" "crypto/md5" "encoding/base64" - stdErrs "errors" "io" "time" @@ -77,7 +76,7 @@ func (c *Client) Create(ctx context.Context, userID string, content *blob.Conten options.MediaType = content.MediaType err = c.BlobUnstructuredStore().Put(ctx, userID, *result.ID, io.TeeReader(io.TeeReader(io.LimitReader(content.Body, blob.SizeMaximum+1), hasher), sizer), options) if err != nil { - doDoneCtx(ctx, defaultCleanupTimeout, func(ctx context.Context) error { + alwaysDo(ctx, defaultCleanupTimeout, func(ctx context.Context) error { if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil { logger.WithError(destroyErr).Error("Unable to destroy blob after failure to put blob content") } @@ -88,7 +87,7 @@ func (c *Client) Create(ctx context.Context, userID string, content *blob.Conten size := sizer.Size if size > blob.SizeMaximum { - doDoneCtx(ctx, defaultCleanupTimeout, func(ctx context.Context) error { + alwaysDo(ctx, defaultCleanupTimeout, func(ctx context.Context) error { if _, deleteErr := c.BlobUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil { logger.WithError(deleteErr).Error("Unable to delete blob content exceeding maximum size") } @@ -102,7 +101,7 @@ func (c *Client) Create(ctx context.Context, userID string, content *blob.Conten digestMD5 := base64.StdEncoding.EncodeToString(hasher.Sum(nil)) if content.DigestMD5 != nil && *content.DigestMD5 != digestMD5 { - doDoneCtx(ctx, defaultCleanupTimeout, func(ctx context.Context) error { + alwaysDo(ctx, defaultCleanupTimeout, func(ctx context.Context) error { if _, deleteErr := c.BlobUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil { logger.WithError(deleteErr).Error("Unable to delete blob content with incorrect MD5 digest") } @@ -145,7 +144,7 @@ func (c *Client) CreateDeviceLogs(ctx context.Context, userID string, content *b options.MediaType = content.MediaType err = c.DeviceLogsUnstructuredStore().Put(ctx, userID, *result.ID, io.TeeReader(io.TeeReader(io.LimitReader(content.Body, blob.SizeMaximum+1), hasher), sizer), options) if err != nil { - doDoneCtx(ctx, defaultCleanupTimeout, func(ctx context.Context) error { + alwaysDo(ctx, defaultCleanupTimeout, func(ctx context.Context) error { if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil { logger.WithError(destroyErr).Error("Unable to destroy blob after failure to put blob content") } @@ -156,7 +155,7 @@ func (c *Client) CreateDeviceLogs(ctx context.Context, userID string, content *b size := sizer.Size if size > blob.SizeMaximum { - doDoneCtx(ctx, defaultCleanupTimeout, func(ctx context.Context) error { + alwaysDo(ctx, defaultCleanupTimeout, func(ctx context.Context) error { if _, deleteErr := c.DeviceLogsUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil { logger.WithError(deleteErr).Error("Unable to delete blob content exceeding maximum size") } @@ -170,7 +169,7 @@ func (c *Client) CreateDeviceLogs(ctx context.Context, userID string, content *b digestMD5 := base64.StdEncoding.EncodeToString(hasher.Sum(nil)) if content.DigestMD5 != nil && *content.DigestMD5 != digestMD5 { - doDoneCtx(ctx, defaultCleanupTimeout, func(ctx context.Context) error { + alwaysDo(ctx, defaultCleanupTimeout, func(ctx context.Context) error { if _, deleteErr := c.DeviceLogsUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil { logger.WithError(deleteErr).Error("Unable to delete blob content with incorrect MD5 digest") } @@ -314,20 +313,12 @@ func (s *SizeWriter) Write(bites []byte) (int, error) { return length, nil } -// doDoneCtx performs an action given a context even if the context is +// alwaysDo performs an action given a context even if the context is // canceled or timed out. This is used if we have any cleanup functions that we // still want to perform and passing the parent context would time out any // child contexts. -func doDoneCtx(ctx context.Context, timeout time.Duration, fn func(ctx context.Context) error) error { - newContext := ctx - var cancel context.CancelFunc - select { - case <-ctx.Done(): - if stdErrs.Is(ctx.Err(), context.Canceled) || stdErrs.Is(ctx.Err(), context.DeadlineExceeded) { - newContext, cancel = context.WithTimeout(context.WithoutCancel(ctx), timeout) - defer cancel() - } - default: - } +func alwaysDo(ctx context.Context, timeout time.Duration, fn func(ctx context.Context) error) error { + newContext, cancel := context.WithTimeout(context.WithoutCancel(ctx), timeout) + defer cancel() return fn(newContext) } diff --git a/request/responder.go b/request/responder.go index db5089e936..708673437a 100644 --- a/request/responder.go +++ b/request/responder.go @@ -119,10 +119,6 @@ func (r *Responder) Error(statusCode int, err error, mutators ...ResponseMutator } } -func (r *Responder) CodedError(err error, mutators ...ResponseMutator) { - r.Error(StatusCodeForError(err), err, mutators...) -} - func (r *Responder) InternalServerError(err error, mutators ...ResponseMutator) { if err == nil { err = ErrorInternalServerError(errors.New("error is missing"))