diff --git a/examples/tunasync.conf b/examples/tunasync.conf index 61d114e..6cc2600 100644 --- a/examples/tunasync.conf +++ b/examples/tunasync.conf @@ -40,6 +40,17 @@ command = "sleep 20" local_dir = "/mnt/sdb1/mirror/archlinux/current/" # log_file = "/dev/null" + +[[mirrors]] +name = "arch3" +provider = "two-stage-rsync" +stage1_profile = "debian" +upstream = "/tmp/rsync_test/src/" +local_dir = "/tmp/rsync_test/dst/" +log_file = "/tmp/rsync_test/log" +# log_file = "/dev/null" +no_delay = true + [[mirrors]] name = "arch4" provider = "shell" diff --git a/tunasync/jobs.py b/tunasync/jobs.py index fd48d72..cc84577 100644 --- a/tunasync/jobs.py +++ b/tunasync/jobs.py @@ -54,13 +54,14 @@ def run_job(sema, child_q, manager_q, provider, **settings): for retry in range(max_retry): manager_q.put(("UPDATE", (provider.name, status, ctx))) print("start syncing {}, retry: {}".format(provider.name, retry)) - provider.run(ctx=ctx) - status = "success" try: + provider.run(ctx=ctx) provider.wait() except sh.ErrorReturnCode: status = "fail" + else: + status = "success" if status == "success": break diff --git a/tunasync/mirror_config.py b/tunasync/mirror_config.py index df2c365..ea7bc50 100644 --- a/tunasync/mirror_config.py +++ b/tunasync/mirror_config.py @@ -2,7 +2,7 @@ # -*- coding:utf-8 -*- import os from datetime import datetime -from .mirror_provider import RsyncProvider, ShellProvider +from .mirror_provider import RsyncProvider, TwoStageRsyncProvider, ShellProvider from .btrfs_snapshot import BtrfsHook from .loglimit import LogLimitHook from .exec_pre_post import CmdExecHook @@ -10,7 +10,7 @@ from .exec_pre_post import CmdExecHook class MirrorConfig(object): - _valid_providers = set(("rsync", "debmirror", "shell", )) + _valid_providers = set(("rsync", "two-stage-rsync", "shell", )) def __init__(self, parent, options): self._parent = parent @@ -60,38 +60,48 @@ class MirrorConfig(object): return self.__dict__["options"].get(key, None) def to_provider(self, hooks=[], no_delay=False): + + kwargs = { + 'name': self.name, + 'upstream_url': self.upstream, + 'local_dir': self.local_dir, + 'log_dir': self.log_dir, + 'log_file': self.log_file, + 'interval': self.interval, + 'hooks': hooks, + } + if self.provider == "rsync": - provider = RsyncProvider( - name=self.name, - upstream_url=self.upstream, - local_dir=self.local_dir, - log_dir=self.log_dir, - useIPv6=self.use_ipv6, - password=self.password, - exclude_file=self.exclude_file, - log_file=self.log_file, - interval=self.interval, - hooks=hooks, - ) + kwargs.update({ + 'useIPv6': self.use_ipv6, + 'password': self.password, + 'exclude_file': self.exclude_file, + }) + provider = RsyncProvider(**kwargs) + + elif self.provider == "two-stage-rsync": + kwargs.update({ + 'useIPv6': self.use_ipv6, + 'password': self.password, + 'exclude_file': self.exclude_file, + }) + provider = TwoStageRsyncProvider(**kwargs) + provider.set_stage1_profile(self.stage1_profile) + elif self.options["provider"] == "shell": - provider = ShellProvider( - name=self.name, - command=self.command, - upstream_url=self.upstream, - local_dir=self.local_dir, - log_dir=self.log_dir, - log_file=self.log_file, - log_stdout=self.options.get("log_stdout", True), - interval=self.interval, - hooks=hooks - ) + kwargs.update({ + 'command': self.command, + 'log_stdout': self.options.get("log_stdout", True), + }) + + provider = ShellProvider(**kwargs) if not no_delay: sm = self._parent.status_manager last_update = sm.get_info(self.name, 'last_update') if last_update not in (None, '-'): - last_update = datetime.strptime(last_update, - '%Y-%m-%d %H:%M:%S') + last_update = datetime.strptime( + last_update, '%Y-%m-%d %H:%M:%S') delay = int(last_update.strftime("%s")) \ + self.interval * 60 - int(datetime.now().strftime("%s")) if delay < 0: diff --git a/tunasync/mirror_provider.py b/tunasync/mirror_provider.py index 31b7047..6f36c16 100644 --- a/tunasync/mirror_provider.py +++ b/tunasync/mirror_provider.py @@ -59,8 +59,8 @@ class MirrorProvider(object): class RsyncProvider(MirrorProvider): _default_options = ['-aHvh', '--no-o', '--no-g', '--stats', + '--exclude', '.~tmp~/', '--delete', '--delete-after', '--delay-updates', - '--exclude .~tmp~/', '--safe-links', '--timeout=120', '--contimeout=120'] def __init__(self, name, upstream_url, local_dir, log_dir, @@ -100,8 +100,76 @@ class RsyncProvider(MirrorProvider): if self.password is not None: new_env["RSYNC_PASSWORD"] = self.password - self.p = sh.rsync(*_args, _env=new_env, _out=log_file, _err=log_file, - _out_bufsize=1, _bg=True) + self.p = sh.rsync(*_args, _env=new_env, _out=log_file, + _err_to_out=True, _out_bufsize=1, _bg=True) + + +class TwoStageRsyncProvider(RsyncProvider): + + _stage1_options = ['-aHvh', '--no-o', '--no-g', + '--exclude', '.~tmp~/', + '--safe-links', '--timeout=120', '--contimeout=120'] + + _stage2_options = ['-aHvh', '--no-o', '--no-g', '--stats', + '--exclude', '.~tmp~/', + '--delete', '--delete-after', '--delay-updates', + '--safe-links', '--timeout=120', '--contimeout=120'] + + _stage1_profiles = { + "debian": [ + 'Packages*', 'Sources*', 'Release*', + 'InRelease', 'i18n/*', 'ls-lR*', + ] + } + + def set_stage1_profile(self, profile): + if profile not in self._stage1_profiles: + raise Exception("Profile Undefined!") + + self._stage1_excludes = self._stage1_profiles[profile] + + def options(self, stage): + _default_options = self._stage1_options \ + if stage == 1 else self._stage2_options + _options = [o for o in _default_options] # copy + + if stage == 1: + for _exc in self._stage1_excludes: + _options.append("--exclude") + _options.append(_exc) + + if self.useIPv6: + _options.append("-6") + + if self.exclude_file: + _options.append("--exclude-from") + _options.append(self.exclude_file) + + return _options + + def run(self, ctx={}): + working_dir = ctx.get("current_dir", self.local_dir) + log_file = self.get_log_file(ctx) + new_env = os.environ.copy() + if self.password is not None: + new_env["RSYNC_PASSWORD"] = self.password + + with open(log_file, 'w', buffering=1) as f: + def log_output(line): + f.write(line) + + for stage in (1, 2): + + _args = self.options(stage) + _args.append(self.upstream_url) + _args.append(working_dir) + f.write("==== Stage {} Begins ====\n\n".format(stage)) + + self.p = sh.rsync( + *_args, _env=new_env, _out=log_output, + _err_to_out=True, _out_bufsize=1, _bg=False + ) + self.p.wait() class ShellProvider(MirrorProvider): @@ -133,7 +201,7 @@ class ShellProvider(MirrorProvider): if self.log_stdout: self.p = cmd(*_args, _env=new_env, _out=log_file, - _err=log_file, _out_bufsize=1, _bg=True) + _err_to_out=True, _out_bufsize=1, _bg=True) else: self.p = cmd(*_args, _env=new_env, _out='/dev/null', _err='/dev/null', _out_bufsize=1, _bg=True) diff --git a/tunasync/tunasync.py b/tunasync/tunasync.py index 93615fc..9076058 100644 --- a/tunasync/tunasync.py +++ b/tunasync/tunasync.py @@ -82,7 +82,7 @@ class TUNASync(object): for name, mirror in self.mirrors.iteritems(): hooks = mirror.hooks() + self.hooks() - provider = mirror.to_provider(hooks) + provider = mirror.to_provider(hooks, no_delay=mirror.no_delay) self._providers[name] = provider return self._providers