WebSockets
One of the optional capabilities that a backend can support are websockets (see backends summary). Websocket requests are described exactly the same as regular requests, starting with basicRequest, adding headers, specifying the request method and uri.
A websocket request will be sent instead of a regular one if the response specification includes handling the response as a websocket. Depending on the backend you are using, there are three variants of websocket response specifications: synchronous, asynchronous and streaming. To use them, add one of the following imports:
import sttp.client4.ws.sync.*if you are using a synchronous backend (such asDefaultSyncBackend), without any effect wrappersimport sttp.client4.ws.async.*if you are using an asynchronous backend (e.g. based onFutures orIOs)import sttp.client4.ws.stream.*if you want to handle web socket messages using a non-blocking stream (e.g.fs2.Streamorakka.stream.scaladsl.Source)
The above imports will bring into scope a number of asWebSocket(...) methods, giving a couple of variants of working with websockets. Alternatively, you can extend the SttpWebSocketSyncApi, SttpWebSocketAsyncApi or SttpWebSocketStreamApi traits, to group all used sttp client features within a single object.
Refer to the documentation of individual backends for additional notes, or restrictions, when using WebSockets.
Using WebSocket
The first possibility of interacting with web sockets is using sttp.client4.ws.SyncWebSocket (sync variant), or sttp.ws.WebSocket[F] (async variant), where F is the backend-specific effects wrapper, such as Future or IO. These classes contain two basic methods:
def receive: WebSocketFrame(optionally wrapped withF[_]in the async variant) which will complete once a message is available, and return the next incoming frame (which can be a data, ping, pong or close)def send(f: WebSocketFrame, isContinuation: Boolean = false): Unit(again optionally wrapped withF[_]), which sends a message to the websocket. TheWebSocketFramecompanion object contains methods for creating binary/text messages. When using fragmentation, the first message should be sent usingfinalFragment = false, and subsequent messages usingisContinuation = true.
The SyncWebSocket / WebSocket classes also contain other methods for receiving only text/binary messages, as well as automatically sending Pong responses when a Ping is received.
The following response specifications which use SyncWebSocket are available in the sttp.client4.ws.sync object (the second type parameter of WebSocketResponseAs specifies the type returned as the response body):
import sttp.client4.*
import sttp.client4.ws.SyncWebSocket
import sttp.model.ResponseMetadata
import sttp.shared.Identity
// when using import sttp.client4.ws.sync.*
def asWebSocket[T](f: SyncWebSocket => T):
WebSocketResponseAs[Identity, Either[String, T]] = ???
def asWebSocketOrFail[T](f: SyncWebSocket => T):
WebSocketResponseAs[Identity, T] = ???
def asWebSocketWithMetadata[T](
f: (SyncWebSocket, ResponseMetadata) => T
): WebSocketResponseAs[Identity, Either[String, T]] = ???
def asWebSocketUnsafe:
WebSocketResponseAs[Identity, Either[String, SyncWebSocket]] = ???
def asWebSocketAlwaysUnsafe:
WebSocketResponseAs[Identity, SyncWebSocket] = ???
The first variant, asWebSocket, passes an open SyncWebSocket to the user-provided function. This function should only return once interaction with the websocket is finished. The backend can then safely close the websocket. The value that’s returned as the response body is either an error (represented as a String), in case the websocket upgrade didn’t complete successfully, or the value returned by the websocket-interacting method.
The second (asWebSocketOrFail) is similar, but any errors due to failed websocket protocol upgrades are represented as exceptions.
The remaining two variants return the open SyncWebSocket directly, as the response body. It is then the responsibility of the client code to close the websocket, once it’s no longer needed.
Similar response specifications, but using an effect wrapper and WebSocket[F], are available in the sttp.client4.ws.async objet.
See also the examples, which include examples involving websockets.
Using non-blocking, asynchronous streams
Another possibility is to work with websockets by providing a streaming stage, which transforms incoming data frames into outgoing frames. This can be e.g. a Pekko Flow or a fs2 Pipe.
The following response specifications are available:
import sttp.client4.*
import sttp.capabilities.{Streams, WebSockets}
import sttp.ws.WebSocketFrame
// when using import sttp.client4.ws.stream._
def asWebSocketStream[S](s: Streams[S])(p: s.Pipe[WebSocketFrame.Data[_], WebSocketFrame]):
WebSocketStreamResponseAs[Either[String, Unit], S] = ???
def asWebSocketStreamOrFail[S](s: Streams[S])(p: s.Pipe[WebSocketFrame.Data[_], WebSocketFrame]):
WebSocketStreamResponseAs[Unit, S] = ???
Using streaming websockets requires the backend to support the given streaming capability (see also streaming). Streaming capabilities are described as implementations of Streams[S], and are provided by backend implementations, e.g. PekkoStreams or Fs2Streams[F].
When working with streams of websocket frames keep in mind that a text payload may be fragmented into multiple frames.
sttp provides two useful methods (fromTextPipe, fromTextPipeF) for each backend to aggregate these fragments back into complete messages.
These methods can be found in corresponding WebSockets classes for given effect type:
effect type |
class name |
|---|---|
|
|
|
|
|
|
Using blocking, synchronous Ox streams
Ox is a Scala 3 toolkit that allows you to handle concurrency and resiliency in direct
style, leveraging Java’s 21+ virtual threads. If you’re using Ox with sttp, you can use the DefaultSyncBackend from
sttp-core for HTTP communication. The ox integration module allows handling WebSockets as Ox’s Flow:
// sbt dependency
"com.softwaremill.sttp.client4" %% "ox" % "4.0.23"
The runWebSocketPipe function from that module accepts a SyncWebSocket, as well as a function, which takes a Flow
of messages received from the server, and produces a combined Flow which should both consume the incoming messages, and
produce outgoing messages to be sent:
import sttp.client4.*
import sttp.client4.impl.ox.ws.runWebSocketPipe
import sttp.client4.ws.sync.*
val backend = DefaultSyncBackend()
basicRequest
.get(uri"wss://ws.postman-echo.com/raw")
// you need to provide a Flow[WebSocketFrame] => Flow[WebSocketFrame] function
.response(asWebSocket(ws => runWebSocketPipe(ws)(incoming => ???)))
.send(backend)
See the full example here.
Read more about Ox, structured concurrency and Flows on the project website.
Using channels
Alternatively, you can obtain a lower-level Source + Sink, which allows directly reading/writing from the WebSocket using Ox’s channels:
import ox.*
import sttp.client4.ws.SyncWebSocket
import sttp.client4.impl.ox.ws.asSourceAndSink
def useWebSocket(ws: SyncWebSocket): Unit =
supervised {
// (Source[WebSocketFrame], Sink[WebSocketFrame])
val (wsSource, wsSink) = asSourceAndSink(ws)
// ...
}
Make sure that the Source is continually read. This will guarantee that server-side Close signal is received and handled.
If you don’t want to process frames from the server, you can at least handle it with a fork { source.drain() }.
You don’t need to manually call ws.close() when using this approach, this will be handled automatically underneath,
according to following rules:
If the request
Sinkis closed due to an upstream error, aCloseframe is sent, and theSourcewith incoming responses gets completed asDone.If the request
Sinkcompletes asDone, aCloseframe is sent, and the responseSinkkeeps receiving responses until the server closes communication.If the response
Sourceis closed by aCloseframe from the server or due to an error, the request Sink is closed asDone, which will still send all outstanding buffered frames, and then finish.
Compression
For those who plan to use a lot of websocket traffic, you could consider websocket compression, however it’s often not supported:
OkHttp based backends
supports compression (default: not enabled)
akka-http backend
Compression is not yet available, to track Akka developments in this area, see this issue.