Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions lib/flub/app.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@ defmodule Flub.App do

def start(_type, _args) do
import Supervisor.Spec, warn: false
# ensure that pg2 is up and running:
{:ok, _pid} = :pg2.start

Flub.EtsHelper.setup_tables
:ok = case :pg.start_link() do
{:ok, _pid} -> :ok
{:error, {:already_started, _pid}} -> :ok
error -> {:error, error}
end
children = [
supervisor(Flub.NodeSync.Supervisor, []),
supervisor(Flub.DispatcherSup, []),
{Flub.NodeSync.Supervisor, []},
{Flub.DispatcherSup, []},
]
opts = [strategy: :one_for_one, name: Flub.Supervisor]
Supervisor.start_link(children, opts)
end

end
5 changes: 2 additions & 3 deletions lib/flub/dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ defmodule Flub.Dispatcher do
def init([the_node, channel]) do
:gproc.reg({:n, :l, {__MODULE__, the_node, channel}})
:gproc.reg({:p, :l, __MODULE__})
:pg2.create({__MODULE__, the_node, channel})
:pg2.join({__MODULE__, the_node, channel}, self())
:pg.join({__MODULE__, the_node, channel}, self())
subscribers = Subscribers.find(channel)
|> Enum.map(fn({pid, funs}) ->
{pid, add_subscriber(channel, pid, funs)}
Expand Down Expand Up @@ -160,7 +159,7 @@ defmodule Flub.Dispatcher do
{__MODULE__, :global, channel},
]
for group <- pg2_groups do
case :pg2.get_members(group) do
case :pg.get_members(group) do
list when is_list(list) -> list
_error -> []
end
Expand Down
21 changes: 9 additions & 12 deletions lib/flub/dispatcher_sup.ex
Original file line number Diff line number Diff line change
@@ -1,30 +1,27 @@
defmodule Flub.DispatcherSup do
@moduledoc false
use Supervisor

use DynamicSupervisor

#############
# API
#############

def start_link do
Supervisor.start_link(__MODULE__, [], [name: __MODULE__])
end
def start_link(_), do: DynamicSupervisor.start_link(__MODULE__, [], [name: __MODULE__])

def start_worker(node, channel) do
Supervisor.start_child(__MODULE__, [node, channel])
DynamicSupervisor.start_child(__MODULE__, %{
id: Flub.Dispatcher,
start: {Flub.Dispatcher, :start_link, [node, channel]}, restart: :transient}
)
end

##############################
# GenServer Callbacks
##############################

def init([]) do
children = [
worker(Flub.Dispatcher, [], restart: :transient)
]

supervise(children, strategy: :simple_one_for_one)
end
@impl DynamicSupervisor
def init([]), do: DynamicSupervisor.init(strategy: :one_for_one)

##############################
# Internal
Expand Down
95 changes: 95 additions & 0 deletions lib/flub/helpers.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
defmodule Flub.Helpers do
@moduledoc """
Helper macros for Flub
"""
defmacro __using__(_opts) do
quote do
import Flub, only: [p: 1]
import Flub.Helpers
end
end

@doc """
Define a `Flub` channel and associated helper functions. Used like so:

defmodule PubSub do
use Flub.Helpers

define_channel("alert_notice", level: :info, description: "default description")
end

the `define_channel` macro will introduce the following code for you:
```
defmodule AlertNotice do
defstruct [
level: :info,
description: "default description",
]
end

def alert_notice_chnl(), do: :alert_notice
def pub_alert_notice(val), do: Flub.pub(val, alert_notice_chnl())
def sub_alert_notice(), do: Flub.sub(alert_notice_chnl())
def sub_alert_notice(opts), do: Flub.sub(alert_notice_chnl(), opts)
```

Due to the way the structures are injected into your module, you will
need to reference them with `%__MODULE__.AlertNotice{}`.

In general, GenServers can use function head matching inn handle_info in
"the usual way" to process messages:
```
@alert_notice_chnl alert_notice_chnl()
def handle_info(%Flub.Message{channel: @alert_notice_chnl, data: %__MODULE__.AlertNotice{level: level, description: description}}, state) do
# use level and description here...
{:noreply, state}
end
```

"""
defmacro define_channel(channel_string, channel_def_kwl) do

