Finish up Plugin.Supervisor, replace markov and wordbot implementations with it

Both markov and wordbot have some auxiliary processes that run to keep
track of things. Previously, they both had custom supervisors grafted on
to the Plugin.Base - now, this grafting is automated with
Plugin.Supervisor.

Signed-off-by: Alek Ratzloff <alekratz@gmail.com>
This commit is contained in:
2020-07-11 15:43:07 -07:00
parent 369c9824fb
commit 9a8c6f2472
7 changed files with 290 additions and 247 deletions

View File

@@ -1,50 +0,0 @@
defmodule Omnibot.Contrib.Markov.Bot do
use Omnibot.Plugin
alias Omnibot.Contrib.Markov.Chain
@impl true
def on_init(_cfg) do
# Create the markov database
path = String.to_atom(cfg()[:path])
{:ok, db} = :dets.open_file(path, [:named_table])
chains = :ets.new(:markov_chains, [:public])
:dets.to_ets(db, chains)
:dets.close(db)
end
@impl true
def on_channel_msg(_irc, channel, nick, msg) do
train(channel, nick, msg)
end
def train(channel, user, msg) do
chain = (user_chain(channel, user) || create_user_chain(channel, user))
|> Chain.train(msg)
true = update_user_chain(channel, user, chain)
end
def user_chain(channel, user) do
db = state()
case :ets.lookup(db, {channel, user}) do
[] -> nil
[{{^channel, ^user}, chains}] -> chains
end
end
def update_user_chain(channel, user, chain) do
db = state()
case user_chain(channel, user) do
nil -> :ets.insert_new(db, {{channel, user}, chain})
chain -> :ets.insert(db, {{channel, user}, chain})
end
end
defp create_user_chain(channel, user) do
true = update_user_chain(channel, user, %Chain{order: cfg()[:order]})
user_chain(channel, user)
end
def save_chains() do
end
end

View File

@@ -1,32 +1,64 @@
defmodule Omnibot.Contrib.Markov do defmodule Omnibot.Contrib.Markov do
use Omnibot.Plugin.Base use Omnibot.Plugin.Supervisor
alias Omnibot.Contrib.Markov alias Omnibot.Contrib.Markov.Chain
use Supervisor require Logger
@default_config path: "markov", order: 2, save_every: 5 * 60 @default_config path: "markov", order: 2, save_every: 5 * 60
def start_link(opts) do @impl true
Supervisor.start_link(__MODULE__, opts[:cfg], opts) def children(_cfg, _state) do
[{Task, fn ->
Stream.timer(1000)
|> Stream.cycle()
|> Stream.each(fn _ -> save_chains() end)
|> Stream.run()
end}]
end end
@impl true @impl true
def init(cfg) do def on_init(_cfg) do
children = [ # Create the markov database
{Markov.Bot, cfg: cfg, name: Omnibot.Contrib.Markov.Bot}, path = String.to_atom(cfg()[:path])
{Task, fn -> save_loop(cfg) end} {:ok, db} = :dets.open_file(path, [:named_table])
] chains = :ets.new(:markov_chains, [:public])
Supervisor.init(children, strategy: :one_for_all) :dets.to_ets(db, chains)
end :dets.close(db)
defp save_loop(cfg) do
save_every = cfg[:save_every]
Process.sleep(save_every * 1000)
Markov.Bot.save_chains()
end end
@impl true @impl true
def on_msg(irc, msg), do: Markov.Bot.on_msg(irc, msg) def on_channel_msg(_irc, channel, nick, msg) do
train(channel, nick, msg)
end
@impl true def train(channel, user, msg) do
def on_channel_msg(irc, channel, nick, msg), do: Markov.Bot.on_channel_msg(irc, channel, nick, msg) chain = (user_chain(channel, user) || create_user_chain(channel, user))
|> Chain.train(msg)
true = update_user_chain(channel, user, chain)
end
def user_chain(channel, user) do
db = state()
case :ets.lookup(db, {channel, user}) do
[] -> nil
[{{^channel, ^user}, chains}] -> chains
end
end
def update_user_chain(channel, user, chain) do
db = state()
case user_chain(channel, user) do
nil -> :ets.insert_new(db, {{channel, user}, chain})
chain -> :ets.insert(db, {{channel, user}, chain})
end
end
defp create_user_chain(channel, user) do
true = update_user_chain(channel, user, %Chain{order: cfg()[:order]})
user_chain(channel, user)
end
def save_chains() do
# TODO
Logger.info("Saved markov chains")
end
end end

View File

