mirror of
https://github.com/kovidgoyal/kitty.git
synced 2026-02-01 11:34:59 +01:00
656 lines
16 KiB
Go
656 lines
16 KiB
Go
// Algorithm found at: https://rsync.samba.org/tech_report/tech_report.html
|
|
// Code in this file is inspired by: https://github.com/jbreiding/rsync-go
|
|
//
|
|
// Definitions
|
|
//
|
|
// Source: The final content.
|
|
// Target: The content to be made into final content.
|
|
// Signature: The sequence of hashes used to identify the content.
|
|
package rsync
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"hash"
|
|
"io"
|
|
"slices"
|
|
"strconv"
|
|
|
|
"github.com/zeebo/xxh3"
|
|
)
|
|
|
|
// If no BlockSize is specified in the rsync instance, this value is used.
|
|
const DefaultBlockSize = 1024 * 6
|
|
|
|
// Internal constant used in rolling checksum.
|
|
const _M = 1 << 16
|
|
|
|
// Operation Types.
|
|
type OpType byte // enum
|
|
|
|
const (
|
|
OpBlock OpType = iota
|
|
OpData
|
|
OpHash
|
|
OpBlockRange
|
|
)
|
|
|
|
type xxh3_128 struct {
|
|
xxh3.Hasher
|
|
}
|
|
|
|
func (self *xxh3_128) Sum(b []byte) []byte {
|
|
s := self.Sum128()
|
|
pos := len(b)
|
|
limit := pos + 16
|
|
if limit > cap(b) {
|
|
var x [16]byte
|
|
b = append(b, x[:]...)
|
|
} else {
|
|
b = b[:limit]
|
|
}
|
|
binary.BigEndian.PutUint64(b[pos:], s.Hi)
|
|
binary.BigEndian.PutUint64(b[pos+8:], s.Lo)
|
|
return b
|
|
}
|
|
|
|
func new_xxh3_64() hash.Hash64 {
|
|
ans := xxh3.New()
|
|
ans.Reset()
|
|
return ans
|
|
}
|
|
|
|
func new_xxh3_128() hash.Hash {
|
|
ans := new(xxh3_128)
|
|
ans.Reset()
|
|
return ans
|
|
}
|
|
|
|
// Instruction to mutate target to align to source.
|
|
type Operation struct {
|
|
Type OpType
|
|
BlockIndex uint64
|
|
BlockIndexEnd uint64
|
|
Data []byte
|
|
}
|
|
|
|
func (self Operation) String() string {
|
|
ans := "{" + self.Type.String() + " "
|
|
switch self.Type {
|
|
case OpBlock:
|
|
ans += strconv.FormatUint(self.BlockIndex, 10)
|
|
case OpBlockRange:
|
|
ans += strconv.FormatUint(self.BlockIndex, 10) + " to " + strconv.FormatUint(self.BlockIndexEnd, 10)
|
|
case OpData:
|
|
ans += strconv.Itoa(len(self.Data))
|
|
case OpHash:
|
|
ans += hex.EncodeToString(self.Data)
|
|
}
|
|
return ans + "}"
|
|
}
|
|
|
|
var bin = binary.LittleEndian
|
|
|
|
func (self Operation) SerializeSize() int {
|
|
switch self.Type {
|
|
case OpBlock:
|
|
return 9
|
|
case OpBlockRange:
|
|
return 13
|
|
case OpHash:
|
|
return 3 + len(self.Data)
|
|
case OpData:
|
|
return 5 + len(self.Data)
|
|
}
|
|
return -1
|
|
}
|
|
|
|
func (self Operation) Serialize(ans []byte) {
|
|
switch self.Type {
|
|
case OpBlock:
|
|
bin.PutUint64(ans[1:], self.BlockIndex)
|
|
case OpBlockRange:
|
|
bin.PutUint64(ans[1:], self.BlockIndex)
|
|
bin.PutUint32(ans[9:], uint32(self.BlockIndexEnd-self.BlockIndex))
|
|
case OpHash:
|
|
bin.PutUint16(ans[1:], uint16(len(self.Data)))
|
|
copy(ans[3:], self.Data)
|
|
case OpData:
|
|
bin.PutUint32(ans[1:], uint32(len(self.Data)))
|
|
copy(ans[5:], self.Data)
|
|
}
|
|
ans[0] = byte(self.Type)
|
|
}
|
|
|
|
func (self *Operation) Unserialize(data []byte) (n int, err error) {
|
|
if len(data) < 1 {
|
|
return -1, io.ErrShortBuffer
|
|
}
|
|
switch OpType(data[0]) {
|
|
case OpBlock:
|
|
n = 9
|
|
if len(data) < n {
|
|
return -1, io.ErrShortBuffer
|
|
}
|
|
self.BlockIndex = bin.Uint64(data[1:])
|
|
self.Data = nil
|
|
case OpBlockRange:
|
|
n = 13
|
|
if len(data) < n {
|
|
return -1, io.ErrShortBuffer
|
|
}
|
|
self.BlockIndex = bin.Uint64(data[1:])
|
|
self.BlockIndexEnd = self.BlockIndex + uint64(bin.Uint32(data[9:]))
|
|
self.Data = nil
|
|
case OpHash:
|
|
n = 3
|
|
if len(data) < n {
|
|
return -1, io.ErrShortBuffer
|
|
}
|
|
sz := int(bin.Uint16(data[1:]))
|
|
n += sz
|
|
if len(data) < n {
|
|
return -1, io.ErrShortBuffer
|
|
}
|
|
self.Data = data[3:n]
|
|
case OpData:
|
|
n = 5
|
|
if len(data) < n {
|
|
return -1, io.ErrShortBuffer
|
|
}
|
|
sz := int(bin.Uint32(data[1:]))
|
|
n += sz
|
|
if len(data) < n {
|
|
return -1, io.ErrShortBuffer
|
|
}
|
|
self.Data = data[5:n]
|
|
default:
|
|
return 0, fmt.Errorf("record has unknown operation type: %d", data[0])
|
|
}
|
|
self.Type = OpType(data[0])
|
|
return
|
|
}
|
|
|
|
// Signature hash item generated from target.
|
|
type BlockHash struct {
|
|
Index uint64
|
|
WeakHash uint32
|
|
StrongHash uint64
|
|
}
|
|
|
|
const BlockHashSize = 20
|
|
|
|
// Put the serialization of this BlockHash to output
|
|
func (self BlockHash) Serialize(output []byte) {
|
|
bin.PutUint64(output, self.Index)
|
|
bin.PutUint32(output[8:], self.WeakHash)
|
|
bin.PutUint64(output[12:], self.StrongHash)
|
|
}
|
|
|
|
func (self *BlockHash) Unserialize(data []byte) (err error) {
|
|
if len(data) < 20 {
|
|
return fmt.Errorf("record too small to be a BlockHash: %d < %d", len(data), 20)
|
|
}
|
|
self.Index = bin.Uint64(data)
|
|
self.WeakHash = bin.Uint32(data[8:])
|
|
self.StrongHash = bin.Uint64(data[12:])
|
|
return
|
|
}
|
|
|
|
// Properties to use while working with the rsync algorithm.
|
|
// A single rsync should not be used concurrently as it may contain
|
|
// internal buffers and hash sums.
|
|
type rsync struct {
|
|
BlockSize int
|
|
|
|
// This must be non-nil before using any functions
|
|
hasher hash.Hash64
|
|
hasher_constructor func() hash.Hash64
|
|
checksummer_constructor func() hash.Hash
|
|
checksummer hash.Hash
|
|
checksum_done bool
|
|
buffer []byte
|
|
}
|
|
|
|
func (r *rsync) SetHasher(c func() hash.Hash64) {
|
|
r.hasher_constructor = c
|
|
r.hasher = c()
|
|
}
|
|
|
|
func (r *rsync) SetChecksummer(c func() hash.Hash) {
|
|
r.checksummer_constructor = c
|
|
r.checksummer = c()
|
|
}
|
|
|
|
// If the target length is known the number of hashes in the
|
|
// signature can be determined.
|
|
func (r *rsync) BlockHashCount(targetLength int64) (count int64) {
|
|
bs := int64(r.BlockSize)
|
|
count = targetLength / bs
|
|
if targetLength%bs != 0 {
|
|
count++
|
|
}
|
|
return
|
|
}
|
|
|
|
type signature_iterator struct {
|
|
hasher hash.Hash64
|
|
buffer []byte
|
|
src io.Reader
|
|
rc rolling_checksum
|
|
index uint64
|
|
}
|
|
|
|
// ans is valid iff err == nil
|
|
func (self *signature_iterator) next() (ans BlockHash, err error) {
|
|
n, err := io.ReadAtLeast(self.src, self.buffer, cap(self.buffer))
|
|
switch err {
|
|
case io.ErrUnexpectedEOF, io.EOF, nil:
|
|
err = nil
|
|
default:
|
|
return
|
|
}
|
|
if n == 0 {
|
|
return ans, io.EOF
|
|
}
|
|
b := self.buffer[:n]
|
|
self.hasher.Reset()
|
|
self.hasher.Write(b)
|
|
ans = BlockHash{Index: self.index, WeakHash: self.rc.full(b), StrongHash: self.hasher.Sum64()}
|
|
self.index++
|
|
return
|
|
|
|
}
|
|
|
|
// Calculate the signature of target.
|
|
func (r *rsync) CreateSignatureIterator(target io.Reader) func() (BlockHash, error) {
|
|
return (&signature_iterator{
|
|
hasher: r.hasher_constructor(), buffer: make([]byte, r.BlockSize), src: target,
|
|
}).next
|
|
}
|
|
|
|
// Apply the difference to the target.
|
|
func (r *rsync) ApplyDelta(output io.Writer, target io.ReadSeeker, op Operation) error {
|
|
var err error
|
|
var n int
|
|
var block []byte
|
|
|
|
r.set_buffer_to_size(r.BlockSize)
|
|
buffer := r.buffer
|
|
if r.checksummer == nil {
|
|
r.checksummer = r.checksummer_constructor()
|
|
}
|
|
|
|
write := func(b []byte) (err error) {
|
|
if _, err = r.checksummer.Write(b); err == nil {
|
|
_, err = output.Write(b)
|
|
}
|
|
return err
|
|
}
|
|
write_block := func(op Operation) (err error) {
|
|
if _, err = target.Seek(int64(r.BlockSize*int(op.BlockIndex)), io.SeekStart); err != nil {
|
|
return err
|
|
}
|
|
if n, err = io.ReadAtLeast(target, buffer, r.BlockSize); err != nil && err != io.ErrUnexpectedEOF {
|
|
return err
|
|
}
|
|
block = buffer[:n]
|
|
return write(block)
|
|
}
|
|
|
|
switch op.Type {
|
|
case OpBlockRange:
|
|
for i := op.BlockIndex; i <= op.BlockIndexEnd; i++ {
|
|
if err = write_block(Operation{
|
|
Type: OpBlock,
|
|
BlockIndex: i,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
case OpBlock:
|
|
return write_block(op)
|
|
case OpData:
|
|
return write(op.Data)
|
|
case OpHash:
|
|
actual := r.checksummer.Sum(nil)
|
|
if !bytes.Equal(actual, op.Data) {
|
|
return fmt.Errorf("Failed to verify overall file checksum actual: %s != expected: %s. This usually happens if some data was corrupted in transit or one of the involved files was altered while the transfer was in progress.", hex.EncodeToString(actual), hex.EncodeToString(op.Data))
|
|
}
|
|
r.checksum_done = true
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *rsync) set_buffer_to_size(sz int) {
|
|
if cap(r.buffer) < sz {
|
|
r.buffer = make([]byte, sz)
|
|
} else {
|
|
r.buffer = r.buffer[:sz]
|
|
}
|
|
}
|
|
|
|
// see https://rsync.samba.org/tech_report/node3.html
|
|
type rolling_checksum struct {
|
|
alpha, beta, val, l uint32
|
|
first_byte_of_previous_window uint32
|
|
}
|
|
|
|
func (self *rolling_checksum) full(data []byte) uint32 {
|
|
var alpha, beta uint32
|
|
self.l = uint32(len(data)) // actually should be len(data) - 1 but the equations always use l+1
|
|
for i, b := range data {
|
|
alpha += uint32(b)
|
|
beta += (self.l - uint32(i)) * uint32(b)
|
|
}
|
|
self.first_byte_of_previous_window = uint32(data[0])
|
|
self.alpha = alpha % _M
|
|
self.beta = beta % _M
|
|
self.val = self.alpha + _M*self.beta
|
|
return self.val
|
|
}
|
|
|
|
func (self *rolling_checksum) add_one_byte(first_byte, last_byte byte) {
|
|
self.alpha = (self.alpha - self.first_byte_of_previous_window + uint32(last_byte)) % _M
|
|
self.beta = (self.beta - (self.l)*self.first_byte_of_previous_window + self.alpha) % _M
|
|
self.val = self.alpha + _M*self.beta
|
|
self.first_byte_of_previous_window = uint32(first_byte)
|
|
}
|
|
|
|
type diff struct {
|
|
buffer []byte
|
|
op_write_buf [32]byte
|
|
// A single β hash may correlate with many unique hashes.
|
|
hash_lookup map[uint32][]BlockHash
|
|
source io.Reader
|
|
hasher hash.Hash64
|
|
checksummer hash.Hash
|
|
output io.Writer
|
|
|
|
window, data struct{ pos, sz int }
|
|
block_size int
|
|
finished, written bool
|
|
rc rolling_checksum
|
|
|
|
pending_op *Operation
|
|
}
|
|
|
|
func (self *diff) Next() (err error) {
|
|
return self.pump_till_op_written()
|
|
}
|
|
|
|
func (self *diff) hash(b []byte) uint64 {
|
|
self.hasher.Reset()
|
|
self.hasher.Write(b)
|
|
return self.hasher.Sum64()
|
|
}
|
|
|
|
// Combine OpBlock into OpBlockRange. To do this store the previous
|
|
// non-data operation and determine if it can be extended.
|
|
func (self *diff) send_pending() (err error) {
|
|
if self.pending_op != nil {
|
|
err = self.send_op(self.pending_op)
|
|
self.pending_op = nil
|
|
}
|
|
return
|
|
}
|
|
|
|
func (self *diff) enqueue(op Operation) (err error) {
|
|
switch op.Type {
|
|
case OpBlock:
|
|
if self.pending_op != nil {
|
|
switch self.pending_op.Type {
|
|
case OpBlock:
|
|
if self.pending_op.BlockIndex+1 == op.BlockIndex {
|
|
self.pending_op = &Operation{
|
|
Type: OpBlockRange,
|
|
BlockIndex: self.pending_op.BlockIndex,
|
|
BlockIndexEnd: op.BlockIndex,
|
|
}
|
|
return
|
|
}
|
|
case OpBlockRange:
|
|
if self.pending_op.BlockIndexEnd+1 == op.BlockIndex {
|
|
self.pending_op.BlockIndexEnd = op.BlockIndex
|
|
return
|
|
}
|
|
}
|
|
if err = self.send_pending(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
self.pending_op = &op
|
|
case OpHash:
|
|
if err = self.send_pending(); err != nil {
|
|
return
|
|
}
|
|
if err = self.send_op(&op); err != nil {
|
|
return
|
|
}
|
|
}
|
|
return
|
|
|
|
}
|
|
|
|
func (self *diff) send_op(op *Operation) error {
|
|
b := self.op_write_buf[:op.SerializeSize()]
|
|
op.Serialize(b)
|
|
self.written = true
|
|
_, err := self.output.Write(b)
|
|
return err
|
|
}
|
|
|
|
func (self *diff) send_data() error {
|
|
if self.data.sz > 0 {
|
|
if err := self.send_pending(); err != nil {
|
|
return err
|
|
}
|
|
self.written = true
|
|
data := self.buffer[self.data.pos : self.data.pos+self.data.sz]
|
|
var buf [5]byte
|
|
bin.PutUint32(buf[1:], uint32(len(data)))
|
|
buf[0] = byte(OpData)
|
|
if _, err := self.output.Write(buf[:]); err != nil {
|
|
return err
|
|
}
|
|
if _, err := self.output.Write(data); err != nil {
|
|
return err
|
|
}
|
|
self.data.pos += self.data.sz
|
|
self.data.sz = 0
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (self *diff) pump_till_op_written() error {
|
|
self.written = false
|
|
for !self.finished && !self.written {
|
|
if err := self.read_next(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if self.finished {
|
|
if err := self.send_pending(); err != nil {
|
|
return err
|
|
}
|
|
return io.EOF
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (self *diff) ensure_idx_valid(idx int) (ok bool, err error) {
|
|
if idx < len(self.buffer) {
|
|
return true, nil
|
|
}
|
|
if idx >= cap(self.buffer) {
|
|
// need to wrap the buffer, so send off any data present behind the window
|
|
if err = self.send_data(); err != nil {
|
|
return
|
|
}
|
|
// copy the window and any data present after it to the start of the buffer
|
|
distance_from_window_pos := idx - self.window.pos
|
|
amt_to_copy := len(self.buffer) - self.window.pos
|
|
copy(self.buffer, self.buffer[self.window.pos:self.window.pos+amt_to_copy])
|
|
self.buffer = self.buffer[:amt_to_copy]
|
|
self.window.pos = 0
|
|
self.data.pos = 0
|
|
return self.ensure_idx_valid(distance_from_window_pos)
|
|
}
|
|
extra := idx - len(self.buffer) + 1
|
|
var n int
|
|
n, err = io.ReadAtLeast(self.source, self.buffer[len(self.buffer):cap(self.buffer)], extra)
|
|
block := self.buffer[len(self.buffer):][:n]
|
|
switch err {
|
|
case nil:
|
|
ok = true
|
|
self.buffer = self.buffer[:len(self.buffer)+n]
|
|
self.checksummer.Write(block)
|
|
case io.ErrUnexpectedEOF, io.EOF:
|
|
err = nil
|
|
self.buffer = self.buffer[:len(self.buffer)+n]
|
|
self.checksummer.Write(block)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (self *diff) finish_up() (err error) {
|
|
if err = self.send_data(); err != nil {
|
|
return
|
|
}
|
|
self.data.pos = self.window.pos
|
|
self.data.sz = len(self.buffer) - self.window.pos
|
|
if err = self.send_data(); err != nil {
|
|
return
|
|
}
|
|
self.enqueue(Operation{Type: OpHash, Data: self.checksummer.Sum(nil)})
|
|
self.finished = true
|
|
return
|
|
}
|
|
|
|
// See https://rsync.samba.org/tech_report/node4.html for the design of this algorithm
|
|
func (self *diff) read_next() (err error) {
|
|
if self.window.sz > 0 {
|
|
if ok, err := self.ensure_idx_valid(self.window.pos + self.window.sz); !ok {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return self.finish_up()
|
|
}
|
|
self.window.pos++
|
|
self.data.sz++
|
|
self.rc.add_one_byte(self.buffer[self.window.pos], self.buffer[self.window.pos+self.window.sz-1])
|
|
} else {
|
|
if ok, err := self.ensure_idx_valid(self.window.pos + self.block_size - 1); !ok {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return self.finish_up()
|
|
}
|
|
self.window.sz = self.block_size
|
|
self.rc.full(self.buffer[self.window.pos : self.window.pos+self.window.sz])
|
|
}
|
|
found_hash := false
|
|
var block_index uint64
|
|
if hh, ok := self.hash_lookup[self.rc.val]; ok {
|
|
block_index, found_hash = find_hash(hh, self.hash(self.buffer[self.window.pos:self.window.pos+self.window.sz]))
|
|
}
|
|
if found_hash {
|
|
if err = self.send_data(); err != nil {
|
|
return
|
|
}
|
|
self.enqueue(Operation{Type: OpBlock, BlockIndex: block_index})
|
|
self.window.pos += self.window.sz
|
|
self.data.pos = self.window.pos
|
|
self.window.sz = 0
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type OperationWriter struct {
|
|
Operations []Operation
|
|
expecting_data bool
|
|
}
|
|
|
|
func (self *OperationWriter) Write(p []byte) (n int, err error) {
|
|
if self.expecting_data {
|
|
self.expecting_data = false
|
|
self.Operations = append(self.Operations, Operation{Type: OpData, Data: slices.Clone(p)})
|
|
} else {
|
|
switch OpType(p[0]) {
|
|
case OpData:
|
|
self.expecting_data = true
|
|
case OpBlock, OpBlockRange, OpHash:
|
|
op := Operation{}
|
|
if n, err = op.Unserialize(p); err != nil {
|
|
return 0, err
|
|
} else if n < len(p) {
|
|
return 0, io.ErrShortWrite
|
|
}
|
|
self.Operations = append(self.Operations, op)
|
|
default:
|
|
return 0, fmt.Errorf("Unknown OpType: %d", p[0])
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (r *rsync) CreateDelta(source io.Reader, signature []BlockHash) ([]Operation, error) {
|
|
w := OperationWriter{}
|
|
it := r.CreateDiff(source, signature, &w)
|
|
for {
|
|
if err := it(); err != nil {
|
|
if err == io.EOF {
|
|
return w.Operations, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
|
|
const DataSizeMultiple int = 8
|
|
|
|
func (r *rsync) CreateDiff(source io.Reader, signature []BlockHash, output io.Writer) func() error {
|
|
ans := &diff{
|
|
block_size: r.BlockSize, buffer: make([]byte, 0, (r.BlockSize * DataSizeMultiple)),
|
|
hash_lookup: make(map[uint32][]BlockHash, len(signature)),
|
|
source: source, hasher: r.hasher_constructor(),
|
|
checksummer: r.checksummer_constructor(), output: output,
|
|
}
|
|
for _, h := range signature {
|
|
key := h.WeakHash
|
|
ans.hash_lookup[key] = append(ans.hash_lookup[key], h)
|
|
}
|
|
|
|
return ans.Next
|
|
}
|
|
|
|
// Use a more unique way to identify a set of bytes.
|
|
func (r *rsync) hash(v []byte) uint64 {
|
|
r.hasher.Reset()
|
|
r.hasher.Write(v)
|
|
return r.hasher.Sum64()
|
|
}
|
|
|
|
func (r *rsync) HashSize() int { return r.hasher.Size() }
|
|
func (r *rsync) HashBlockSize() int { return r.hasher.BlockSize() }
|
|
func (r *rsync) HasHasher() bool { return r.hasher != nil }
|
|
|
|
// Searches for a given strong hash among all strong hashes in this bucket.
|
|
func find_hash(hh []BlockHash, hv uint64) (uint64, bool) {
|
|
for _, block := range hh {
|
|
if block.StrongHash == hv {
|
|
return block.Index, true
|
|
}
|
|
}
|
|
return 0, false
|
|
}
|
|
|
|
func min(a, b int) int {
|
|
if a < b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|