# channel string: FooChannel
camel_channel_string = Macro.camelize(channel_string) # "FooChannel"
snake_channel_string = Macro.underscore(channel_string) # foo_channel
channel_atom = String.to_atom(snake_channel_string) # :foo_channel
struct_mod = String.to_atom("#{__CALLER__.module}.#{camel_channel_string}") # Elixir.PubSub.FooChannel
get_chnl = String.to_atom("#{snake_channel_string}_chnl") # foo_channel_chnl
pub_chnl = String.to_atom("pub_#{snake_channel_string}") # pub_foo_channel
sub_chnl = String.to_atom("sub_#{snake_channel_string}") # sub_foo_channel

struct = quote do
defmodule unquote(struct_mod) do
defstruct unquote(channel_def_kwl)
end
end

funcs = quote do

def unquote(get_chnl)() do
unquote(channel_atom)
end

def unquote(pub_chnl)(arg) do
case arg do
# already a struct, pub it
%unquote(struct_mod){} -> Flub.pub(arg, unquote(get_chnl)())

# probably a kwl, convert to struct first then pub
_ -> Kernel.struct!(unquote(struct_mod), arg) |> Flub.pub(unquote(get_chnl)())
end
end

def unquote(sub_chnl)(opts) do
Flub.sub(unquote(get_chnl)(), opts)
end

def unquote(sub_chnl)() do
Flub.sub(unquote(get_chnl)())
end

end

[struct, funcs]
end
end
3 changes: 3 additions & 0 deletions lib/flub/node_sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ defmodule Flub.NodeSync do
# API
##############################

@impl GenServer
def init(arg), do: {:ok, arg}