@@ -1,159 +0,0 @@
defmodule Omnibot.Contrib.Wordbot.Bot do
use Omnibot.Plugin
alias Omnibot.{Contrib.Wordbot, Irc, State, Util}
require Logger
@split_pattern ~r/[\s\b]+/
## Bot commands
command "!wordbot", ["leaderboard"] do
Wordbot.Db.leaderboard(channel)
|> Enum.sort_by(& &1.score)
|> Enum.reverse()
|> Enum.take(5)
|> Enum.with_index()
|> Enum.map(fn {%{user: nick, score: score}, rank} -> "#{rank + 1}. #{Util.denotify_nick(nick)}. #{score}" end)
|> Enum.each(&Irc.send_to(irc, channel, &1))
end
## Client API
def words() do
{words, _watchers} = state()
words
end
defp watchers() do
{_words, watchers} = state()
watchers
end
defp update_watchers(mapping) do
update_state(fn {words, watchers} -> {words, apply(mapping, [watchers])} end)
end
defp add_watcher(channel, task) do
update_watchers(&Map.put(&1, channel, task))
end
defp delete_watcher(channel) do
task = watchers()[channel]
update_watchers(&Map.delete(&1, channel))
task
end
defp lookup_watcher(channel) do
Map.get(watchers(), channel)
end
defp has_watcher?(channel) do
case lookup_watcher(channel) do
nil -> false
task -> Process.alive?(task)
end
end
def start_round(irc, channel) do
# Get round config
cfg = cfg()
num_words = cfg[:words_per_round]
duration = cfg[:hours_per_round] * 3600
# Select words
words = Enum.take_random(words(), num_words)
# Try to start the round - if it's already running then that's OK
case Wordbot.Db.start_round(channel, words, duration) do
:ok -> Logger.debug("Started new wordbot round for #{channel}")
{:error, :game_running} -> Logger.debug("Wordbot game already running for #{channel}")
end
# Try to start a watcher if there isn't one running
if !has_watcher?(channel),
do: start_watcher(irc, channel)
end
defp start_watcher(irc, channel) do
# Start a watcher for the given channel
Logger.debug("Starting wordbot game watcher for #{channel}")
# Assert that there isn't a running watcher for the current channel
false = has_watcher?(channel)
task = Task.Supervisor.async_nolink(
Wordbot.Watchers,
fn -> watch_game(irc, channel) end,
[shutdown: :brutal_kill]
)
add_watcher(channel, task)
end
defp watch_game(irc, channel) do
# Poll every second to check if a game is finished
if Wordbot.Db.game_active?(channel) do
Process.sleep(1000)
watch_game(irc, channel)
else
finish_round(irc, channel)
end
end
def finish_round(irc, channel) do
Logger.debug("Finishing wordbot round for #{channel}")
# Announce scores
Irc.send_to(irc, channel, "Game over. Here were the scores:")
scores = Wordbot.Db.scores(channel)
|> Enum.sort_by(&(&1.score))
|> Enum.reverse()
# Ranking is a little weird because we want to rank people so that having
# the same score will give the same ranking, e.g.
# 1. user1. 4
# 2. user2. 3
# 2. user3. 3
# 3. user4. 1
rankings = scores
|> Enum.map(&(&1.score))
|> Enum.sort()
|> Enum.uniq()
|> Enum.reverse()
|> Enum.with_index()
|> Map.new()
|> IO.inspect()
Enum.each(scores, &Irc.send_to(irc, channel, "#{rankings[&1.score] + 1}. #{Util.denotify_nick(&1.user)}. #{&1.score}"))
# Stop the watcher, start new round
delete_watcher(channel)
start_round(irc, channel)
end
## Plugin callbacks
@impl true
def on_init(cfg) do
Wordbot.Db.ensure_db()
words = File.read!(cfg[:wordbot_source])
|> String.split("\n")
watchers = %{}
{words, watchers}
end
@impl true
def on_channel_msg(irc, channel, nick, msg) do
words = Regex.split(@split_pattern, msg) |> MapSet.new()
game_words = Wordbot.Db.unmatched_words(channel) |> MapSet.new()
MapSet.intersection(words, game_words)
|> Enum.each(fn word ->
Wordbot.Db.add_score(channel, nick, word, msg)
Irc.send_to(irc, channel, "#{nick}: Congrats! '#{word}' is good for 1 point.")
end)
end
@impl true
def on_join(irc, channel, who) do
# Attempt to start a new round
if State.cfg().nick == who do
start_round(irc, channel)
end
end
end

View File

