Skip to content

Server

Flask-based web server that runs a continuous sync loop in a background thread and serves a live dashboard at / and a JSON health endpoint at /health.

SyncServer is started by the CLI when --server is passed. It is not intended to be exposed directly to the internet — use a reverse proxy (nginx, Caddy) if you need that.

devpi_gitea_sync.server

PackageRow dataclass

A single package-version entry for the dashboard.

Parameters:

Name Type Description Default
mapping str

The mapping name this row belongs to.

required
organization str

The Gitea organization.

required
index str

The Devpi index (user/index format).

required
package str

The canonical package name.

required
version str

The package version string.

required
files list[str]

List of distribution filenames available in Gitea.

required
in_devpi bool

Whether this version is already present in the Devpi index.

required
Source code in devpi_gitea_sync/server.py
@dataclass
class PackageRow:
    """A single package-version entry for the dashboard.

    :param mapping: The mapping name this row belongs to.
    :param organization: The Gitea organization.
    :param index: The Devpi index (user/index format).
    :param package: The canonical package name.
    :param version: The package version string.
    :param files: List of distribution filenames available in Gitea.
    :param in_devpi: Whether this version is already present in the Devpi index.
    """

    mapping: str
    organization: str
    index: str
    package: str
    version: str
    files: list[str]
    in_devpi: bool

SyncServer

Web server that continuously syncs Gitea packages to Devpi and serves a status dashboard.

Parameters:

Name Type Description Default
config SyncConfig

The synchronization configuration.

required
host str

The host address to bind the web server to.

'0.0.0.0'
port int

The port number to listen on.

8080
dry_run bool

If True, skip uploads and only display current state.

False
force bool

If True, re-upload versions that already exist in Devpi.

False
organizations list[str] | None

Limit sync and display to these Gitea organizations.

None
mapping_names list[str] | None

Limit sync and display to these mapping names.

