markov: Add up process-scheduled saving
This allows markov to save (hopefully) in parallel using a ProcessPoolExecutor. Since objects are sent over-the-wire and copied, pruning in parallel is not an issue. Signed-off-by: Alek Ratzloff <alekratz@gmail.com>
This commit is contained in:
@@ -264,17 +264,30 @@ class Markov(Plugin):
|
||||
|
||||
async def save(self, retain_after: float | None = None):
|
||||
async with self.__saving:
|
||||
from concurrent.futures import ProcessPoolExecutor
|
||||
|
||||
log.info("Saving markov chains")
|
||||
for chains in self.__chains.values():
|
||||
for chain in chains.values():
|
||||
chain.save()
|
||||
# Prune
|
||||
retain = True
|
||||
if retain_after is not None:
|
||||
retain = chain.last_access > retain_after
|
||||
if not retain:
|
||||
log.debug("Pruning markov chain %s from memory", chain.path)
|
||||
chain.clear_cache()
|
||||
coros = []
|
||||
loop = asyncio.get_running_loop()
|
||||
# ProcessPoolExecutor is an explicit decision I've made to use,
|
||||
# because it allows us to save in a different process, with
|
||||
# different memory, and simultaneously clear it if it needs to be
|
||||
# cleared.
|
||||
with ProcessPoolExecutor() as pool:
|
||||
for chains in self.__chains.values():
|
||||
for chain in chains.values():
|
||||
# Start the save in a new process, in a new task.
|
||||
coro = loop.run_in_executor(pool, chain.save)
|
||||
coros += [coro]
|
||||
# Prune
|
||||
retain = True
|
||||
if retain_after is not None:
|
||||
retain = chain.last_access > retain_after
|
||||
if not retain:
|
||||
log.info("Pruning markov chain %s from memory", chain.path)
|
||||
chain.clear_cache()
|
||||
if coros:
|
||||
await asyncio.gather(*coros)
|
||||
log.info("Done")
|
||||
|
||||
async def on_unload(self, conn: IrcProtocol):
|
||||
|
||||
Reference in New Issue
Block a user