This document describes the key components of LuaRadio and how they interact.
Blocks provide a process(...) method that accepts vectors of input samples as
arguments, processes them, and returns vectors of output samples. Blocks may
retain state.
Input sample vectors must be processed in their entirety by blocks in
process(...). Samples across multiple inputs are synchronized, meaning that
the multiple input vectors are always of the same length, and samples across
the input vectors all correspond to the same timestep.
Output sample vectors may be produced by blocks asynchronously, meaning that
there is no requirement on the amount of output samples produced, or on which
call to process(...) that output samples are produced.
Vectors of samples are serialized between output and input ports of blocks over anonymous UNIX sockets.
Vectors of CStruct data type samples are
serialized raw, in their native memory representation, with no marshalling or
unmarshalling.
Vectors of Object data type samples are
marshalled and unmarshalled with MessagePack and
serialized as byte strings, preceded by their length.
LuaRadio uses multi-processing. Every block is run in its own process under its own Lua state.
Blocks do not share memory. Blocks use IPC to serialize samples to other blocks and communicate with applications.
Memory for input samples is allocated once. A persistent buffer is allocated
for each block input port, where raw input samples are read into. The raw
input samples are cast into read-only vectors, which are provided as the
arguments to the block's process(...) method.
Memory for output samples is typically allocated once by each block. Blocks
are responsible for managing their own output sample memory. Most blocks
create a persistent output sample vector in their initialize() method, and
resize it as needed in process(...) to store the computed output samples.
However, this is not strictly enforced by the framework.
Blocks that do reuse output sample vectors have constant memory usage in the
steady state. The output sample vectors approach a stable size, as they are
resized in process(...) to accommodate the inputs vectors the block consumes.
All objects in LuaRadio are garbage collected, although this mainly applies to transient objects. The input and output sample memory in the situations described above is anchored throughout the processing lifetime of the block.
Moving a vector of samples between blocks has an overhead of two copies: one from writing it to the UNIX socket in the producing block, and one from reading it from the UNIX socket in the consuming block. This overhead could be reduced to one copy, by changing the sample transport from UNIX sockets to a shared memory circular buffer (not unlike GNU Radio).
The overhead could be further reduced to zero copies, if the vectors for output
samples were not allocated by the block, but instead pointers into the
persistent shared memory circular buffer (also not unlike GNU Radio) that is
shared between the output and input port. Implementing this would likely
require a different process(...) signature that includes the outputs vectors
as arguments, and a new resizing mechanism for output vectors.
Multi-processing instead of multi-threading for concurrency incurs some memory and CPU overhead from requiring a process for each block. However, multi-threading poses other issues, like sharing instantiated and initialized block state of the parent thread with the Lua state of each child thread. This problem is addressed in the multi-processing architecture with forking.
Block processes currently have a slightly larger memory footprint than
necessary, as other blocks and their initialized objects, e.g. buffers
allocated in initialize(), are not yet released after forking. This can be
addressed by pruning references to all unneeded blocks and associated
connectivity after forking.
Blocks cannot be manipulated at runtime, e.g. modifying their attributes or calling methods on them, since each block runs in an independent Lua state. This could be worked around by implementing RPC for these attributes and calls, but this would add substantial complexity.
Samples are typed by special data types that implement the necessary interface
to be serialized and deserialized between blocks. These data types can either
be CStruct types, which are backed by a
C structure, or Object types, which are
backed by a Lua object.
CStruct types are serialized as raw contiguous samples between blocks in
their native memory representation, with no marshalling and unmarshalling.
These types must be of a fixed size, so the sample boundaries are well-defined
in the stream.
Object types are serialized as MessagePack marshalled
bytes between blocks. These types can be variable size, may nest other Lua
types, and may have optional members.
LuaRadio has four basic types, all of which are CStruct types. These are the
ComplexFloat32,
Float32, Bit,
and Byte types.
They are backed by the following C structure types:
typedef struct {
float real;
float imag;
} complex_float32_t;
typedef struct {
float value;
} float32_t;
typedef struct {
uint8_t value;
} bit_t;
typedef struct {
uint8_t value;
} byte_t;The use of a structure to back CStruct types, rather than the raw C type
(e.g. float), is for implementation reasons. It allows the framework to
associate a metatable with the type using LuaJIT
FFI library's
ffi.metatype(), to bind
useful metamethods and methods to all instances of the type. It also makes
those instances distinct from other occurrences of the underlying data types
(e.g. float32_t vs float).
Users can define their own CStruct and Object types and bind methods to
them. See the Creating Blocks guide for
more details and examples.
Code reference: CStruct class, Object class.
It would be inefficient for blocks to process one sample at a time, as the overhead of serializing the sample and calling the block to process it would exceed the cost of processing it. Instead, blocks operate on a vector of samples at a time, to amortize the overhead of serialization.
Vectors are dynamic arrays of a CStruct or Object data type. Blocks get
their inputs as vectors, and return their outputs as vectors.
Each data type provides two static methods for creating a vector of itself:
.vector(num) for a zero-initialized vector, or .vector_from_array(arr) for
a vector initialized from a Lua array. For example:
-- ComplexFloat32 vector of length 16
local vec = radio.types.ComplexFloat32.vector(16)
-- Byte vector of length 10
local vec = radio.types.Byte.vector(10)
-- Float32 vector of length 3 from array initializer
local vec = radio.types.Float32.vector_from_array({1.0, 2.0, 3.0})Vectors can be resized and appended to:
local vec = radio.types.Byte.vector(10)
print(vec.length) --> 10
vec:resize(5)
print(vec.length) --> 5
vec:append(radio.types.Byte(0xAA))
print(vec.length) --> 6Vectors of CStruct typed samples are laid out contiguously in memory. The
array of samples is available under the .data member, and its length under
the .length member. These samples can be modified directly in Lua, or can be
passed to an external library for processing.
Resizing a CStruct typed vector only causes a re-allocation when it is grown
to a larger size. The underlying buffer is retained on resizing to a smaller
size; just the bookkeeping is updated. This allows blocks that reuse vectors
for output samples to approach constant memory usage, as the underlying memory
of the vectors will reach a stable size for the inputs the block consumes.
All CStruct typed vectors are allocated with page alignment, to enable
processing with libraries that require, or perform better with, aligned
buffers. This is often because SIMD operations are involved.
Vectors of Object typed samples are stored in a Lua array, but provide a
compatible interface to CStruct typed vectors. These vectors cannot be passed
to external libraries.
Code reference: Vector and ObjectVector classes.
Blocks are classes derived from
radio.block.Block that implement their
functionality in the following methods:
instantiate(...)— constructorinitialize()— initialization (optional)process(...)— main work methodcleanup()— clean up (optional)
The role of the instantiate() constructor is to establish the basic state of
the block and to register its type signatures, which specify the block's
input/output port names and types.
The initialize() method is called after the block has been connected in a
flow graph and differentiated. It allows the block to perform additional
initialization based on its differentiated type signature and its sample rate.
The process() method is the main work method of the block. It receives input
vectors of samples as arguments, and returns output vectors of samples. This
method is called repeatedly by the framework to process inputs into outputs.
The cleanup() method is called by the framework when the flow graph has
collapsed, for additional clean up of resources.
Source blocks and blocks that modify the sample rate must implement the
get_rate() method, which returns the
source's sample rate as a number, in samples per second.
Code reference: Block class.
A type signature is a description of the input/output port names and data types of a block. LuaRadio blocks can support multiple type signatures, all of which must share the same input/output count and names, but may differ in data types. Blocks register their type signatures in their constructor, so that they can be connected into a flow graph after they are instantiated. The framework selects the correct type signature in its differentiation phase, described in its section below.
Code reference: Block class.
The instantiate(...) method is called whenever a block is instantiated by
name. This method takes the arguments passed to the block on instantiation.
Blocks must register type signatures with the
add_type_signature()
method in their instantiate() constructor. This method takes an array of
input port descriptors, followed by an array of output port descriptors. Each
port descriptor specifies a name and a data type. For example, an AddBlock
that supports both complex-valued and real-valued inputs would register:
function AddBlock:instantiate()
self:add_type_signature({radio.block.Input("in", radio.types.Float32}),
{radio.block.Output("out", radio.types.Float32)})
self:add_type_signature({radio.block.Input("in", radio.types.ComplexFloat32}),
{radio.block.Output("out", radio.types.ComplexFloat32)})
endSource and sink blocks may specify empty arrays for inputs or outputs, respectively, when they add type signatures. Otherwise, sources and sinks are implemented largely the same way as other blocks.
The add_type_signature() method can also be used to specify different
process() and initialize() methods to type signatures, that are bound to
the block on differentiation. See the Creating
Blocks guide for examples.
After a block is instantiated, it can be connected into a flow graph under a
CompositeBlock. This is described in more detail in the Composite
Blocks section below.
When the add_type_signature() method is called in a block's instantiate()
constructor, it builds PipeInput and PipeOutput containers for each
specified input and output port of the block, and stores them in arrays under
the .inputs and .outputs members. These containers are the actual "ports"
connected in a flow graph. They contain a name and block owner, and are later
populated with their concrete data type and a shared Pipe object.
The act of connecting two blocks in a flow graph is registering the block's
PipeInput instance as the key and the PipeOutput instance as a value in a
hash table owned by a CompositeBlock, thereby building a graph of input and
output connections.
When a connection is made, a Pipe object is also created and registered under
both PipeInput and PipeOutput containers of the connected blocks. This
Pipe object provides an interface for the serialization and deserialization
of sample vectors between blocks. The Pipe is how a block reads or writes
vectors of samples from or to another block.
Code reference: Block class, PipeInput, PipeOutput, Pipe classes.
When a flow graph is run, each block in the flow graph is first differentiated into a compatible type signature. This differentiation starts at the source blocks and ends at the sink blocks, and is carried out in a downstream order.
A block is differentiated by its input types, using the
differentiate()
method. This method takes an array of input data types and differentiates the
block into a compatible type signature, by finding a registered type signature
with matching input data types. The method raises an error if a compatible
type signature is not found.
Type signatures may also specify a function predicate instead of a concrete
data type for input ports. For example, the
JSONSink does this to accept any data type
that implements to_json(). In those cases, differentiate() calls the
function predicate with the input type and expects a boolean result to indicate
if the input type is compatible.
The result of differentiation is that the PipeInput and PipeOutput ports in
the block's .inputs and .outputs take on the concrete data types specified
in the selected type signature, and the block's initialize() and process()
methods are bound to the ones specified in the type signature. The concrete
data types are available to the block with the
get_input_type() and
get_output_type()
methods.
Since a block is differentiated by its input types, it cannot have multiple type signatures that share the same input types, as this would cause ambiguity.
After differentiation, the block's output types are well defined, and can then be used in the differentiation of downstream blocks.
local multiply = radio.MultiplyBlock()
-- Differentiate into the complex-valued flavor of MultiplyBlock
multiply:differentiate({radio.types.ComplexFloat32, radio.types.ComplexFloat32})Code reference: Block class.
The initialization phase takes place after the differentiation phase in running
a flow graph. In this phase, every block's initialize() method is called,
starting at the source blocks and ending at the sink blocks, in a downstream
order.
In the initialize() method, the block can perform data type dependent
initialization with the
get_input_type() and
get_output_type()
methods, which return the differentiated input and output types of the
specified port index, respectively.
The block can also perform sample rate dependent initialization with
get_rate(), which recursively calls
get_rate() on upstream blocks in the flow graph to determine the sample rate.
Blocks may modify the sample rate for downstream blocks by overriding this
method, and source blocks are required to implement it to return a concrete
value.
Most blocks will create persistent output sample vectors in initialize(), to
be used by process(...). This allows blocks to efficiently produce new
samples without excessive allocations and deallocations.
Code reference: Block class.
The block's process() method is called repeatedly in the running phase
of a flow graph to process inputs into outputs.
This method receives a set of input vectors as arguments, corresponding to the
input ports it defined in its type signatures. These inputs are immutable,
read-only vectors and are all of the same length. The process() method is
responsible for computing output vectors from these inputs and any block state,
and returning the output vectors in the order corresponding to the output ports
it defined in its type signatures.
Blocks are responsible for managing their output sample vectors. Most blocks
allocate persistent output sample vectors in their initialize() method, and
then resize, populate, and return them in process(...). Since vector resizes
only cause re-allocation when they are grown, the underlying memory of output
vectors approach a stable size as the vector is resized to accommodate the
block's inputs.
For example, the process method for adding two inputs might look like:
function AddBlock:process(x, y)
local out = self.out:resize(x.length)
for i = 0, x.length-1 do
out.data[i] = x.data[i] + y.data[i]
end
return out
endAfter a flow graph is connected, differentiated, and initialized, every block is run concurrently.
While running, each block's inputs share a Pipe object with another block's
output. A block output may have multiple Pipe objects to several different
block inputs, but every block input only has one Pipe object.
The Pipe object provides an interface for the serialization and
deserialization of sample vectors between blocks. The read() method reads a
vector from the pipe. The write() method writes a vector to the pipe.
A block is run with its run() method, which is a loop that repeatedly reads
input pipes into an array of vectors, calls process(...) with these input
vectors, and writes the resulting array of vectors to the output pipes.
Code reference: Block class.
A block runs indefinitely in its run() method, until a read() on one of the
input Pipe objects returns nil. This indicates that the upstream block
closed its output Pipe and that the flow graph is collapsing.
When an input Pipe returns nil, the block breaks its main run loop, calls
cleanup(), and then closes its output pipes. This causes the downstream
blocks to terminate similarly.
Sources that produce a finite number of samples will exit naturally after they
have produced all of their samples, triggering the collapse of the flow graph.
Sources that run indefinitely, on the other hand, can only be shutdown by the
framework forcibly by the SIGINT signal.
A block shutdown does not mean samples are lost. Samples are buffered in the
underlying implementation of the Pipe, even after the producing block has
terminated, and the consuming block will only encounter the nil on an input
Pipe after all of these samples have been consumed. This allows samples from
a finite source to be processed through a flow graph to completion.
Code reference: Block class.
The CompositeBlock is a special block
used to build and run flow graphs. It can either be used as a hierarchical
block: a composition of blocks abstracted into one block with redefined
inputs/outputs at its boundary, or as a top-level block: a composition of
blocks forming a complete flow graph that can be run.
A CompositeBlock used for a hierarchical block builds an internal flow graph
by connecting block ports with the
connect() method, just as a
top-level block would. However, unlike top-level blocks, which have no boundary
inputs or outputs, hierarchical blocks also specify a type signature with
add_type_signature() for their boundary inputs and outputs.
When a CompositeBlock adds a type signature, instead of building the
PipeInput and PipeOutput input and output ports under the .inputs and
.outputs arrays as a normal block would, the CompositeBlock builds
AliasedPipeInput and AliasedPipeOutput ports, respectively. These are
special input and output ports that alias existing input and output ports
inside a flow graph.
These aliases are established in calls to connect(), when boundary
input/output ports are connected to the input/output ports of concrete blocks
in the flow graph.
Code reference: CompositeBlock class.
A CompositeBlock used for a top-level block builds a flow graph by connecting
block ports with the connect()
method. The data structure for the flow graph is a table that maps block
PipeInput instances to PipeOutput instances. In other words, it is a hash
table of input port to output port, for each connection between blocks in the
graph.
When a connection is made with connect() on a top-level CompositeBlock, the
PipeInput and PipeOutput ports are looked up in the blocks and added to the
connections table, and a Pipe object is created and registered under both
PipeInput and PipeOutput port instances.
If a hierarchical block is connected into a top-level block, then its
AliasedPipeInput and AliasedPipeOutput ports are followed to the underlying
real PipeInput and PipeOutput ports, and those are the input and output
port instances registered in the flow graph. This means that the hierarchical
block does not exist in a top-level flow graph; it's just a convenience for
composing blocks.
Code reference: CompositeBlock class.
After the flow graph is connected under a top-level block, it can be run with
start() or
run(). Running a flow graph
requires a few initialization steps before each block can begin to consume,
process, and produce samples.
First, the CompositeBlock crawls its connections table and absorbs the
internal connections table of any hierarchical blocks. Since hierarchical
blocks were just connected at the boundary ports, the internal connections are
not yet available to the top-level block and need to be absorbed.
Next, every block referenced in the connections table is checked for unconnected inputs. If an input is unconnected, the flow graph cannot be run and an error is raised.
The connections table data structure is then used to build an auxiliary list called the evaluation order. This is a list of all of the blocks in the flow graph, arranged in a downstream and dependency-free order. Each block in this list may depend on the outputs of previous blocks in the list, but does not depend on the outputs of any successive blocks. This order is needed to correctly differentiate the flow graph, because the output types of each block are fed as the input types to downstream blocks for differentiation.
The evaluation order is followed once to differentiate() each block, once to
check that all of its input port sample rates match, and once to initialize()
each block. If input port sample rates do not match, the flow graph is not run
and an error is raised.
Finally, all of the Pipe objects in the flow graph are initialized. Pipes are
backed by anonymous UNIX socket pairs created by socketpair().
The flow graph is now ready to run.
Code reference: CompositeBlock class.
At this stage, the flow graph is fully validated, differentiated, initialized, and is ready to run.
The CompositeBlock first blocks SIGINT and SIGCHLD signals with
sigprocmask(), so that it can later synchronously detect these signals with
sigpending() in wait(). The CompositeBlock then calls fork() for each
block.
The child process for each block closes all unneeded file descriptors, and
calls the block's main run() method. The child process spends the majority of
its processing lifetime in this method. If this method returns naturally due to
a flow graph collapse, the child exits with exit code 0. If it returns because
of a runtime error, the child prints the error and backtrace to standard error,
and exits with exit code 1.
The parent process closes all file descriptors associated with the Pipe
objects it built, so that the blocks are the only owners of these connected
files, and can close them to signal block termination.
The parent returns back to the top-level script, where it can use status() to
get the running status of the flow graph, wait() to wait for the flow graph
to finish, or stop() to stop the flow graph.
Code reference: CompositeBlock class.
The wait() method waits for a
SIGINT or SIGCHLD signal. If it gets a SIGINT signal, which indicates a
user requested exit, it calls stop() to stop the flow graph. If it gets a
SIGCHLD signal, which indicates a block exited, it waits on each block PID
with waitpid() until the flow graph has fully collapsed, and then unblocks
the SIGINT and SIGCHLD signals.
The stop() method kills all
source blocks with SIGTERM, waits on each block PID with waitpid() until
the flow graph has fully collapsed, and then unblocks the SIGINT and
SIGCHLD signals.
The status() method checks if
any block is still running with kill(<pid>, 0). If all blocks have exited,
it waits on each block PID with waitpid(), and then unblocks the SIGINT and
SIGCHLD signals.
Code reference: CompositeBlock class.