From 5725113a9164273798cd161e4d2269fe6dfda86a Mon Sep 17 00:00:00 2001 From: bigeagle Date: Sat, 25 Oct 2014 19:19:04 +0800 Subject: [PATCH] add status manager --- examples/tunasync.conf | 20 +++++-- tunasync.py | 78 ------------------------- tunasync/jobs.py | 5 +- tunasync/mirror_config.py | 109 +++++++++++++++++++++++++++++++++++ tunasync/status_manager.py | 61 ++++++++++++++++++++ tunasync/tunasync.py | 115 ++++--------------------------------- 6 files changed, 198 insertions(+), 190 deletions(-) create mode 100644 tunasync/mirror_config.py create mode 100644 tunasync/status_manager.py diff --git a/examples/tunasync.conf b/examples/tunasync.conf index 44846c0..191a7ce 100644 --- a/examples/tunasync.conf +++ b/examples/tunasync.conf @@ -4,6 +4,7 @@ log_dir = "/var/log/tunasync" mirror_root = "/mnt/sdb1/mirror" use_btrfs = false local_dir = "{mirror_root}/_working/{mirror_name}/" +status_file = "/tmp/tunasync.json" # maximum numbers of running jobs concurrent = 2 # interval in minutes @@ -16,19 +17,26 @@ working_dir = "{mirror_root}/_working/{mirror_name}" gc_root = "{mirror_root}/_garbage/" gc_dir = "{mirror_root}/_garbage/_gc_{mirror_name}_{{timestamp}}" +# [[mirrors]] +# name = "archlinux" +# provider = "rsync" +# upstream = "rsync://mirror.us.leaseweb.net/archlinux/" +# log_file = "/tmp/archlinux-{date}.log" +# use_ipv6 = true + [[mirrors]] -name = "archlinux" -provider = "rsync" -upstream = "rsync://mirror.us.leaseweb.net/archlinux/" -log_file = "/tmp/archlinux-{date}.log" -use_ipv6 = true +name = "arch1" +provider = "shell" +command = "sleep 10" +local_dir = "/mnt/sdb1/mirror/archlinux/current/" +log_file = "/dev/null" [[mirrors]] name = "arch2" provider = "shell" command = "sleep 20" local_dir = "/mnt/sdb1/mirror/archlinux/current/" -log_file = "/tmp/arch2-{date}.log" +log_file = "/dev/null" [[mirrors]] name = "arch4" diff --git a/tunasync.py b/tunasync.py index ceb22d0..4c4a752 100644 --- a/tunasync.py +++ b/tunasync.py @@ -2,79 +2,8 @@ # -*- coding:utf-8 -*- import os import argparse -import json -from datetime import datetime from tunasync import TUNASync -from tunasync.hook import JobHook - - -class IndexPageHook(JobHook): - - def __init__(self, parent, dbfile): - self.parent = parent - self.dbfile = dbfile - - @property - def mirrors(self): - mirrors = {} - try: - with open(self.dbfile) as f: - _mirrors = json.load(f) - for m in _mirrors: - mirrors[m["name"]] = m - except: - for name, _ in self.parent.mirrors.iteritems(): - mirrors[name] = { - 'name': name, - 'last_update': '-', - 'status': 'unknown', - } - return mirrors - - def before_job(self, name=None, *args, **kwargs): - if name is None: - return - mirrors = self.mirrors - _m = mirrors.get(name, { - 'name': name, - 'last_update': '-', - 'status': '-', - }) - - mirrors[name] = { - 'name': name, - 'last_update': _m['last_update'], - 'status': 'syncing' - } - with open(self.dbfile, 'wb') as f: - _mirrors = sorted( - [m for _, m in mirrors.items()], - key=lambda x: x['name'] - ) - - json.dump(_mirrors, f) - - def after_job(self, name=None, status="unknown", *args, **kwargs): - if name is None: - return - - print("Updating tunasync.json") - now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - mirrors = self.mirrors - mirrors[name] = { - 'name': name, - 'last_update': now, - 'status': status - } - with open(self.dbfile, 'wb') as f: - - _mirrors = sorted( - [m for _, m in mirrors.items()], - key=lambda x: x['name'] - ) - - json.dump(_mirrors, f) if __name__ == "__main__": @@ -83,9 +12,6 @@ if __name__ == "__main__": parser = argparse.ArgumentParser(prog="tunasync") parser.add_argument("-c", "--config", default="tunasync.ini", help="config file") - parser.add_argument("--dbfile", - default="tunasync.json", - help="mirror status db file") parser.add_argument("--pidfile", default="/var/run/tunasync.pid", help="pidfile") @@ -97,10 +23,6 @@ if __name__ == "__main__": tunaSync = TUNASync() tunaSync.read_config(args.config) - index_hook = IndexPageHook(tunaSync, args.dbfile) - - tunaSync.add_hook(index_hook) - tunaSync.run_jobs() # vim: ts=4 sw=4 sts=4 expandtab diff --git a/tunasync/jobs.py b/tunasync/jobs.py index 36bfbb3..92daf3d 100644 --- a/tunasync/jobs.py +++ b/tunasync/jobs.py @@ -28,7 +28,8 @@ def run_job(sema, child_q, manager_q, provider, **settings): break aquired = True - status = "unkown" + status = "syncing" + manager_q.put((provider.name, status)) try: for hook in provider.hooks: hook.before_job(name=provider.name) @@ -65,6 +66,8 @@ def run_job(sema, child_q, manager_q, provider, **settings): provider.name, provider.interval )) + manager_q.put((provider.name, status)) + try: msg = child_q.get(timeout=provider.interval * 60) if msg == "terminate": diff --git a/tunasync/mirror_config.py b/tunasync/mirror_config.py new file mode 100644 index 0000000..1220693 --- /dev/null +++ b/tunasync/mirror_config.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python2 +# -*- coding:utf-8 -*- +import os +from .mirror_provider import RsyncProvider, ShellProvider +from .btrfs_snapshot import BtrfsHook + + +class MirrorConfig(object): + + _valid_providers = set(("rsync", "debmirror", "shell", )) + + def __init__(self, parent, options): + self._parent = parent + self._popt = self._parent._settings + self.options = dict(options.items()) # copy + self._validate() + + def _validate(self): + provider = self.options.get("provider", None) + assert provider in self._valid_providers + + if provider == "rsync": + assert "upstream" in self.options + + elif provider == "shell": + assert "command" in self.options + + local_dir_tmpl = self.options.get( + "local_dir", self._popt["global"]["local_dir"]) + + self.options["local_dir"] = local_dir_tmpl.format( + mirror_root=self._popt["global"]["mirror_root"], + mirror_name=self.name, + ) + + if "interval" not in self.options: + self.options["interval"] = self._popt["global"]["interval"] + + assert isinstance(self.options["interval"], int) + + log_dir = self._popt["global"]["log_dir"] + if "log_file" not in self.options: + self.options["log_file"] = os.path.join( + log_dir, self.name, "{date}.log") + + if "use_btrfs" not in self.options: + self.options["use_btrfs"] = self._parent.use_btrfs + assert self.options["use_btrfs"] in (True, False) + + def __getattr__(self, key): + if key in self.__dict__: + return self.__dict__[key] + else: + return self.__dict__["options"].get(key, None) + + def to_provider(self, hooks=[]): + if self.provider == "rsync": + provider = RsyncProvider( + self.name, + self.upstream, + self.local_dir, + self.use_ipv6, + self.exclude_file, + self.log_file, + self.interval, + hooks, + ) + elif self.options["provider"] == "shell": + provider = ShellProvider( + self.name, + self.command, + self.local_dir, + self.log_file, + self.interval, + hooks + ) + + return provider + + def compare(self, other): + assert self.name == other.name + + for key, val in self.options.iteritems(): + if other.options.get(key, None) != val: + return False + + return True + + def hooks(self): + hooks = [] + parent = self._parent + if self.options["use_btrfs"]: + working_dir = parent.btrfs_working_dir_tmpl.format( + mirror_root=parent.mirror_root, + mirror_name=self.name + ) + service_dir = parent.btrfs_service_dir_tmpl.format( + mirror_root=parent.mirror_root, + mirror_name=self.name + ) + gc_dir = parent.btrfs_gc_dir_tmpl.format( + mirror_root=parent.mirror_root, + mirror_name=self.name + ) + hooks.append(BtrfsHook(service_dir, working_dir, gc_dir)) + + return hooks + +# vim: ts=4 sw=4 sts=4 expandtab diff --git a/tunasync/status_manager.py b/tunasync/status_manager.py new file mode 100644 index 0000000..7c14687 --- /dev/null +++ b/tunasync/status_manager.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python2 +# -*- coding:utf-8 -*- +import json +from datetime import datetime + + +class StatusManager(object): + + def __init__(self, parent, dbfile): + self.parent = parent + self.dbfile = dbfile + self.init_mirrors() + + def init_mirrors(self): + mirrors = {} + try: + with open(self.dbfile) as f: + _mirrors = json.load(f) + for m in _mirrors: + mirrors[m["name"]] = m + except: + for name, _ in self.parent.mirrors.iteritems(): + mirrors[name] = { + 'name': name, + 'last_update': '-', + 'status': 'unknown', + } + self.mirrors = mirrors + + def update_status(self, name, status): + + _m = self.mirrors.get(name, { + 'name': name, + 'last_update': '-', + 'status': '-', + }) + + if status in ("syncing", "fail"): + update_time = _m["last_update"] + elif status == "success": + update_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + else: + print("Invalid status: {}, from {}".format(status, name)) + + self.mirrors[name] = { + 'name': name, + 'last_update': update_time, + 'status': status, + } + + with open(self.dbfile, 'wb') as f: + _mirrors = sorted( + [m for _, m in self.mirrors.items()], + key=lambda x: x['name'] + ) + + print("Updated status file, {}:{}".format(name, status)) + json.dump(_mirrors, f) + + +# vim: ts=4 sw=4 sts=4 expandtab diff --git a/tunasync/tunasync.py b/tunasync/tunasync.py index edb186e..4916ebf 100644 --- a/tunasync/tunasync.py +++ b/tunasync/tunasync.py @@ -1,117 +1,14 @@ #!/usr/bin/env python2 # -*- coding:utf-8 -*- -import os.path import signal import sys import toml from multiprocessing import Process, Semaphore, Queue from . import jobs -from .mirror_provider import RsyncProvider, ShellProvider -from .btrfs_snapshot import BtrfsHook from .hook import JobHook - - -class MirrorConfig(object): - - _valid_providers = set(("rsync", "debmirror", "shell", )) - - def __init__(self, parent, options): - self._parent = parent - self._popt = self._parent._settings - self.options = dict(options.items()) # copy - self._validate() - - def _validate(self): - provider = self.options.get("provider", None) - assert provider in self._valid_providers - - if provider == "rsync": - assert "upstream" in self.options - - elif provider == "shell": - assert "command" in self.options - - local_dir_tmpl = self.options.get( - "local_dir", self._popt["global"]["local_dir"]) - - self.options["local_dir"] = local_dir_tmpl.format( - mirror_root=self._popt["global"]["mirror_root"], - mirror_name=self.name, - ) - - if "interval" not in self.options: - self.options["interval"] = self._popt["global"]["interval"] - - assert isinstance(self.options["interval"], int) - - log_dir = self._popt["global"]["log_dir"] - if "log_file" not in self.options: - self.options["log_file"] = os.path.join( - log_dir, self.name, "{date}.log") - - if "use_btrfs" not in self.options: - self.options["use_btrfs"] = self._parent.use_btrfs - assert self.options["use_btrfs"] in (True, False) - - def __getattr__(self, key): - if key in self.__dict__: - return self.__dict__[key] - else: - return self.__dict__["options"].get(key, None) - - def to_provider(self, hooks=[]): - if self.provider == "rsync": - provider = RsyncProvider( - self.name, - self.upstream, - self.local_dir, - self.use_ipv6, - self.exclude_file, - self.log_file, - self.interval, - hooks, - ) - elif self.options["provider"] == "shell": - provider = ShellProvider( - self.name, - self.command, - self.local_dir, - self.log_file, - self.interval, - hooks - ) - - return provider - - def compare(self, other): - assert self.name == other.name - - for key, val in self.options.iteritems(): - if other.options.get(key, None) != val: - return False - - return True - - def hooks(self): - hooks = [] - parent = self._parent - if self.options["use_btrfs"]: - working_dir = parent.btrfs_working_dir_tmpl.format( - mirror_root=parent.mirror_root, - mirror_name=self.name - ) - service_dir = parent.btrfs_service_dir_tmpl.format( - mirror_root=parent.mirror_root, - mirror_name=self.name - ) - gc_dir = parent.btrfs_gc_dir_tmpl.format( - mirror_root=parent.mirror_root, - mirror_name=self.name - ) - hooks.append(BtrfsHook(service_dir, working_dir, gc_dir)) - - return hooks +from .mirror_config import MirrorConfig +from .status_manager import StatusManager class TUNASync(object): @@ -146,6 +43,9 @@ class TUNASync(object): self.btrfs_working_dir_tmpl = self._settings["btrfs"]["working_dir"] self.btrfs_gc_dir_tmpl = self._settings["btrfs"]["gc_dir"] + self.status_file = self._settings["global"]["status_file"] + self.status_manager = StatusManager(self, self.status_file) + def add_hook(self, h): assert isinstance(h, JobHook) self._hooks.append(h) @@ -203,6 +103,11 @@ class TUNASync(object): if status == "QUIT": print("New configuration applied to {}".format(name)) self.run_provider(name) + else: + try: + self.status_manager.update_status(name, status) + except Exception as e: + print(e) def run_provider(self, name): if name not in self.providers: