@@ -713,3 +713,121 @@ jobs:
713713 RESTRICT: args.restrict,
714714 SORTTAB : args.sortTab
715715 }
716+
717+ # Defines a HTTP SSE (Server-Sent Events) service
718+ #
719+ # -----------------------
720+ - name : HTTP SSE Service
721+ deps :
722+ - HTTP Start Server
723+ to : HTTP Apply Service
724+ typeArgs :
725+ noLog : false
726+ shortcut :
727+ name : httpdSSE
728+ keyArg : port
729+ args :
730+ uri : uri
731+ execSSE : execSSE
732+ audit : audit
733+ auditTemplate : auditTemplate
734+ error : error
735+ execPre : execPre
736+ help :
737+ text : Provides a Server-Sent Events (SSE) streaming endpoint. If using sequential jobs do ensure that the job 'HTTP Start Server' gets executed first.
738+ expects :
739+ - name : port
740+ desc : (Number) The port where the server was made available (default is 8091)
741+ - name : uri
742+ desc : (String) The URI where the SSE endpoint will be available.
743+ - name : execSSE
744+ desc : (Function) The SSE execution function. Receives (writer, request) where writer has write(event, data) and close() methods.
745+ - name : audit
746+ desc : (Boolean) Turns request audit logging on (default true)
747+ - name : auditTemplate
748+ desc : (String) Provide an audit logging template based on the request argument.
749+ - name : error
750+ desc : (Boolean) If true will return the error in case of exception.
751+ - name : execPre
752+ desc : (String) A pre-execution code include to execSSE useful for authentication
753+ exec : | # js
754+ args.port = _$(args.port).isNumber().default(8091)
755+ args.uri = _$(args.uri).isString().default("/sse")
756+ args.audit = _$(args.audit).default(true)
757+ args.execPre = _$(args.execPre).isString().default(__)
758+ args.auditTemplate = _$(args.auditTemplate).default("AUDIT SSE | {{method}} {{uri}} ({{header.remote-addr}}; {{header.user-agent}})")
759+
760+ if (isUnDef(args.execPre) && isDef(global.__ojobPreRoutes[args.port])) {
761+ args.execPre = global.__ojobPreRoutes[args.port]
762+ } else {
763+ args.execPre = ""
764+ }
765+
766+ var fn = __
767+ if (args.audit) {
768+ fn = (req, reply) => {
769+ var data = merge(req, {
770+ reply: {
771+ status : reply.status,
772+ mimetype: reply.mimetype
773+ }
774+ })
775+ try {
776+ tlog(args.auditTemplate, data)
777+ } catch(e) {
778+ logErr("Error on auditing SSE access: " + String(e))
779+ }
780+ }
781+ }
782+
783+ global.__ojobRoutes[args.port][args.uri] = function(r) {
784+ try {
785+ var pis = new java.io.PipedInputStream()
786+ var pos = new java.io.PipedOutputStream(pis)
787+ var _closed = false
788+
789+ var writer = {
790+ write: function(event, data) {
791+ if (_closed) return
792+ var msg = "event: " + event + "\n"
793+ msg += "data: " + JSON.stringify(data) + "\n\n"
794+ pos.write(af.fromString2Bytes(msg))
795+ pos.flush()
796+ },
797+ close: function() {
798+ if (_closed) return
799+ _closed = true
800+ try { pos.close() } catch(e) {}
801+ }
802+ }
803+
804+ $do(function() {
805+ try {
806+ (new Function("var writer = arguments[0]; var request = arguments[1]; var server = global.__ojobHttp[" + args.port + "]; var port = " + String(args.port) + "; var uri = '" + args.uri + "'; " + args.execPre + "; " + args.execSSE))(writer, r)
807+ } catch(e) {
808+ writer.write("error", { message: String(e) })
809+ if (args.error) logErr("SSE error: " + String(e))
810+ } finally {
811+ writer.close()
812+ }
813+ })
814+
815+ var res = {
816+ status : 200,
817+ mimetype: "text/event-stream",
818+ stream : pis,
819+ header : {
820+ "Cache-Control" : "no-cache, no-transform",
821+ "Connection" : "keep-alive",
822+ "Content-Encoding" : "identity",
823+ "X-Accel-Buffering": "no"
824+ }
825+ }
826+ if (fn) fn(r, res)
827+ return res
828+ } catch(e) {
829+ if (args.error) {
830+ return { data: String(e), mimetype: "text/plain", status: 500, header: {} }
831+ }
832+ }
833+ }
0 commit comments