From 0d8ea797e56ff1b393fc1cf9e1303e85753e7322 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Migda=C5=82?= Date: Fri, 23 Jan 2026 14:21:29 +0100 Subject: [PATCH 1/5] Update return values in callbacks to reflect 1.0 spec --- basic_pipeline/07_Redemands.md | 38 +++++----- broadcasting/05_RTMP_Pipeline.md | 120 +++++++++++++++---------------- 2 files changed, 79 insertions(+), 79 deletions(-) diff --git a/basic_pipeline/07_Redemands.md b/basic_pipeline/07_Redemands.md index d9de38f..c535965 100644 --- a/basic_pipeline/07_Redemands.md +++ b/basic_pipeline/07_Redemands.md @@ -10,49 +10,49 @@ To comprehensively understand the concept behind redemanding, you need to be awa ## In Source elements -In the [source elements](../glossary/glossary.md#source), there is a "side-channel", from which we can receive data. That "side-channel" can be, as in the exemplary [pipeline](../glossary/glossary.md#pipeline) we are working on, in form of a file, from which we are reading the data. In real-life scenarios it could be also, i.e. an [RTP](../glossary/glossary.md#rtp) stream received via the network. Since we have that "side-channel", there is no need to receive data via the input [pad](../glossary/glossary.md#pad) (that is why we don't have it in the source element, do we?). -The whole logic of fetching the data can be put inside the `handle_demand/5` callback - once we are asked to provide the [buffers](../glossary/glossary.md#buffer), the `handle_demand/5` callback gets called and we can provide the desired number of buffers from the "side-channel", inside the body of that callback. No processing occurs here - we get asked for the buffer and we provide the buffer, simple as that. -The redemand mechanism here lets you focus on providing a single buffer in the `handle_demand/5` body - later on, you can simply return the `:redemand` action and that action will invoke the `handle_demand/5` once again, with the updated number of buffers which are expected to be provided. Let's see it in an example - we could have such a `handle_demand/5` definition (and it wouldn't be a mistake!): +In the [source elements](../glossary/glossary.md#source), there is a "side-channel", from which we can receive data. That "side-channel" can be, as in the example [pipeline](../glossary/glossary.md#pipeline) we are working on, in the form of a file, from which we are reading the data. In real-life scenarios it could also be, i.e. an [RTP](../glossary/glossary.md#rtp) stream received via the network. Since we have that "side-channel", there is no need to receive data via the input [pad](../glossary/glossary.md#pad) (that is why we don't have it in the source element). +The whole logic of fetching the data can be put inside the `handle_demand/5` callback - once we are asked to provide the [buffers](../glossary/glossary.md#buffer), the `handle_demand/5` callback gets called and we can provide the desired number of buffers from the "side-channel" inside the body of that callback. No processing occurs here - we get asked for the buffer and we provide the buffer, simple as that. +The redemand mechanism here lets you focus on providing a single buffer in the `handle_demand/5` body - later on, you can simply return the `:redemand` action and that action will invoke `handle_demand/5` once again, with the updated number of buffers which are expected to be provided. Let's see it in an example - we could have such a `handle_demand/5` definition (and it wouldn't be a mistake!): ```elixir @impl true def handle_demand(:output, size, _unit, _context, state) do - actions = for x <- 1..size do - payload = Input.get_next() #Input.get_next() is an exemplary function which could be providing data - {:buffer, %Membrane.Buffer(payload: payload)} - end - { {:ok, actions}, state} + actions = for x <- 1..size do + payload = Input.get_next() #Input.get_next() is an example function providing data + {:buffer, %Membrane.Buffer{payload: payload}} + end + {actions, state} end ``` -As you can see in the snippet above, we need to generate the required `size` of buffers in the single `handle_demand/5` run. The logic of supplying the demand there is quite easy - but what if you would also need to check if there is enough data to provide a sufficient number of buffer? You would need to check it in advance (or try to read as much data as possible before supplying the desired number of buffers). And what if an exception occurs during the generation, before supplying all the buffers? -You would need to take under the consideration all these situations and your code would become larger and larger. +As you can see in the snippet above, we need to generate the required `size` of buffers in the single `handle_demand/5` run. The logic of supplying the demand there is quite easy - but what if you would also need to check if there is enough data to provide a sufficient number of buffers? You would need to check it in advance (or try to read as much data as possible before supplying the desired number of buffers). And what if an exception occurs during the generation, before supplying all the buffers? +You would need to consider all these situations and your code would become larger and larger. Wouldn't it be better to focus on a single buffer in each `handle_demand/5` call - and let the Membrane Framework automatically update the demand's size? This can be done in the following way: ```elixir @impl true def handle_demand(:output, _size, unit, context, state) do - payload = Input.get_next() #Input.get_next() is an exemplary function which could be providing data - actions = [buffer: %Membrane.Buffer(payload: payload), redemand: :output] - { {:ok, actions}, state} + payload = Input.get_next() + actions = [buffer: %Membrane.Buffer(payload: payload), redemand: :output] + {actions, state} end ``` ## In Filter elements -In the filter element, the situation is quite different. +In a filter element, the situation is quite different. Since the filter's responsibility is to process the data sent via the input pads and transmit it through the output pads, there is no 'side-channel' from which we could take data. That is why in normal circumstances you would transmit the buffer through the output pad in the `handle_buffer/4` callback (which means - once your element receives a buffer, you process it, and then you 'mark' it as ready to be output with the `:buffer` action). When it comes to the `handle_demand/5` action on the output pad, all you need to do is to demand the appropriate number of buffers on the element's input pad. -That behavior is easy to specify when we exactly know how many input buffers correspond to the one output buffer (recall the situation in the [Depayloader](../glossary/glossary.md#payloader-and-depayloader) of our pipeline, where we *a priori* knew, that each output buffer ([frame](../glossary/glossary.md#frame)) consists of a given number of input buffers ([packets](../glossary/glossary.md#packet))). However it becomes impossible to define if the output buffer might be a combination of a discretionary set number of input buffers. At the same time, we have dealt with an unknown number of required buffers in the OrderingBuffer implementation, where we didn't know how many input buffers do we need to demand to fulfill the missing spaces between the packets ordered in the list. How did we manage to do it? +That behavior is easy to specify when we know exactly how many input buffers correspond to one output buffer (recall the situation in the [Depayloader](../glossary/glossary.md#payloader-and-depayloader) of our pipeline, where we *a priori* knew, that each output buffer ([frame](../glossary/glossary.md#frame)) consists of a given number of input buffers ([packets](../glossary/glossary.md#packet))). However it becomes impossible to define if the output buffer might be a combination of a discretionary set number of input buffers. At the same time, we have dealt with an unknown number of required buffers in the OrderingBuffer implementation, where we didn't know how many input buffers we would need to demand to fulfill the missing spaces between the packets ordered in the list. How did we manage to do it? -We simply used the `:redemand` action! In case there was a missing space between the packets, we returned the `:redemand` action, which immediately called the `handle_demand/5` callback (implemented in a way to request for a buffer on the input pad). The fact, that that callback invocation was immediate, which means - the callback was called synchronously, right after returning from the `handle_buffer/4` callback, before processing any other message from the element's mailbox - might be crucial in some situations, since it guarantees that the demand will be done before handling any other event. +We simply used the `:redemand` action! In case there was a missing space between the packets, we returned the `:redemand` action, which immediately called the `handle_demand/5` callback (implemented in a way to request for a buffer on the input pad). The fact that that callback invocation was immediate (the callback was called synchronously, right after returning from the `handle_buffer/4` callback, before processing any other message from the element's mailbox) might be crucial in some situations, since it guarantees that the demand will be done before handling any other event. Recall the situation in the [Mixer](../glossary/glossary.md#mixer), where we were producing the output buffers right in the `handle_demand/5` callback. We needed to attempt to create the output buffer after: - updating the buffers' list in `handle_buffer/4` - updating the status of the [track](../glossary/glossary.md#track) in `handle_end_of_stream/3` - Therefore, we were simply returning the `:redemand` action, and the `handle_demand/5` was called sequentially after on, trying to produce the output buffer. + Therefore, we were simply returning the `:redemand` action, and the `handle_demand/5` was called sequentially afterwards, trying to produce the output buffer. -As you can see, redemand mechanism in filters helps us deal with situations, where we do not know how many input buffers to demand in order to be able to produce an output buffer/buffers. -In case we don't provide enough buffers in the `handle_demand/5` callback (or we are not sure that we do provide), we should call `:redemand` somewhere else (usually in the `handle_buffer/4`) to make sure that the demand is not lost. +As you can see, redemand mechanism in filters helps us deal with situations where we do not know how many input buffers to demand in order to be able to produce an output buffer/buffers. +In case we don't provide enough buffers in the `handle_demand/5` callback (or we are not sure that we do), we should call `:redemand` somewhere else (usually in the `handle_buffer/4`) to make sure that the demand is not lost. With that knowledge let's carry on with the next element in our pipeline - `Depayloader`. diff --git a/broadcasting/05_RTMP_Pipeline.md b/broadcasting/05_RTMP_Pipeline.md index 3a92dbb..34a21c2 100644 --- a/broadcasting/05_RTMP_Pipeline.md +++ b/broadcasting/05_RTMP_Pipeline.md @@ -8,20 +8,20 @@ which is invoked once the pipeline is initialized. **_`lib/rtmp_to_hls/pipeline.ex`_** ```elixir - @impl true - def handle_init(_opts) do - ... - children: %{ - src: %Membrane.RTMP.SourceBin{port: 9009}, - sink: %Membrane.HTTPAdaptiveStream.SinkBin{ - manifest_module: Membrane.HTTPAdaptiveStream.HLS, - target_window_duration: 20 |> Membrane.Time.seconds(), - muxer_segment_duration: 8 |> Membrane.Time.seconds(), - storage: %Membrane.HTTPAdaptiveStream.Storages.FileStorage{directory: "output"} - } - }, - ... - end +@impl true +def handle_init(_opts) do + ... + children: %{ + src: %Membrane.RTMP.SourceBin{port: 9009}, + sink: %Membrane.HTTPAdaptiveStream.SinkBin{ + manifest_module: Membrane.HTTPAdaptiveStream.HLS, + target_window_duration: 20 |> Membrane.Time.seconds(), + muxer_segment_duration: 8 |> Membrane.Time.seconds(), + storage: %Membrane.HTTPAdaptiveStream.Storages.FileStorage{directory: "output"} + } + }, + ... +end ``` First, we define the list of children. The following children are defined: @@ -29,7 +29,7 @@ First, we define the list of children. The following children are defined: - `:src` - a `Membrane.RTMP.SourceBin`, an RTMP server, which, according to its `:port` configuration, will be listening on port `9009`. This bin will be acting as a source for our pipeline. For more information on RTMP Source Bin please visit [the documentation](https://hexdocs.pm/membrane_rtmp_plugin/Membrane.RTMP.SourceBin.html). - `:sink` - a `Membrane.HTTPAdaptiveStream.SinkBin`, acting as a sink of the pipeline. The full documentation of that bin is available [here](https://hexdocs.pm/membrane_http_adaptive_stream_plugin/Membrane.HTTPAdaptiveStream.SinkBin.html). We need to specify some of its options: - `:manifest_module` - a module which implements [`Membrane.HTTPAdaptiveStream.Manifest`](https://hexdocs.pm/membrane_http_adaptive_stream_plugin/Membrane.HTTPAdaptiveStream.Manifest.html#c:serialize/1) behavior. A manifest allows aggregate tracks (of a different type, i.e. an audio track and a video track as well as many tracks of the same type, i.e. a few video tracks with different resolutions). For each track, the manifest holds a reference to a list of segments, which form that track. Furthermore, the manifest module is equipped with the `serialize/1` method, which allows transforming that manifest to a string (which later on can be written to a file). In that case, we use a built-in implementation of a manifest module - the `Membrane.HTTPAdaptiveStream.HLS`, designed to serialize a manifest into a form required by HLS. -- `:target_window_duriation` - that value determines the minimal manifest's duration. The oldest segments of the tracks will be removed whenever possible if persisting them would result in exceeding the manifest duration. +- `:target_window_duriation` - determines the minimal manifest's duration. The oldest segments of the tracks will be removed whenever possible if persisting them would result in exceeding the manifest duration. - `:muxer_segment_duration` - the maximal duration of a segment. Each segment of each track shouldn't exceed that value. In our case, we have decided to limit the length of each segment to 8 seconds. - `:storage` - the sink element, the module responsible for writing down the HLS playlist and manifest files. In our case, we use a pre-implemented `Membrane.HTTPAdaptiveStream.FileStorage` module, designed to write the files to the local filesystem. We configure it so that the directory where the files will be put in the `output/` directory (make sure that that directory exists as the storage module won't create it itself). @@ -39,21 +39,21 @@ After providing the children's specifications, we are ready to connect the pads **_`lib/rtmp_to_hls/pipeline.ex`_** ```elixir - @impl true - def handle_init(_opts) do - ... - links: [ - link(:src) - |> via_out(:audio) - |> via_in(Pad.ref(:input, :audio), options: [encoding: :AAC]) - |> to(:sink), - link(:src) - |> via_out(:video) - |> via_in(Pad.ref(:input, :video), options: [encoding: :H264]) - |> to(:sink) - ] - ... - end +@impl true +def handle_init(_opts) do + ... + links: [ + link(:src) + |> via_out(:audio) + |> via_in(Pad.ref(:input, :audio), options: [encoding: :AAC]) + |> to(:sink), + link(:src) + |> via_out(:video) + |> via_in(Pad.ref(:input, :video), options: [encoding: :H264]) + |> to(:sink) + ] + ... +end ``` The structure of links reflects the desired architecture of the application. @@ -67,11 +67,11 @@ The final thing that is done in the `handle_init/1` callback's implementation is **_`lib/rtmp_to_hls/pipeline.ex`_** ```elixir - @impl true - def handle_init(_opts) do - ... - { {:ok, spec: spec, playback: :playing}, %{} } - end +@impl true +def handle_init(_opts) do + ... + {[spec: spec, playback: :playing], %{}} +end ``` The first action is the `:spec` action, which spawns the children. The second action changes the playback state of the pipeline into the `:playing` - meaning, that data can start flowing through the pipeline. @@ -83,51 +83,51 @@ The pipeline is started with `Supervisor.start_link`, as a child of the applicat **_`lib/rtmp_to_hls/application.ex`_** ```elixir - @impl true - def start(_type, _args) do - children = [ - # Start the Pipeline - Membrane.Demo.RtmpToHls, - ... - ] - opts = [strategy: :one_for_one, name: RtmpToHls.Supervisor] - Supervisor.start_link(children, opts) - end +@impl true +def start(_type, _args) do + children = [ + # Start the Pipeline + Membrane.Demo.RtmpToHls, + ... + ] + opts = [strategy: :one_for_one, name: RtmpToHls.Supervisor] + Supervisor.start_link(children, opts) +end ``` ## HLS controller The files produced with the pipeline are written down to the `output/` directory. We need to make them accessible via HTTP. -The Phoenix Framework provides tools to achieve that - take a look at the `RtmpToHlsWeb.Router`: +The Phoenix Framework provides tools to achieve that - take a look at `RtmpToHlsWeb.Router`: **_`lib/rtmp_to_hls_web/router.ex`_** ```elixir scope "/", RtmpToHlsWeb do - pipe_through :browser + pipe_through :browser - get "/", PageController, :index - get "/video/:filename", HlsController, :index - end - ``` + get "/", PageController, :index + get "/video/:filename", HlsController, :index +end +``` We are directing HTTP requests on `/video/:filename` to the HlsController, whose implementation is shown below: **_`lib/rtmp_to_hls_web/controllers/hls_controller.ex`_** ```elixir defmodule RtmpToHlsWeb.HlsController do - use RtmpToHlsWeb, :controller + use RtmpToHlsWeb, :controller - alias Plug + alias Plug - def index(conn, %{"filename" => filename}) do - path = "output/#{filename}" + def index(conn, %{"filename" => filename}) do + path = "output/#{filename}" - if File.exists?(path) do - conn |> Plug.Conn.send_file(200, path) - else - conn |> Plug.Conn.send_resp(404, "File not found") - end + if File.exists?(path) do + conn |> Plug.Conn.send_file(200, path) + else + conn |> Plug.Conn.send_resp(404, "File not found") end + end end ``` From ea2b9fa0dbde09a19597d22f41e90729a10f28ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kuba=20Migda=C5=82?= <68378289+kidq330@users.noreply.github.com> Date: Mon, 26 Jan 2026 10:51:28 +0100 Subject: [PATCH 2/5] Update basic_pipeline/07_Redemands.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Ɓukasz Kita --- basic_pipeline/07_Redemands.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/basic_pipeline/07_Redemands.md b/basic_pipeline/07_Redemands.md index c535965..c7f583e 100644 --- a/basic_pipeline/07_Redemands.md +++ b/basic_pipeline/07_Redemands.md @@ -10,7 +10,7 @@ To comprehensively understand the concept behind redemanding, you need to be awa ## In Source elements -In the [source elements](../glossary/glossary.md#source), there is a "side-channel", from which we can receive data. That "side-channel" can be, as in the example [pipeline](../glossary/glossary.md#pipeline) we are working on, in the form of a file, from which we are reading the data. In real-life scenarios it could also be, i.e. an [RTP](../glossary/glossary.md#rtp) stream received via the network. Since we have that "side-channel", there is no need to receive data via the input [pad](../glossary/glossary.md#pad) (that is why we don't have it in the source element). +In the [source elements](../glossary/glossary.md#source), there is a "side-channel", from which we can receive data. That "side-channel" can be, as in the example [pipeline](../glossary/glossary.md#pipeline) we are working on, in the form of a file, from which we are reading the data. In real-life scenarios it could also be, e.g. an [RTP](../glossary/glossary.md#rtp) stream received via the network. Since we have that "side-channel", there is no need to receive data via the input [pad](../glossary/glossary.md#pad) (that is why we don't have it in the source element). The whole logic of fetching the data can be put inside the `handle_demand/5` callback - once we are asked to provide the [buffers](../glossary/glossary.md#buffer), the `handle_demand/5` callback gets called and we can provide the desired number of buffers from the "side-channel" inside the body of that callback. No processing occurs here - we get asked for the buffer and we provide the buffer, simple as that. The redemand mechanism here lets you focus on providing a single buffer in the `handle_demand/5` body - later on, you can simply return the `:redemand` action and that action will invoke `handle_demand/5` once again, with the updated number of buffers which are expected to be provided. Let's see it in an example - we could have such a `handle_demand/5` definition (and it wouldn't be a mistake!): From df869520b558fde0dac2b298b4d777f044ae43bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Migda=C5=82?= Date: Mon, 26 Jan 2026 11:27:40 +0100 Subject: [PATCH 3/5] Update rtmp_to_hls pipeline spec --- broadcasting/05_RTMP_Pipeline.md | 67 +++++++++++++------------------- 1 file changed, 28 insertions(+), 39 deletions(-) diff --git a/broadcasting/05_RTMP_Pipeline.md b/broadcasting/05_RTMP_Pipeline.md index 34a21c2..102998e 100644 --- a/broadcasting/05_RTMP_Pipeline.md +++ b/broadcasting/05_RTMP_Pipeline.md @@ -11,71 +11,60 @@ which is invoked once the pipeline is initialized. @impl true def handle_init(_opts) do ... - children: %{ - src: %Membrane.RTMP.SourceBin{port: 9009}, - sink: %Membrane.HTTPAdaptiveStream.SinkBin{ + spec = [ + child(:src, %Membrane.RTMP.SourceBin{socket: 9009}) + |> via_out(:audio) + |> via_in(Pad.ref(:input, :audio), + options: [encoding: :AAC, segment_duration: Membrane.Time.seconds(4)] + ) + |> child(:sink, %Membrane.HTTPAdaptiveStream.SinkBin{ manifest_module: Membrane.HTTPAdaptiveStream.HLS, - target_window_duration: 20 |> Membrane.Time.seconds(), - muxer_segment_duration: 8 |> Membrane.Time.seconds(), + target_window_duration: :infinity, + persist?: false, storage: %Membrane.HTTPAdaptiveStream.Storages.FileStorage{directory: "output"} - } - }, + }), + get_child(:src) + |> via_out(:video) + |> via_in(Pad.ref(:input, :video), + options: [encoding: :H264, segment_duration: Membrane.Time.seconds(4)] + ) + |> get_child(:sink) + ] ... end ``` First, we define the list of children. The following children are defined: -- `:src` - a `Membrane.RTMP.SourceBin`, an RTMP server, which, according to its `:port` configuration, will be listening on port `9009`. This bin will be acting as a source for our pipeline. For more information on RTMP Source Bin please visit [the documentation](https://hexdocs.pm/membrane_rtmp_plugin/Membrane.RTMP.SourceBin.html). +- `:src` - a `Membrane.RTMP.SourceBin` RTMP server, which, according to its `:port` configuration, will be listening on port `9009`. This bin will be acting as a source for our pipeline. For more information on RTMP Source Bin please visit [the documentation](https://hexdocs.pm/membrane_rtmp_plugin/Membrane.RTMP.SourceBin.html). - `:sink` - a `Membrane.HTTPAdaptiveStream.SinkBin`, acting as a sink of the pipeline. The full documentation of that bin is available [here](https://hexdocs.pm/membrane_http_adaptive_stream_plugin/Membrane.HTTPAdaptiveStream.SinkBin.html). We need to specify some of its options: - `:manifest_module` - a module which implements [`Membrane.HTTPAdaptiveStream.Manifest`](https://hexdocs.pm/membrane_http_adaptive_stream_plugin/Membrane.HTTPAdaptiveStream.Manifest.html#c:serialize/1) behavior. A manifest allows aggregate tracks (of a different type, i.e. an audio track and a video track as well as many tracks of the same type, i.e. a few video tracks with different resolutions). For each track, the manifest holds a reference to a list of segments, which form that track. Furthermore, the manifest module is equipped with the `serialize/1` method, which allows transforming that manifest to a string (which later on can be written to a file). In that case, we use a built-in implementation of a manifest module - the `Membrane.HTTPAdaptiveStream.HLS`, designed to serialize a manifest into a form required by HLS. -- `:target_window_duriation` - determines the minimal manifest's duration. The oldest segments of the tracks will be removed whenever possible if persisting them would result in exceeding the manifest duration. -- `:muxer_segment_duration` - the maximal duration of a segment. Each segment of each track shouldn't exceed that value. In our case, we have decided to limit the length of each segment to 8 seconds. +- `:target_window_duration` - determines the minimal manifest's duration. The oldest segments of the tracks will be removed whenever possible if persisting them would result in exceeding the manifest duration. - `:storage` - the sink element, the module responsible for writing down the HLS playlist and manifest files. In our case, we use a pre-implemented `Membrane.HTTPAdaptiveStream.FileStorage` module, designed to write the files to the local filesystem. We configure it so that the directory where the files will be put in the `output/` directory (make sure that that directory exists as the storage module won't create it itself). The fact that the configuration of a pipeline, which performs relatively complex processing, consists of just two elements, proves the power of [bins](/basic_pipeline/12_Bin.md). Feel free to stop for a moment and read about them if you haven't done it yet. -After providing the children's specifications, we are ready to connect the pads between these children. Take a look at that part of the code: -**_`lib/rtmp_to_hls/pipeline.ex`_** +At the same time, we configure the pads linking the two bins. +`:src` has two output pads: the `:audio` pad and the `:video` pad, transferring the appropriate media tracks. +The source's `:audio` pad is linked to the input `:audio` pad of the sink - along with an `:encoding` option and a `:segment_duration`. +- `:encoding` is an atom, describing the codec which is used to encode the media data - when it comes to audio data, we will be using the AAC codec. +- `:segment_duration` specifies the maximal duration of a segment. Each segment of each track shouldn't exceed that value. In our case, we have decided to limit the length of each segment to 4 seconds. -```elixir -@impl true -def handle_init(_opts) do - ... - links: [ - link(:src) - |> via_out(:audio) - |> via_in(Pad.ref(:input, :audio), options: [encoding: :AAC]) - |> to(:sink), - link(:src) - |> via_out(:video) - |> via_in(Pad.ref(:input, :video), options: [encoding: :H264]) - |> to(:sink) - ] - ... -end -``` +At the time of writing, the available codecs accepted by `:encoding` are `:H264` and `:H265` for video, and `:AAC` for audio. -The structure of links reflects the desired architecture of the application. -`:src` has two output pads: the `:audio` pad and the `:video` pad, transferring the appropriate media tracks. -The source's `:audio` pad is linked to the input `:audio` pad of the sink - along with the `:encoding` option. That option is an atom, describing the codec which is used to encode the media data - when it comes to audio data, -we will be using AAC coded. -At the time of the writing, only `:H264` and `:AAC` codecs are available to be passed as an `:encoding` option - the first one is used with video data, and the second one is used with audio data. -By analogy, the source's `:video` pad is linked with the sink's `:video` pad - and the `:encoding` to be used is H264. +We refer to previously defined elements using `get_child` to also configure the `:video` pads, using `:H264` as the preferred encoding and a segment duration of 4 seconds. -The final thing that is done in the `handle_init/1` callback's implementation is returning the desired actions: +The final thing that is done in the `handle_init/1` callback's implementation is returning the pipeline structure through the `:spec` action: **_`lib/rtmp_to_hls/pipeline.ex`_** ```elixir @impl true def handle_init(_opts) do ... - {[spec: spec, playback: :playing], %{}} + {[spec: spec], %{}} end ``` -The first action is the `:spec` action, which spawns the children. The second action changes the playback state of the pipeline into the `:playing` - meaning, that data can start flowing through the pipeline. - ## Starting the pipeline The pipeline is started with `Supervisor.start_link`, as a child of the application, inside the `lib/rtmp_to_hls/application.ex` file: From 929757569ae508f1145cbeeddd5c4ab17caf431b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Migda=C5=82?= Date: Mon, 2 Feb 2026 11:33:24 +0100 Subject: [PATCH 4/5] Changes after 2nd round of review --- broadcasting/05_RTMP_Pipeline.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/broadcasting/05_RTMP_Pipeline.md b/broadcasting/05_RTMP_Pipeline.md index 102998e..004fd65 100644 --- a/broadcasting/05_RTMP_Pipeline.md +++ b/broadcasting/05_RTMP_Pipeline.md @@ -2,14 +2,14 @@ In this chapter, we will discuss the multimedia-specific part of the application ## The pipeline -Let's start with `lib/rtmp_to_hls/pipeline.ex` file. All the logic is put inside the [`Membrane.Pipeline.handle_init/1`](https://hexdocs.pm/membrane_core/Membrane.Pipeline.html#c:handle_init/1) callback, +Let's start with `lib/rtmp_to_hls/pipeline.ex` file. All the logic is put inside the [`Membrane.Pipeline.handle_init/2`](https://hexdocs.pm/membrane_core/Membrane.Pipeline.html#c:handle_init/2) callback, which is invoked once the pipeline is initialized. **_`lib/rtmp_to_hls/pipeline.ex`_** ```elixir @impl true -def handle_init(_opts) do +def handle_init(_context) do ... spec = [ child(:src, %Membrane.RTMP.SourceBin{socket: 9009}) @@ -19,7 +19,7 @@ def handle_init(_opts) do ) |> child(:sink, %Membrane.HTTPAdaptiveStream.SinkBin{ manifest_module: Membrane.HTTPAdaptiveStream.HLS, - target_window_duration: :infinity, + target_window_duration: Membrane.Time.seconds(20), persist?: false, storage: %Membrane.HTTPAdaptiveStream.Storages.FileStorage{directory: "output"} }), @@ -54,7 +54,7 @@ At the time of writing, the available codecs accepted by `:encoding` are `:H264` We refer to previously defined elements using `get_child` to also configure the `:video` pads, using `:H264` as the preferred encoding and a segment duration of 4 seconds. -The final thing that is done in the `handle_init/1` callback's implementation is returning the pipeline structure through the `:spec` action: +The final thing that is done in the `handle_init/2` callback's implementation is returning the pipeline structure through the `:spec` action: **_`lib/rtmp_to_hls/pipeline.ex`_** ```elixir From d04878ed42b6f7e4b6e7d9d01da223d6ff6ae2b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Migda=C5=82?= Date: Mon, 2 Feb 2026 11:34:58 +0100 Subject: [PATCH 5/5] Add missing opts to handle_init --- broadcasting/05_RTMP_Pipeline.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/broadcasting/05_RTMP_Pipeline.md b/broadcasting/05_RTMP_Pipeline.md index 004fd65..6cce58e 100644 --- a/broadcasting/05_RTMP_Pipeline.md +++ b/broadcasting/05_RTMP_Pipeline.md @@ -9,7 +9,7 @@ which is invoked once the pipeline is initialized. ```elixir @impl true -def handle_init(_context) do +def handle_init(_context, _opts) do ... spec = [ child(:src, %Membrane.RTMP.SourceBin{socket: 9009}) @@ -44,9 +44,10 @@ First, we define the list of children. The following children are defined: The fact that the configuration of a pipeline, which performs relatively complex processing, consists of just two elements, proves the power of [bins](/basic_pipeline/12_Bin.md). Feel free to stop for a moment and read about them if you haven't done it yet. -At the same time, we configure the pads linking the two bins. +At the same time, we configure the pads linking the two bins. `:src` has two output pads: the `:audio` pad and the `:video` pad, transferring the appropriate media tracks. The source's `:audio` pad is linked to the input `:audio` pad of the sink - along with an `:encoding` option and a `:segment_duration`. + - `:encoding` is an atom, describing the codec which is used to encode the media data - when it comes to audio data, we will be using the AAC codec. - `:segment_duration` specifies the maximal duration of a segment. Each segment of each track shouldn't exceed that value. In our case, we have decided to limit the length of each segment to 4 seconds.