Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
781a7cc
chore: release new version
May 10, 2025
ce286dc
Merge remote-tracking branch 'upstream/master'
May 14, 2025
dae7deb
merge
May 28, 2025
9fe3c55
Merge remote-tracking branch 'upstream/master'
Jul 15, 2025
2a76e93
Merge remote-tracking branch 'upstream/master'
Jul 31, 2025
3af0153
Merge remote-tracking branch 'upstream/master'
sleipnir Sep 9, 2025
4feee64
Merge remote-tracking branch 'upstream/master'
sleipnir Sep 11, 2025
bae8b92
Merge remote-tracking branch 'upstream/master'
sleipnir Oct 13, 2025
0cfb96d
Merge remote-tracking branch 'upstream/master'
sleipnir Oct 14, 2025
db695ec
bump 0.10.2 -> 0.11.0
sleipnir Oct 14, 2025
4465815
merge with master
sleipnir Oct 16, 2025
d592b38
refactor: better interface and simpler implementation for GRPC.Stream…
polvalente Oct 22, 2025
4ba7367
Merge remote-tracking branch 'upstream/master'
sleipnir Oct 22, 2025
322adfa
chore: remove Logger.debug/1 noise (#454)
oohnoitz Oct 22, 2025
9886e91
Merge remote-tracking branch 'upstream/master'
sleipnir Oct 23, 2025
412baf6
[Fix] content type warning (#455)
sleipnir Oct 23, 2025
af23fd8
fix: refresh error spam on direct_state (no lb) (#456)
JoeriDijkstra Oct 29, 2025
14d578a
[Feat] Error handler in streams (#451)
sleipnir Oct 30, 2025
64353db
Merge remote-tracking branch 'upstream/master'
sleipnir Nov 6, 2025
fde44f4
docs: overhaul library documentation (#458)
sleipnir Nov 7, 2025
702ddee
Merge remote-tracking branch 'upstream/master'
sleipnir Nov 7, 2025
af73254
Merge branch 'elixir-grpc:master' into master
sleipnir Nov 7, 2025
f9d4231
Merge branch 'master' of https://github.com/sleipnir/grpc
sleipnir Nov 7, 2025
5fc70b4
fix: correct struct syntax
sleipnir Nov 7, 2025
d05739a
Fix/docs struct (#459)
sleipnir Nov 7, 2025
e746473
Merge remote-tracking branch 'upstream/master'
sleipnir Nov 7, 2025
723e8a4
bump v0.11.3 -> v0.11.4
sleipnir Nov 7, 2025
3c9392f
[Feat] bump v0.11.4 (#460)
sleipnir Nov 7, 2025
4a9299f
Merge remote-tracking branch 'upstream/master'
sleipnir Nov 7, 2025
06e2ba7
Report GRPC.Errors as normal shutdowns (#461)
ollien Nov 7, 2025
e9a904d
Add `exception_log_filter` option to server (#462)
ollien Nov 7, 2025
c342db0
Fix assertless test for exception log filtering (#463)
ollien Nov 8, 2025
9c58f9c
Ensure 1 GRPC.Client.Supervisor (#466)
mrmicahcooper Nov 13, 2025
addcb4b
Merge remote-tracking branch 'upstream/master'
sleipnir Nov 14, 2025
97e1897
bump 0.11.4 -> 0.11.5
sleipnir Nov 14, 2025
83b01a2
Fix FunctionClauseError in disconnect handler
saner May 4, 2026
1f6bd68
Add regression test for disconnect with failed channels
saner May 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Changelog

## v0.11.5 (2025-11-14)

### Enhancements

* Feat add `exception_log_filter` option to server

### Bug fixes

* Fix ensure thers is only one `GRPC.Client.Supervisor`.
* Fix report `GRPC.Errors` as normal shutdowns

## v0.11.4 (2025-11-07)

### Enhancements

* Feat added new function to handle side-effects.
* Feat added error handler for unary and stream pipelines.
* Docs adds a better explanation of the different types of input.
* Docs improvements to module documentation.
* Docs livebooks added directly to the documentation.

### Bug fixes

* Fix refresh error spam on direct_state (no lb).
* Fix correct return type in doc.
131 changes: 106 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
- [Unary RPC using Stream API](#unary-rpc-using-stream-api)
- [Server-Side Streaming](#server-side-streaming)
- [Bidirectional Streaming](#bidirectional-streaming)
- [Effects and Error Handling](#effects-and-error-handling)
- [Side Effects](#side-effects-with-effect2)
- [Recovery from errors](#recovery-from-errors)
- [Unified Error Matching and Propagation](#unified-error-matching-and-propagation)
- [Application Startup](#application-startup)
- [Client Usage](#client-usage)
- [Basic Connection and RPC](#basic-connection-and-rpc)
Expand Down Expand Up @@ -101,8 +105,9 @@ defmodule HelloworldStreams.Server do
alias Helloworld.HelloReply

@spec say_unary_hello(HelloRequest.t(), GRPC.Server.Stream.t()) :: any()
def say_unary_hello(request, _materializer) do
GRPC.Stream.unary(request)
def say_unary_hello(request, materializer) do
request
|> GRPC.Stream.unary(materializer: materializer)
|> GRPC.Stream.map(fn %HelloReply{} = reply ->
%HelloReply{message: "[Reply] #{reply.message}"}
end)
Expand Down Expand Up @@ -144,28 +149,104 @@ def say_bid_stream_hello(request, materializer) do
|> GRPC.Stream.run_with(materializer)
end
```
The Stream API supports composable stream transformations via `ask`, `map`, `run` and others functions, enabling clean and declarative stream pipelines. See the table below:

| Function | Description | Parameters / Options |
|:---------------------------------|:-------------|:----------------------|
| **`from(input, opts \\\\ [])`** | Converts a gRPC stream (or list) into a `Flow` with backpressure support. Allows joining with external `GenStage` producers. | **Parameters:**<br>• `input` — stream, list, or gRPC struct.<br>**Options:**<br>• `:join_with` — PID or name of an external `GenStage` producer.<br>• `:dispatcher` — dispatcher module (default: `GenStage.DemandDispatcher`).<br>• `:propagate_context` — if `true`, propagates the materializer context.<br>• `:materializer` — the current `%GRPC.Server.Stream{}`.<br>• Other options supported by `Flow`. |
| **`unary(input, opts \\\\ [])`** | Creates a `Flow` from a single gRPC request (unary). Useful for non-streaming calls that still leverage the Flow API. | **Parameters:**<br>• `input` — single gRPC message.<br>**Options:** same as `from/2`. |
| **`to_flow(stream)`** | Returns the underlying `Flow` from a `GRPC.Stream`. If uninitialized, returns `Flow.from_enumerable([])`. | **Parameters:**<br>• `stream` — `%GRPC.Stream{}` struct. |
| **`run(stream)`** | Executes the `Flow` for a unary stream and returns the first materialized result. | **Parameters:**<br>• `stream` — `%GRPC.Stream{}` with `unary: true` option. |
| **`run_with(stream, materializer, opts \\\\ [])`** | Executes the `Flow` and sends responses into the gRPC server stream. Supports `:dry_run` for test mode without sending messages. | **Parameters:**<br>• `stream` — `%GRPC.Stream{}`.<br>• `materializer` — `%GRPC.Server.Stream{}`.<br>**Options:**<br>• `:dry_run` — if `true`, responses are not sent. |
| **`ask(stream, target, timeout \\\\ 5000)`** | Sends a request to an external process (`PID` or named process) and waits for a response (`{:response, msg}`). Returns an updated stream or an error. | **Parameters:**<br>• `stream` — `%GRPC.Stream{}`.<br>• `target` — PID or atom.<br>• `timeout` — in milliseconds. |
| **`ask!(stream, target, timeout \\\\ 5000)`** | Same as `ask/3`, but raises an exception on failure (aborts the Flow). | Same parameters as `ask/3`. |
| **`filter(stream, fun)`** | Filters items in the stream by applying a concurrent predicate function. | **Parameters:**<br>• `stream` — `%GRPC.Stream{}`.<br>• `fun` — function `(item -> boolean)`. |
| **`flat_map(stream, fun)`** | Applies a function returning a list or enumerable, flattening the results. | **Parameters:**<br>• `stream` — `%GRPC.Stream{}`.<br>• `fun` — `(item -> Enumerable.t())`. |
| **`map(stream, fun)`** | Applies a transformation function to each item in the stream. | **Parameters:**<br>• `stream` — `%GRPC.Stream{}`.<br>• `fun` — `(item -> term)`. |
| **`map_with_context(stream, fun)`** | Applies a function to each item, passing the stream context (e.g., headers) as an additional argument. | **Parameters:**<br>• `stream` — `%GRPC.Stream{}`.<br>• `fun` — `(context, item -> term)`. |
| **`partition(stream, opts \\\\ [])`** | Partitions the stream to group items by key or condition before stateful operations like `reduce/3`. | **Parameters:**<br>• `stream` — `%GRPC.Stream{}`.<br>• `opts` — partitioning options (`Flow.partition/2`). |
| **`reduce(stream, acc_fun, reducer_fun)`** | Reduces the stream using an accumulator, useful for aggregations. | **Parameters:**<br>• `stream` — `%GRPC.Stream{}`.<br>• `acc_fun` — initializer function `() -> acc`.<br>• `reducer_fun` — `(item, acc -> acc)`. |
| **`uniq(stream)`** | Emits only distinct items from the stream (no custom uniqueness criteria). | **Parameters:**<br>• `stream` — `%GRPC.Stream{}`. |
| **`uniq_by(stream, fun)`** | Emits only unique items based on the return value of the provided function. | **Parameters:**<br>• `stream` — `%GRPC.Stream{}`.<br>• `fun` — `(item -> term)` for uniqueness determination. |
| **`get_headers(stream)`** | Retrieves HTTP/2 headers from a `%GRPC.Server.Stream{}`. | **Parameters:**<br>• `stream` — `%GRPC.Server.Stream{}`.<br>**Returns:** `map` containing decoded headers. |

For a complete list of available operators see [here](lib/grpc/stream.ex).
The Stream API supports composable stream transformations via `ask`, `map`, `run` and others functions, enabling clean and declarative stream pipelines. For a complete list of available operators see [here](lib/grpc/stream.ex).

---

### Effects and Error Handling

#### Side Effects

The `effect/2` operator executes user-defined functions for each element in the stream, allowing the integration of non-transformative actions such as logging, metrics, or external notifications.

Unlike transformation operators (e.g., `map/2`), `effect/2` does not modify or filter values — it preserves the original stream while executing the provided callback safely for each emitted element.

```elixir
iex> parent = self()
iex> stream =
...> GRPC.Stream.from([1, 2, 3])
...> |> GRPC.Stream.effect(fn x -> send(parent, {:seen, x * 2}) end)
...> |> GRPC.Stream.to_flow()
...> |> Enum.to_list()
iex> assert_receive {:seen, 2}
iex> assert_receive {:seen, 4}
iex> assert_receive {:seen, 6}
iex> stream
[1, 2, 3]
```

Key characteristics:

* The callback function (`effect_fun`) is invoked for each item emitted downstream.
* The result of the callback is ignored, ensuring that the stream’s structure and values remain unchanged.
* Execution is lazy and occurs only when the stream is materialized using run/1, run_with/3, or to_flow/1.
* Exceptions raised inside the callback are captured internally, preventing interruption of the dataflow.

This operator is designed for observability, telemetry, auditing, and integration with external systems that must react to events flowing through the gRPC stream.

---

#### Recovery from errors

The `map_error/2` operator intercepts and transforms errors or exceptions emitted by previous stages in a stream pipeline.

It provides a unified mechanism for handling:

* Expected errors, such as validation or domain failures (`{:error, reason}`)
* Unexpected runtime errors, including raised or thrown exceptions inside other operators.

```elixir
iex> GRPC.Stream.from([1, 2])
...> |> GRPC.Stream.map(fn
...> 2 -> raise "boom"
...> x -> x
...> end)
...> |> GRPC.Stream.map_error(fn
...> {:error, {:exception, _reason}} ->
...> {:error, GRPC.RPCError.exception(message: "Booomm")}
...> end)
```

In this example:

* The function inside `map/2` raises an exception for the value `2`.
* `map_error/2` captures and transforms that error into a structured `GRPC.RPCError` response.
* The stream continues processing without being interrupted.

This makes map_error/2 suitable for input validation, runtime fault recovery, and user-facing error translation within gRPC pipelines.

---

#### Unified Error Matching and Propagation

All stream operators share a unified error propagation model that guarantees consistent handling of exceptions and failures across the pipeline.

This ensures that user-defined functions within the stream — whether pure transformations, side effects, or external calls — always produce a predictable and recoverable result, maintaining the integrity of the dataflow even in the presence of unexpected errors.

```elixir
def say_unary_hello(request, _materializer) do
GRPCStream.unary(request)
|> GRPCStream.ask(Transformer)
|> GRPCStream.map(fn
%HelloReply{} = reply ->
%HelloReply{message: "[Reply] #{reply.message}"}

{:error, reason} ->
{:error, GRPC.RPCError.exception(message: "error calling external process: #{inspect(reason)}")}

error ->
Logger.error("Unknown error")
error
end)
|> GRPCStream.run()
end
```

By normalizing all possible outcomes, `GRPC.Stream` ensures fault-tolerant, exception-safe pipelines where operators can freely raise, throw, or return tuples without breaking the flow execution.

This unified model allows developers to build composable and reliable streaming pipelines that gracefully recover from both domain and runtime errors.

>_NOTE_: In the example above, we could use `map_error/2` instead of `map/2` to handle error cases explicitly. However, since the function also performs a transformation on successful values, `map/2` remains appropriate and useful in this context.

---

Expand All @@ -175,7 +256,7 @@ Add the server supervisor to your application's supervision tree:

```elixir
defmodule Helloworld.Application do
@ false
@moduledoc false
use Application

@impl true
Expand Down
23 changes: 0 additions & 23 deletions examples/helloworld/.gitignore

This file was deleted.

76 changes: 0 additions & 76 deletions examples/helloworld/README.md

This file was deleted.

3 changes: 0 additions & 3 deletions examples/helloworld/config/config.exs

This file was deleted.

1 change: 0 additions & 1 deletion examples/helloworld/config/dev.exs

This file was deleted.

4 changes: 0 additions & 4 deletions examples/helloworld/config/prod.exs

This file was deleted.

1 change: 0 additions & 1 deletion examples/helloworld/config/test.exs

This file was deleted.

6 changes: 0 additions & 6 deletions examples/helloworld/lib/endpoint.ex

This file was deleted.

41 changes: 0 additions & 41 deletions examples/helloworld/lib/helloworld.pb.ex

This file was deleted.

Loading