#!/usr/bin/env python3 # pylint: disable=too-many-instance-attributes, pointless-string-statement """CheckMK Special Agent for Traefik""" import time import argparse from typing import Dict, Union, Optional, Final import requests import urllib3 from requests.auth import HTTPDigestAuth, HTTPBasicAuth # dictionary with all parameters ArgumentsMap = Dict[str, Union[str, bool, None]] TIMEOUT: Final[int] = 30 NOT_AVAIL: Final[str] = "N/A" class TraefikAPI: """Traefik API Class""" def __init__(self, args: ArgumentsMap): """Initialize TraefikAPI""" self.start_time: float = time.perf_counter() # set arguments self.hostname: str = args.get("hostname") self.username: str = args.get("username") self.password: str = args.get("password") self.auth_type: str = args.get("auth_type") self.no_http_check: bool = args.get("no_http_check", False) self.no_tcp_check: bool = args.get("no_tcp_check", False) self.no_udp_check: bool = args.get("no_udp_check", False) self.no_cert_check: bool = args.get("no_cert_check", False) self.no_https: bool = args.get("no_https", False) # define other attributes if self.no_cert_check: # disable certificate warnings urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) self.verify = False else: self.verify = True if self.no_https: self.protocol = "http" self.port = "80" else: self.protocol = "https" self.port = "443" if args.get("port"): self.port = args.get("port") self.base_url: str = f"{self.protocol}://{self.hostname}:{self.port}/api" # print(f"Base URL: {self.base_url}") self.session: Optional[requests.Session] = None self.overview: Optional[dict] = None def __enter__(self): self.session: requests.Session = requests.Session() if self.auth_type == "basic": self.session.auth = HTTPBasicAuth(self.username, self.password) elif self.auth_type == "digest": self.session.auth = HTTPDigestAuth(self.username, self.password) else: print( f"Error: Invalid authentication type '{self.auth_type}'. Use 'basic' or 'digest'." ) return None return self def __exit__(self, _exc_type, _exc_value, _traceback) -> None: if self.session: self.session.close() end_time: float = time.perf_counter() duration: float = end_time - self.start_time traefik_info: Optional[dict] = self._get_version() if traefik_info: print("<<>>") print(traefik_info.get("Version", NOT_AVAIL)) print(traefik_info.get("Codename", NOT_AVAIL)) print(traefik_info.get("startDate", NOT_AVAIL)) print(duration) def _get_version(self) -> Optional[dict]: """Get data from API endpoint version""" try: response: requests.Response = self.session.get( f"{self.base_url}/version", timeout=TIMEOUT, verify=self.verify ) response.raise_for_status() return response.json() except requests.RequestException as e: print(f"Error fetching version data: {e}") return None def get_overview(self) -> Optional[dict]: """Get data from API endpoint overview""" try: response: requests.Response = self.session.get( f"{self.base_url}/overview", timeout=TIMEOUT, verify=self.verify ) response.raise_for_status() self.overview = response.json() return self.overview except requests.RequestException as e: print(f"Error fetching overview data: {e}") return None def create_section_output(self) -> None: """Create section output""" total: int = 0 warnings: int = 0 errors: int = 0 data: dict[str, dict[str, int]] = {} if not self.no_http_check: print("<<>>") # Process HTTP components data = self.overview.get("http", {}) for component in ["routers", "services", "middlewares"]: total = data.get(component, {}).get("total", -1) warnings = data.get(component, {}).get("warnings", -1) errors = data.get(component, {}).get("errors", -1) print(f"{component};{total};{warnings};{errors}") if not self.no_tcp_check: print("<<>>") # Process TCP components data = self.overview.get("tcp", {}) for component in ["routers", "services", "middlewares"]: total = data.get(component, {}).get("total", -1) warnings = data.get(component, {}).get("warnings", -1) errors = data.get(component, {}).get("errors", -1) print(f"{component};{total};{warnings};{errors}") if not self.no_udp_check: print("<<>>") # Process UDP components data = self.overview.get("udp", {}) for component in ["routers", "services"]: total = data.get(component, {}).get("total", -1) warnings = data.get(component, {}).get("warnings", -1) errors = data.get(component, {}).get("errors", -1) print(f"{component};{total};{warnings};{errors}") def parse_arguments() -> ArgumentsMap: """Parse command-line arguments""" parser = argparse.ArgumentParser( description="Parameters to connect to the API of a Traefik instance", ) # Required arguments parser.add_argument( "--hostname", type=str, required=True, help="The hostname (FQDN) or IP address of the Traefik instance.", ) parser.add_argument( "--username", type=str, required=True, help="The username for authentication.", ) parser.add_argument( "--password", type=str, required=True, help="The password for authentication.", ) parser.add_argument( "--auth-type", type=str, required=True, choices=["basic", "digest"], # Restrict allowed values help="The authentication type to use ('basic' or 'digest').", ) parser.add_argument( "--no-http-check", action="store_true", default=False, help="Disable HTTP components check (optional, default=no).", ) parser.add_argument( "--no-tcp-check", action="store_true", default=False, help="Disable TCP components check (optional, default=no).", ) parser.add_argument( "--no-udp-check", action="store_true", default=False, help="Disable UDP components check (optional, default=no).", ) parser.add_argument( "--no-cert-check", action="store_true", default=False, help="Disable certificate check (optional, default=no).", ) parser.add_argument( "--no-https", action="store_true", default=False, help="Disable HTTPS (optional, default=no).", ) parser.add_argument( "--port", type=int, required=False, help="Port if not listening to HTTP(S) on default ports 80/443 (optional).", ) args: argparse.Namespace = parser.parse_args() arg_map: ArgumentsMap = vars(args) return arg_map def main() -> None: """Main function""" args: ArgumentsMap = parse_arguments() with TraefikAPI(args=args) as traefik: overview = traefik.get_overview() if overview: traefik.create_section_output() if __name__ == "__main__": main() """ Sample output component;total number of components;number of warnings;number of errors <<>> routers;50;0;0 services;52;0;0 middlewares;5;0;0 <<>> routers;1;0;0 services;1;0;0 middlewares;0;0;0 <<>> routers;0;0;0 services;0;0;0 ## General Traefik info Lines contain Version Codename Start Date Runtime of script (in seconds) <<>> 3.3.5 saintnectaire 2025-04-18T19:01:01.706796728+02:00 0.19748995700501837 """