None
Source code in devpi_gitea_sync/server.py
class SyncServer:
    """Web server that continuously syncs Gitea packages to Devpi and serves a status dashboard.

    :param config: The synchronization configuration.
    :param host: The host address to bind the web server to.
    :param port: The port number to listen on.
    :param dry_run: If True, skip uploads and only display current state.
    :param force: If True, re-upload versions that already exist in Devpi.
    :param organizations: Limit sync and display to these Gitea organizations.
    :param mapping_names: Limit sync and display to these mapping names.
    """

    def __init__(
        self,
        config: SyncConfig,
        *,
        host: str = "0.0.0.0",
        port: int = 8080,
        dry_run: bool = False,
        force: bool = False,
        organizations: list[str] | None = None,
        mapping_names: list[str] | None = None,
    ) -> None:
        self._config = config
        self._host = host
        self._port = port
        self._dry_run = dry_run
        self._force = force
        self._organizations = organizations
        self._mapping_names = mapping_names
        self._state = _State()
        self._lock = threading.Lock()
        self._app = self._build_app()

    def run(self) -> None:
        """Start the background sync thread and the Flask web server."""
        thread = threading.Thread(target=self._sync_loop, daemon=True, name="sync-worker")
        thread.start()
        logger.info(
            "Web server listening on http://%s:%d — dashboard at / health at /health",
            self._host,
            self._port,
        )
        self._app.run(host=self._host, port=self._port, threaded=True, use_reloader=False)

    # ------------------------------------------------------------------
    # Background sync loop
    # ------------------------------------------------------------------

    def _sync_loop(self) -> None:
        while True:
            self._refresh()
            interval = self._config.poll_interval_seconds
            logger.debug("Next refresh in %d seconds.", interval)
            time.sleep(interval)

    def _refresh(self) -> None:
        with self._lock:
            self._state.refreshing = True

        logger.info("Refresh started.")
        errors: list[str] = []

        if not self._dry_run:
            try:
                sync_packages(
                    self._config,
                    organizations=self._organizations,
                    mapping_names=self._mapping_names,
                    dry_run=False,
                    force=self._force,
                )
            except SyncFailure as exc:
                errors.append(str(exc))
                logger.error("Sync failed: %s", exc)

        rows = self._collect_all_rows(errors)

        with self._lock:
            self._state.packages = rows
            self._state.errors = errors
            self._state.last_refreshed = datetime.now(timezone.utc)
            self._state.refreshing = False

        logger.info("Refresh complete: %d package versions discovered.", len(rows))

    def _collect_all_rows(self, errors: list[str]) -> list[PackageRow]:
        selected_orgs = set(self._organizations) if self._organizations else None
        selected_mappings = set(self._mapping_names) if self._mapping_names else None

        rows: list[PackageRow] = []
        gitea_clients: dict[str, GiteaClient] = {}
        devpi_sessions: dict[str, DevpiSession] = {}

        try:
            for mapping in self._config.mappings:
                if selected_orgs and mapping.organization not in selected_orgs:
                    continue
                if selected_mappings and mapping.name not in selected_mappings:
                    continue

                token = mapping.gitea_token
                if not token:
                    errors.append(f"{mapping.name}: no Gitea token configured")
                    continue

                try:
                    gitea = gitea_clients.get(token)
                    if gitea is None:
                        gitea = GiteaClient(self._config.gitea, token)
                        gitea_clients[token] = gitea

                    server_name = mapping.devpi_server
                    devpi = devpi_sessions.get(server_name)
                    if devpi is None:
                        devpi = DevpiSession(self._config.devpi_servers[server_name])
                        devpi_sessions[server_name] = devpi

                    rows.extend(self._collect_mapping_rows(mapping, gitea, devpi))
                except (GiteaError, DevpiError) as exc:
                    msg = f"{mapping.name}: {exc}"
                    errors.append(msg)
                    logger.error(
                        "State collection failed for mapping '%s': %s", mapping.name, exc
                    )
        finally:
            for client in gitea_clients.values():
                client.close()
            for session in devpi_sessions.values():
                session.close()

        return rows

    @staticmethod
    def _collect_mapping_rows(
        mapping: MappingConfig,
        gitea: GiteaClient,
        devpi: DevpiSession,
    ) -> list[PackageRow]:
        from packaging.utils import canonicalize_name

        allowed_repos: set[str] | None = None
        if mapping.repositories:
            allowed_repos = {r.lower() for r in (mapping.normalized_repositories() or [])}

        grouped: dict[tuple[str, str], list[PackageFile]] = {}
        for asset in gitea.iter_package_files(mapping.organization):
            if not asset.looks_like_distribution():
                continue
            if allowed_repos is not None and not _repository_allowed(
                asset.repository, allowed_repos
            ):
                continue
            try:
                name, version = _extract_distribution_metadata(asset.name)
            except ValueError:
                continue
            grouped.setdefault((name, version), []).append(asset)

        devpi_packages = {canonicalize_name(p) for p in devpi.list_packages(mapping.index)}
        devpi_versions: dict[str, set[str]] = {}

        rows: list[PackageRow] = []
        for (name, version), assets in sorted(grouped.items()):
            if name not in devpi_versions:
                devpi_versions[name] = (
                    devpi.list_versions(mapping.index, name)
                    if name in devpi_packages
                    else set()
                )
            rows.append(
                PackageRow(
                    mapping=mapping.name,
                    organization=mapping.organization,
                    index=mapping.index,
                    package=name,
                    version=version,
                    files=[a.name for a in assets],
                    in_devpi=version in devpi_versions[name],
                )
            )
        return rows

    # ------------------------------------------------------------------
    # Flask app
    # ------------------------------------------------------------------

    def _build_app(self) -> Flask:
        app = Flask(__name__)
        logging.getLogger("werkzeug").setLevel(logging.WARNING)

        state = self._state
        lock = self._lock
        dry_run = self._dry_run

        @app.route("/health")
        def health():
            with lock:
                last = state.last_refreshed
                errs = list(state.errors)
                total = len(state.packages)
                synced = sum(1 for p in state.packages if p.in_devpi)
                refreshing = state.refreshing

            status = "degraded" if errs else "ok"
            return (
                jsonify(
                    {
                        "status": status,
                        "last_refreshed": last.isoformat() if last else None,
                        "refreshing": refreshing,
                        "errors": errs,
                        "package_versions_total": total,
                        "synced": synced,
                        "missing": total - synced,
                    }
                ),
                200 if status == "ok" else 503,
            )

        @app.route("/")
        def dashboard():
            with lock:
                packages = list(state.packages)
                errors = list(state.errors)
                last_refreshed = state.last_refreshed
                refreshing = state.refreshing

            synced = sum(1 for p in packages if p.in_devpi)
            missing = len(packages) - synced
            return render_template_string(
                _DASHBOARD_HTML,
                packages=packages,
                errors=errors,
                last_refreshed=last_refreshed,
                refreshing=refreshing,
                synced=synced,
                missing=missing,
                dry_run=dry_run,
            )

        return app

run()

Start the background sync thread and the Flask web server.

Source code in devpi_gitea_sync/server.py
def run(self) -> None:
    """Start the background sync thread and the Flask web server."""
    thread = threading.Thread(target=self._sync_loop, daemon=True, name="sync-worker")
    thread.start()
    logger.info(
        "Web server listening on http://%s:%d — dashboard at / health at /health",
        self._host,
        self._port,
    )
    self._app.run(host=self._host, port=self._port, threaded=True, use_reloader=False)