Source code for geeservermap.async_jobs

"""TODO Missing docstring."""

import atexit
import time
import uuid
from contextlib import suppress
from copy import deepcopy
from threading import Thread


[docs] class Async: """TODO Missing docstring."""
[docs] _INTERVAL = 60
[docs] _TIMEOUT = 300
def __init__(self): """TODO Missing docstring.""" self._jobs = {} self.cleanup_running = False atexit.register(self._terminate)
[docs] def _run_cleanup(self): """TODO Missing docstring.""" self.cleanup_thread = Thread(target=self._cleanup_timedout_jobs) self.cleanup_thread.start()
# This MUST be called when flask wants to exit otherwise the process will hang for a while
[docs] def _terminate(self): """TODO Missing docstring.""" self._jobs = None with suppress(Exception): self.cleanup_thread.join()
[docs] def get_job_result(self, job_id): """TODO Missing docstring.""" if not self._jobs: return None job = self._jobs.get(job_id) if job["state"] in ["finished", "failed"]: self.remove_job(job) return job
[docs] def _create_job(self): """TODO Missing docstring.""" job = { "id": uuid.uuid4().hex, "ready": False, "result": None, "created": time.time(), "state": "created", "finished": None, } self._jobs[job["id"]] = job if not self.cleanup_running: self._run_cleanup() self.cleanup_running = True return job
[docs] def _start_job(self, thread, job): """TODO Missing docstring.""" job["state"] = "started" thread.daemon = True thread.start()
[docs] def _finish_job(self, job, result): """TODO Missing docstring.""" job["ready"] = True job["state"] = "finished" if self._is_job_alive(job): job["result"] = result job["finished"] = time.time()
[docs] def remove_job(self, job): """TODO Missing docstring.""" if self._jobs and job["id"] in self._jobs: # logger.info(f'removing job {job["id"]}') del self._jobs[job["id"]]
[docs] def _is_job_alive(self, job): """TODO Missing docstring.""" return ( self._jobs and job is not None and job["id"] in self._jobs and job["ready"] is not None )
# Iterate though jobs every minute and remove the stale ones
[docs] def _cleanup_timedout_jobs(self): """TODO Missing docstring.""" next_cleanup_time = time.time() while self._jobs is not None: time.sleep(5) now = time.time() proxy = deepcopy(self._jobs) if proxy and now >= next_cleanup_time: for job in proxy.values(): if job["state"] == "finished" and ( job["finished"] + self._TIMEOUT > now ): self.remove_job(job) next_cleanup_time = time.time() + self._INTERVAL self.cleanup_running = False
[docs] asyncgee = Async()