diff --git a/blob/service/api/v1/v1.go b/blob/service/api/v1/v1.go index 0dcad8fac..aa647aba0 100644 --- a/blob/service/api/v1/v1.go +++ b/blob/service/api/v1/v1.go @@ -280,12 +280,11 @@ func (r *Router) GetDeviceLogsContent(res rest.ResponseWriter, req *rest.Request } content, err := blobClient.GetDeviceLogsContent(req.Context(), *deviceLogMetadata.ID) - if err != nil { - responder.Error(http.StatusInternalServerError, err) + if responder.RespondIfError(err) { return } if content == nil || content.Body == nil { - responder.Error(http.StatusNotFound, request.ErrorResourceNotFoundWithID(deviceLogID)) + responder.RespondIfError(request.ErrorResourceNotFoundWithID(deviceLogID)) return } defer content.Body.Close() diff --git a/blob/service/api/v1/v1_test.go b/blob/service/api/v1/v1_test.go index 2124d7d0c..b296235fe 100644 --- a/blob/service/api/v1/v1_test.go +++ b/blob/service/api/v1/v1_test.go @@ -521,6 +521,20 @@ var _ = Describe("V1", func() { errorsTest.ExpectErrorJSON(request.ErrorResourceNotFoundWithID(id), res.WriteInputs[0]) }) + It("responds with not found error when the client returns a blob but the blob has no content", func() { + // This has happened when blob creation succeeded but the request context canceled before the contents finished uploading to S3 so there is a "stray" blob w/o content + deviceLogsBlob := blobTest.RandomDeviceLogsBlob() + client.GetDeviceLogsBlobOutputs = []blobTest.GetDeviceLogsBlobOutput{{Blob: deviceLogsBlob}} + client.GetDeviceLogsContentOutputs = []blobTest.GetDeviceLogsContentOutput{{Error: request.ErrorResourceNotFoundWithID(*deviceLogsBlob.ID)}} + res.WriteOutputs = []testRest.WriteOutput{{BytesWritten: 0, Error: nil}} + + handlerFunc(res, req) + Expect(res.WriteHeaderInputs).To(Equal([]int{http.StatusNotFound})) + Expect(res.HeaderOutput).To(Equal(&http.Header{"Content-Type": []string{"application/json; charset=utf-8"}})) + Expect(res.WriteInputs).To(HaveLen(1)) + errorsTest.ExpectErrorJSON(request.ErrorResourceNotFoundWithID(*deviceLogsBlob.ID), res.WriteInputs[0]) + }) + It("responds successfully with headers", func() { deviceLogsBlob := blobTest.RandomDeviceLogsBlob() content := blob.NewDeviceLogsContent() @@ -607,6 +621,19 @@ var _ = Describe("V1", func() { errorsTest.ExpectErrorJSON(request.ErrorResourceNotFoundWithID(id), res.WriteInputs[0]) }) + It("responds with not found error when the client returns a blob but the blob has no content", func() { + deviceLogsBlob := blobTest.RandomDeviceLogsBlob() + deviceLogsBlob.UserID = pointer.FromString(userID) + client.GetDeviceLogsBlobOutputs = []blobTest.GetDeviceLogsBlobOutput{{Blob: deviceLogsBlob}} + client.GetDeviceLogsContentOutputs = []blobTest.GetDeviceLogsContentOutput{{Error: request.ErrorResourceNotFoundWithID(*deviceLogsBlob.ID)}} + res.WriteOutputs = []testRest.WriteOutput{{BytesWritten: 0, Error: nil}} + + handlerFunc(res, req) + Expect(res.WriteHeaderInputs).To(Equal([]int{http.StatusNotFound})) + Expect(res.HeaderOutput).To(Equal(&http.Header{"Content-Type": []string{"application/json; charset=utf-8"}})) + Expect(res.WriteInputs).To(HaveLen(1)) + errorsTest.ExpectErrorJSON(request.ErrorResourceNotFoundWithID(*deviceLogsBlob.ID), res.WriteInputs[0]) + }) It("responds successfully with headers for user's own logs content", func() { deviceLogsBlob := blobTest.RandomDeviceLogsBlob() deviceLogsBlob.UserID = pointer.FromString(userID) diff --git a/blob/service/client/client.go b/blob/service/client/client.go index a5b6bf516..4aeac6bd2 100644 --- a/blob/service/client/client.go +++ b/blob/service/client/client.go @@ -5,6 +5,7 @@ import ( "crypto/md5" "encoding/base64" "io" + "time" "github.com/tidepool-org/platform/blob" blobStoreStructured "github.com/tidepool-org/platform/blob/store/structured" @@ -19,6 +20,11 @@ import ( structureValidator "github.com/tidepool-org/platform/structure/validator" ) +const ( + // arbritrary timeout value for cleanup operations such as deleting from S3 or Repo + defaultCleanupTimeout = time.Second * 5 +) + type Provider interface { BlobStructuredStore() blobStoreStructured.Store BlobUnstructuredStore() blobStoreUnstructured.Store @@ -70,31 +76,40 @@ func (c *Client) Create(ctx context.Context, userID string, content *blob.Conten options.MediaType = content.MediaType err = c.BlobUnstructuredStore().Put(ctx, userID, *result.ID, io.TeeReader(io.TeeReader(io.LimitReader(content.Body, blob.SizeMaximum+1), hasher), sizer), options) if err != nil { - if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil { - logger.WithError(destroyErr).Error("Unable to destroy blob after failure to put blob content") - } + alwaysDo(ctx, defaultCleanupTimeout, func(ctx context.Context) error { + if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil { + logger.WithError(destroyErr).Error("Unable to destroy blob after failure to put blob content") + } + return nil + }) return nil, err } size := sizer.Size if size > blob.SizeMaximum { - if _, deleteErr := c.BlobUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil { - logger.WithError(deleteErr).Error("Unable to delete blob content exceeding maximum size") - } - if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil { - logger.WithError(destroyErr).Error("Unable to destroy blob exceeding maximum size") - } + alwaysDo(ctx, defaultCleanupTimeout, func(ctx context.Context) error { + if _, deleteErr := c.BlobUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil { + logger.WithError(deleteErr).Error("Unable to delete blob content exceeding maximum size") + } + if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil { + logger.WithError(destroyErr).Error("Unable to destroy blob exceeding maximum size") + } + return nil + }) return nil, request.ErrorResourceTooLarge() } digestMD5 := base64.StdEncoding.EncodeToString(hasher.Sum(nil)) if content.DigestMD5 != nil && *content.DigestMD5 != digestMD5 { - if _, deleteErr := c.BlobUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil { - logger.WithError(deleteErr).Error("Unable to delete blob content with incorrect MD5 digest") - } - if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil { - logger.WithError(destroyErr).Error("Unable to destroy blob with incorrect MD5 digest") - } + alwaysDo(ctx, defaultCleanupTimeout, func(ctx context.Context) error { + if _, deleteErr := c.BlobUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil { + logger.WithError(deleteErr).Error("Unable to delete blob content with incorrect MD5 digest") + } + if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil { + logger.WithError(destroyErr).Error("Unable to destroy blob with incorrect MD5 digest") + } + return nil + }) return nil, errors.WithSource(request.ErrorDigestsNotEqual(*content.DigestMD5, digestMD5), structure.NewPointerSource().WithReference("digestMD5")) } @@ -129,31 +144,41 @@ func (c *Client) CreateDeviceLogs(ctx context.Context, userID string, content *b options.MediaType = content.MediaType err = c.DeviceLogsUnstructuredStore().Put(ctx, userID, *result.ID, io.TeeReader(io.TeeReader(io.LimitReader(content.Body, blob.SizeMaximum+1), hasher), sizer), options) if err != nil { - if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil { - logger.WithError(destroyErr).Error("Unable to destroy blob after failure to put blob content") - } + alwaysDo(ctx, defaultCleanupTimeout, func(ctx context.Context) error { + if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil { + logger.WithError(destroyErr).Error("Unable to destroy blob after failure to put blob content") + } + return nil + }) return nil, err } size := sizer.Size if size > blob.SizeMaximum { - if _, deleteErr := c.DeviceLogsUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil { - logger.WithError(deleteErr).Error("Unable to delete blob content exceeding maximum size") - } - if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil { - logger.WithError(destroyErr).Error("Unable to destroy blob exceeding maximum size") - } + alwaysDo(ctx, defaultCleanupTimeout, func(ctx context.Context) error { + if _, deleteErr := c.DeviceLogsUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil { + logger.WithError(deleteErr).Error("Unable to delete blob content exceeding maximum size") + } + if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil { + logger.WithError(destroyErr).Error("Unable to destroy blob exceeding maximum size") + } + return nil + }) return nil, request.ErrorResourceTooLarge() } digestMD5 := base64.StdEncoding.EncodeToString(hasher.Sum(nil)) if content.DigestMD5 != nil && *content.DigestMD5 != digestMD5 { - if _, deleteErr := c.DeviceLogsUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil { - logger.WithError(deleteErr).Error("Unable to delete blob content with incorrect MD5 digest") - } - if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil { - logger.WithError(destroyErr).Error("Unable to destroy blob with incorrect MD5 digest") - } + alwaysDo(ctx, defaultCleanupTimeout, func(ctx context.Context) error { + if _, deleteErr := c.DeviceLogsUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil { + logger.WithError(deleteErr).Error("Unable to delete blob content with incorrect MD5 digest") + } + if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil { + logger.WithError(destroyErr).Error("Unable to destroy blob with incorrect MD5 digest") + } + return nil + }) + return nil, errors.WithSource(request.ErrorDigestsNotEqual(*content.DigestMD5, digestMD5), structure.NewPointerSource().WithReference("digestMD5")) } @@ -287,3 +312,13 @@ func (s *SizeWriter) Write(bites []byte) (int, error) { s.Size += length return length, nil } + +// alwaysDo performs an action given a context even if the context is +// canceled or timed out. This is used if we have any cleanup functions that we +// still want to perform and passing the parent context would time out any +// child contexts. +func alwaysDo(ctx context.Context, timeout time.Duration, fn func(ctx context.Context) error) error { + newContext, cancel := context.WithTimeout(context.WithoutCancel(ctx), timeout) + defer cancel() + return fn(newContext) +}