Files

192 lines
5.5 KiB
Go

package containerd
import (
"context"
"errors"
"fmt"
"io"
"time"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
"github.com/containerd/containerd/v2/client"
"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/containerd/v2/core/leases"
"github.com/containerd/errdefs"
"github.com/distribution/distribution/v3"
"github.com/distribution/reference"
)
const leaseExpiration = 1 * time.Hour
// blobWriter is a resumable blob uploader to the containerd content store.
// Implements distribution.BlobWriter.
type blobWriter struct {
client *client.Client
repo reference.Named
id string
// lease is a containerd lease for writer that prevents garbage collection of the content. It's intentionally not
// deleted on successful blob commit to keep it while the registry is uploading other blobs and manifests and
// creating an image referencing them. Otherwise, the blob would be garbage collected immediately after lease is
// deleted if the blob is not referenced by an image.
// In the worst case, the lease and unreferenced blob will be garbage collected after leaseExpiration.
lease leases.Lease
writer content.Writer
// size is the total number of bytes written to writer.
size int64
log *logrus.Entry
}
func newBlobWriter(
ctx context.Context, client *client.Client, repo reference.Named, id string,
) (distribution.BlobWriter, error) {
if id == "" {
id = uuid.NewString()
}
// Create a containerd lease to prevent garbage collection.
opts := []leases.Opt{
leases.WithRandomID(),
leases.WithExpiration(leaseExpiration),
}
lease, err := client.LeasesService().Create(ctx, opts...)
if err != nil {
return nil, fmt.Errorf("create containerd lease: %w", err)
}
// Open a containerd content writer with the lease.
ctx = leases.WithLease(ctx, lease.ID)
writer, err := client.ContentStore().Writer(ctx, content.WithRef("upload-"+id))
if err != nil {
_ = client.LeasesService().Delete(ctx, lease)
return nil, fmt.Errorf("create containerd content writer: %w", err)
}
// Get the status of the writer to get the written offset (size) if the writer was resumed.
status, err := writer.Status()
if err != nil {
return nil, fmt.Errorf("get containerd content writer status: %w", err)
}
log := logrus.WithFields(
logrus.Fields{
"writer.id": id,
"repo": repo.Name(),
},
)
log.WithField("size", status.Offset).Debug("Created new containerd blob writer.")
return &blobWriter{
client: client,
repo: repo,
id: id,
lease: lease,
writer: writer,
size: status.Offset,
log: log,
}, nil
}
// ID returns the identifier for this blob upload.
func (bw *blobWriter) ID() string {
return bw.id
}
// StartedAt returns the time the upload started.
func (bw *blobWriter) StartedAt() time.Time {
return bw.lease.CreatedAt
}
// Size returns the number of bytes written to the containerd blob writer.
func (bw *blobWriter) Size() int64 {
return bw.size
}
// ReadFrom reads from the provided reader and writes to the containerd blob writer.
func (bw *blobWriter) ReadFrom(r io.Reader) (int64, error) {
n, err := io.Copy(bw.writer, r)
bw.size += n
log := bw.log.WithField("size", n)
if err != nil {
err = fmt.Errorf("copy data to containerd blob writer: %w", err)
log = log.WithError(err)
}
log.Debug("Copied data to containerd blob writer.")
return n, err
}
// Write writes data to the containerd blob writer.
func (bw *blobWriter) Write(data []byte) (int, error) {
n, err := bw.writer.Write(data)
bw.size += int64(n)
log := bw.log.WithField("size", n)
if err != nil {
err = fmt.Errorf("write data to containerd blob writer: %w", err)
log = log.WithError(err)
}
log.Debug("Wrote data to containerd blob writer.")
return n, err
}
// Commit finalizes the blob upload.
func (bw *blobWriter) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
log := bw.log.WithFields(
logrus.Fields{
"digest": desc.Digest,
"mediatype": desc.MediaType,
"size": bw.size,
},
)
log.Debug("Committing blob to containerd content store.")
// The caller may not provide a size in the descriptor if it doesn't know it so we use the calculated size from
// the writer.
if err := bw.writer.Commit(ctx, bw.size, desc.Digest); err != nil {
// The writer didn't create a new blob so we don't need to keep the lease.
_ = bw.client.LeasesService().Delete(ctx, bw.lease)
if errdefs.IsAlreadyExists(err) {
log.Debug("Blob already exists in containerd content store.")
} else {
return distribution.Descriptor{}, fmt.Errorf("commit blob to containerd content store: %w", err)
}
} else {
log.Debug("Successfully committed blob to containerd content store.")
}
if desc.Size == 0 {
desc.Size = bw.size
}
if desc.MediaType == "" {
// Not sure if this is needed but the default registry blob writer assigns this.
desc.MediaType = "application/octet-stream"
}
return desc, nil
}
// Cancel cancels the blob upload by deleting the containerd lease.
func (bw *blobWriter) Cancel(ctx context.Context) error {
bw.log.Debug("Canceling upload: deleting containerd lease.")
return bw.client.LeasesService().Delete(ctx, bw.lease)
}
// Close closes the containerd blob writer.
func (bw *blobWriter) Close() error {
bw.log.Debug("Closing containerd blob writer.")
err := bw.writer.Close()
if bw.size == 0 {
// It's safe to delete the lease if no data was written to the writer. Deletion is idempotent.
err = errors.Join(bw.client.LeasesService().Delete(context.Background(), bw.lease))
}
return err
}