UNSAFE MODULE*IMPORT Stdio;*; IMPORT Atom, AtomList, Ctypes, ConnFD, IP, JVBuffer, JVConverter, JVConverterF, JVFromSource, jvprotocol, OSError, Rd, RdUtils, TCP, TCPPosix, Thread, Tick, Uin, WeakRef, Wr; JVSink
CONST RetrySeconds = 30.0d0; REVEAL T = Public BRANDED OBJECT sourceHost : TEXT; info : JVFromSource.StreamInfo; quality : jvprotocol.ClientQualityVal; tcp : TCP.T := NIL; maxBuffers : CARDINAL := 0; readThread : Thread.T := NIL; readClosure: Closure := NIL; readSerial : JVBuffer.Serial; sync : CARDINAL := 0; delay : CARDINAL := 0; retryThread : Thread.T := NIL; retryClosure: Closure := NIL; retrying := FALSE; OVERRIDES init := Init; start := Start; stop := Stop; getInfo := GetInfo; close := Close; startStats := StartStats; END;{{{ -- methods --
PROCEDURE{{{ -- internal proceduresInit (t : T; hostname : TEXT; quality : Quality := DefaultQuality; maxBuffers: CARDINAL := 2; factory : JVBuffer.Factory; delay : CARDINAL ): T RAISES {OSError.E, Thread.Alerted} = BEGIN TRY EVAL JVConverter.T.init(t); t.sourceHost := hostname; t.maxBuffers := maxBuffers; t.quality := quality + FIRST(jvprotocol.ClientQualityVal); t.output := NEW(JVBuffer.Pool).init(factory, t.maxBuffers); t.delay := delay; OpenRemoteConnection(t); EXCEPT | OSError.E (e) => RAISE OSError.E(AtomList.Cons(sinkError, e)); END; RETURN t; END Init; PROCEDUREStart (t: T) RAISES {JVConverter.Error, Thread.Alerted} = BEGIN <* ASSERT t.output # NIL *> TRY t.output.setSize(t.maxBuffers); Open(t); EXCEPT | OSError.E (e) => RAISE JVConverter.Error( AtomList.Cons(Atom.FromText("JVSink.Start"), e)); END; END Start; PROCEDUREStop (t: T) RAISES {JVConverter.Error, Thread.Alerted} = BEGIN TRY t.output.setSize(0); EXCEPT | OSError.E (al) => RAISE JVConverter.Error( AtomList.Cons(Atom.FromText("JVSink.Stop"), al)); END; InnerClose(t, TRUE); END Stop; PROCEDUREGetInfo (t: T; VAR info: JVFromSource.StreamInfo): BOOLEAN = BEGIN LOCK t DO WITH res = info.serial < t.info.serial DO info := t.info; RETURN res; END; END; END GetInfo; PROCEDUREClose (t: T) = BEGIN LOCK t DO InnerClose(t, TRUE); t.output.signalClosed(); t.output := NIL; END; END Close;
PROCEDURE}}}InnerClose (t: T; noRetry := FALSE) = (* unlocked implementation of Close() *) BEGIN IF NOT t.retrying OR noRetry THEN WITH thread = t.retryThread DO IF thread # NIL THEN Thread.Alert(thread); Thread.Release(t); EVAL Thread.Join(thread); (* don't do anything until the thread has finished *) END; END; END; WITH thread = t.readThread DO IF thread # NIL THEN Thread.Alert(thread); Thread.Release(t); EVAL Thread.Join(thread); (* ditto *) Thread.Acquire(t); END; END; t.tcp.close(); t.tcp := NIL; END InnerClose; PROCEDUREStartStats (t: T) = BEGIN LOCK t DO IF t.statistics = NIL THEN t.statistics := NEW(JVConverter.Statistics); END; t.statistics.framesStarted := 0; t.statistics.framesProcessed := 0; t.statistics.timesBlocked := 0; END; END StartStats;
}}} {{{ -- read thread --
TYPE Closure = Thread.Closure OBJECT t: T; OVERRIDES apply := ReadApply; END; CONST AnySize = BYTESIZE(jvprotocol.AnyHeader); PROCEDURE}}} {{{ -- procedures to read from server --ReadApply (cl: Closure): REFANY = (* control data from the server currently is thrown away *) PROCEDURE GetControl (netlength: Ctypes.int) RAISES {Rd.Failure, Thread.Alerted} = VAR length := Uin.ntohl(netlength); BEGIN IF length > jvprotocol.MaxControlMsgSize THEN RAISE Rd.Failure(NIL); END; GetMinBytes(t.tcp, SUBARRAY(control, 0, length)); END GetControl; VAR t := cl.t; buffer: jvprotocol.Header; any := LOOPHOLE(ADR(buffer[0]), jvprotocol.AnyHeaderPtr); control: jvprotocol.ControlBuffer; jvbuff : JVFromSource.T := NIL; length : CARDINAL; retryp : BOOLEAN; failure: AtomList.T; BEGIN TRY retryp := FALSE; failure := NIL; LOOP IF jvbuff = NIL OR t.paused > 0 THEN (* don't start reading until we have a buffer in which to put the frame data *) LOCK t DO IF t.statistics # NIL THEN INC(t.statistics.timesBlocked); END; SendToSource(t.tcp, jvprotocol.ClientSuspendStream); END; IF jvbuff = NIL THEN jvbuff := NARROW(t.output.getFreeBuffer(TRUE), JVFromSource.T); END; LOCK t DO WHILE t.paused > 0 DO Thread.AlertWait(t, t.pauseEvent); END; SendToSource(t.tcp, jvprotocol.ClientResumeStream); END; END; LOCK t DO GetHeader(t.tcp, buffer); (* {{{ -- dispatch header *) CASE any.type OF | jvprotocol.JVP_VIDEO => IF t.statistics # NIL THEN INC(t.statistics.framesStarted); END; GetFrame(t, buffer, jvbuff); jvbuff.info := t.info; jvbuff.ready := NewReady(t.tcp); t.output.insert(jvbuff); jvbuff := NIL; IF t.statistics # NIL THEN INC(t.statistics.framesProcessed); END; jvbuff := NARROW(t.output.getFreeBuffer(), JVFromSource.T); (* returns NIL if none available *) | jvprotocol.JVP_RESOLUTION => (* skip *) | jvprotocol.JVP_TYPE => WITH type = LOOPHOLE(any, jvprotocol.VideoTypePtr), si = t.info DO INC(si.serial); si.qfactor := Uin.ntohl(type.qfactor); si.width := Uin.ntohl(type.width); si.height := Uin.ntohl(type.height); END; | jvprotocol.JVP_SYNC => WITH sync = LOOPHOLE(any, jvprotocol.SyncFramePtr) DO (* if this is the reply we're waiting for, send credits to restart stream *) IF t.sync = Uin.ntohl(sync.code) THEN t.sync := 0; END; END; | jvprotocol.JVP_QUALITIES => length := LOOPHOLE(any, jvprotocol.QualitiesFramePtr).length; GetControl(Uin.ntohl(length)); | jvprotocol.JVP_ERROR => length := LOOPHOLE(any, jvprotocol.ErrorFramePtr).length; GetControl(Uin.ntohl(length)); | jvprotocol.JVP_INFO => length := LOOPHOLE(any, jvprotocol.InfoFramePtr).length; GetControl(Uin.ntohl(length)); | jvprotocol.JVP_VIDEO_UDP_REQUEST, jvprotocol.JVP_VIDEO_UDP_RESPONSE => | jvprotocol.JVP_AUDIO, jvprotocol.JVP_STATUS, jvprotocol.JVP_ENDMARK, jvprotocol.JVP_AUDIO_SILENCE, jvprotocol.JVP_AUDIO_MULTICAST, jvprotocol.JVP_VIDEO_UDP_DATA => RAISE Rd.Failure(NIL); ELSE RAISE Rd.Failure(NIL); END; (* }}} *) END; END; EXCEPT | Thread.Alerted => (* skip *) | Rd.Failure (f) => failure := f; retryp := TRUE; | Wr.Failure (f) => failure := f; retryp := TRUE; | OSError.E (f) => failure := f; retryp := TRUE; END; IF jvbuff # NIL THEN jvbuff.free(); END; IF failure # NIL THEN WITH txt = RdUtils.FailureText(failure) DO IF txt # NIL THEN JVConverter.ReportError( "JVSink, read from " & t.sourceHost & txt); END; END; END; LOCK t DO t.readThread := NIL; IF retryp THEN Retry(t); END; END; RETURN NIL; END ReadApply;
PROCEDUREGetMinBytes (t: TCP.T; VAR buff: ARRAY OF CHAR) RAISES {Rd.Failure, Thread.Alerted} = (* loop until "buff" has been filled *) VAR len := NUMBER(buff); charsRead := 0; BEGIN TRY WHILE len > 0 DO WITH res = t.get(SUBARRAY(buff, charsRead, len)) DO IF res = 0 THEN RAISE ConnFD.TimedOut END; INC(charsRead, res); DEC(len, res); END; END; EXCEPT | ConnFD.TimedOut => RAISE Rd.Failure(NIL); END; END GetMinBytes; PROCEDUREGetHeader (t: TCP.T; VAR buffer: jvprotocol.Header) RAISES {Rd.Failure, Thread.Alerted} = VAR any := LOOPHOLE(ADR(buffer[0]), jvprotocol.AnyHeaderPtr); bytesLeft: CARDINAL; BEGIN GetMinBytes(t, SUBARRAY(buffer, 0, AnySize)); any.type := Uin.ntohl(any.type); CASE any.type OF | jvprotocol.JVP_FirstEvent .. jvprotocol.JVP_LastEvent => bytesLeft := jvprotocol.HdrSizes[any.type] - AnySize; IF bytesLeft <= 0 THEN RETURN; END; GetMinBytes(t, SUBARRAY(buffer, AnySize, bytesLeft)); ELSE RAISE Rd.Failure(AtomList.List1(Atom.FromText("BadHeaderType"))); END; END GetHeader; TYPE OpenArray = RECORD address: ADDRESS; count : INTEGER; END; OpenArrayPtr = UNTRACED REF ARRAY OF CHAR;
this horrible hack allows us to build an open array from an address and length. We can't use the trick with the very long fixed array because the NIL check in the subarray fails for shared memory segments
) =
Wr.PutText(Stdio.stderr, Bpt:
& s & \n
END Bpt;
PROCEDURE}}} {{{ -- retry thread --GetFrame ( t : T; READONLY buffer: jvprotocol.Header; jvb : JVBuffer.T ) RAISES {Rd.Failure, Thread.Alerted} = VAR hdr := LOOPHOLE(ADR(buffer[0]), jvprotocol.VideoFramePtr); oa : OpenArray; oap: OpenArrayPtr := ADR(oa); BEGIN <* ASSERT jvb # NIL *> jvb.serial := t.readSerial; INC(t.readSerial); jvb.timestamp.tv_sec := Uin.ntohl(hdr.timestamp.tv_sec); jvb.timestamp.tv_usec := Uin.ntohl(hdr.timestamp.tv_usec); jvb.frameLength := Uin.ntohl(hdr.length); jvb.localTime := Tick.Now(); <* ASSERT jvb.frameLength <= jvb.length *> oa.address := jvb.addr; oa.count := jvb.frameLength; GetMinBytes(t.tcp, oap^); END GetFrame;
LL >= {t}
PROCEDURE}}} {{{ -- internal procedures --Retry (t: T) = BEGIN IF t.retryThread # NIL THEN Thread.Alert(t.retryThread); END; IF t.retryClosure = NIL THEN t.retryClosure := NEW(Closure, t := t, apply := RetryApply); END; t.retrying := TRUE; t.retryThread := Thread.Fork(t.retryClosure); END Retry; PROCEDURERetryApply (cl: Closure): REFANY = VAR t := cl.t; BEGIN TRY LOOP TRY JVConverter.ReportError("JVSink, retrying " & t.sourceHost); LOCK t DO InnerClose(t); Open(t); END; EXIT; (* open succeeded *) EXCEPT | OSError.E => (* skip and loop again *) END; Thread.AlertPause(RetrySeconds); END; EXCEPT | Thread.Alerted => (*skip*) END; LOCK t DO t.retryThread := NIL; t.retrying := FALSE; END; RETURN NIL; END RetryApply;
PROCEDURE}}} {{{ -- ready closure --Open (t: T) RAISES {OSError.E, Thread.Alerted} = (* establish a connection with the server and start a thread to get frames and other info from it *) BEGIN TRY IF NOT t.retrying THEN IF t.retryThread # NIL THEN Thread.Alert(t.retryThread); END; END; OpenRemoteConnection(t); (* ask for correct quality *) SendToSource(t.tcp, t.quality); t.readSerial := 0; t.info.serial := 0; IF t.readThread = NIL THEN IF t.readClosure = NIL THEN t.readClosure := NEW(Closure, t := t); END; t.readThread := Thread.Fork(t.readClosure); END; EXCEPT | Wr.Failure (f) => RAISE OSError.E(AtomList.Cons(Atom.FromText("JVSink.Open"), f)); END; END Open; PROCEDUREOpenRemoteConnection (t: T) RAISES {OSError.E, Thread.Alerted} = VAR endpoint: IP.Endpoint; arg : Ctypes.int := 1; BEGIN IF t.tcp # NIL THEN RETURN; (*already open *) END; TRY IF NOT IP.GetHostByName(t.sourceHost, endpoint.addr) THEN RAISE OSError.E(AtomList.Cons(hostNotFound, NIL)); END; endpoint.port := jvprotocol.PORT_VIDEO; t.tcp := TCP.Connect(endpoint); WITH tcp = t.tcp DO LOCK tcp DO IF tcp.closed THEN RAISE IP.Error(AtomList.Cons(TCP.Closed, NIL)); END; (* set rcv buffer as large as possible *) arg := 64 * 1024 - 1; END; END; EXCEPT | IP.Error (ec) => RAISE OSError.E(AtomList.Cons(sinkError, ec)); | OSError.E (e) => RAISE OSError.E(AtomList.Cons(sinkError, e)); END; END OpenRemoteConnection; PROCEDURESendToSource (t: TCP.T; msg: jvprotocol.ClientRequest) RAISES {Wr.Failure, Thread.Alerted} = VAR data := LOOPHOLE(ADR(msg), UNTRACED REF ARRAY [0 .. 0] OF CHAR); BEGIN t.put(data^); END SendToSource;
this ready closure tries to send a credit back to the source if this is
the first time the apply
method has been called. It is supposed to be
called just after the Picture.Paint
call for this buffer.
TYPE ReadyClosure = JVBuffer.ReadyClosure OBJECT next : ReadyClosure; (* for free list *) tcp : TCP.T; called: BOOLEAN; (* has apply been called yet? *) tcpOpen: BOOLEAN; (* false if a call to this tcp has failed *) OVERRIDES apply := ReadyApply; END; PROCEDURE}}}ReadyApply (cl: ReadyClosure) RAISES {Thread.Alerted} = BEGIN IF cl.tcpOpen AND NOT cl.called THEN (* unlocked quick check first *) LOCK cl DO IF cl.tcp # NIL AND cl.tcpOpen AND cl.endToEnd AND NOT cl.called THEN cl.called := TRUE; TRY SendToSource(cl.tcp, FIRST(jvprotocol.ClientNumCredits)); EXCEPT | Wr.Failure => cl.tcpOpen := FALSE; END; END; END; END; END ReadyApply; PROCEDURENewReady (tcp: TCP.T): ReadyClosure = (* use this proc to get a new ReadyClosure, does some free list stuff *) VAR res: ReadyClosure := NIL; BEGIN LOCK readyMutex DO IF readyFrees # NIL THEN res := readyFrees; readyFrees := readyFrees.next; END; END; IF res = NIL THEN res := NEW(ReadyClosure); EVAL WeakRef.FromRef(res, CleanUpReady); END; res.next := NIL; res.tcp := tcp; res.called := FALSE; res.endToEnd := FALSE; res.tcpOpen := TRUE; RETURN res; END NewReady; PROCEDURECleanUpReady (<* UNUSED*> READONLY w: WeakRef.T; r: REFANY) = (* return to ready free list *) BEGIN WITH rc = NARROW(r, ReadyClosure) DO LOCK readyMutex DO rc.next := readyFrees; END; END; END CleanUpReady; VAR readyMutex := NEW(Thread.Mutex); readyFrees: ReadyClosure := NIL;
BEGIN hostNotFound := Atom.FromText("HostNotFound"); sinkError := Atom.FromText("JVSink: "); END JVSink.