-*- Mode: Modula-3 -*-
*
* For information about this program, contact Blair MacIntyre
* (bm@cs.columbia.edu) or Steven Feiner (feiner@cs.columbia.edu)
* at the Computer Science Dept., Columbia University,
* 1214 Amsterdam Ave. Mailstop 0401, New York, NY, 10027.
*
* Copyright (C) 1995, 1996 by The Trustees of Columbia University in the
* City of New York. Blair MacIntyre, Computer Science Department.
* See file COPYRIGHT-COLUMBIA for details.
*
* Author : Blair MacIntyre
* Created On : Mon Feb 20 17:43:14 1995
* Last Modified By: Blair MacIntyre
* Last Modified On: Sat Aug 9 13:47:54 1997
* Update Count : 76
*
* $Source: /usr/cvs/cm3/m3-comm/rdwr/src/RdWrPipe.m3,v $
* $Date: 2001-12-02 00:35:21 $
* $Author: wagner $
* $Revision: 1.2 $
*
* $Log: RdWrPipe.m3,v $
* Revision 1.2 2001-12-02 00:35:21 wagner
* add copyright notes and fix overrides for cm3
*
* added: rdwr/COPYRIGHT-COLUMBIA
* added: rdwr/src/COPYRIGHT-COLUMBIA
* added: rdwr/src/m3overrides
* modified: rdwr/src/RdWrPipe.i3
* modified: rdwr/src/RdWrPipe.m3
* modified: rdwr/src/SimpleMsgRW.i3
* modified: rdwr/src/SimpleMsgRW.m3
* modified: rdwr/src/TeeWr.i3
* modified: rdwr/src/TeeWr.m3
*
* Revision 1.1.1.1 2001/12/02 00:29:10 wagner
* Blair MacIntyre's rdwr library
*
* Revision 1.2 1997/08/11 20:36:21 bm
* Various fixes
*
*
* HISTORY
MODULE RdWrPipe;
IMPORT Rd, Wr, RdClass, WrClass, Thread, Atom, AtomList, IO, Fmt;
UNUSED, but want to keep the following revelation honest.
IMPORT UnsafeWr, UnsafeRd; <*NOWARN*>
FROM Thread IMPORT Alerted;
Since we need to use the Mutex properties of Rd.T and Wr.T, we
should actually import UnsafeWr and UnsafeRd. We need to add the
following revelations, as the comment in UnsafeRd points out, if we
want to include both the Unsafe* and *Class interfaces.
REVEAL RdClass.Private <: MUTEX;
REVEAL WrClass.Private <: MUTEX;
CONST
TYPE
(* the shared data between the reader and writer corresponds to the
locking elements and the pointers into the shared buffer. The closed
flag is used for the reader/writer to communicate if they are
close()d. *)
(* If there is no data in the buffer, first_i = next_i, and the reader
may block. Otherwise, first_i points at the first character in the
shared buffer available for reading, and next_i points at the the
first character available for writing. If (next_i + 1) MOD size =
first_i, the buffer is full, and the writer may block.
At any given time, rd.st will be first_i. rd.cur <= rd.hi will fall
in the area between first_i and next_i.
wr.st will be next_i. wr.cur <= wr.hi will fall in the free area
between next_i and first_i.
wr.flush() will sync the writer, so that next_i, wr.st and wr.lo will
be advanced to correspond to wr.cur, "flushing" the buffered writer
output so the reader can get at it. *)
SharedData = RECORD
name: TEXT; (* an optional name *)
mu: Thread.Mutex; (* for controlling access to this *)
cv: Thread.Condition; (* when it's full or empty *)
first_i: CARDINAL := 0; (* first index of data *)
next_i : CARDINAL := 0; (* last index of part in use *)
size : CARDINAL; (* size of buffer *)
closed : BOOLEAN := FALSE; (* is this buffer open? *)
END;
(* our reader and writer will share a buffer and an instance of the
SharedData RECORD *)
RdT = Rd.T OBJECT
share: REF SharedData;
OVERRIDES
seek := RdSeek;
length := RdLength;
close := RdClose;
END;
WrT = Wr.T OBJECT
share: REF SharedData;
OVERRIDES
seek := WrSeek;
close := WrClose;
flush := WrFlush;
END;
<*UNUSED*>
PROCEDURE RdChanged(t: TEXT; rd: RdT) =
BEGIN
IO.Put(t & "Rd("&rd.share.name&") changed: \n lo = " &
Fmt.Unsigned(rd.lo,10) & ", hi = " &
Fmt.Unsigned(rd.hi,10) & ", cur = " &
Fmt.Unsigned(rd.cur,10) & ", st = " &
Fmt.Unsigned(rd.st,10) & "\n share.first_i = " &
Fmt.Unsigned(rd.share.first_i,10) & ", share.next_i = " &
Fmt.Unsigned(rd.share.next_i,10) & ", share.size = " &
Fmt.Unsigned(rd.share.size,10) & "\n");
END RdChanged;
<*UNUSED*>
PROCEDURE WrChanged(t: TEXT; wr: WrT) =
BEGIN
IO.Put(t & "Wr("&wr.share.name&") changed: \n lo = " &
Fmt.Unsigned(wr.lo,10) & ", hi = " &
Fmt.Unsigned(wr.hi,10) & ", cur = " &
Fmt.Unsigned(wr.cur,10) & ", st = " &
Fmt.Unsigned(wr.st,10) & "\n share.first_i = " &
Fmt.Unsigned(wr.share.first_i,10) & ", share.next_i = " &
Fmt.Unsigned(wr.share.next_i,10) & ", share.size = " &
Fmt.Unsigned(wr.share.size,10) & "\n");
END WrChanged;
PROCEDURE New (VAR rd : Rd.T;
VAR wr : Wr.T;
buff_size: CARDINAL := BufferSize; nm : TEXT := NIL) =
VAR
info := NEW(REF SharedData, mu := NEW(Thread.Mutex),
cv := NEW(Thread.Condition), size := buff_size);
shared_buff := NEW(REF ARRAY OF CHAR, buff_size);
BEGIN
IF nm = NIL THEN nm := "" END;
info.name := nm;
rd := NEW(RdT, buff := shared_buff, share := info);
wr := NEW(WrT, buff := shared_buff, share := info);
rd.st := 0;
rd.lo := 0;
rd.cur := 0;
rd.hi := 0;
rd.closed := FALSE;
rd.seekable := FALSE;
rd.intermittent := TRUE;
wr.st := 0;
wr.lo := 0;
wr.cur := 0;
wr.hi := buff_size-1;
wr.closed := FALSE;
wr.seekable := FALSE;
wr.buffered := TRUE;
END New;
EXCEPTION Error; <*FATAL Error*>
PROCEDURE RdSeek (rd: RdT; pos: CARDINAL;
dontBlock: BOOLEAN):
RdClass.SeekResult RAISES {Alerted} =
VAR do_signal := FALSE;
BEGIN
(* This file is not seekable, so only handle the special case. *)
IF pos # rd.hi OR pos # rd.cur THEN RAISE Error; END;
LOCK rd.share.mu DO
(* Check the obvious: rd.st should correspond to the start of the
data area of the shared buffer. *)
<*ASSERT rd.st = rd.share.first_i *>
(* cur_i is the position of rd.cur in the shared data buffer,
which is the next character we want to read. *)
WITH cur_i = (rd.st + rd.cur - rd.lo) MOD rd.share.size DO
(* Set rd.lo to rd.cur, since we are done with all of the current
buffer contents. Move rd.st to cur_i, to keep things in sync in
case we raise an Alert below and leave early. *)
rd.lo := rd.cur;
rd.st := cur_i;
(* Before updating first_i, check if the buffer was full. If so,
then there may be blocked writers, so arrange to signal them.
Since the buffer is completely full, we'll handle this special
case explicitely. *)
IF ((rd.share.next_i + 1) MOD rd.share.size) = rd.share.first_i THEN
(* move first_i to the next value to be read, freeing up the
stuff that was already read. *)
rd.share.first_i := cur_i;
(* If the buffer is empty now, we have to signal the writer
now, since we might block below. *)
IF rd.share.first_i = rd.share.next_i THEN
(* IO.Put("RdSeek: Buffer was full. Empty now.\n"); *)
Thread.Signal(rd.share.cv);
ELSE
(* IO.Put("RdSeek: Buffer was full. Partial now.\n"); *)
do_signal := TRUE;
END;
ELSE
(* IO.Put("RdSeek: Buffer was not full.\n"); *)
(* the buffer is not completely full, so we see if there is
anything to be gotten. First, set first_i, in case we are
Alerted in the Wait below. *)
rd.share.first_i := cur_i;
END;
(* As long as there is nothing more to read, wait. *)
WHILE rd.share.first_i = rd.share.next_i DO
(* Don't block if they don't want us to. *)
IF dontBlock THEN RETURN RdClass.SeekResult.WouldBlock END;
(* before blocking, check if the other end has closed *)
IF rd.share.closed THEN RETURN RdClass.SeekResult.Eof END;
(* wait for more data! An Alert will pop us all the way
out. If the writer closes, we will be signaled, and
go through the loop again, this time terminating above. *)
(* IO.Put("RdSeek: Buffer is empty. Wait with state:\n");*)
(* RdChanged(rd.share.name, rd);*)
Thread.AlertWait(rd.share.mu, rd.share.cv);
END;
IF rd.share.first_i <= rd.share.next_i THEN
(* move rd.hi to rd.share.next_i *)
INC(rd.hi, rd.share.next_i - cur_i);
ELSE
(* available part of buffer wraps, so move rd.hi to one
after the end. We will wrap next time. *)
INC(rd.hi, rd.share.size - cur_i);
END;
END;
(* RdChanged(rd.share.name, rd); *)
END;
(* we signal outside the MUTEX, as suggested in the threads article, to
prevent spurious context switches *)
IF do_signal THEN Thread.Signal(rd.share.cv); END;
RETURN RdClass.SeekResult.Ready;
END RdSeek;
PROCEDURE RdLength (<*UNUSED*>rd: RdT): INTEGER =
BEGIN
RETURN -1;
END RdLength;
PROCEDURE RdClose (rd: RdT) RAISES {} =
BEGIN
LOCK rd.share.mu DO
IF NOT rd.share.closed THEN
(* in case the writer is blocked *)
Thread.Signal(rd.share.cv);
END;
rd.share.closed := TRUE;
rd.share := NIL;
END;
END RdClose;
PROCEDURE WrSeek (wr: WrT; pos: CARDINAL) RAISES {Wr.Failure, Alerted} =
BEGIN
(* This file is not seekable, so only handle the special case. *)
IF pos # wr.hi OR pos # wr.cur THEN RAISE Error; END;
(* first, call flush so that the shared buffer and wr are in sync.
Also, flush will raise an exception if the reader has closed. *)
wr.flush();
LOCK wr.share.mu DO
(* Check the obvious: wr.st should correspond to the start of the
free area of the shared buffer. *)
<*ASSERT wr.st = wr.share.next_i *>
(* cur_i is the position of wr.cur in the shared data buffer.
Because of the flush() above, wr.cur = wr.lo, so cur_i =
wr.st. We'll leave it as it is here, for now, though. *)
WITH cur_i = (wr.st + wr.cur - wr.lo) MOD wr.share.size DO
WHILE ((wr.share.next_i + 1) MOD wr.share.size) = wr.share.first_i DO
(* IO.Put("WrSeek: Buffer is full. Wait with state:\n"); *)
(* WrChanged(wr.share.name, wr);*)
(* Wait for more buffer space. An Alert will pop us all the
way out. *)
Thread.AlertWait(wr.share.mu, wr.share.cv)
END;
(* wr.st corresponds to the first part of the free buffer, starting
at wr.share.next_i. There will always be at least one "free"
character, corresponding to wr.share.next_i, which always points
at a valid spot in the buffer. *)
IF wr.share.first_i <= wr.share.next_i THEN
(* the rest of the buffer is open, since the first_i used is
earlier in the buffer: in other words, the hole wraps around
the end. Mark everything to the end as available, except
perhaps our one dummy byte if first_i is 0. *)
IF wr.share.first_i > 0 THEN
INC(wr.hi, wr.share.size - cur_i);
ELSE
INC(wr.hi, wr.share.size - cur_i - 1);
END;
ELSE
(* mark everything from the beginning of the buffer to 1 before
first_i as available *)
INC(wr.hi, wr.share.first_i - 1 - cur_i);
END;
END;
(* WrChanged(wr.share.name, wr); *)
END;
END WrSeek;
PROCEDURE WrFlush (wr: WrT) RAISES {Wr.Failure} =
VAR do_signal: BOOLEAN := FALSE;
BEGIN
LOCK wr.share.mu DO
(* before wasting time doing any work, if the other end has
closed then the write fails. *)
IF wr.share.closed THEN
RAISE Wr.Failure(AtomList.List1(Atom.FromText("reader closed")));
END;
(* if there is anything to flush, let's flush! *)
IF wr.cur > wr.lo THEN
WITH cur_i = (wr.st + wr.cur - wr.lo) MOD wr.share.size DO
IF wr.share.first_i = wr.share.next_i THEN
(* IO.Put("WrFlush: Buffer was empty.\n"); *)
(* The buffer was empty, so signal any blocked readers. *)
do_signal := TRUE;
ELSE
(* IO.Put("WrFlush: Buffer was not empty.\n");*)
END;
(* We want to move next_i and wr.st ahead so that the buffered
stuff is now "available" in the shared buffer. So, we need to
change next_i, wr.st to cur_i, and then move wr.lo to wr.cur,
so that it corresponds to index of wr.st. The case of cur_i
having hit the end of the buffer, and needing to wrap is
handled by the MOD above. *)
wr.share.next_i := cur_i;
wr.st := cur_i;
wr.lo := wr.cur;
END;
END;
END;
IF do_signal THEN Thread.Signal(wr.share.cv); END;
END WrFlush;
PROCEDURE WrClose (wr: WrT) RAISES {} =
BEGIN
LOCK wr.share.mu DO
IF NOT wr.share.closed THEN
(* in case the reader is blocked *)
Thread.Signal(wr.share.cv);
END;
wr.share.closed := TRUE;
wr.share := NIL;
END;
END WrClose;
PROCEDURE ResetRdCounter (rd: Rd.T) =
BEGIN
TYPECASE rd OF
| RdT (r) =>
LOCK rd DO DEC(r.cur, r.lo); DEC(r.hi, r.lo); r.lo := 0; END;
ELSE (* Skip *)
END;
END ResetRdCounter;
PROCEDURE ResetWrCounter (wr: Wr.T) =
BEGIN
TYPECASE wr OF
| WrT (w) =>
LOCK wr DO DEC(w.cur, w.lo); DEC(w.hi, w.lo); w.lo := 0; END;
ELSE (* Skip *)
END;
END ResetWrCounter;
BEGIN
END RdWrPipe.