From 9a8c6f2472ec4c741b3914970e998af5200ca315 Mon Sep 17 00:00:00 2001 From: Alek Ratzloff Date: Sat, 11 Jul 2020 15:43:07 -0700 Subject: [PATCH] Finish up Plugin.Supervisor, replace markov and wordbot implementations with it Both markov and wordbot have some auxiliary processes that run to keep track of things. Previously, they both had custom supervisors grafted on to the Plugin.Base - now, this grafting is automated with Plugin.Supervisor. Signed-off-by: Alek Ratzloff --- lib/contrib/markov/bot.ex | 50 --------- lib/contrib/markov/markov.ex | 72 +++++++++---- lib/contrib/wordbot/bot.ex | 159 --------------------------- lib/contrib/wordbot/wordbot.ex | 189 ++++++++++++++++++++++++++++++--- lib/plugin/base.ex | 1 + lib/plugin/supervisor.ex | 64 +++++++++++ lib/plugin_supervisor.ex | 2 +- 7 files changed, 290 insertions(+), 247 deletions(-) delete mode 100644 lib/contrib/markov/bot.ex delete mode 100644 lib/contrib/wordbot/bot.ex create mode 100644 lib/plugin/supervisor.ex diff --git a/lib/contrib/markov/bot.ex b/lib/contrib/markov/bot.ex deleted file mode 100644 index 0f603ff..0000000 --- a/lib/contrib/markov/bot.ex +++ /dev/null @@ -1,50 +0,0 @@ -defmodule Omnibot.Contrib.Markov.Bot do - use Omnibot.Plugin - alias Omnibot.Contrib.Markov.Chain - - @impl true - def on_init(_cfg) do - # Create the markov database - path = String.to_atom(cfg()[:path]) - {:ok, db} = :dets.open_file(path, [:named_table]) - chains = :ets.new(:markov_chains, [:public]) - :dets.to_ets(db, chains) - :dets.close(db) - end - - @impl true - def on_channel_msg(_irc, channel, nick, msg) do - train(channel, nick, msg) - end - - def train(channel, user, msg) do - chain = (user_chain(channel, user) || create_user_chain(channel, user)) - |> Chain.train(msg) - true = update_user_chain(channel, user, chain) - end - - def user_chain(channel, user) do - db = state() - case :ets.lookup(db, {channel, user}) do - [] -> nil - [{{^channel, ^user}, chains}] -> chains - end - end - - def update_user_chain(channel, user, chain) do - db = state() - case user_chain(channel, user) do - nil -> :ets.insert_new(db, {{channel, user}, chain}) - chain -> :ets.insert(db, {{channel, user}, chain}) - end - end - - defp create_user_chain(channel, user) do - true = update_user_chain(channel, user, %Chain{order: cfg()[:order]}) - user_chain(channel, user) - end - - def save_chains() do - - end -end diff --git a/lib/contrib/markov/markov.ex b/lib/contrib/markov/markov.ex index ace5407..e8673b2 100644 --- a/lib/contrib/markov/markov.ex +++ b/lib/contrib/markov/markov.ex @@ -1,32 +1,64 @@ defmodule Omnibot.Contrib.Markov do - use Omnibot.Plugin.Base - alias Omnibot.Contrib.Markov - use Supervisor + use Omnibot.Plugin.Supervisor + alias Omnibot.Contrib.Markov.Chain + require Logger @default_config path: "markov", order: 2, save_every: 5 * 60 - def start_link(opts) do - Supervisor.start_link(__MODULE__, opts[:cfg], opts) + @impl true + def children(_cfg, _state) do + [{Task, fn -> + Stream.timer(1000) + |> Stream.cycle() + |> Stream.each(fn _ -> save_chains() end) + |> Stream.run() + end}] end @impl true - def init(cfg) do - children = [ - {Markov.Bot, cfg: cfg, name: Omnibot.Contrib.Markov.Bot}, - {Task, fn -> save_loop(cfg) end} - ] - Supervisor.init(children, strategy: :one_for_all) - end - - defp save_loop(cfg) do - save_every = cfg[:save_every] - Process.sleep(save_every * 1000) - Markov.Bot.save_chains() + def on_init(_cfg) do + # Create the markov database + path = String.to_atom(cfg()[:path]) + {:ok, db} = :dets.open_file(path, [:named_table]) + chains = :ets.new(:markov_chains, [:public]) + :dets.to_ets(db, chains) + :dets.close(db) end @impl true - def on_msg(irc, msg), do: Markov.Bot.on_msg(irc, msg) + def on_channel_msg(_irc, channel, nick, msg) do + train(channel, nick, msg) + end - @impl true - def on_channel_msg(irc, channel, nick, msg), do: Markov.Bot.on_channel_msg(irc, channel, nick, msg) + def train(channel, user, msg) do + chain = (user_chain(channel, user) || create_user_chain(channel, user)) + |> Chain.train(msg) + true = update_user_chain(channel, user, chain) + end + + def user_chain(channel, user) do + db = state() + case :ets.lookup(db, {channel, user}) do + [] -> nil + [{{^channel, ^user}, chains}] -> chains + end + end + + def update_user_chain(channel, user, chain) do + db = state() + case user_chain(channel, user) do + nil -> :ets.insert_new(db, {{channel, user}, chain}) + chain -> :ets.insert(db, {{channel, user}, chain}) + end + end + + defp create_user_chain(channel, user) do + true = update_user_chain(channel, user, %Chain{order: cfg()[:order]}) + user_chain(channel, user) + end + + def save_chains() do + # TODO + Logger.info("Saved markov chains") + end end diff --git a/lib/contrib/wordbot/bot.ex b/lib/contrib/wordbot/bot.ex deleted file mode 100644 index ad80b1c..0000000 --- a/lib/contrib/wordbot/bot.ex +++ /dev/null @@ -1,159 +0,0 @@ -defmodule Omnibot.Contrib.Wordbot.Bot do - use Omnibot.Plugin - - alias Omnibot.{Contrib.Wordbot, Irc, State, Util} - require Logger - - @split_pattern ~r/[\s\b]+/ - - ## Bot commands - - command "!wordbot", ["leaderboard"] do - Wordbot.Db.leaderboard(channel) - |> Enum.sort_by(& &1.score) - |> Enum.reverse() - |> Enum.take(5) - |> Enum.with_index() - |> Enum.map(fn {%{user: nick, score: score}, rank} -> "#{rank + 1}. #{Util.denotify_nick(nick)}. #{score}" end) - |> Enum.each(&Irc.send_to(irc, channel, &1)) - end - - ## Client API - - def words() do - {words, _watchers} = state() - words - end - - defp watchers() do - {_words, watchers} = state() - watchers - end - - defp update_watchers(mapping) do - update_state(fn {words, watchers} -> {words, apply(mapping, [watchers])} end) - end - - defp add_watcher(channel, task) do - update_watchers(&Map.put(&1, channel, task)) - end - - defp delete_watcher(channel) do - task = watchers()[channel] - update_watchers(&Map.delete(&1, channel)) - task - end - - defp lookup_watcher(channel) do - Map.get(watchers(), channel) - end - - defp has_watcher?(channel) do - case lookup_watcher(channel) do - nil -> false - task -> Process.alive?(task) - end - end - - def start_round(irc, channel) do - # Get round config - cfg = cfg() - num_words = cfg[:words_per_round] - duration = cfg[:hours_per_round] * 3600 - # Select words - words = Enum.take_random(words(), num_words) - # Try to start the round - if it's already running then that's OK - case Wordbot.Db.start_round(channel, words, duration) do - :ok -> Logger.debug("Started new wordbot round for #{channel}") - {:error, :game_running} -> Logger.debug("Wordbot game already running for #{channel}") - end - - # Try to start a watcher if there isn't one running - if !has_watcher?(channel), - do: start_watcher(irc, channel) - end - - defp start_watcher(irc, channel) do - # Start a watcher for the given channel - Logger.debug("Starting wordbot game watcher for #{channel}") - # Assert that there isn't a running watcher for the current channel - false = has_watcher?(channel) - task = Task.Supervisor.async_nolink( - Wordbot.Watchers, - fn -> watch_game(irc, channel) end, - [shutdown: :brutal_kill] - ) - add_watcher(channel, task) - end - - defp watch_game(irc, channel) do - # Poll every second to check if a game is finished - if Wordbot.Db.game_active?(channel) do - Process.sleep(1000) - watch_game(irc, channel) - else - finish_round(irc, channel) - end - end - - def finish_round(irc, channel) do - Logger.debug("Finishing wordbot round for #{channel}") - - # Announce scores - Irc.send_to(irc, channel, "Game over. Here were the scores:") - scores = Wordbot.Db.scores(channel) - |> Enum.sort_by(&(&1.score)) - |> Enum.reverse() - - # Ranking is a little weird because we want to rank people so that having - # the same score will give the same ranking, e.g. - # 1. user1. 4 - # 2. user2. 3 - # 2. user3. 3 - # 3. user4. 1 - rankings = scores - |> Enum.map(&(&1.score)) - |> Enum.sort() - |> Enum.uniq() - |> Enum.reverse() - |> Enum.with_index() - |> Map.new() - |> IO.inspect() - - Enum.each(scores, &Irc.send_to(irc, channel, "#{rankings[&1.score] + 1}. #{Util.denotify_nick(&1.user)}. #{&1.score}")) - - # Stop the watcher, start new round - delete_watcher(channel) - start_round(irc, channel) - end - - ## Plugin callbacks - - @impl true - def on_init(cfg) do - Wordbot.Db.ensure_db() - words = File.read!(cfg[:wordbot_source]) - |> String.split("\n") - watchers = %{} - {words, watchers} - end - - @impl true - def on_channel_msg(irc, channel, nick, msg) do - words = Regex.split(@split_pattern, msg) |> MapSet.new() - game_words = Wordbot.Db.unmatched_words(channel) |> MapSet.new() - MapSet.intersection(words, game_words) - |> Enum.each(fn word -> - Wordbot.Db.add_score(channel, nick, word, msg) - Irc.send_to(irc, channel, "#{nick}: Congrats! '#{word}' is good for 1 point.") - end) - end - - @impl true - def on_join(irc, channel, who) do - # Attempt to start a new round - if State.cfg().nick == who do - start_round(irc, channel) - end - end -end diff --git a/lib/contrib/wordbot/wordbot.ex b/lib/contrib/wordbot/wordbot.ex index 015c683..aa79c44 100644 --- a/lib/contrib/wordbot/wordbot.ex +++ b/lib/contrib/wordbot/wordbot.ex @@ -1,28 +1,183 @@ defmodule Omnibot.Contrib.Wordbot do - use Omnibot.Plugin.Base - use Supervisor + use Omnibot.Plugin.Supervisor + alias Omnibot.{Contrib.Wordbot, State, Util} require Logger - alias Omnibot.Contrib.Wordbot - @default_config wordbot_source: "words.txt", wordbot_db: "wordbot.db", words_per_round: 300, hours_per_round: 5 - def start_link(opts) do - Supervisor.start_link(__MODULE__, opts[:cfg], opts) + #def start_link(opts) do + #Supervisor.start_link(__MODULE__, opts[:cfg], opts) + #end + + #@impl true + #def init(cfg) do + # children = [ + # {Task.Supervisor, name: Omnibot.Contrib.Wordbot.Watchers, strategy: :one_for_one}, + # Wordbot.Db.child_spec(cfg[:wordbot_db]), + # {Wordbot.Bot, cfg: cfg, name: Omnibot.Contrib.Wordbot.Bot}, + # ] + + # Supervisor.init(children, strategy: :one_for_all) + #end + + @impl true + def children(cfg, _state) do + [ + {Task.Supervisor, name: Omnibot.Contrib.Wordbot.Watchers, strategy: :one_for_one}, + Wordbot.Db.child_spec(cfg[:wordbot_db]), + ] + end + + @split_pattern ~r/[\s\b]+/ + + ## Bot commands + + command "!wordbot", ["leaderboard"] do + Wordbot.Db.leaderboard(channel) + |> Enum.sort_by(& &1.score) + |> Enum.reverse() + |> Enum.take(5) + |> Enum.with_index() + |> Enum.map(fn {%{user: nick, score: score}, rank} -> "#{rank + 1}. #{Util.denotify_nick(nick)}. #{score}" end) + |> Enum.each(&Irc.send_to(irc, channel, &1)) + end + + ## Client API + + def words() do + {words, _watchers} = state() + words + end + + defp watchers() do + {_words, watchers} = state() + watchers + end + + defp update_watchers(mapping) do + update_state(fn {words, watchers} -> {words, apply(mapping, [watchers])} end) + end + + defp add_watcher(channel, task) do + update_watchers(&Map.put(&1, channel, task)) + end + + defp delete_watcher(channel) do + task = watchers()[channel] + update_watchers(&Map.delete(&1, channel)) + task + end + + defp lookup_watcher(channel) do + Map.get(watchers(), channel) + end + + defp has_watcher?(channel) do + case lookup_watcher(channel) do + nil -> false + task -> Process.alive?(task) + end + end + + def start_round(irc, channel) do + # Get round config + cfg = cfg() + num_words = cfg[:words_per_round] + duration = cfg[:hours_per_round] * 3600 + # Select words + words = Enum.take_random(words(), num_words) + # Try to start the round - if it's already running then that's OK + case Wordbot.Db.start_round(channel, words, duration) do + :ok -> Logger.debug("Started new wordbot round for #{channel}") + {:error, :game_running} -> Logger.debug("Wordbot game already running for #{channel}") + end + + # Try to start a watcher if there isn't one running + if !has_watcher?(channel), + do: start_watcher(irc, channel) + end + + defp start_watcher(irc, channel) do + # Start a watcher for the given channel + Logger.debug("Starting wordbot game watcher for #{channel}") + # Assert that there isn't a running watcher for the current channel + false = has_watcher?(channel) + task = Task.Supervisor.async_nolink( + Wordbot.Watchers, + fn -> watch_game(irc, channel) end, + [shutdown: :brutal_kill] + ) + add_watcher(channel, task) + end + + defp watch_game(irc, channel) do + # Poll every second to check if a game is finished + if Wordbot.Db.game_active?(channel) do + Process.sleep(1000) + watch_game(irc, channel) + else + finish_round(irc, channel) + end + end + + def finish_round(irc, channel) do + Logger.debug("Finishing wordbot round for #{channel}") + + # Announce scores + Irc.send_to(irc, channel, "Game over. Here were the scores:") + scores = Wordbot.Db.scores(channel) + |> Enum.sort_by(&(&1.score)) + |> Enum.reverse() + + # Ranking is a little weird because we want to rank people so that having + # the same score will give the same ranking, e.g. + # 1. user1. 4 + # 2. user2. 3 + # 2. user3. 3 + # 3. user4. 1 + rankings = scores + |> Enum.map(&(&1.score)) + |> Enum.sort() + |> Enum.uniq() + |> Enum.reverse() + |> Enum.with_index() + |> Map.new() + |> IO.inspect() + + Enum.each(scores, &Irc.send_to(irc, channel, "#{rankings[&1.score] + 1}. #{Util.denotify_nick(&1.user)}. #{&1.score}")) + + # Stop the watcher, start new round + delete_watcher(channel) + start_round(irc, channel) + end + + ## Plugin callbacks + + @impl true + def on_init(cfg) do + Wordbot.Db.ensure_db() + words = File.read!(cfg[:wordbot_source]) + |> String.split("\n") + watchers = %{} + {words, watchers} end @impl true - def init(cfg) do - children = [ - {Task.Supervisor, name: Omnibot.Contrib.Wordbot.Watchers, strategy: :one_for_one}, - Wordbot.Db.child_spec(cfg[:wordbot_db]), - {Wordbot.Bot, cfg: cfg, name: Omnibot.Contrib.Wordbot.Bot}, - ] - - Supervisor.init(children, strategy: :one_for_all) + def on_channel_msg(irc, channel, nick, msg) do + words = Regex.split(@split_pattern, msg) |> MapSet.new() + game_words = Wordbot.Db.unmatched_words(channel) |> MapSet.new() + MapSet.intersection(words, game_words) + |> Enum.each(fn word -> + Wordbot.Db.add_score(channel, nick, word, msg) + Irc.send_to(irc, channel, "#{nick}: Congrats! '#{word}' is good for 1 point.") + end) end - def on_msg(irc, msg), do: Wordbot.Bot.on_msg(irc, msg) - - def on_channel_msg(irc, channel, nick, msg), do: Wordbot.Bot.on_channel_msg(irc, channel, nick, msg) + @impl true + def on_join(irc, channel, who) do + # Attempt to start a new round + if State.cfg().nick == who do + start_round(irc, channel) + end + end end diff --git a/lib/plugin/base.ex b/lib/plugin/base.ex index 3b1dd27..edaf3f4 100644 --- a/lib/plugin/base.ex +++ b/lib/plugin/base.ex @@ -79,6 +79,7 @@ defmodule Omnibot.Plugin.Base do end end + #@callback handle_msg(irc :: pid(), msg :: %Omnibot.Irc.Msg{}) :: any @callback on_msg(irc :: pid(), msg :: %Omnibot.Irc.Msg{}) :: any @callback on_channel_msg(irc :: pid(), channel :: String.t(), nick :: String.t(), line :: String.t()) :: any @callback on_channel_msg( diff --git a/lib/plugin/supervisor.ex b/lib/plugin/supervisor.ex new file mode 100644 index 0000000..701582b --- /dev/null +++ b/lib/plugin/supervisor.ex @@ -0,0 +1,64 @@ +defmodule Omnibot.Plugin.Supervisor do + @default_opts [include_base: true, opts: [strategy: :one_for_one]] + + defmodule CfgState do + use Agent + + def start_link(opts) do + {cfg, opts} = Keyword.pop(opts, :cfg) + {state, opts} = Keyword.pop(opts, :state, nil) + Agent.start_link(fn -> {cfg, state} end, opts) + end + + def cfg(pid), do: Agent.get(pid, fn {cfg, _} -> cfg end) + + def state(pid), do: Agent.get(pid, fn {_, state} -> state end) + + def update_state(pid, fun, timeout \\ 5000), + do: Agent.update(pid, &{&1, apply(fun, [&1])}, timeout) + end + + defmacro __using__(opts) do + opts = opts ++ @default_opts + + quote do + use Supervisor + use Omnibot.Plugin.Base + + ## Client API + + def start_link(opts) do + Supervisor.start_link(__MODULE__, {opts[:cfg], opts[:state]}, opts) + end + + def cfg() do + Omnibot.Plugin.Supervisor.CfgState.cfg(__MODULE__.CfgState) + end + + def state() do + Omnibot.Plugin.Supervisor.CfgState.state(__MODULE__.CfgState) + end + + def update_state(fun) do + Omnibot.Plugin.Supervisor.CfgState.update_state(__MODULE__.CfgState, fun) + end + + ## Server callbacks + + @impl Supervisor + def init({cfg, state}) do + + base_children = [ + {Omnibot.Plugin.Supervisor.CfgState, cfg: cfg, state: state, name: __MODULE__.CfgState}, + ] + children = + (if unquote(opts[:include_base]), do: base_children, else: []) ++ children(cfg, state) + Supervisor.init(children, unquote(opts[:opts])) + end + + @behaviour Omnibot.Plugin.Supervisor + end + end + + @callback children(cfg :: [atom: any], state :: any) :: [atom | {atom, [atom: any]} | {atom, any, [atom: any]}] +end diff --git a/lib/plugin_supervisor.ex b/lib/plugin_supervisor.ex index 291a71d..e0e40de 100644 --- a/lib/plugin_supervisor.ex +++ b/lib/plugin_supervisor.ex @@ -15,7 +15,7 @@ defmodule Omnibot.PluginSupervisor do # These are plugins that need to be loaded for core functionality of the bot core = [ - Omnibot.Core + Omnibot.Core, ] # Map the plugins in the configuration to the children