@@ -1,28 +1,183 @@
defmodule Omnibot.Contrib.Wordbot do defmodule Omnibot.Contrib.Wordbot do
use Omnibot.Plugin.Base use Omnibot.Plugin.Supervisor
use Supervisor alias Omnibot.{Contrib.Wordbot, State, Util}
require Logger require Logger
alias Omnibot.Contrib.Wordbot
@default_config wordbot_source: "words.txt", wordbot_db: "wordbot.db", words_per_round: 300, hours_per_round: 5 @default_config wordbot_source: "words.txt", wordbot_db: "wordbot.db", words_per_round: 300, hours_per_round: 5
def start_link(opts) do #def start_link(opts) do
Supervisor.start_link(__MODULE__, opts[:cfg], opts) #Supervisor.start_link(__MODULE__, opts[:cfg], opts)
#end
#@impl true
#def init(cfg) do
# children = [
# {Task.Supervisor, name: Omnibot.Contrib.Wordbot.Watchers, strategy: :one_for_one},
# Wordbot.Db.child_spec(cfg[:wordbot_db]),
# {Wordbot.Bot, cfg: cfg, name: Omnibot.Contrib.Wordbot.Bot},
# ]
# Supervisor.init(children, strategy: :one_for_all)
#end
@impl true
def children(cfg, _state) do
[
{Task.Supervisor, name: Omnibot.Contrib.Wordbot.Watchers, strategy: :one_for_one},
Wordbot.Db.child_spec(cfg[:wordbot_db]),
]
end
@split_pattern ~r/[\s\b]+/
## Bot commands
command "!wordbot", ["leaderboard"] do
Wordbot.Db.leaderboard(channel)
|> Enum.sort_by(& &1.score)
|> Enum.reverse()
|> Enum.take(5)
|> Enum.with_index()
|> Enum.map(fn {%{user: nick, score: score}, rank} -> "#{rank + 1}. #{Util.denotify_nick(nick)}. #{score}" end)
|> Enum.each(&Irc.send_to(irc, channel, &1))
end
## Client API
def words() do
{words, _watchers} = state()
words
end
defp watchers() do
{_words, watchers} = state()
watchers
end
defp update_watchers(mapping) do
update_state(fn {words, watchers} -> {words, apply(mapping, [watchers])} end)
end
defp add_watcher(channel, task) do
update_watchers(&Map.put(&1, channel, task))
end
defp delete_watcher(channel) do
task = watchers()[channel]
update_watchers(&Map.delete(&1, channel))
task
end
defp lookup_watcher(channel) do
Map.get(watchers(), channel)
end
defp has_watcher?(channel) do
case lookup_watcher(channel) do
nil -> false
task -> Process.alive?(task)
end
end
def start_round(irc, channel) do
# Get round config
cfg = cfg()
num_words = cfg[:words_per_round]
duration = cfg[:hours_per_round] * 3600
# Select words
words = Enum.take_random(words(), num_words)
# Try to start the round - if it's already running then that's OK
case Wordbot.Db.start_round(channel, words, duration) do
:ok -> Logger.debug("Started new wordbot round for #{channel}")
{:error, :game_running} -> Logger.debug("Wordbot game already running for #{channel}")
end
# Try to start a watcher if there isn't one running
if !has_watcher?(channel),
do: start_watcher(irc, channel)
end
defp start_watcher(irc, channel) do
# Start a watcher for the given channel
Logger.debug("Starting wordbot game watcher for #{channel}")
# Assert that there isn't a running watcher for the current channel
false = has_watcher?(channel)
task = Task.Supervisor.async_nolink(
Wordbot.Watchers,
fn -> watch_game(irc, channel) end,
[shutdown: :brutal_kill]
)
add_watcher(channel, task)
end
defp watch_game(irc, channel) do
# Poll every second to check if a game is finished
if Wordbot.Db.game_active?(channel) do
Process.sleep(1000)
watch_game(irc, channel)
else
finish_round(irc, channel)
end
end
def finish_round(irc, channel) do
Logger.debug("Finishing wordbot round for #{channel}")
# Announce scores
Irc.send_to(irc, channel, "Game over. Here were the scores:")
scores = Wordbot.Db.scores(channel)
|> Enum.sort_by(&(&1.score))
|> Enum.reverse()
# Ranking is a little weird because we want to rank people so that having
# the same score will give the same ranking, e.g.
# 1. user1. 4
# 2. user2. 3
# 2. user3. 3
# 3. user4. 1
rankings = scores
|> Enum.map(&(&1.score))
|> Enum.sort()
|> Enum.uniq()
|> Enum.reverse()
|> Enum.with_index()
|> Map.new()
|> IO.inspect()
Enum.each(scores, &Irc.send_to(irc, channel, "#{rankings[&1.score] + 1}. #{Util.denotify_nick(&1.user)}. #{&1.score}"))
# Stop the watcher, start new round
delete_watcher(channel)
start_round(irc, channel)
end
## Plugin callbacks
@impl true
def on_init(cfg) do
Wordbot.Db.ensure_db()
words = File.read!(cfg[:wordbot_source])
|> String.split("\n")
watchers = %{}
{words, watchers}
end end
@impl true @impl true
def init(cfg) do def on_channel_msg(irc, channel, nick, msg) do
children = [ words = Regex.split(@split_pattern, msg) |> MapSet.new()
{Task.Supervisor, name: Omnibot.Contrib.Wordbot.Watchers, strategy: :one_for_one}, game_words = Wordbot.Db.unmatched_words(channel) |> MapSet.new()
Wordbot.Db.child_spec(cfg[:wordbot_db]), MapSet.intersection(words, game_words)
{Wordbot.Bot, cfg: cfg, name: Omnibot.Contrib.Wordbot.Bot}, |> Enum.each(fn word ->
] Wordbot.Db.add_score(channel, nick, word, msg)
Irc.send_to(irc, channel, "#{nick}: Congrats! '#{word}' is good for 1 point.")
Supervisor.init(children, strategy: :one_for_all) end)
end end
def on_msg(irc, msg), do: Wordbot.Bot.on_msg(irc, msg) @impl true
def on_join(irc, channel, who) do
def on_channel_msg(irc, channel, nick, msg), do: Wordbot.Bot.on_channel_msg(irc, channel, nick, msg) # Attempt to start a new round
if State.cfg().nick == who do
start_round(irc, channel)
end
end
end end

