Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion project/metals.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

// This file enables sbt-bloop to create bloop config files.

addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.5.3")
addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.5.4")

Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package example

import zio._
import zio.http._
import zio.http.model._
import zio.http.sse._
import zio.stream.ZStream

/**
* Example to provide content as Server-Sent Events.
*/
object ServerSentEventEndpoint extends ZIOAppDefault {

// Create a stream of Server-Sent-Events
val eventStream =
ZStream.repeatWithSchedule(ServerSentEvent.withData("myData"), Schedule.spaced(1.seconds) && Schedule.recurs(10))

// Starting the server (for more advanced startup configuration checkout `HelloWorldAdvanced`)
def run = Server.serve(app).provide(Server.default)

// Use `Http.collect` to match on route
def app: HttpApp[Any, Nothing] = Http.collect[Request] {

// Simple (non-stream) based route
case Method.GET -> !! / "health" => Response.ok

// ZStream powered response
case Method.GET -> !! / "stream" =>
ServerSentEventResponse.fromEventStreamWithHeartbeat(
status = Status.Ok,
additionalHeaders = Headers.accessControlAllowOrigin("*"),
stream = eventStream,
)
}
}
2 changes: 2 additions & 0 deletions zio-http/src/main/scala/zio/http/model/MimeDB.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7227,6 +7227,7 @@ private[zio] trait MimeDB {
lazy val `rtploopback`: MediaType =
new MediaType("text", "rtploopback", Compressible, NotBinary)
lazy val `rtx`: MediaType = new MediaType("text", "rtx", Compressible, NotBinary)
lazy val `server-sent-events`: MediaType = new MediaType("text", "event-stream", Uncompressible, NotBinary)
lazy val `sgml`: MediaType =
new MediaType("text", "sgml", Compressible, NotBinary, List("sgml", "sgm"))
lazy val `shaclc`: MediaType = new MediaType("text", "shaclc", Compressible, NotBinary)
Expand Down Expand Up @@ -7425,6 +7426,7 @@ private[zio] trait MimeDB {
`rtp-enc-aescm128`,
`rtploopback`,
`rtx`,
`server-sent-events`,
`sgml`,
`shaclc`,
`shex`,
Expand Down
85 changes: 85 additions & 0 deletions zio-http/src/main/scala/zio/http/sse/ServerSentEvent.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package zio.http.sse

sealed trait ServerSentEvent {
def toStringRepresentation: String
}

object ServerSentEvent {

import Fields._

private object Fields {
val Field = """([^:]+):? ?(.*)""".r
val DataField = "data"
val IdField = "id"
val EventField = "event"
val RetryField = "retry"
}

def parse(eventLines: List[String]): ServerSentEvent = {
val result = eventLines.foldLeft(Event(None, None, None, None)) { (sse, line) =>
line match {
case Field(DataField, data) =>
sse.copy(dataF = sse.dataF.map(s => s.concat(LF ++ data)).orElse(Some(data)))
case Field(IdField, id) => sse.copy(idF = Some(id))
case Field(EventField, event) => sse.copy(eventF = Some(event))
case Field(RetryField, retry) => sse.copy(retryF = retry.toIntOption)
case _ => sse
}
}
if (result.isEmpty) ServerSentEvent.empty else result
}

val empty: EventHeartbeat.type =
EventHeartbeat

private[sse] val LF =
"\n"

def withData(
data: String,
eventF: Option[String] = None,
idF: Option[String] = None,
retryF: Option[Int] = None,
): Event =
Event(Some(data), eventF, idF, retryF)

private[sse] case class Event private[sse] (
dataF: Option[String],
eventF: Option[String],
idF: Option[String],
retryF: Option[Int],
) extends ServerSentEvent {

/**
* @return
* A String representation of the SSE, structured for serialization and
* transport.
*/
override def toStringRepresentation: String = {
val _data = dataF.map(_.split(LF).map(line => s"${DataField}: $line").mkString(LF).concat(LF))
val _event = eventF.map(str => s"${EventField}: $str".concat(LF))
val _id = idF.map(str => s"${IdField}: $str".concat(LF))
val _retry = retryF.map(str => s"${RetryField}: $str".concat(LF))

// As per specification each field and every event as a whole are each followed by an `end-of-line` character.
Array[Option[String]](_data, _event, _id, _retry).flatten.mkString.concat(LF)
}

private[sse] val isEmpty = dataF.isEmpty && eventF.isEmpty && idF.isEmpty && retryF.isEmpty
}

private[sse] object Event {
def apply(dataF: Option[String], eventF: Option[String], idF: Option[String], retryF: Option[Int]): Event =
new Event(dataF, eventF, idF, retryF) {}
}

/**
* As per specification an `end-of-line` is a complete event which in turn can
* be used as a heartbeat.
*/
private[sse] case object EventHeartbeat extends ServerSentEvent {
override def toStringRepresentation: String = LF
}

}
17 changes: 17 additions & 0 deletions zio-http/src/main/scala/zio/http/sse/ServerSentEventBody.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package zio.http.sse

import zio.Chunk
import zio.http.Body
import zio.http.model.HTTP_CHARSET
import zio.stream.ZStream

object ServerSentEventBody {

def fromEventStream(stream: ZStream[Any, Throwable, ServerSentEvent]): Body =
Body.fromStream(serializeEventStream(stream))

private[sse] def serializeEventStream(
stream: ZStream[Any, Throwable, ServerSentEvent],
): ZStream[Any, Throwable, Byte] =
stream.map(seq => Chunk.fromArray(seq.toStringRepresentation.getBytes(HTTP_CHARSET))).flattenChunks
}
40 changes: 40 additions & 0 deletions zio-http/src/main/scala/zio/http/sse/ServerSentEventResponse.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package zio.http.sse

import zio.http.model._
import zio.http.{Body, Response}
import zio.stream.ZStream
import zio.{Schedule, durationInt}

object ServerSentEventResponse {

private[sse] val requiredHeaders =
Headers.contentType("text/event-stream; charset=utf-8") ++
Headers.cacheControl("no-cache") ++
Headers.connection("keep-alive")

def fromEventStream(
status: Status = Status.Ok,
additionalHeaders: Headers = Headers.empty,
stream: ZStream[Any, Throwable, ServerSentEvent],
): Response =
fromBody(status, additionalHeaders, ServerSentEventBody.fromEventStream(stream))

private[sse] def fromBody(
status: Status = Status.Ok,
additionalHeaders: Headers = Headers.empty,
body: Body = Body.empty,
): Response =
Response(status, requiredHeaders.combine(additionalHeaders), body)

def fromEventStreamWithHeartbeat(
status: Status = Status.Ok,
additionalHeaders: Headers = Headers.empty,
stream: ZStream[Any, Throwable, ServerSentEvent],
schedule: Schedule[Any, Any, Any] = Schedule.spaced(2.seconds),
): Response =
fromBody(status, additionalHeaders, ServerSentEventBody.fromEventStream(stream.mergeLeft(heartbeatWith(schedule))))

private def heartbeatWith(schedule: Schedule[Any, Any, Any]) =
ZStream.repeat(ServerSentEvent.empty).schedule(schedule)

}
59 changes: 59 additions & 0 deletions zio-http/src/test/scala/zio/http/sse/ServerSentEventBodySpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package zio.http.sse

import zio.Scope
import zio.http.model.HTTP_CHARSET
import zio.stream.ZStream
import zio.test.{Spec, TestEnvironment, ZIOSpecDefault, assertTrue}

object ServerSentEventBodySpec extends ZIOSpecDefault {

override def spec: Spec[TestEnvironment with Scope, Any] =
suite("ServerSentEventBody")(
test("#serializeEventStream correctly serializes a ServerSentEvent.empty as a LF") {
val eventStream = ZStream(ServerSentEvent.empty)
val newlineBytes = "\n".getBytes(HTTP_CHARSET).toList
ServerSentEventBody.serializeEventStream(eventStream).runCollect.map { byteChunk =>
assertTrue(byteChunk.toList == newlineBytes)
}
},
test("#serializeEventStream correctly serializes a SSE with minimal data followed by a LF") {
val eventStream = ZStream(ServerSentEvent.withData("myData"))
val expectedString =
"""data: myData
|
|""".stripMargin
val expectedBytes = expectedString.getBytes(HTTP_CHARSET).toList
ServerSentEventBody.serializeEventStream(eventStream).runCollect.map { byteChunk =>
assertTrue(byteChunk.toList == expectedBytes)
}
},
test("#serializeEventStream correctly serializes a sequence of minimal SSEs") {
val eventStream = ZStream(ServerSentEvent.withData("myData"), ServerSentEvent.withData("myData"))
val expectedString =
"""data: myData
|
|data: myData
|
|""".stripMargin
val expectedBytes = expectedString.getBytes(HTTP_CHARSET).toList
ServerSentEventBody.serializeEventStream(eventStream).runCollect.map { byteChunk =>
assertTrue(byteChunk.toList == expectedBytes)
}
},
test("#serializeEventStream correctly serializes heartbeats between minimal SSEs ") {
val eventStream =
ZStream(ServerSentEvent.withData("myData"), ServerSentEvent.empty, ServerSentEvent.withData("myData"))
val expectedString =
"""data: myData
|
|
|data: myData
|
|""".stripMargin
val expectedBytes = expectedString.getBytes(HTTP_CHARSET).toList
ServerSentEventBody.serializeEventStream(eventStream).runCollect.map { byteChunk =>
assertTrue(byteChunk.toList == expectedBytes)
}
},
)
}
Loading