mirror of
https://github.com/psviderski/unregistry.git
synced 2025-12-14 20:35:57 +01:00
192 lines
5.5 KiB
Go
192 lines
5.5 KiB
Go
package containerd
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/sirupsen/logrus"
|
|
|
|
"github.com/containerd/containerd/v2/client"
|
|
"github.com/containerd/containerd/v2/core/content"
|
|
"github.com/containerd/containerd/v2/core/leases"
|
|
"github.com/containerd/errdefs"
|
|
"github.com/distribution/distribution/v3"
|
|
"github.com/distribution/reference"
|
|
)
|
|
|
|
const leaseExpiration = 1 * time.Hour
|
|
|
|
// blobWriter is a resumable blob uploader to the containerd content store.
|
|
// Implements distribution.BlobWriter.
|
|
type blobWriter struct {
|
|
client *client.Client
|
|
repo reference.Named
|
|
id string
|
|
|
|
// lease is a containerd lease for writer that prevents garbage collection of the content. It's intentionally not
|
|
// deleted on successful blob commit to keep it while the registry is uploading other blobs and manifests and
|
|
// creating an image referencing them. Otherwise, the blob would be garbage collected immediately after lease is
|
|
// deleted if the blob is not referenced by an image.
|
|
// In the worst case, the lease and unreferenced blob will be garbage collected after leaseExpiration.
|
|
lease leases.Lease
|
|
writer content.Writer
|
|
// size is the total number of bytes written to writer.
|
|
size int64
|
|
log *logrus.Entry
|
|
}
|
|
|
|
func newBlobWriter(
|
|
ctx context.Context, client *client.Client, repo reference.Named, id string,
|
|
) (distribution.BlobWriter, error) {
|
|
if id == "" {
|
|
id = uuid.NewString()
|
|
}
|
|
|
|
// Create a containerd lease to prevent garbage collection.
|
|
opts := []leases.Opt{
|
|
leases.WithRandomID(),
|
|
leases.WithExpiration(leaseExpiration),
|
|
}
|
|
lease, err := client.LeasesService().Create(ctx, opts...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create containerd lease: %w", err)
|
|
}
|
|
|
|
// Open a containerd content writer with the lease.
|
|
ctx = leases.WithLease(ctx, lease.ID)
|
|
writer, err := client.ContentStore().Writer(ctx, content.WithRef("upload-"+id))
|
|
if err != nil {
|
|
_ = client.LeasesService().Delete(ctx, lease)
|
|
return nil, fmt.Errorf("create containerd content writer: %w", err)
|
|
}
|
|
|
|
// Get the status of the writer to get the written offset (size) if the writer was resumed.
|
|
status, err := writer.Status()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get containerd content writer status: %w", err)
|
|
}
|
|
|
|
log := logrus.WithFields(
|
|
logrus.Fields{
|
|
"writer.id": id,
|
|
"repo": repo.Name(),
|
|
},
|
|
)
|
|
log.WithField("size", status.Offset).Debug("Created new containerd blob writer.")
|
|
|
|
return &blobWriter{
|
|
client: client,
|
|
repo: repo,
|
|
id: id,
|
|
lease: lease,
|
|
writer: writer,
|
|
size: status.Offset,
|
|
log: log,
|
|
}, nil
|
|
}
|
|
|
|
// ID returns the identifier for this blob upload.
|
|
func (bw *blobWriter) ID() string {
|
|
return bw.id
|
|
}
|
|
|
|
// StartedAt returns the time the upload started.
|
|
func (bw *blobWriter) StartedAt() time.Time {
|
|
return bw.lease.CreatedAt
|
|
}
|
|
|
|
// Size returns the number of bytes written to the containerd blob writer.
|
|
func (bw *blobWriter) Size() int64 {
|
|
return bw.size
|
|
}
|
|
|
|
// ReadFrom reads from the provided reader and writes to the containerd blob writer.
|
|
func (bw *blobWriter) ReadFrom(r io.Reader) (int64, error) {
|
|
n, err := io.Copy(bw.writer, r)
|
|
bw.size += n
|
|
|
|
log := bw.log.WithField("size", n)
|
|
if err != nil {
|
|
err = fmt.Errorf("copy data to containerd blob writer: %w", err)
|
|
log = log.WithError(err)
|
|
}
|
|
log.Debug("Copied data to containerd blob writer.")
|
|
|
|
return n, err
|
|
}
|
|
|
|
// Write writes data to the containerd blob writer.
|
|
func (bw *blobWriter) Write(data []byte) (int, error) {
|
|
n, err := bw.writer.Write(data)
|
|
bw.size += int64(n)
|
|
|
|
log := bw.log.WithField("size", n)
|
|
if err != nil {
|
|
err = fmt.Errorf("write data to containerd blob writer: %w", err)
|
|
log = log.WithError(err)
|
|
}
|
|
log.Debug("Wrote data to containerd blob writer.")
|
|
|
|
return n, err
|
|
}
|
|
|
|
// Commit finalizes the blob upload.
|
|
func (bw *blobWriter) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
|
|
log := bw.log.WithFields(
|
|
logrus.Fields{
|
|
"digest": desc.Digest,
|
|
"mediatype": desc.MediaType,
|
|
"size": bw.size,
|
|
},
|
|
)
|
|
|
|
log.Debug("Committing blob to containerd content store.")
|
|
// The caller may not provide a size in the descriptor if it doesn't know it so we use the calculated size from
|
|
// the writer.
|
|
if err := bw.writer.Commit(ctx, bw.size, desc.Digest); err != nil {
|
|
// The writer didn't create a new blob so we don't need to keep the lease.
|
|
_ = bw.client.LeasesService().Delete(ctx, bw.lease)
|
|
|
|
if errdefs.IsAlreadyExists(err) {
|
|
log.Debug("Blob already exists in containerd content store.")
|
|
} else {
|
|
return distribution.Descriptor{}, fmt.Errorf("commit blob to containerd content store: %w", err)
|
|
}
|
|
} else {
|
|
log.Debug("Successfully committed blob to containerd content store.")
|
|
}
|
|
|
|
if desc.Size == 0 {
|
|
desc.Size = bw.size
|
|
}
|
|
if desc.MediaType == "" {
|
|
// Not sure if this is needed but the default registry blob writer assigns this.
|
|
desc.MediaType = "application/octet-stream"
|
|
}
|
|
|
|
return desc, nil
|
|
}
|
|
|
|
// Cancel cancels the blob upload by deleting the containerd lease.
|
|
func (bw *blobWriter) Cancel(ctx context.Context) error {
|
|
bw.log.Debug("Canceling upload: deleting containerd lease.")
|
|
return bw.client.LeasesService().Delete(ctx, bw.lease)
|
|
}
|
|
|
|
// Close closes the containerd blob writer.
|
|
func (bw *blobWriter) Close() error {
|
|
bw.log.Debug("Closing containerd blob writer.")
|
|
err := bw.writer.Close()
|
|
|
|
if bw.size == 0 {
|
|
// It's safe to delete the lease if no data was written to the writer. Deletion is idempotent.
|
|
err = errors.Join(bw.client.LeasesService().Delete(context.Background(), bw.lease))
|
|
}
|
|
|
|
return err
|
|
}
|