View File

@@ -79,6 +79,7 @@ defmodule Omnibot.Plugin.Base do
end end
end end
#@callback handle_msg(irc :: pid(), msg :: %Omnibot.Irc.Msg{}) :: any
@callback on_msg(irc :: pid(), msg :: %Omnibot.Irc.Msg{}) :: any @callback on_msg(irc :: pid(), msg :: %Omnibot.Irc.Msg{}) :: any
@callback on_channel_msg(irc :: pid(), channel :: String.t(), nick :: String.t(), line :: String.t()) :: any @callback on_channel_msg(irc :: pid(), channel :: String.t(), nick :: String.t(), line :: String.t()) :: any
@callback on_channel_msg( @callback on_channel_msg(

64
lib/plugin/supervisor.ex Normal file
View File

@@ -0,0 +1,64 @@
defmodule Omnibot.Plugin.Supervisor do
@default_opts [include_base: true, opts: [strategy: :one_for_one]]
defmodule CfgState do
use Agent
def start_link(opts) do
{cfg, opts} = Keyword.pop(opts, :cfg)
{state, opts} = Keyword.pop(opts, :state, nil)
Agent.start_link(fn -> {cfg, state} end, opts)
end
def cfg(pid), do: Agent.get(pid, fn {cfg, _} -> cfg end)
def state(pid), do: Agent.get(pid, fn {_, state} -> state end)
def update_state(pid, fun, timeout \\ 5000),
do: Agent.update(pid, &{&1, apply(fun, [&1])}, timeout)
end
defmacro __using__(opts) do
opts = opts ++ @default_opts
quote do
use Supervisor
use Omnibot.Plugin.Base
## Client API
def start_link(opts) do
Supervisor.start_link(__MODULE__, {opts[:cfg], opts[:state]}, opts)
end
def cfg() do
Omnibot.Plugin.Supervisor.CfgState.cfg(__MODULE__.CfgState)
end
def state() do
Omnibot.Plugin.Supervisor.CfgState.state(__MODULE__.CfgState)
end
def update_state(fun) do
Omnibot.Plugin.Supervisor.CfgState.update_state(__MODULE__.CfgState, fun)
end
## Server callbacks
@impl Supervisor
def init({cfg, state}) do
base_children = [
{Omnibot.Plugin.Supervisor.CfgState, cfg: cfg, state: state, name: __MODULE__.CfgState},
]
children =
(if unquote(opts[:include_base]), do: base_children, else: []) ++ children(cfg, state)
Supervisor.init(children, unquote(opts[:opts]))
end
@behaviour Omnibot.Plugin.Supervisor
end
end
@callback children(cfg :: [atom: any], state :: any) :: [atom | {atom, [atom: any]} | {atom, any, [atom: any]}]
end

View File

@@ -15,7 +15,7 @@ defmodule Omnibot.PluginSupervisor do
# These are plugins that need to be loaded for core functionality of the bot # These are plugins that need to be loaded for core functionality of the bot
core = [ core = [
Omnibot.Core Omnibot.Core,
] ]
# Map the plugins in the configuration to the children # Map the plugins in the configuration to the children