UNSAFE MODULE; IMPORT FileAttr, MD5, MD5Digest, OSError, Pathname, RsyncBlock, RsyncBlockArraySort, Thread, UnixMisc, Ustat, Word, Wr; REVEAL T = Public BRANDED OBJECT buf: ADDRESS := NIL; size: CARDINAL := 0; start: UNTRACED REF CHAR := NIL; limit: UNTRACED REF CHAR := NIL; md5: TEXT := NIL; END; CONST CharOffset = 3; RsyncFile
CharOffset
is an arbitrary non-zero value which improves the quality
of the rolling checksum a little bit. It must be the same on the client
and the server.
PROCEDUREChooseBlockSize (fileSize: CARDINAL): CARDINAL =
Choose a good block size for the given file size.
We choose a block size based on several considerations. First, we want to keep the network pipeline full as well as we can. The server cannot do anything with a file until it has all the block information from the client. We would like to have all the block information for a file sitting in the server's receive buffer before the server finishes with the previous file. Otherwise the server will have to stop sending until it has read all of the block information from the network. To avoid this, we try to choose a block size such that all of the block information will fit into the network receive buffer on the server machine.
We clamp the block size within a range that we consider
reasonable
.
We would also like to have the final remainder
block be as
small as possible, since it will always have to be sent verbatim
by the server. After we have chosen a target block size, we
search in its neighborhood for a block size that will minimize
the remainder.
CONST MinBlockSize = 1024; MaxBlockSize = 16 * 1024; ReceiveBufferSize = 15 * 1024; (* A guess. *) BlockInfoSize = 26; (* It's 42 chars, but we assume decent compression. *) SearchRegion = 10; (* On each side of the first choice. *) MaxBlocks = ReceiveBufferSize DIV BlockInfoSize; VAR bestRemainder: CARDINAL; blockSize := fileSize DIV MaxBlocks; loSearch := blockSize - SearchRegion; hiSearch := blockSize + SearchRegion; BEGIN IF loSearch < MinBlockSize THEN loSearch := MinBlockSize; hiSearch := loSearch + 2*SearchRegion; ELSIF hiSearch > MaxBlockSize THEN hiSearch := MaxBlockSize; loSearch := hiSearch - 2*SearchRegion; END; bestRemainder := MaxBlockSize; (* Effectively infinity. *) FOR bs := loSearch TO hiSearch DO WITH rem = fileSize MOD bs DO IF rem < bestRemainder THEN bestRemainder := rem; blockSize := bs; END; END; END; RETURN blockSize; END ChooseBlockSize; PROCEDURE***************************************************************************Close (rf: T) RAISES {OSError.E} = BEGIN IF rf.buf # NIL THEN UnixMisc.Unmap(rf.buf, rf.size); rf.buf := NIL; END; rf.size := 0; rf.start := NIL; rf.limit := NIL; END Close; PROCEDUREGetMD5 (rf: T): TEXT = VAR md5 := MD5.New(); BEGIN TRY md5.updateRaw(rf.start, rf.size); FINALLY RETURN md5.finish(); END; END GetMD5; PROCEDUREOpen (p: Pathname.T; blockSize: CARDINAL := 0): T RAISES {OSError.E} = VAR rf: T; statbuf: Ustat.struct_stat; BEGIN rf := NEW(T); rf.buf := UnixMisc.MapFile(p, statbuf); rf.attr := FileAttr.FromStat(statbuf); rf.size := VAL(statbuf.st_size, INTEGER); rf.start := rf.buf; rf.limit := rf.start + rf.size; IF blockSize = 0 THEN blockSize := ChooseBlockSize(rf.size); END; rf.blockSize := blockSize; RETURN rf; END Open;
TYPE BlockIteratorRep = BlockIterator OBJECT rf: T; ptr: UNTRACED REF CHAR; blockNum: CARDINAL := 0; OVERRIDES next := BlockIteratorNext; END; PROCEDURE***************************************************************************IterateBlocks (rf: T): BlockIterator = BEGIN RETURN NEW(BlockIteratorRep, rf := rf, ptr := rf.start); END IterateBlocks; PROCEDUREBlockIteratorNext (bi: BlockIteratorRep; VAR block: RsyncBlock.T): BOOLEAN = VAR blockSize: CARDINAL; BEGIN IF bi.ptr >= bi.rf.limit THEN (* All done. *) RETURN FALSE; END; blockSize := MIN(bi.rf.limit - bi.ptr, bi.rf.blockSize); block.num := bi.blockNum; block.md5 := BlockMD5(bi.ptr, blockSize); block.rsum := BlockRsum(bi.ptr, blockSize); INC(bi.blockNum); INC(bi.ptr, blockSize); RETURN TRUE; END BlockIteratorNext;
PROCEDURE***************************************************************************BlockMD5 (ptr: UNTRACED REF CHAR; count: CARDINAL): MD5Digest.T = VAR digest: MD5Digest.T; md5 := MD5.New(); BEGIN TRY md5.updateRaw(ptr, count); FINALLY md5.finishRaw(digest); RETURN digest; END; END BlockMD5; PROCEDUREBlockRsum (ptr: UNTRACED REF CHAR; count: CARDINAL): Word.T = VAR limit := ptr + count; a, b: Word.T := 0; BEGIN (* This first loop is just an unrolled version of the second loop. *) WHILE ptr <= limit - 4 DO b := Word.Plus( b, Word.Plus( Word.Times(4, Word.Plus(a, ORD(ptr^))), Word.Plus( Word.Times(3, ORD(LOOPHOLE(ptr+1, UNTRACED REF CHAR)^)), Word.Plus( Word.Times(2, ORD(LOOPHOLE(ptr+2, UNTRACED REF CHAR)^)), Word.Plus( ORD(LOOPHOLE(ptr+3, UNTRACED REF CHAR)^), 10*CharOffset))))); a := Word.Plus( a, Word.Plus( ORD(ptr^), Word.Plus( ORD(LOOPHOLE(ptr+1, UNTRACED REF CHAR)^), Word.Plus( ORD(LOOPHOLE(ptr+2, UNTRACED REF CHAR)^), Word.Plus( ORD(LOOPHOLE(ptr+3, UNTRACED REF CHAR)^), 4*CharOffset))))); INC(ptr, 4); END; WHILE ptr < limit DO a := Word.Plus(a, ORD(ptr^)+CharOffset); b := Word.Plus(b, a); INC(ptr); END; RETURN Word.Or(Word.Shift(Word.And(b, 16_ffff), 16), Word.And(a, 16_ffff)); END BlockRsum;
CONST HashBits = 14; HashMask = Word.Not(Word.Shift(Word.Not(0), HashBits)); HashShift = (Word.Size - HashBits) DIV 2;To hash a rolling checksum, we take its middle
HashBits
bits.
TYPE HashTab = ARRAY [0..HashMask] OF INTEGER; DiffIteratorRep = DiffIterator OBJECT rf: T; ptr: UNTRACED REF CHAR; (* Start of checksummed block. *) rsum: Word.T; blocks: REF ARRAY OF RsyncBlock.T; seqVec: REF ARRAY OF CARDINAL := NIL; (* Block # => "blocks" index. *) hashTab: REF HashTab := NIL; OVERRIDES next := DiffIteratorNext; END; PROCEDUREDiffIteratorNext (di: DiffIteratorRep; wr: Wr.T; VAR blocks: BlockRange): BOOLEAN RAISES {Thread.Alerted, Wr.Failure} = VAR hash: Word.T; blockIndex := -1; blockNum: CARDINAL; a, b: Word.T; buff: ARRAY [0..8191] OF CHAR; buffCt: CARDINAL := 0; BEGIN IF di.ptr >= di.rf.limit THEN (* We have already sent everything. *) (* NIL out some pointers in an attempt to get their large amounts of memory collected earlier. *) di.blocks := NIL; di.seqVec := NIL; di.hashTab := NIL; RETURN FALSE; END; (* Scan forward, copying characters, until we find a matching block or we reach the end of the file. *) VAR limit := di.rf.limit; blockSize := di.rf.blockSize; hashTab := di.hashTab; rsum := di.rsum; ptr := di.ptr; BEGIN REPEAT (* See whether the client has a block matching our current one. *) IF limit - ptr >= blockSize AND hashTab # NIL THEN (* It's worth looking for a matching block. *) hash := Word.Extract(rsum, HashShift, HashBits); blockIndex := hashTab[hash]; IF blockIndex >= 0 THEN (* A block matches the hash. *) di.ptr := ptr; di.rsum := rsum; blockIndex := FindMatch(di, hash, blockIndex); IF blockIndex >= 0 THEN EXIT END; (* Found a match. *) END; END; (* No matching block at this position; output the character. *) IF buffCt >= NUMBER(buff) THEN (* Flush buffer. *) Wr.PutString(wr, buff); buffCt := 0; END; buff[buffCt] := ptr^; INC(buffCt); (* Update the checksum, if we're going to use it. *) IF limit - (ptr+1) >= blockSize AND hashTab # NIL THEN a := Word.And(rsum, 16_ffff); b := Word.And(Word.RightShift(rsum, 16), 16_ffff); (* Unchecksum the first character of the block. *) WITH ch = ORD(ptr^) + CharOffset DO a := Word.Minus(a, ch); b := Word.Minus(b, Word.Times(blockSize, ch)); END; (* Checksum the character just entering the block. *) WITH ch = ORD(LOOPHOLE(ptr+blockSize, UNTRACED REF CHAR)^) + CharOffset DO a := Word.Plus(a, ch); b := Word.Plus(b, a); END; rsum := Word.Or(Word.Shift(Word.And(b, 16_ffff), 16), Word.And(a, 16_ffff)); END; INC(ptr); (* Advance the block. *) UNTIL ptr >= limit; IF buffCt > 0 THEN (* Write out the remaining buffered characters. *) Wr.PutString(wr, SUBARRAY(buff, 0, buffCt)); buffCt := 0; END; IF blockIndex >= 0 THEN (* Found a matching block. *) blocks.start := di.blocks[blockIndex].num; blockNum := blocks.start; (* Scan forward through consecutive blocks until we find one that doesn't match. *) REPEAT (* Advance past the matching block. *) INC(blockNum); INC(ptr, blockSize); (* Bail out if there are no more complete blocks, or if we've just matched the last block of the client's file. *) IF limit - ptr < blockSize OR blockNum > LAST(di.seqVec^) THEN EXIT; END; blockIndex := di.seqVec[blockNum]; UNTIL NOT MD5Digest.Equal(di.blocks[blockIndex].md5, BlockMD5(ptr, blockSize)); IF limit - ptr >= blockSize THEN (* Compute rolling checksum. *) rsum := BlockRsum(ptr, blockSize); END; blocks.count := blockNum - blocks.start; ELSE blocks.start := 0; blocks.count := 0; END; di.ptr := ptr; di.rsum := rsum; RETURN TRUE; END; END DiffIteratorNext; PROCEDUREFindMatch (di: DiffIteratorRep; hash: Word.T; blockIndex: INTEGER): INTEGER =
Returns index of matching block, or -1 if no match.
VAR md5: MD5Digest.T; needMD5 := TRUE; BEGIN IF blockIndex >= 0 THEN REPEAT IF di.blocks[blockIndex].rsum = di.rsum THEN (* The full 32-bit checksums match too. Check the MD5s. *) IF needMD5 THEN md5 := BlockMD5(di.ptr, di.rf.blockSize); needMD5 := FALSE; END; IF MD5Digest.Equal(di.blocks[blockIndex].md5, md5) THEN (* Found. *) RETURN blockIndex; END; END; INC(blockIndex); UNTIL blockIndex > LAST(di.blocks^) OR Word.Extract(di.blocks[blockIndex].rsum, HashShift, HashBits) # hash; END; RETURN -1; END FindMatch; PROCEDUREHashCompare (READONLY a, b: RsyncBlock.T): [-1..1] = VAR aHash := Word.Extract(a.rsum, HashShift, HashBits); bHash := Word.Extract(b.rsum, HashShift, HashBits); BEGIN IF aHash < bHash THEN RETURN -1 END; IF aHash > bHash THEN RETURN 1 END; RETURN 0; END HashCompare; PROCEDUREIterateDiffs (rf: T; blocks: REF ARRAY OF RsyncBlock.T): DiffIterator = VAR di: DiffIteratorRep; hashTabIndex: CARDINAL; hash: Word.T; BEGIN di := NEW(DiffIteratorRep, rf := rf, ptr := rf.start, blocks := blocks); IF NUMBER(blocks^) > 0 AND rf.size >= rf.blockSize THEN RsyncBlockArraySort.Sort(di.blocks^, HashCompare); di.seqVec := NEW(REF ARRAY OF CARDINAL, NUMBER(di.blocks^)); di.hashTab := NEW(REF HashTab); hashTabIndex := 0; FOR blockIndex := FIRST(di.blocks^) TO LAST(di.blocks^) DO di.seqVec[di.blocks[blockIndex].num] := blockIndex; hash := Word.Extract(di.blocks[blockIndex].rsum, HashShift, HashBits); IF hash >= hashTabIndex THEN (* Not a repeat of previous hash *) FOR hti := hashTabIndex TO hash-1 DO di.hashTab[hti] := -1; END; di.hashTab[hash] := blockIndex; hashTabIndex := hash + 1; END; END; FOR hti := hashTabIndex TO LAST(di.hashTab^) DO di.hashTab[hti] := -1; END; di.rsum := BlockRsum(di.ptr, rf.blockSize); END; RETURN di; END IterateDiffs; BEGIN END RsyncFile.