UNSAFE MODULEThe byte stream is divided into a sequence of fragments. Each fragment has a header that consists of 4 bytes of control information and 4 bytes of count, followed by a sequence of data bytes. The last fragment of each message has the; IMPORT ConnFD, Atom, AtomList; IMPORT Rd, Wr, MsgRd, MsgWr, RdClass, WrClass, Thread; IMPORT Swap; ConnMsgRW
word set it its header. Both
control word and count are transmitted in little-endian order (that
is, the bytes of the integer occur in the stream in order of
increasing significance).
The {\it alignment} of a byte in a sequence is its index modulo eight. The data format maintains the invariants that
-- each data byte has the same alignment in its fragment and in its message, and
-- the alignment of the first byte of each header is zero.
This is achieved by inserting up to seven bytes of padding before and after each fragment. Alignment padding is only included for fragments that contain one or more data byte.
That is, given a sequence of fragments
f[0], ..., f[N-1]a message is represented by a contiguous subsequence of fragments,
f[p], ... f[q-1]where
(A i: p <= i < q : f[i].eom = (i = q-1)) AND (p=0) OR f[p-1].eomThe bytes of the message are the concatenation of the data bytes of the fragments containing the message, minus any padding introduced to satisfy the alignment invariants. For example, the following code sets
to the contents of the message f[p], ... f[q-1]:
VAR m := ""; skip := 0; CONST Alignment = 8; (* at least as big as a fragment header
| BEGIN | FOR i := p TO q-1 DO | m := m & Text.Sub(f[i].data, skip, f[i].count); | skip := skip + f[i].count MOD Alignment | END | END *) TYPE Int32 = BITS 32 FOR [-16_7FFFFFFF-1..16_7FFFFFFF]; FragmentHeader = RECORD (* N.B. This is in little-endian order on the wire. *) eom: Int32; nb: Int32; END; CONST HeaderBytes = BYTESIZE(FragmentHeader); CONST AlignBytes = HeaderBytes; TYPE RdT = MsgRd.T BRANDED OBJECT fd: ConnFD.T; hdr := FragmentHeader{eom := 1, nb := 0}; lim: CARDINAL := 0; OVERRIDES seek := RdSeek; close := RdClose; nextMsg := RdNextMsg; length := Length; END;The {\it current fragment} ofrd
is the fragment containing the data byte rd.buff[rd.sd+sr.cur], ifrd
is ready. Otherwise the current fragment is the last fragment that has been processed by the reader. The initial state reflects an artificial fragment with size zero that is prepended to the actual sequence of fragments read over the TCP stream.The value
is the end of message bit for the current fragment.The value
is the number of bytes in the current fragment that remain to be read from the TCP stream.The value of
is one greater than the buffer index of the last valid TCP stream byte in the buffer. The value oflim MOD 8
is always zero.TYPE WrT = MsgWr.T BRANDED OBJECT fd: ConnFD.T; OVERRIDES seek := WrSeek; flush := WrFlush; close := WrClose; nextMsg := WrNextMsg; END; CONST BufferSize = 8192; VAR ProtocolErrorEOF: AtomList.T; ProtocolErrorNB: AtomList.T; PROCEDURENewRd (fd: ConnFD.T) : MsgRd.T = BEGIN RETURN NEW(RdT, fd := fd, buff := NEW(REF ARRAY OF CHAR, BufferSize), st := 0, lo := 0, hi := 0, cur := 0, intermittent := TRUE, seekable := FALSE, closed := FALSE) END NewRd; PROCEDURENewWr (fd: ConnFD.T) : MsgWr.T = BEGIN RETURN NEW(WrT, fd := fd, buff := NEW(REF ARRAY OF CHAR, BufferSize), st := HeaderBytes, lo := 0, hi := BufferSize - HeaderBytes, cur := 0, buffered := TRUE, seekable := FALSE, closed := FALSE) END NewWr; PROCEDURERdSeek (rd: RdT; <*UNUSED*> n: CARDINAL; dontBlock: BOOLEAN): RdClass.SeekResult RAISES {Rd.Failure, Thread.Alerted} = VAR nb: CARDINAL; BEGIN IF dontBlock THEN RETURN RdClass.SeekResult.WouldBlock; END; REPEAT (* Advance to next sub-buffer and set "nb" to the number of data bytes available in it. There are two cases: *) IF rd.hdr.nb # 0 THEN (* read the rest of the current fragment into the buffer *) rd.st := 0; rd.lo := rd.hi; rd.lim := ReadAligned(rd.fd, rd.buff); (* fall out into common code below *) ELSE (* advance to next non-empty fragment *) rd.st := Align((rd.hi - rd.lo) + rd.st); rd.lo := rd.hi; REPEAT IF rd.hdr.eom # 0 THEN RETURN RdClass.SeekResult.Eof; END; IF rd.lim - rd.st < HeaderBytes THEN <* ASSERT rd.st = rd.lim *> (* read from stream to get next header *) rd.st := 0; rd.lim := ReadAligned(rd.fd, rd.buff); END; (* careful, this is endian dependent *) rd.hdr := LOOPHOLE(ADR(rd.buff[rd.st]), UNTRACED REF FragmentHeader)^; IF Swap.endian = Swap.Endian.Big THEN rd.hdr.nb := Swap.Swap4(rd.hdr.nb); END; IF rd.hdr.nb < 0 THEN RAISE Rd.Failure(ProtocolErrorNB); END; INC(rd.st, HeaderBytes); UNTIL rd.hdr.nb # 0; END; (* Now "rd.st" is the buffer index where the data part of the now-current fragment begins. This could be beyond the end of the buffer. *) rd.st := rd.st + (rd.lo MOD AlignBytes); nb := MIN(rd.hdr.nb, MAX(rd.lim-rd.st, 0)); INC(rd.hi, nb); DEC(rd.hdr.nb, nb); UNTIL nb # 0; (* loop necessary in case hdr is at end of buffer *) RETURN RdClass.SeekResult.Ready; END RdSeek; PROCEDUREAlign (n: CARDINAL) : CARDINAL =Roundn
up to the next multiple of 8.BEGIN RETURN n + ((-n) MOD 8) END Align; PROCEDUREReadAligned ( fd: ConnFD.T; buff: REF ARRAY OF CHAR) : INTEGER RAISES {Rd.Failure, Thread.Alerted} = (* "ReadAligned" always returns a non-zero result. If we are at the end of the TCP stream, then we have encountered a protocol error. Otherwise, we return a postive count of bytes which is zero modulo "AlignBytes". *) VAR len := 0; nb: CARDINAL; <* FATAL ConnFD.TimedOut *> BEGIN len := 0; REPEAT nb := fd.get(SUBARRAY(buff^, len, NUMBER(buff^)-len)); INC(len, nb); UNTIL (len MOD AlignBytes) = 0; IF len = 0 THEN RAISE Rd.Failure(ProtocolErrorEOF); END; RETURN len; END ReadAligned; PROCEDURELength (rd: RdT) : INTEGER = BEGIN IF rd.hdr.eom # 0 THEN RETURN rd.lo + rd.hdr.nb; ELSE RETURN -1; END; END Length; PROCEDURERdClose (rd: RdT) RAISES {Rd.Failure} = BEGIN rd.buff := NIL; rd.fd.shutdownIn(); END RdClose; PROCEDURERdNextMsg (rd: RdT) : BOOLEAN RAISES {Rd.Failure, Thread.Alerted} = BEGIN (* For now, we are not going to differentiate between end of connection and communications failure. This procedure will raise Rd.Failure in either case. *) WHILE rd.hdr.nb # 0 OR NOT rd.hdr.eom # 0 DO EVAL RdSeek(rd, rd.hi, FALSE); END; rd.st := Align((rd.hi - rd.lo) + rd.st); rd.cur := 0; rd.lo := 0; rd.hi := 0; rd.hdr := FragmentHeader{eom := 0, nb := 0}; EVAL RdSeek(rd, rd.cur, FALSE); RETURN TRUE; END RdNextMsg; PROCEDUREWrSeek (wr: WrT; <*UNUSED*> n: CARDINAL) RAISES {Wr.Failure, Thread.Alerted} = BEGIN WrFlush(wr) END WrSeek; PROCEDUREWrFlush (wr: WrT) RAISES {Wr.Failure, Thread.Alerted} = BEGIN IF wr.cur # wr.lo THEN PutFrag(wr, FALSE); END; wr.st := HeaderBytes + (wr.cur MOD AlignBytes); wr.lo := wr.cur; wr.hi := wr.lo + (NUMBER(wr.buff^) - wr.st); END WrFlush; PROCEDUREWrClose (wr: WrT) RAISES {Wr.Failure} = BEGIN wr.buff := NIL; wr.fd.shutdownOut(); END WrClose; PROCEDUREWrNextMsg (wr: WrT) RAISES {Wr.Failure, Thread.Alerted} = BEGIN PutFrag(wr, TRUE); wr.st := HeaderBytes; wr.cur := 0; wr.lo := 0; wr.hi := NUMBER(wr.buff^) - HeaderBytes; END WrNextMsg; PROCEDUREPutFrag (wr: WrT; eom: BOOLEAN) RAISES {Wr.Failure, Thread.Alerted} = VAR len := wr.cur - wr.lo; BEGIN WITH hdr = LOOPHOLE(ADR(wr.buff[0]), UNTRACED REF FragmentHeader) DO hdr^ := FragmentHeader{eom := ORD(eom), nb := len}; IF Swap.endian = Swap.Endian.Big THEN hdr.nb := Swap.Swap4(hdr.nb); END; END; IF len = 0 THEN (* output just a header for null fragments *) wr.fd.put(SUBARRAY(wr.buff^, 0, HeaderBytes)); ELSE (* otherwise correct for alignment of first byte *) wr.fd.put(SUBARRAY(wr.buff^, 0, Align(len+wr.st))); END; END PutFrag; BEGIN ProtocolErrorEOF := AtomList.Cons(Atom.FromText("ConnMsgRW.UnexpectedEOF"), NIL); ProtocolErrorNB := AtomList.Cons(Atom.FromText("ConnMsgRW.ProtocolError"), NIL); END ConnMsgRW.