Last modified on Thu Jan 26 14:04:03 PST 1995 by kalsow modified on Fri Jun 18 17:36:13 PDT 1993 by wobber modified on Tue Jun 1 00:18:27 PDT 1993 by swart modified on Wed Apr 21 13:13:51 PDT 1993 by mcjones modified on Thu Jan 28 12:00:11 PST 1993 by mjordan modified on Sat Jun 27 15:10:10 PDT 1992 by muller modified on Thu Nov 2 18:18:07 1989 by gnelson modified on Thu Nov 2 18:18:07 1989 by manasse
MODULEA writer wr is *inactive* if wr.cur=wr.hi=wr.lo, and *active* otherwise. Notice that if wr is inactive, c(wr) = target(wr) (by WrClass.V2) and therefore can be ignored by the worker thread that does the flushing. Our strategy is to keep a list containing all active writers. The worker thread wakes up periodically and checks all the writers on the list, flushing them if they have gone unflushed for too long. The delay between wake ups is the equal to minimum delay of any active writer. When the list is empty, the worker blocks.; IMPORT Wr, WrClass, Time, Thread, Process; FROM Wr IMPORT Failure; FROM Thread IMPORT Alerted; FROM UnsafeWr IMPORT FastClose, FastFlush, FastLength; AutoFlushWr
CONST Forever = LAST(LONGREAL); Default = 0.1d0; VAR mu := NEW (Thread.Mutex); (* mu protects the next four variables *) (* LL(wr) < LL(mu) for all wr: T *) active := NEW(T); minDelay: Time.T := Forever; c := NEW (Thread.Condition); (* that active is nonempty *) worker: Thread.T := NIL;The comment LL(wr) < LL(mu) means that the locking level of mu is greater than the locking level of any auto-flush writer. This means that a thread must not try to acquire a writer lock while it holds mu.
The first object on the active list is a sentinal; thus the list is empty if active.link = NIL. The value of minDelay is the minimum delay of any writer on the active list.
REVEAL T = Public BRANDED OBJECT child: Wr.T; delay: Time.T; alarm: Time.T; (* readonly if onQ is true *) onQ : BOOLEAN; (* protected by mu *) link : T; (* protected by mu *) OVERRIDES seek := Seek; length := Length; flush := Flush; close := Close; init := Init; END; PROCEDURECopy (from, to: Wr.T) =
Copy the public fields fromfrom
toto
. We don't necessarily need to copy all of them, but we do anyway. Caller handles locking.
BEGIN to.buff := from.buff; to.st := from.st; to.cur := from.cur; to.lo := from.lo; to.hi := from.hi; to.closed := from.closed; to.seekable := from.seekable; to.buffered := from.buffered; END Copy;For wr of type T, wr.child is the writer to which wr's writing is forwarded. wr.delay is the inverse of the frequency with which wr will be flushed. If wr is active, wr.alarm is the next time at which wr should be flushed; otherwise wr.alarm is irrelevant. The boolean wr.onQ is true iff wr is on the active list, and wr.link is the link field for the active list. The child, delay, and alarm fields are protected by wr's mutex; the onQ and link fields are protected by the global mu.
PROCEDUREThe worker thread delay is the minimum delay of any active writer. Each time it wakes up, it executes the following:Init (wr: T; ch: Wr.T; p := -1.0D0): T = BEGIN WrClass.Lock(wr); WrClass.Lock(ch); TRY Copy(ch, wr); wr.buffered := TRUE; wr.child := ch; wr.onQ := FALSE; wr.link := NIL; FINALLY WrClass.Unlock(ch); WrClass.Unlock(wr); END; IF p < 0.0D0 THEN wr.delay := Default ELSE wr.delay := p END; RETURN wr END Init; PROCEDURESeek (wr: T; n: CARDINAL) RAISES {Failure, Alerted} = VAR wasEmpty := FALSE; BEGIN WrClass.Lock(wr.child); TRY Copy(wr, wr.child); (* We don't call FastSeek here because it fails if wr.child is not seekable, even if n = wr.cur. *) wr.child.seek(n); Copy(wr.child, wr); FINALLY WrClass.Unlock(wr.child); END; LOCK mu DO IF NOT wr.onQ THEN wr.onQ := TRUE; wasEmpty := active.link = NIL; wr.link := active.link; active.link := wr; wr.alarm := Time.Now() + wr.delay; IF wasEmpty THEN minDelay := wr.delay; IF worker = NIL THEN worker := Thread.Fork(NEW(Thread.Closure, apply := Worker)) END ELSIF minDelay > wr.delay THEN minDelay := wr.delay; Thread.Alert(worker) END END END; IF wasEmpty THEN Thread.Signal(c) END END Seek; PROCEDUREFlush (wr: T) RAISES {Failure, Alerted} = BEGIN WrClass.Lock(wr.child); TRY Copy(wr, wr.child); FastFlush(wr.child); Copy(wr.child, wr); FINALLY WrClass.Unlock(wr.child); END; END Flush; PROCEDURELength (wr: T): CARDINAL RAISES {Failure, Alerted} = VAR res: CARDINAL; BEGIN WrClass.Lock(wr.child); TRY Copy(wr, wr.child); res := FastLength(wr.child); Copy(wr.child, wr); FINALLY WrClass.Unlock(wr.child); END; RETURN res; END Length; PROCEDUREClose (wr: T) RAISES {Failure, Alerted} = BEGIN WrClass.Lock(wr.child); TRY Copy(wr, wr.child); FastClose(wr.child); Copy(wr.child, wr); FINALLY WrClass.Unlock(wr.child); END; wr.buff := NIL END Close;
Let S be the set of active writers. Set minDelay := infinity; FOR wr IN S do: IF wr's timer has expired THEN Flush wr, remove it from the active list ELSE minDelay := min(minDelay, wr.delay) END ENDNotice that the active list can be enlarged while S is being processed, since clients may be operating on writers concurrently. If minDelay decreases while the worker is asleep, it is alerted.
PROCEDURENotice that Flush(wr) cannot be called while mu is locked, since this violates the locking level order. Thus wr is taken out of the lock to be flushed.Worker (<*UNUSED*>cl: Thread.Closure): REFANY RAISES {} = <*FATAL Wr.Failure, Thread.Alerted*> VAR delay, now: Time.T; s: T; wr: T; BEGIN LOOP LOCK mu DO WHILE active.link = NIL DO Thread.Wait(mu, c) END; delay := minDelay END; TRY Thread.AlertPause(delay) EXCEPT Thread.Alerted => (*skip*) END; LOCK mu DO s := active; minDelay := Forever; END; now := Time.Now(); LOOP (* s scans through the set called S above. *) LOCK mu DO IF s.link = NIL THEN EXIT END; wr := s.link; IF now > wr.alarm THEN s.link := wr.link; wr.onQ := FALSE; wr.link := NIL ELSE s := wr; wr := NIL; IF minDelay > s.delay THEN minDelay := s.delay END END END; IF wr # NIL THEN WrClass.Lock(wr); TRY IF NOT wr.closed THEN Flush(wr) END; FINALLY WrClass.Unlock(wr); END; END (* IF *); END END END Worker;
PROCEDUREShutdown () = VAR s, wr: T; <*FATAL Wr.Failure, Thread.Alerted *> BEGIN LOCK mu DO s := active; END; LOOP (* s scans through the set called S above. *) LOCK mu DO IF s.link = NIL THEN EXIT END; wr := s.link; s.link := wr.link; wr.onQ := FALSE; wr.link := NIL END; WrClass.Lock(wr); TRY IF NOT wr.closed THEN Flush(wr) END; FINALLY WrClass.Unlock(wr); END; END END Shutdown; BEGIN Process.RegisterExitor(Shutdown); END AutoFlushWr.