Files
kitty-mirror/tools/rsync/algorithm.go
2024-06-24 07:54:13 +05:30

656 lines
16 KiB
Go

// Algorithm found at: https://rsync.samba.org/tech_report/tech_report.html
// Code in this file is inspired by: https://github.com/jbreiding/rsync-go
//
// Definitions
//
// Source: The final content.
// Target: The content to be made into final content.
// Signature: The sequence of hashes used to identify the content.
package rsync
import (
"bytes"
"encoding/binary"
"encoding/hex"
"fmt"
"hash"
"io"
"slices"
"strconv"
"github.com/zeebo/xxh3"
)
// If no BlockSize is specified in the rsync instance, this value is used.
const DefaultBlockSize = 1024 * 6
// Internal constant used in rolling checksum.
const _M = 1 << 16
// Operation Types.
type OpType byte // enum
const (
OpBlock OpType = iota
OpData
OpHash
OpBlockRange
)
type xxh3_128 struct {
xxh3.Hasher
}
func (self *xxh3_128) Sum(b []byte) []byte {
s := self.Sum128()
pos := len(b)
limit := pos + 16
if limit > cap(b) {
var x [16]byte
b = append(b, x[:]...)
} else {
b = b[:limit]
}
binary.BigEndian.PutUint64(b[pos:], s.Hi)
binary.BigEndian.PutUint64(b[pos+8:], s.Lo)
return b
}
func new_xxh3_64() hash.Hash64 {
ans := xxh3.New()
ans.Reset()
return ans
}
func new_xxh3_128() hash.Hash {
ans := new(xxh3_128)
ans.Reset()
return ans
}
// Instruction to mutate target to align to source.
type Operation struct {
Type OpType
BlockIndex uint64
BlockIndexEnd uint64
Data []byte
}
func (self Operation) String() string {
ans := "{" + self.Type.String() + " "
switch self.Type {
case OpBlock:
ans += strconv.FormatUint(self.BlockIndex, 10)
case OpBlockRange:
ans += strconv.FormatUint(self.BlockIndex, 10) + " to " + strconv.FormatUint(self.BlockIndexEnd, 10)
case OpData:
ans += strconv.Itoa(len(self.Data))
case OpHash:
ans += hex.EncodeToString(self.Data)
}
return ans + "}"
}
var bin = binary.LittleEndian
func (self Operation) SerializeSize() int {
switch self.Type {
case OpBlock:
return 9
case OpBlockRange:
return 13
case OpHash:
return 3 + len(self.Data)
case OpData:
return 5 + len(self.Data)
}
return -1
}
func (self Operation) Serialize(ans []byte) {
switch self.Type {
case OpBlock:
bin.PutUint64(ans[1:], self.BlockIndex)
case OpBlockRange:
bin.PutUint64(ans[1:], self.BlockIndex)
bin.PutUint32(ans[9:], uint32(self.BlockIndexEnd-self.BlockIndex))
case OpHash:
bin.PutUint16(ans[1:], uint16(len(self.Data)))
copy(ans[3:], self.Data)
case OpData:
bin.PutUint32(ans[1:], uint32(len(self.Data)))
copy(ans[5:], self.Data)
}
ans[0] = byte(self.Type)
}
func (self *Operation) Unserialize(data []byte) (n int, err error) {
if len(data) < 1 {
return -1, io.ErrShortBuffer
}
switch OpType(data[0]) {
case OpBlock:
n = 9
if len(data) < n {
return -1, io.ErrShortBuffer
}
self.BlockIndex = bin.Uint64(data[1:])
self.Data = nil
case OpBlockRange:
n = 13
if len(data) < n {
return -1, io.ErrShortBuffer
}
self.BlockIndex = bin.Uint64(data[1:])
self.BlockIndexEnd = self.BlockIndex + uint64(bin.Uint32(data[9:]))
self.Data = nil
case OpHash:
n = 3
if len(data) < n {
return -1, io.ErrShortBuffer
}
sz := int(bin.Uint16(data[1:]))
n += sz
if len(data) < n {
return -1, io.ErrShortBuffer
}
self.Data = data[3:n]
case OpData:
n = 5
if len(data) < n {
return -1, io.ErrShortBuffer
}
sz := int(bin.Uint32(data[1:]))
n += sz
if len(data) < n {
return -1, io.ErrShortBuffer
}
self.Data = data[5:n]
default:
return 0, fmt.Errorf("record has unknown operation type: %d", data[0])
}
self.Type = OpType(data[0])
return
}
// Signature hash item generated from target.
type BlockHash struct {
Index uint64
WeakHash uint32
StrongHash uint64
}
const BlockHashSize = 20
// Put the serialization of this BlockHash to output
func (self BlockHash) Serialize(output []byte) {
bin.PutUint64(output, self.Index)
bin.PutUint32(output[8:], self.WeakHash)
bin.PutUint64(output[12:], self.StrongHash)
}
func (self *BlockHash) Unserialize(data []byte) (err error) {
if len(data) < 20 {
return fmt.Errorf("record too small to be a BlockHash: %d < %d", len(data), 20)
}
self.Index = bin.Uint64(data)
self.WeakHash = bin.Uint32(data[8:])
self.StrongHash = bin.Uint64(data[12:])
return
}
// Properties to use while working with the rsync algorithm.
// A single rsync should not be used concurrently as it may contain
// internal buffers and hash sums.
type rsync struct {
BlockSize int
// This must be non-nil before using any functions
hasher hash.Hash64
hasher_constructor func() hash.Hash64
checksummer_constructor func() hash.Hash
checksummer hash.Hash
checksum_done bool
buffer []byte
}
func (r *rsync) SetHasher(c func() hash.Hash64) {
r.hasher_constructor = c
r.hasher = c()
}
func (r *rsync) SetChecksummer(c func() hash.Hash) {
r.checksummer_constructor = c
r.checksummer = c()
}
// If the target length is known the number of hashes in the
// signature can be determined.
func (r *rsync) BlockHashCount(targetLength int64) (count int64) {
bs := int64(r.BlockSize)
count = targetLength / bs
if targetLength%bs != 0 {
count++
}
return
}
type signature_iterator struct {
hasher hash.Hash64
buffer []byte
src io.Reader
rc rolling_checksum
index uint64
}
// ans is valid iff err == nil
func (self *signature_iterator) next() (ans BlockHash, err error) {
n, err := io.ReadAtLeast(self.src, self.buffer, cap(self.buffer))
switch err {
case io.ErrUnexpectedEOF, io.EOF, nil:
err = nil
default:
return
}
if n == 0 {
return ans, io.EOF
}
b := self.buffer[:n]
self.hasher.Reset()
self.hasher.Write(b)
ans = BlockHash{Index: self.index, WeakHash: self.rc.full(b), StrongHash: self.hasher.Sum64()}
self.index++
return
}
// Calculate the signature of target.
func (r *rsync) CreateSignatureIterator(target io.Reader) func() (BlockHash, error) {
return (&signature_iterator{
hasher: r.hasher_constructor(), buffer: make([]byte, r.BlockSize), src: target,
}).next
}
// Apply the difference to the target.
func (r *rsync) ApplyDelta(output io.Writer, target io.ReadSeeker, op Operation) error {
var err error
var n int
var block []byte
r.set_buffer_to_size(r.BlockSize)
buffer := r.buffer
if r.checksummer == nil {
r.checksummer = r.checksummer_constructor()
}
write := func(b []byte) (err error) {
if _, err = r.checksummer.Write(b); err == nil {
_, err = output.Write(b)
}
return err
}
write_block := func(op Operation) (err error) {
if _, err = target.Seek(int64(r.BlockSize*int(op.BlockIndex)), io.SeekStart); err != nil {
return err
}
if n, err = io.ReadAtLeast(target, buffer, r.BlockSize); err != nil && err != io.ErrUnexpectedEOF {
return err
}
block = buffer[:n]
return write(block)
}
switch op.Type {
case OpBlockRange:
for i := op.BlockIndex; i <= op.BlockIndexEnd; i++ {
if err = write_block(Operation{
Type: OpBlock,
BlockIndex: i,
}); err != nil {
return err
}
}
case OpBlock:
return write_block(op)
case OpData:
return write(op.Data)
case OpHash:
actual := r.checksummer.Sum(nil)
if !bytes.Equal(actual, op.Data) {
return fmt.Errorf("Failed to verify overall file checksum actual: %s != expected: %s. This usually happens if some data was corrupted in transit or one of the involved files was altered while the transfer was in progress.", hex.EncodeToString(actual), hex.EncodeToString(op.Data))
}
r.checksum_done = true
}
return nil
}
func (r *rsync) set_buffer_to_size(sz int) {
if cap(r.buffer) < sz {
r.buffer = make([]byte, sz)
} else {
r.buffer = r.buffer[:sz]
}
}
// see https://rsync.samba.org/tech_report/node3.html
type rolling_checksum struct {
alpha, beta, val, l uint32
first_byte_of_previous_window uint32
}
func (self *rolling_checksum) full(data []byte) uint32 {
var alpha, beta uint32
self.l = uint32(len(data)) // actually should be len(data) - 1 but the equations always use l+1
for i, b := range data {
alpha += uint32(b)
beta += (self.l - uint32(i)) * uint32(b)
}
self.first_byte_of_previous_window = uint32(data[0])
self.alpha = alpha % _M
self.beta = beta % _M
self.val = self.alpha + _M*self.beta
return self.val
}
func (self *rolling_checksum) add_one_byte(first_byte, last_byte byte) {
self.alpha = (self.alpha - self.first_byte_of_previous_window + uint32(last_byte)) % _M
self.beta = (self.beta - (self.l)*self.first_byte_of_previous_window + self.alpha) % _M
self.val = self.alpha + _M*self.beta
self.first_byte_of_previous_window = uint32(first_byte)
}
type diff struct {
buffer []byte
op_write_buf [32]byte
// A single β hash may correlate with many unique hashes.
hash_lookup map[uint32][]BlockHash
source io.Reader
hasher hash.Hash64
checksummer hash.Hash
output io.Writer
window, data struct{ pos, sz int }
block_size int
finished, written bool
rc rolling_checksum
pending_op *Operation
}
func (self *diff) Next() (err error) {
return self.pump_till_op_written()
}
func (self *diff) hash(b []byte) uint64 {
self.hasher.Reset()
self.hasher.Write(b)
return self.hasher.Sum64()
}
// Combine OpBlock into OpBlockRange. To do this store the previous
// non-data operation and determine if it can be extended.
func (self *diff) send_pending() (err error) {
if self.pending_op != nil {
err = self.send_op(self.pending_op)
self.pending_op = nil
}
return
}
func (self *diff) enqueue(op Operation) (err error) {
switch op.Type {
case OpBlock:
if self.pending_op != nil {
switch self.pending_op.Type {
case OpBlock:
if self.pending_op.BlockIndex+1 == op.BlockIndex {
self.pending_op = &Operation{
Type: OpBlockRange,
BlockIndex: self.pending_op.BlockIndex,
BlockIndexEnd: op.BlockIndex,
}
return
}
case OpBlockRange:
if self.pending_op.BlockIndexEnd+1 == op.BlockIndex {
self.pending_op.BlockIndexEnd = op.BlockIndex
return
}
}
if err = self.send_pending(); err != nil {
return err
}
}
self.pending_op = &op
case OpHash:
if err = self.send_pending(); err != nil {
return
}
if err = self.send_op(&op); err != nil {
return
}
}
return
}
func (self *diff) send_op(op *Operation) error {
b := self.op_write_buf[:op.SerializeSize()]
op.Serialize(b)
self.written = true
_, err := self.output.Write(b)
return err
}
func (self *diff) send_data() error {
if self.data.sz > 0 {
if err := self.send_pending(); err != nil {
return err
}
self.written = true
data := self.buffer[self.data.pos : self.data.pos+self.data.sz]
var buf [5]byte
bin.PutUint32(buf[1:], uint32(len(data)))
buf[0] = byte(OpData)
if _, err := self.output.Write(buf[:]); err != nil {
return err
}
if _, err := self.output.Write(data); err != nil {
return err
}
self.data.pos += self.data.sz
self.data.sz = 0
}
return nil
}
func (self *diff) pump_till_op_written() error {
self.written = false
for !self.finished && !self.written {
if err := self.read_next(); err != nil {
return err
}
}
if self.finished {
if err := self.send_pending(); err != nil {
return err
}
return io.EOF
}
return nil
}
func (self *diff) ensure_idx_valid(idx int) (ok bool, err error) {
if idx < len(self.buffer) {
return true, nil
}
if idx >= cap(self.buffer) {
// need to wrap the buffer, so send off any data present behind the window
if err = self.send_data(); err != nil {
return
}
// copy the window and any data present after it to the start of the buffer
distance_from_window_pos := idx - self.window.pos
amt_to_copy := len(self.buffer) - self.window.pos
copy(self.buffer, self.buffer[self.window.pos:self.window.pos+amt_to_copy])
self.buffer = self.buffer[:amt_to_copy]
self.window.pos = 0
self.data.pos = 0
return self.ensure_idx_valid(distance_from_window_pos)
}
extra := idx - len(self.buffer) + 1
var n int
n, err = io.ReadAtLeast(self.source, self.buffer[len(self.buffer):cap(self.buffer)], extra)
block := self.buffer[len(self.buffer):][:n]
switch err {
case nil:
ok = true
self.buffer = self.buffer[:len(self.buffer)+n]
self.checksummer.Write(block)
case io.ErrUnexpectedEOF, io.EOF:
err = nil
self.buffer = self.buffer[:len(self.buffer)+n]
self.checksummer.Write(block)
}
return
}
func (self *diff) finish_up() (err error) {
if err = self.send_data(); err != nil {
return
}
self.data.pos = self.window.pos
self.data.sz = len(self.buffer) - self.window.pos
if err = self.send_data(); err != nil {
return
}
self.enqueue(Operation{Type: OpHash, Data: self.checksummer.Sum(nil)})
self.finished = true
return
}
// See https://rsync.samba.org/tech_report/node4.html for the design of this algorithm
func (self *diff) read_next() (err error) {
if self.window.sz > 0 {
if ok, err := self.ensure_idx_valid(self.window.pos + self.window.sz); !ok {
if err != nil {
return err
}
return self.finish_up()
}
self.window.pos++
self.data.sz++
self.rc.add_one_byte(self.buffer[self.window.pos], self.buffer[self.window.pos+self.window.sz-1])
} else {
if ok, err := self.ensure_idx_valid(self.window.pos + self.block_size - 1); !ok {
if err != nil {
return err
}
return self.finish_up()
}
self.window.sz = self.block_size
self.rc.full(self.buffer[self.window.pos : self.window.pos+self.window.sz])
}
found_hash := false
var block_index uint64
if hh, ok := self.hash_lookup[self.rc.val]; ok {
block_index, found_hash = find_hash(hh, self.hash(self.buffer[self.window.pos:self.window.pos+self.window.sz]))
}
if found_hash {
if err = self.send_data(); err != nil {
return
}
self.enqueue(Operation{Type: OpBlock, BlockIndex: block_index})
self.window.pos += self.window.sz
self.data.pos = self.window.pos
self.window.sz = 0
}
return nil
}
type OperationWriter struct {
Operations []Operation
expecting_data bool
}
func (self *OperationWriter) Write(p []byte) (n int, err error) {
if self.expecting_data {
self.expecting_data = false
self.Operations = append(self.Operations, Operation{Type: OpData, Data: slices.Clone(p)})
} else {
switch OpType(p[0]) {
case OpData:
self.expecting_data = true
case OpBlock, OpBlockRange, OpHash:
op := Operation{}
if n, err = op.Unserialize(p); err != nil {
return 0, err
} else if n < len(p) {
return 0, io.ErrShortWrite
}
self.Operations = append(self.Operations, op)
default:
return 0, fmt.Errorf("Unknown OpType: %d", p[0])
}
}
return
}
func (r *rsync) CreateDelta(source io.Reader, signature []BlockHash) ([]Operation, error) {
w := OperationWriter{}
it := r.CreateDiff(source, signature, &w)
for {
if err := it(); err != nil {
if err == io.EOF {
return w.Operations, nil
}
return nil, err
}
}
}
const DataSizeMultiple int = 8
func (r *rsync) CreateDiff(source io.Reader, signature []BlockHash, output io.Writer) func() error {
ans := &diff{
block_size: r.BlockSize, buffer: make([]byte, 0, (r.BlockSize * DataSizeMultiple)),
hash_lookup: make(map[uint32][]BlockHash, len(signature)),
source: source, hasher: r.hasher_constructor(),
checksummer: r.checksummer_constructor(), output: output,
}
for _, h := range signature {
key := h.WeakHash
ans.hash_lookup[key] = append(ans.hash_lookup[key], h)
}
return ans.Next
}
// Use a more unique way to identify a set of bytes.
func (r *rsync) hash(v []byte) uint64 {
r.hasher.Reset()
r.hasher.Write(v)
return r.hasher.Sum64()
}
func (r *rsync) HashSize() int { return r.hasher.Size() }
func (r *rsync) HashBlockSize() int { return r.hasher.BlockSize() }
func (r *rsync) HasHasher() bool { return r.hasher != nil }
// Searches for a given strong hash among all strong hashes in this bucket.
func find_hash(hh []BlockHash, hv uint64) (uint64, bool) {
for _, block := range hh {
if block.StrongHash == hv {
return block.Index, true
}
}
return 0, false
}
func min(a, b int) int {
if a < b {
return a
}
return b
}