ADAM / env /process_monitor.py
YuShu
Initial commit
36ba3ef
raw
history blame contribute delete
2.75 kB
import time
import re
import warnings
from typing import List
import psutil
import subprocess
import logging
import threading
import utils as U
class SubprocessMonitor:
def __init__(
self,
commands: List[str],
name: str,
ready_match: str = r".*",
log_path: str = "logs",
callback_match: str = r"^(?!x)x$", # regex that will never match
callback: callable = None,
finished_callback: callable = None,
):
self.commands = commands
start_time = time.strftime("%Y%m%d_%H%M%S")
self.name = name
self.logger = logging.getLogger(name)
handler = logging.FileHandler(U.f_join(log_path, f"{start_time}.log"))
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
self.process = None
self.ready_match = ready_match
self.ready_event = None
self.ready_line = None
self.callback_match = callback_match
self.callback = callback
self.finished_callback = finished_callback
self.thread = None
def _start(self):
self.logger.info(f"Starting subprocess with commands: {self.commands}")
self.process = psutil.Popen(
self.commands,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
)
print(f"Subprocess {self.name} started with PID {self.process.pid}.")
for line in iter(self.process.stdout.readline, ""):
self.logger.info(line.strip())
if re.search(self.ready_match, line):
self.ready_line = line
self.logger.info("Subprocess is ready.")
self.ready_event.set()
if re.search(self.callback_match, line):
self.callback()
if not self.ready_event.is_set():
self.ready_event.set()
warnings.warn(f"Subprocess {self.name} failed to start.")
if self.finished_callback:
self.finished_callback()
def run(self):
self.ready_event = threading.Event()
self.ready_line = None
self.thread = threading.Thread(target=self._start)
self.thread.start()
self.ready_event.wait()
def stop(self):
self.logger.info("Stopping subprocess.")
if self.process and self.process.is_running():
self.process.terminate()
self.process.wait()
@property
def is_running(self):
if self.process is None:
return False
return self.process.is_running()