An opinionated PGMQ client for Elixir.
This package can be installed by adding :ecto_pgmq to your list of
dependencies in mix.exs:
defp deps do
[
# Should already be present if you're using Ecto
{:ecto_sql, "~> 3.11"},
{:postgrex, ">= 0.0.0"},
# Required to use the EctoPGMQ.Producer module
{:broadway, "~> 1.0"},
{:ecto_pgmq, "~> 1.0"}
]
endFor additional documentation, see HexDocs.
EctoPGMQ is designed to be used seamlessly with your application repo. Some
common operations are showcased below but the full feature-space can be found in
the Documentation.
The most common way to install the PGMQ extension is in a database migration. We can create a database migration with the following command:
mix ecto.gen.migration create_pgmqWe can then implement the migration accordingly:
defmodule MyApp.Repo.Migrations.CreatePgmq do
use Ecto.Migration
alias EctoPGMQ.Migrations
def change, do: Migrations.create_extension()
endFor more information about installing PGMQ, see PGMQ Installation.
The most common way to create a queue is in a database migration. We can create a database migration with the following command:
mix ecto.gen.migration create_my_queueWe can then implement the migration accordingly:
defmodule MyApp.Repo.Migrations.CreateMyQueue do
use Ecto.Migration
alias EctoPGMQ.Migrations
def change, do: Migrations.create_queue("my_queue")
endMessages can be sent to a queue with the following function:
EctoPGMQ.send_messages(MyApp.Repo, "my_queue", [%{"foo" => 1}, %{"bar" => 2}])Messages can be read from a queue with the following function:
messages = EctoPGMQ.read_messages(MyApp.Repo, "my_queue", 300, 2)Alternatively, messages can also be consumed in a Broadway pipeline with the
EctoPGMQ.Producer module.
The most common way to drop a queue is in a database migration. We can create a database migration with the following command:
mix ecto.gen.migration drop_my_queueWe can then implement the migration accordingly:
defmodule MyApp.Repo.Migrations.DropMyQueue do
use Ecto.Migration
alias EctoPGMQ.Migrations
def change, do: Migrations.drop_queue("my_queue")
end