From 1d016c5164184e965fe7b147813055c8e08bc53f Mon Sep 17 00:00:00 2001 From: Alek Ratzloff Date: Fri, 3 Jun 2022 18:24:25 -0700 Subject: [PATCH] markov: Add up process-scheduled saving This allows markov to save (hopefully) in parallel using a ProcessPoolExecutor. Since objects are sent over-the-wire and copied, pruning in parallel is not an issue. Signed-off-by: Alek Ratzloff --- plugins/markov.py | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/plugins/markov.py b/plugins/markov.py index 24cb328..3bde931 100644 --- a/plugins/markov.py +++ b/plugins/markov.py @@ -264,17 +264,30 @@ class Markov(Plugin): async def save(self, retain_after: float | None = None): async with self.__saving: + from concurrent.futures import ProcessPoolExecutor + log.info("Saving markov chains") - for chains in self.__chains.values(): - for chain in chains.values(): - chain.save() - # Prune - retain = True - if retain_after is not None: - retain = chain.last_access > retain_after - if not retain: - log.debug("Pruning markov chain %s from memory", chain.path) - chain.clear_cache() + coros = [] + loop = asyncio.get_running_loop() + # ProcessPoolExecutor is an explicit decision I've made to use, + # because it allows us to save in a different process, with + # different memory, and simultaneously clear it if it needs to be + # cleared. + with ProcessPoolExecutor() as pool: + for chains in self.__chains.values(): + for chain in chains.values(): + # Start the save in a new process, in a new task. + coro = loop.run_in_executor(pool, chain.save) + coros += [coro] + # Prune + retain = True + if retain_after is not None: + retain = chain.last_access > retain_after + if not retain: + log.info("Pruning markov chain %s from memory", chain.path) + chain.clear_cache() + if coros: + await asyncio.gather(*coros) log.info("Done") async def on_unload(self, conn: IrcProtocol):