<* PRAGMA LL *> MODULE; IMPORT Atom, AtomList, IP, Rd, RefSeq, TCP, Thread, Word, Wr; IMPORT ConnFD; ChannelMux 
IMPORT Fmt, IO; CONST DoTrace = FALSE; PROCEDURETrace (msg: TEXT) = BEGIN IF DoTrace THEN IO.Put(msg & "\n") END; END Trace;
CONST
  ProtoVersion = 0;
  SendBufSize = 16 * 1024;
  RecvBufSize = 16 * 1024;
  MaxSegSize = 1024;
TYPE
  PacketType = {  (* Don't re-order these! *)
    StartupRequest,
    StartupReply,
    Connect,
    Accept,
    Reset,
    Data,
    Window,
    Close
  };
 Locking conventions.
We use medium-grained locking, to wit: You can lock the multiplexer itself, and you can lock an individual channel. The locking order, which must be followed in all cases, is: multiplexer < channel.
In other words, if you want to hold locks on the multiplexer and on a channel simultaneously, you must acquire the multiplexer lock first, and release it last.
*************************************************************************** Multiplexers. ***************************************************************************
REVEAL
  T = MUTEX BRANDED OBJECT
    channels: RefSeq.T;
    closed := TRUE;
    wr: Wr.T;
    sender: Sender;
    rd: Rd.T;
    receiver: Receiver;
  END;
PROCEDURE Open (rd: Rd.T;
               wr: Wr.T;
	       VAR (*OUT*) chan: Channel;
	       active: BOOLEAN): T
  RAISES {IP.Error, Thread.Alerted} =
  VAR
    mux: T;
  BEGIN
    mux := NEW(T,
      channels := NEW(RefSeq.T).init(),
      rd := rd,
      wr := wr);
    mux.receiver := NEW(Receiver).init(mux);
    mux.sender := NEW(Sender).init(mux);
    mux.closed := FALSE;
    TRY
      IF active THEN
	InitiateProtocol(mux, chan);
      ELSE
	AcceptProtocol(mux, chan);
      END;
    EXCEPT
    | Rd.Failure(l) => RAISE IP.Error(l);
    | Wr.Failure(l) => RAISE IP.Error(l);
    END;
    RETURN mux;
  END Open;
PROCEDURE Listen (mux: T): ChannelID
  RAISES {IP.Error} =
  <* LL = {} *>
  VAR
    chan: Channel;
  BEGIN
    LOCK mux DO
      chan := AllocChannel(mux);
    END;
    RETURN chan.id;
  END Listen;
PROCEDURE Accept (mux: T; id: ChannelID): Channel
  RAISES {IP.Error, Thread.Alerted} =
  <* LL = {} *>
  VAR
    chan: Channel;
  BEGIN
    LOCK mux DO
      chan := GetChannel(mux, id);
    END;
    LOCK chan DO
      WHILE chan.state = ChannelState.Listening DO
        Thread.AlertWait(chan, chan.rdReady);
      END;
      IF chan.state # ChannelState.Established THEN
	Raise(TCP.Closed);
      END;
    END;
    RETURN chan;
  END Accept;
PROCEDURE Connect (mux: T; id: ChannelID): Channel
  RAISES {IP.Error, Thread.Alerted} =
  VAR
    chan: Channel;
  BEGIN
    LOCK mux DO
      chan := GetChannel(mux, id);
    END;
    LOCK chan DO
      IF chan.state # ChannelState.Unused THEN
	Raise(IP.PortBusy);
      END;
      chan.state := ChannelState.Connecting;
      chan.flags := chan.flags + ChannelFlags{ChannelFlag.Connect};
    END;
    AwakenSender(mux);
    LOCK chan DO
      WHILE chan.state = ChannelState.Connecting DO
        Thread.AlertWait(chan, chan.wrReady);
      END;
      IF chan.state # ChannelState.Established THEN
	Raise(TCP.Refused);
      END;
    END;
    RETURN chan;
  END Connect;
PROCEDURE Close (mux: T) =
  BEGIN
    ShutdownProtocol(mux);
  END Close;
***************************************************************************
PROCEDUREInitiateProtocol (mux: T; VAR (*OUT*) chan: Channel) RAISES {IP.Error, Rd.Failure, Thread.Alerted, Wr.Failure} = VAR version: CARDINAL; BEGIN TRY PutStartupRequest(mux, ProtoVersion); NeedPacketType(mux, PacketType.StartupReply); GetStartupReply(mux, version); IF version # ProtoVersion THEN (* There is only one version right now. *) Raise(TCP.Refused); END; StartThreads(mux); chan := Connect(mux, 0); EXCEPT Rd.EndOfFile => Raise(TCP.ConnLost); END; END InitiateProtocol; PROCEDUREAcceptProtocol (mux: T; VAR (*OUT*) chan: Channel) RAISES {IP.Error, Rd.Failure, Thread.Alerted, Wr.Failure} = VAR hisVersion, myVersion: CARDINAL; id: ChannelID; BEGIN TRY NeedPacketType(mux, PacketType.StartupRequest); GetStartupRequest(mux, hisVersion); myVersion := ProtoVersion; (* The only version. *) PutStartupReply(mux, myVersion); IF hisVersion # myVersion THEN (* There is only one version right now. *) Raise(TCP.Refused); END; id := Listen(mux); <* ASSERT id = 0 *> StartThreads(mux); chan := Accept(mux, id); EXCEPT Rd.EndOfFile => Raise(TCP.ConnLost); END; END AcceptProtocol; PROCEDUREAllocChannel (mux: T): Channel RAISES {IP.Error} = <* LL = mux *>
 Returns an available Channel in the listening state. 
  VAR
    chan: Channel;
  BEGIN
    WITH sz = mux.channels.size() DO
      FOR i := 0 TO sz-1 DO
	chan := mux.channels.get(i);
	LOCK chan DO
	  IF chan.state = ChannelState.Unused THEN
	    chan.state := ChannelState.Listening;
	    RETURN chan;
	  END;
	END;
      END;
      IF sz > LAST(ChannelID) THEN
	Raise(IP.NoResources);
      END;
      chan := NEW(Channel).init(mux, sz);
      chan.state := ChannelState.Listening;
      mux.channels.addhi(chan);
      RETURN chan;
    END;
  END AllocChannel;
PROCEDURE GetChannel (mux: T;
                     id: ChannelID): Channel =
<* LL = mux *>
Returns theChannelwith the givenid, creating it if necessary.
  BEGIN
    FOR i := mux.channels.size() TO id DO
      mux.channels.addhi(NEW(Channel).init(mux, i));
    END;
    RETURN mux.channels.get(id);
  END GetChannel;
PROCEDURE StartThreads (mux: T) =
  BEGIN
    mux.receiver.thread := Thread.Fork(mux.receiver);
    mux.sender.thread := Thread.Fork(mux.sender);
  END StartThreads;
PROCEDURE ShutdownProtocol (mux: T;
                           error: AtomList.T := NIL) =
<* LL = {} *>
  VAR
    chan: Channel;
  BEGIN
    LOCK mux DO
      IF mux.closed THEN RETURN END;
      mux.closed := TRUE;
      WITH sz = mux.channels.size() DO
	FOR i := 0 TO sz-1 DO
	  chan := mux.channels.get(i);
	  LOCK chan DO
	    IF chan.state # ChannelState.Unused THEN
	      IF error # NIL THEN chan.error := error END;
	      chan.state := ChannelState.Closed;
	      chan.flags := ChannelFlags{};
	      Thread.Signal(chan.rdReady);
	      Thread.Signal(chan.wrReady);
	    END;
	  END;
	END;
      END;
    END;
    WITH me = Thread.Self() DO
      IF me # mux.sender.thread THEN
	Thread.Alert(mux.sender.thread);
	EVAL Thread.Join(mux.sender.thread);
      END;
      IF me # mux.receiver.thread THEN
	Thread.Alert(mux.receiver.thread);
	EVAL Thread.Join(mux.receiver.thread);
      END;
    END;
  END ShutdownProtocol;
PROCEDURE AwakenSender (mux: T) =
<* LL = {} *>
  VAR
    waitingForWork: BOOLEAN;
  BEGIN
    LOCK mux DO
      waitingForWork := mux.sender.waitingForWork;
    END;
    IF waitingForWork THEN
      Thread.Signal(mux.sender.newWork);
    END;
  END AwakenSender;
***************************************************************************
 Receiver thread. 
***************************************************************************
TYPE
  Receiver = Thread.Closure OBJECT
    mux: T;
    thread: Thread.T;
  METHODS
    init(mux: T): Receiver := RecvInit;
  OVERRIDES
    apply := RecvApply;
  END;
PROCEDURE RecvInit (self: Receiver; mux: T): Receiver =
  BEGIN
    self.mux := mux;
    RETURN self;
  END RecvInit;
PROCEDURE RecvApply (self: Receiver): REFANY =
  VAR
    type: PacketType;
    id: ChannelID;
    mss: CARDINAL;
    window: Word.T;
    length: CARDINAL;
    len1, len2: CARDINAL;
    chan: Channel;
    awakenSender: BOOLEAN;
    awakenClient: Thread.Condition;
  BEGIN
    TRY
      LOOP
	awakenSender := FALSE;
	awakenClient := NIL;
	type := GetPacketType(self.mux);
	CASE type OF
	| PacketType.StartupRequest, PacketType.StartupReply =>
	    RAISE Rd.Failure(AtomList.List1(ProtocolError));
	| PacketType.Connect =>
	    GetConnect(self.mux, id, mss, window);
	    LOCK self.mux DO
	      chan := GetChannel(self.mux, id);
	    END;
	    LOCK chan DO
	      IF chan.state = ChannelState.Listening THEN
		chan.sendMSS := mss;
		chan.sendWin := window;
		chan.state := ChannelState.Established;
		chan.flags := chan.flags + ChannelFlags{ChannelFlag.Accept};
		awakenClient := chan.rdReady;
	      ELSE
		chan.flags := chan.flags + ChannelFlags{ChannelFlag.Reset};
	      END;
	    END;
	    awakenSender := TRUE;
	| PacketType.Accept =>
	    GetAccept(self.mux, id, mss, window);
	    LOCK self.mux DO
	      chan := GetChannel(self.mux, id);
	    END;
	    LOCK chan DO
	      IF chan.state = ChannelState.Connecting THEN
		chan.sendMSS := mss;
		chan.sendWin := window;
		chan.state := ChannelState.Established;
		awakenClient := chan.wrReady;
	      ELSE
		chan.flags := chan.flags + ChannelFlags{ChannelFlag.Reset};
		awakenSender := TRUE;
	      END;
	    END;
	| PacketType.Reset =>
	    RAISE Rd.Failure(AtomList.List1(ProtocolError));
	| PacketType.Data =>
	    (* Read the packet directly into the buffer to avoid
	       unnecessary copying. *)
	    id := Get1(self.mux);
	    length := Get2(self.mux);
	    LOCK self.mux DO
	      chan := GetChannel(self.mux, id);
	    END;
	    LOCK chan DO
	      IF chan.state # ChannelState.Established
	      AND chan.state # ChannelState.WrClosed THEN
		RAISE Rd.Failure(AtomList.List1(ProtocolError));
	      END;
	      IF length > chan.recvMSS OR length > BufAvail(chan.recvBuf) THEN
		RAISE Rd.Failure(AtomList.List1(ProtocolError));
	      END;
	    END;
	    IF length > 0 THEN
	      WITH b = chan.recvBuf DO
		len1 := MIN(length, NUMBER(b.buf^) - b.in);
		len2 := length - len1;
		IF len1 > 0 THEN
		  GetN(self.mux, SUBARRAY(b.buf^, b.in, len1));
		END;
		IF len2 > 0 THEN
		  GetN(self.mux, SUBARRAY(b.buf^, 0, len2));
		END;
	      END;
	      LOCK chan DO
		INC(chan.recvBuf.in, length);
		IF chan.recvBuf.in >= NUMBER(chan.recvBuf.buf^) THEN
		  DEC(chan.recvBuf.in, NUMBER(chan.recvBuf.buf^));
		END;
		Trace("<D " & Fmt.Int(id) & " " & Fmt.Int(length)
		  & " [" & Fmt.Int(BufCount(chan.recvBuf)) & "]");
	      END;
	      awakenClient := chan.rdReady;
	    END;
	| PacketType.Window =>
	    GetWindow(self.mux, id, window);
	    LOCK self.mux DO
	      chan := GetChannel(self.mux, id);
	    END;
	    LOCK chan DO
	      IF chan.state = ChannelState.Established
	      OR chan.state = ChannelState.RdClosed THEN
		chan.sendWin := window;
		awakenSender := TRUE;
	      END;
	    END;
	| PacketType.Close =>
	    GetClose(self.mux, id);
	    LOCK self.mux DO
	      chan := GetChannel(self.mux, id);
	    END;
	    LOCK chan DO
	      IF chan.state = ChannelState.Established THEN
		chan.state := ChannelState.RdClosed;
	      ELSIF chan.state = ChannelState.WrClosed THEN
		chan.state := ChannelState.Closed;
	      ELSE
		RAISE Rd.Failure(AtomList.List1(ProtocolError));
	      END;
	    END;
	    awakenClient := chan.rdReady;
	END;
	IF awakenSender THEN AwakenSender(self.mux) END;
	IF awakenClient # NIL THEN Thread.Signal(awakenClient) END;
      END;
    EXCEPT
    | Rd.EndOfFile =>   ShutdownProtocol(self.mux);
    | Rd.Failure(l) =>  ShutdownProtocol(self.mux, l);
    | Thread.Alerted => (* We've been killed.  Just quit. *)
    END;
    Trace("Receiver terminates");
    RETURN NIL;
  END RecvApply;
***************************************************************************
 Sender thread. 
***************************************************************************
TYPE
  Sender = Thread.Closure OBJECT
    mux: T;
    thread: Thread.T;
    newWork: Thread.Condition;
    waitingForWork: BOOLEAN;
    lastID: CARDINAL;
  METHODS
    init(mux: T): Sender := SndrInit;
  OVERRIDES
    apply := SndrApply;
  END;
PROCEDURE SndrInit (self: Sender;
                   mux: T): Sender =
  BEGIN
    self.mux := mux;
    self.newWork := NEW(Thread.Condition);
    self.waitingForWork := FALSE;
    self.lastID := 0;
    RETURN self;
  END SndrInit;
PROCEDURE SndrApply (self: Sender): REFANY =
  VAR
    what: ChannelFlag;
    chan: Channel;
    id: ChannelID;
    mss: CARDINAL;
    window: Word.T;
    length, len1, len2: CARDINAL;
  BEGIN
    TRY
      LOOP
	SndrWaitForWork(self, what, chan);
	CASE what OF
	| ChannelFlag.Connect =>
	    LOCK chan DO
	      id := chan.id;
	      mss := chan.recvMSS;
	      window := Word.Extract(
		Word.Plus(chan.recvSeq, BufSize(chan.recvBuf)), 0, 32);
	    END;
	    PutConnect(self.mux, id, mss, window);
	| ChannelFlag.Accept =>
	    LOCK chan DO
	      id := chan.id;
	      mss := chan.recvMSS;
	      window := Word.Extract(
		Word.Plus(chan.recvSeq, BufSize(chan.recvBuf)), 0, 32);
	    END;
	    PutAccept(self.mux, id, mss, window);
	| ChannelFlag.Reset =>
	    LOCK chan DO
	      id := chan.id;
	    END;
	    PutReset(self.mux, id);
	| ChannelFlag.Window =>
	    LOCK chan DO
	      id := chan.id;
	      window := Word.Extract(
		Word.Plus(chan.recvSeq, BufSize(chan.recvBuf)), 0, 32);
	    END;
	    PutWindow(self.mux, id, window);
	| ChannelFlag.Data =>
	    LOCK chan DO
	      id := chan.id;
	      length := MIN(BufCount(chan.sendBuf), chan.sendMSS);
	      WITH winSize =
		Word.Extract(Word.Minus(chan.sendWin, chan.sendSeq), 0, 32)
	      DO
		IF Word.LT(winSize, length) THEN length := winSize END;
	      END;
	    END;
	    IF length > 0 THEN
	      (* Output the packet directly from the channel's send buffer, to
		 avoid unnecessary copying. *)
	      WITH b = chan.sendBuf DO
		len1 := MIN(length, NUMBER(b.buf^) - b.out);
		len2 := length - len1;
		PutPacketType(self.mux, PacketType.Data);
		Put1(self.mux, id);
		Put2(self.mux, length);
		IF len1 > 0 THEN
		  Wr.PutString(self.mux.wr, SUBARRAY(b.buf^, b.out, len1));
		END;
		IF len2 > 0 THEN
		  Wr.PutString(self.mux.wr, SUBARRAY(b.buf^, 0, len2));
		END;
		Wr.Flush(self.mux.wr);
	      END;
	      LOCK chan DO
		chan.sendSeq :=
		  Word.Extract(Word.Plus(chan.sendSeq, length), 0, 32);
		INC(chan.sendBuf.out, length);
		IF chan.sendBuf.out >= NUMBER(chan.sendBuf.buf^) THEN
		  DEC(chan.sendBuf.out, NUMBER(chan.sendBuf.buf^));
		END;
		Trace(">D " & Fmt.Int(id) & " " & Fmt.Int(length)
		  & " [" & Fmt.Int(BufCount(chan.sendBuf)) & "]");
	      END;
	      Thread.Signal(chan.wrReady);
	    END;
	| ChannelFlag.Close =>
	    LOCK chan DO
	      id := chan.id;
	    END;
	    PutClose(self.mux, id);
	END;
	(* ... *)
      END;
    EXCEPT
    | Thread.Alerted =>  (* We've been killed.  Just quit. *)
    | Wr.Failure(l)  =>  ShutdownProtocol(self.mux, l);
    END;
    Trace("Sender terminates");
    RETURN NIL;
  END SndrApply;
PROCEDURE SndrWaitForWork (self: Sender;
                          VAR (*OUT*) what: ChannelFlag;
                          VAR (*OUT*) chan: Channel)
  RAISES {Thread.Alerted} =
<* LL = {} *>
Waits until there is a channel that needs something done by the sender. Returns the task viawhatand the channel viachan.
  BEGIN
    LOCK self.mux DO
      WHILE NOT SndrScan(self, what, chan) DO  (* Wait for something to do. *)
	self.waitingForWork := TRUE;
        Thread.AlertWait(self.mux, self.newWork);
      END;
      self.waitingForWork := FALSE;
    END;
  END SndrWaitForWork;
PROCEDURE SndrScan (self: Sender;
                   VAR (*OUT*) what: ChannelFlag;
                   VAR (*OUT*) chan: Channel): BOOLEAN =
<* LL = self.mux *>
Searches for a channel that needs something be done by the sender. If such a channel is found, setswhatandchanand returnsTRUE. Else returnsFALSE.
  VAR
    flags: ChannelFlags;
    numChannels := self.mux.channels.size();
    id: CARDINAL := self.lastID;
  BEGIN
    IF numChannels > 0 THEN
      REPEAT
	INC(id);
	IF id >= numChannels THEN id := 0 END;
	chan := self.mux.channels.get(id);
	LOCK chan DO
	  IF chan.state # ChannelState.Unused THEN
	    flags := chan.flags;
	    IF chan.sendSeq # chan.sendWin AND BufCount(chan.sendBuf) > 0 THEN
	      (* We can send some data. *)
	      flags := flags + ChannelFlags{ChannelFlag.Data};
	    END;
	    IF flags # ChannelFlags{} THEN  (* Something to do. *)
	      FOR w := FIRST(ChannelFlag) TO LAST(ChannelFlag) DO
		IF w IN flags THEN
		  chan.flags := chan.flags - ChannelFlags{w};
		  self.lastID := id;
		  what := w;
		  RETURN TRUE;
		END;
	      END;
	    END;
	  END;
	END;
      UNTIL id = self.lastID;
    END;
    RETURN FALSE;
  END SndrScan;
***************************************************************************
 Channels. 
***************************************************************************
REVEAL
  Channel = ConnFD.T BRANDED OBJECT
    mux: T;
    id: ChannelID;
    state: ChannelState;
    flags: ChannelFlags;
    error: AtomList.T;
    rdReady: Thread.Condition;
    wrReady: Thread.Condition;
    (* Sender state variables. *)
    sendBuf: Buffer;
    sendSeq: Word.T;    (* Next byte number to send (MOD 2^32). *)
    sendWin: Word.T;    (* Allowed to advance sendSeq this far (MOD 2^32). *)
    sendMSS: CARDINAL;	(* Allowed to send data packets this large. *)
    (* Receiver state variables. *)
    recvBuf: Buffer;
    recvSeq: Word.T;    (* Next byte number for the application (MOD 2^32). *)
    recvMSS: CARDINAL;  (* Peer should never send us data packets larger. *)
  METHODS
    init(mux: T;
         id: ChannelID): Channel := ChanInit;
  OVERRIDES
    get := ChanGet;
    put := ChanPut;
    shutdownIn := ChanShutdownIn;
    shutdownOut := ChanShutdownOut;
    close := ChanClose;
  END;
TYPE
  ChannelState = {
    Unused,
    Listening,
    Connecting,
    Established,
    RdClosed,      (* Reading half has been closed. *)
    WrClosed,      (* Writing half has been closed. *)
    Closed
  };
  ChannelFlag = {  (* Ordered from most urgent to least urgent. *)
    Connect,  (* Must send a connect packet. *)
    Accept,   (* Must send an accept packet. *)
    Reset,    (* Must send a Reset packet. *)
    Window,   (* Must send a window update packet. *)
    Data,     (* Must send a data packet. *)
    Close     (* Must send a close packet. *)
  };
  ChannelFlags = SET OF ChannelFlag;
PROCEDURE ChanInit (self: Channel;
                   mux: T;
                   id: ChannelID): Channel =
  BEGIN
    self.mux := mux;
    self.id := id;
    self.state := ChannelState.Unused;
    self.flags := ChannelFlags{};
    self.rdReady := NEW(Thread.Condition);
    self.wrReady := NEW(Thread.Condition);
    self.error := NIL;
    self.sendBuf := NEW(Buffer).init(SendBufSize);
    self.sendSeq := 0;
    self.sendWin := 0;
    self.sendMSS := 0;
    self.recvBuf := NEW(Buffer).init(RecvBufSize);
    self.recvSeq := 0;
    self.recvMSS := MaxSegSize;
    RETURN self;
  END ChanInit;
PROCEDURE ChanGet (self: Channel;
                  VAR arr: ARRAY OF CHAR;
                  waitFor: LONGREAL := -1.0D0): CARDINAL
  RAISES {Rd.Failure, Thread.Alerted, ConnFD.TimedOut} =
  VAR
    count: CARDINAL;
    n: CARDINAL;
  BEGIN
    (* FIXME - We only handle a "waitFor" of -1.0d0 and 0.0d0. *)
    IF NUMBER(arr) = 0 THEN RETURN 0 END;
    LOCK self DO
      LOOP
        IF self.error # NIL THEN
          RAISE Rd.Failure(self.error);
        END;
	count := BufCount(self.recvBuf);
	CASE self.state OF
	| ChannelState.Established, ChannelState.WrClosed =>
	    IF count > 0 THEN EXIT END;
	| ChannelState.RdClosed, ChannelState.Closed =>
	    EXIT;
	ELSE
	  RAISE Rd.Failure(AtomList.List1(TCP.Closed));
	END;
	IF waitFor >= 0.0d0 THEN RAISE ConnFD.TimedOut END;
	Thread.AlertWait(self, self.rdReady);
      END;
      n := MIN(count, NUMBER(arr));
      BufGet(self.recvBuf, SUBARRAY(arr, 0, n));
      Trace("G  " & Fmt.Int(self.id) & " " & Fmt.Int(n)
	& " [" & Fmt.Int(BufCount(self.recvBuf)) & "]");
      self.recvSeq := Word.Extract(Word.Plus(self.recvSeq, n), 0, 32);
      self.flags := self.flags + ChannelFlags{ChannelFlag.Window};
    END;
    AwakenSender(self.mux);
    RETURN n;
  END ChanGet;
PROCEDURE ChanPut (self: Channel;
                  READONLY arr: ARRAY OF CHAR)
  RAISES {Thread.Alerted, Wr.Failure} =
  VAR
    pos := 0;
    avail: CARDINAL;
    n: CARDINAL;
  BEGIN
    WHILE pos < NUMBER(arr) DO
      LOCK self DO
	LOOP
          IF self.error # NIL THEN
            RAISE Wr.Failure(self.error);
          END;
	  IF self.state # ChannelState.Established
	  AND self.state # ChannelState.RdClosed THEN
	    RAISE Wr.Failure(AtomList.List1(TCP.Closed));
	  END;
	  avail := BufAvail(self.sendBuf);
	  IF avail > 0 THEN EXIT END;
	  Thread.AlertWait(self, self.wrReady);
	END;
	n := MIN(avail, NUMBER(arr) - pos);
	BufPut(self.sendBuf, SUBARRAY(arr, pos, n));
	Trace("P  " & Fmt.Int(self.id) & " " & Fmt.Int(n)
	  & " [" & Fmt.Int(BufCount(self.sendBuf)) & "]");
	INC(pos, n);
      END;
      AwakenSender(self.mux);
    END;
  END ChanPut;
PROCEDURE ChanShutdownIn (<*UNUSED*> self: Channel) =
  BEGIN
    (* Ignored for now. *)
  END ChanShutdownIn;
PROCEDURE ChanShutdownOut (self: Channel)
  RAISES {Wr.Failure} =
  VAR
    awakenSender := FALSE;
  BEGIN
    LOCK self DO
      CASE self.state OF
      | ChannelState.Established =>
          self.state := ChannelState.WrClosed;
	  self.flags := self.flags + ChannelFlags{ChannelFlag.Close};
	  awakenSender := TRUE;
      | ChannelState.RdClosed =>
          self.state := ChannelState.Closed;
	  self.flags := self.flags + ChannelFlags{ChannelFlag.Close};
	  awakenSender := TRUE;
      | ChannelState.WrClosed, ChannelState.Closed =>  (* Be tolerant. *)
          RETURN;
      ELSE
        RAISE Wr.Failure(AtomList.List1(TCP.Closed));
      END;
    END;
    IF awakenSender THEN AwakenSender(self.mux) END;
    (* It seems like we ought to wait here for the send buffer to empty
       out.  But the "ConnFD" interface doesn't define this method as
       alertable, and we don't want any possibility of blocking forever. *)
  END ChanShutdownOut;
PROCEDURE ChanClose (self: Channel) =
  BEGIN
    TRY self.shutdownOut() EXCEPT ELSE END;
    TRY self.shutdownIn() EXCEPT ELSE END;
  END ChanClose;
***************************************************************************
 Circular buffers. 
***************************************************************************
TYPE
  Buffer = OBJECT
    buf: REF ARRAY OF CHAR;
    in, out: CARDINAL;
  METHODS
    init(size: CARDINAL): Buffer := BufInit;
  END;
PROCEDURE BufInit (self: Buffer; size: CARDINAL): Buffer =
  BEGIN
    self.buf := NEW(REF ARRAY OF CHAR, size + 1);
    self.in := 0;
    self.out := 0;
    RETURN self;
  END BufInit;
PROCEDURE BufSize (self: Buffer): CARDINAL =
Returns the maximum capacity of the given buffer in bytes.
  BEGIN
    RETURN NUMBER(self.buf^) - 1;
  END BufSize;
PROCEDURE BufAvail (self: Buffer): CARDINAL =
  VAR
    avail: INTEGER;
  BEGIN
    avail := self.out - self.in - 1;
    IF avail < 0 THEN INC(avail, NUMBER(self.buf^)) END;
    RETURN avail;
  END BufAvail;
PROCEDURE BufCount (self: Buffer): CARDINAL =
  VAR
    count: INTEGER;
  BEGIN
    count := self.in - self.out;
    IF count < 0 THEN INC(count, NUMBER(self.buf^)) END;
    RETURN count;
  END BufCount;
PROCEDURE BufPut (self: Buffer; READONLY a: ARRAY OF CHAR) =
  VAR
    newIn: CARDINAL;
  BEGIN
    WITH len1 = NUMBER(self.buf^) - self.in DO
      IF len1 >= NUMBER(a) THEN  (* Not wrapping around. *)
	SUBARRAY(self.buf^, self.in, NUMBER(a)) := a;
      ELSE  (* Wrapping around. *)
	SUBARRAY(self.buf^, self.in, len1) := SUBARRAY(a, 0, len1);
	WITH len2 = NUMBER(a) - len1 DO
	  SUBARRAY(self.buf^, 0, len2) := SUBARRAY(a, len1, len2);
	END;
      END;
    END;
    newIn := self.in + NUMBER(a);
    IF newIn >= NUMBER(self.buf^) THEN DEC(newIn, NUMBER(self.buf^)) END;
    self.in := newIn;
  END BufPut;
PROCEDURE BufGet (self: Buffer; VAR a: ARRAY OF CHAR) =
  VAR
    newOut: CARDINAL;
  BEGIN
    WITH len1 = NUMBER(self.buf^) - self.out DO
      IF len1 >= NUMBER(a) THEN  (* Not wrapping around. *)
	a := SUBARRAY(self.buf^, self.out, NUMBER(a));
      ELSE
	SUBARRAY(a, 0, len1) := SUBARRAY(self.buf^, self.out, len1);
	WITH len2 = NUMBER(a) - len1 DO
	  SUBARRAY(a, len1, len2) := SUBARRAY(self.buf^, 0, len2);
	END;
      END;
    END;
    newOut := self.out + NUMBER(a);
    IF newOut >= NUMBER(self.buf^) THEN DEC(newOut, NUMBER(self.buf^)) END;
    self.out := newOut;
  END BufGet;
***************************************************************************
 Packet I/O. 
***************************************************************************
PROCEDURE***************************************************************************PutStartupRequest (mux: T; version: CARDINAL) RAISES {Thread.Alerted, Wr.Failure} = BEGIN PutPacketType(mux, PacketType.StartupRequest); Put2(mux, version); Wr.Flush(mux.wr); END PutStartupRequest; PROCEDUREPutStartupReply (mux: T; version: CARDINAL) RAISES {Thread.Alerted, Wr.Failure} = BEGIN PutPacketType(mux, PacketType.StartupReply); Put2(mux, version); Wr.Flush(mux.wr); END PutStartupReply; PROCEDUREPutConnect (mux: T; id: ChannelID; mss: CARDINAL; window: Word.T) RAISES {Thread.Alerted, Wr.Failure} = BEGIN PutPacketType(mux, PacketType.Connect); Put1(mux, id); Put2(mux, mss); Put4(mux, window); Wr.Flush(mux.wr); END PutConnect; PROCEDUREPutAccept (mux: T; id: ChannelID; mss: CARDINAL; window: Word.T) RAISES {Thread.Alerted, Wr.Failure} = BEGIN PutPacketType(mux, PacketType.Accept); Put1(mux, id); Put2(mux, mss); Put4(mux, window); Wr.Flush(mux.wr); END PutAccept; PROCEDUREPutReset (mux: T; id: ChannelID) RAISES {Thread.Alerted, Wr.Failure} = BEGIN PutPacketType(mux, PacketType.Reset); Put1(mux, id); Wr.Flush(mux.wr); END PutReset; PROCEDUREPutWindow (mux: T; id: ChannelID; window: Word.T) RAISES {Thread.Alerted, Wr.Failure} = BEGIN PutPacketType(mux, PacketType.Window); Put1(mux, id); Put4(mux, window); Wr.Flush(mux.wr); Trace(">W " & Fmt.Int(id) & " " & Fmt.Unsigned(window, 10)); END PutWindow; PROCEDUREPutClose (mux: T; id: ChannelID) RAISES {Thread.Alerted, Wr.Failure} = BEGIN PutPacketType(mux, PacketType.Close); Put1(mux, id); Wr.Flush(mux.wr); Trace(">C " & Fmt.Int(id)); END PutClose;
PROCEDURE***************************************************************************GetStartupRequest (mux: T; VAR (*OUT*) version: CARDINAL) RAISES {Rd.EndOfFile, Rd.Failure, Thread.Alerted} = BEGIN version := Get2(mux); END GetStartupRequest; PROCEDUREGetStartupReply (mux: T; VAR (*OUT*) version: CARDINAL) RAISES {Rd.EndOfFile, Rd.Failure, Thread.Alerted} = BEGIN version := Get2(mux); END GetStartupReply; PROCEDUREGetConnect (mux: T; VAR (*OUT*) id: ChannelID; VAR (*OUT*) mss: CARDINAL; VAR (*OUT*) window: Word.T) RAISES {Rd.EndOfFile, Rd.Failure, Thread.Alerted} = BEGIN id := Get1(mux); mss := Get2(mux); window := Get4(mux); END GetConnect; PROCEDUREGetAccept (mux: T; VAR (*OUT*) id: ChannelID; VAR (*OUT*) mss: CARDINAL; VAR (*OUT*) window: Word.T) RAISES {Rd.EndOfFile, Rd.Failure, Thread.Alerted} = BEGIN id := Get1(mux); mss := Get2(mux); window := Get4(mux); END GetAccept; PROCEDUREGetWindow (mux: T; VAR (*OUT*) id: ChannelID; VAR (*OUT*) window: Word.T) RAISES {Rd.EndOfFile, Rd.Failure, Thread.Alerted} = BEGIN id := Get1(mux); window := Get4(mux); Trace("<W " & Fmt.Int(id) & " " & Fmt.Unsigned(window, 10)); END GetWindow; PROCEDUREGetClose (mux: T; VAR (*OUT*) id: ChannelID) RAISES {Rd.EndOfFile, Rd.Failure, Thread.Alerted} = BEGIN id := Get1(mux); Trace("<C " & Fmt.Int(id)); END GetClose;
PROCEDURE***************************************************************************PutPacketType (mux: T; type: PacketType) RAISES {Thread.Alerted, Wr.Failure} = BEGIN Put1(mux, ORD(type)); END PutPacketType; PROCEDUREPut1 (mux: T; v: Word.T) RAISES {Thread.Alerted, Wr.Failure} = BEGIN Wr.PutChar(mux.wr, VAL(v, CHAR)); END Put1; PROCEDUREPut2 (mux: T; v: Word.T) RAISES {Thread.Alerted, Wr.Failure} = BEGIN Wr.PutString(mux.wr, ARRAY [0..1] OF CHAR{ VAL(Word.Extract(v, 8, 8), CHAR), VAL(Word.Extract(v, 0, 8), CHAR)}); END Put2; PROCEDUREPut4 (mux: T; v: Word.T) RAISES {Thread.Alerted, Wr.Failure} = BEGIN Wr.PutString(mux.wr, ARRAY [0..3] OF CHAR{ VAL(Word.Extract(v, 24, 8), CHAR), VAL(Word.Extract(v, 16, 8), CHAR), VAL(Word.Extract(v, 8, 8), CHAR), VAL(Word.Extract(v, 0, 8), CHAR)}); END Put4;
PROCEDURE***************************************************************************NeedPacketType (mux: T; type: PacketType) RAISES {Rd.EndOfFile, Rd.Failure, Thread.Alerted} = BEGIN IF GetPacketType(mux) # type THEN RAISE Rd.Failure(AtomList.List1(ProtocolError)); END; END NeedPacketType; PROCEDUREGetPacketType (mux: T): PacketType RAISES {Rd.EndOfFile, Rd.Failure, Thread.Alerted} = VAR v := Get1(mux); BEGIN IF v > ORD(LAST(PacketType)) THEN RAISE Rd.Failure(AtomList.List1(ProtocolError)); END; RETURN VAL(v, PacketType); END GetPacketType; PROCEDUREGet1 (mux: T): Word.T RAISES {Rd.EndOfFile, Rd.Failure, Thread.Alerted} = BEGIN RETURN ORD(Rd.GetChar(mux.rd)); END Get1; PROCEDUREGet2 (mux: T): Word.T RAISES {Rd.EndOfFile, Rd.Failure, Thread.Alerted} = VAR a: ARRAY[0..1] OF CHAR; v: Word.T; BEGIN IF Rd.GetSub(mux.rd, a) # NUMBER(a) THEN RAISE Rd.EndOfFile; END; v := ORD(a[0]); v := Word.Or(Word.LeftShift(v, 8), ORD(a[1])); RETURN v; END Get2; PROCEDUREGet4 (mux: T): Word.T RAISES {Rd.EndOfFile, Rd.Failure, Thread.Alerted} = VAR a: ARRAY[0..3] OF CHAR; v: Word.T; BEGIN IF Rd.GetSub(mux.rd, a) # NUMBER(a) THEN RAISE Rd.EndOfFile; END; v := ORD(a[0]); v := Word.Or(Word.LeftShift(v, 8), ORD(a[1])); v := Word.Or(Word.LeftShift(v, 8), ORD(a[2])); v := Word.Or(Word.LeftShift(v, 8), ORD(a[3])); RETURN v; END Get4; PROCEDUREGetN (mux: T; VAR arr: ARRAY OF CHAR) RAISES {Rd.EndOfFile, Rd.Failure, Thread.Alerted} = BEGIN IF Rd.GetSub(mux.rd, arr) # NUMBER(arr) THEN RAISE Rd.EndOfFile; END; END GetN;
PROCEDURERaise (a: Atom.T) RAISES {IP.Error} = BEGIN RAISE IP.Error(AtomList.List1(a)); END Raise; BEGIN ProtocolError := Atom.FromText("ChannelMux.ProtocolError"); END ChannelMux.