@doc """
Request that the connection to `node` be maintained, via `Node.ping` and
`Node.monitor`.
Expand Down
20 changes: 9 additions & 11 deletions lib/flub/node_sync/ns_sup.ex
Original file line number Diff line number Diff line change
@@ -1,30 +1,28 @@
defmodule Flub.NodeSync.Supervisor do
@moduledoc false
use Supervisor

use DynamicSupervisor

#############
# API
#############

def start_link do
Supervisor.start_link(__MODULE__, [], [name: __MODULE__])
def start_link(_) do
DynamicSupervisor.start_link(__MODULE__, [], [name: __MODULE__])
end

def start_child(the_node) do
Supervisor.start_child(__MODULE__, [the_node])
DynamicSupervisor.start_child(__MODULE__, %{
id: Flub.NodeSync.Worker,
start: {Flub.NodeSync.Worker, :start_link, [the_node]}, restart: :transient}
)
end

##############################
# GenServer Callbacks
##############################

def init([]) do
children = [
worker(Flub.NodeSync.Worker, [], restart: :transient)
]

supervise(children, strategy: :simple_one_for_one)
end
def init([]), do: DynamicSupervisor.init(strategy: :one_for_one)

##############################
# Internal
Expand Down
13 changes: 7 additions & 6 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Flub.Mixfile do
[
app: :flub,
version: @version,
elixir: "~> 1.2",
elixir: "~> 1.11",
build_embedded: Mix.env == :prod,
start_permanent: Mix.env == :prod,
deps: deps(),
Expand All @@ -18,8 +18,9 @@ defmodule Flub.Mixfile do
end

def application do
[applications: [:logger, :ets_owner, :gproc],
mod: {Flub.App, []}]
[
extra_applications: [],
mod: {Flub.App, []}]
end

defp hex_package do
Expand All @@ -31,9 +32,9 @@ defmodule Flub.Mixfile do
defp deps do
[
{:ets_owner, "~> 1.0"},
{:ex2ms, "~> 1.0"},
{:shorter_maps, "~> 2.1"},
{:gproc, "~> 0.5"},
{:ex2ms, "~> 1.6"},
{:shorter_maps, "~> 2.2"},
{:gproc, "~> 0.9"},
{:ex_doc, ">= 0.0.0", only: :dev},
]
end
Expand Down
19 changes: 13 additions & 6 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
%{"earmark": {:hex, :earmark, "1.1.0", "8c2bf85d725050a92042bc1edf362621004d43ca6241c756f39612084e95487f", [:mix], [], "hexpm"},
"ets_owner": {:hex, :ets_owner, "1.0.0", "f8408c609c29cb6925c2f71bac07f3e35352786b2c6824ad545f8133a4f3762d", [], [], "hexpm"},
"ex2ms": {:hex, :ex2ms, "1.4.0", "e43b410888b45ba363ea6650db3736db3e455a0a412ec244ac633fede857bcb2", [], [], "hexpm"},
"ex_doc": {:hex, :ex_doc, "0.14.5", "c0433c8117e948404d93ca69411dd575ec6be39b47802e81ca8d91017a0cf83c", [:mix], [{:earmark, "~> 1.0", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"},
"gproc": {:hex, :gproc, "0.6.1", "4579663e5677970758a05d8f65d13c3e9814ec707ad51d8dcef7294eda1a730c", [:rebar3], [], "hexpm"},
"shorter_maps": {:hex, :shorter_maps, "2.2.1", "65298ed203be953a6d14755aab5c9f7e365e81104315a6799f9911e93cb99364", [:mix], [], "hexpm"}}
%{
"earmark": {:hex, :earmark, "1.1.0", "8c2bf85d725050a92042bc1edf362621004d43ca6241c756f39612084e95487f", [:mix], [], "hexpm", "15e3a816bdc53d12f258ea59d0c1fae9a37da2e70a0cb486edad687f65a36f66"},
"earmark_parser": {:hex, :earmark_parser, "1.4.13", "0c98163e7d04a15feb62000e1a891489feb29f3d10cb57d4f845c405852bbef8", [:mix], [], "hexpm", "d602c26af3a0af43d2f2645613f65841657ad6efc9f0e361c3b6c06b578214ba"},
"ets_owner": {:hex, :ets_owner, "1.0.0", "f8408c609c29cb6925c2f71bac07f3e35352786b2c6824ad545f8133a4f3762d", [:mix], [], "hexpm", "54c0228a9134f4afe5c2a5418712a8b010bbc3f3e4864f3c854110f6cb65bca9"},
"ex2ms": {:hex, :ex2ms, "1.6.0", "f39bbd9ff1b0f27b3f707bab2d167066dd8965e7df1149b962d94c74615d0e09", [:mix], [], "hexpm", "0d1ab5e08421af5cd69146efb408dbb1ff77f38a2f4df5f086f2512dc8cf65bf"},
"ex_doc": {:hex, :ex_doc, "0.24.2", "e4c26603830c1a2286dae45f4412a4d1980e1e89dc779fcd0181ed1d5a05c8d9", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "e134e1d9e821b8d9e4244687fb2ace58d479b67b282de5158333b0d57c6fb7da"},
"gproc": {:hex, :gproc, "0.9.0", "853ccb7805e9ada25d227a157ba966f7b34508f386a3e7e21992b1b484230699", [:rebar3], [], "hexpm", "587e8af698ccd3504cf4ba8d90f893ede2b0f58cabb8a916e2bf9321de3cf10b"},
"makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"},
"makeup_elixir": {:hex, :makeup_elixir, "0.15.1", "b5888c880d17d1cc3e598f05cdb5b5a91b7b17ac4eaf5f297cb697663a1094dd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "db68c173234b07ab2a07f645a5acdc117b9f99d69ebf521821d89690ae6c6ec8"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
"nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"},
"shorter_maps": {:hex, :shorter_maps, "2.2.5", "77b2e3632a49c72ca8aea2a47687767b3ad2623b9ade529ffe31f41865a32886", [:mix], [], "hexpm", "fbef59a9f13369f819111f500f0253678526ea8c717d794a224df41223bc88ab"},
}
30 changes: 30 additions & 0 deletions test/flub_helpers_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
defmodule FlubHelpersTest do
use ExUnit.Case
require Flub
use Flub.Helpers

setup do
on_exit fn ->
Flub.unsub
Process.sleep(10)
end
end

# define a channel
define_channel("test_channel", foo: "foo", bar: :bar)

test "default struct members" do
data = %__MODULE__.TestChannel{}
assert(data.foo == "foo")
assert(data.bar == :bar)
end

test "pub and sub via macro" do
channel = test_channel_chnl()
sub_test_channel()
data = %__MODULE__.TestChannel{}
pub_test_channel(data)
Flub.pub(data, channel)
assert_receive(%Flub.Message{channel: ^channel, data: ^data})
end
end