Traefik-CheckMK/libexec/agent_traefik

263 lines
8.7 KiB
Python
Executable File

#!/usr/bin/env python3
# pylint: disable=too-many-instance-attributes, pointless-string-statement
"""CheckMK Special Agent for Traefik"""
import time
import argparse
from typing import Dict, Union, Optional, Final
import requests
import urllib3
from requests.auth import HTTPDigestAuth, HTTPBasicAuth
# dictionary with all parameters
ArgumentsMap = Dict[str, Union[str, bool, None]]
TIMEOUT: Final[int] = 30
NOT_AVAIL: Final[str] = "N/A"
AGENT_VERSION: Final[str] = "0.1.0"
class TraefikAPI:
"""Traefik API Class"""
def __init__(self, args: ArgumentsMap):
"""Initialize TraefikAPI"""
self.start_time: float = time.perf_counter()
# set arguments
self.hostname: str = args.get("hostname")
self.username: str = args.get("username")
self.password: str = args.get("password")
self.auth_type: str = args.get("auth_type")
self.no_http_check: bool = args.get("no_http_check", False)
self.no_tcp_check: bool = args.get("no_tcp_check", False)
self.no_udp_check: bool = args.get("no_udp_check", False)
self.no_cert_check: bool = args.get("no_cert_check", False)
self.no_https: bool = args.get("no_https", False)
# define other attributes
if self.no_cert_check:
# disable certificate warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
self.verify = False
else:
self.verify = True
if self.no_https:
self.protocol = "http"
self.port = "80"
else:
self.protocol = "https"
self.port = "443"
if args.get("port"):
self.port = args.get("port")
self.base_url: str = f"{self.protocol}://{self.hostname}:{self.port}/api"
# print(f"Base URL: {self.base_url}")
self.session: Optional[requests.Session] = None
self.overview: Optional[dict] = None
def __enter__(self):
self.session: requests.Session = requests.Session()
if self.auth_type == "basic":
self.session.auth = HTTPBasicAuth(self.username, self.password)
elif self.auth_type == "digest":
self.session.auth = HTTPDigestAuth(self.username, self.password)
else:
print(
f"Error: Invalid authentication type '{self.auth_type}'. Use 'basic' or 'digest'."
)
return None
return self
def __exit__(self, _exc_type, _exc_value, _traceback) -> None:
if self.session:
self.session.close()
end_time: float = time.perf_counter()
duration: float = end_time - self.start_time
traefik_info: Optional[dict] = self._get_version()
if traefik_info:
print("<<<traefik_info:sep(59)>>>")
print(f"Traefik_Version;{traefik_info.get('Version', NOT_AVAIL)}")
print(f"Traefik_CodeName;{traefik_info.get('Codename', NOT_AVAIL)}")
print(f"Traefik_StartDate;{traefik_info.get('startDate', NOT_AVAIL)}")
print(f"Agent_Runtime;{duration}")
print(f"Agent_Version;{AGENT_VERSION}")
def _get_version(self) -> Optional[dict]:
"""Get data from API endpoint version"""
try:
response: requests.Response = self.session.get(
f"{self.base_url}/version", timeout=TIMEOUT, verify=self.verify
)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
print(f"Error fetching version data: {e}")
return None
def get_overview(self) -> Optional[dict]:
"""Get data from API endpoint overview"""
try:
response: requests.Response = self.session.get(
f"{self.base_url}/overview", timeout=TIMEOUT, verify=self.verify
)
response.raise_for_status()
self.overview = response.json()
return self.overview
except requests.RequestException as e:
print(f"Error fetching overview data: {e}")
return None
def create_section_output(self) -> None:
"""Create section output"""
total: int = 0
warnings: int = 0
errors: int = 0
data: dict[str, dict[str, int]] = {}
if not self.no_http_check:
print("<<<traefik_http_components:sep(59)>>>")
# Process HTTP components
data = self.overview.get("http", {})
for component in ["routers", "services", "middlewares"]:
total = data.get(component, {}).get("total", -1)
warnings = data.get(component, {}).get("warnings", -1)
errors = data.get(component, {}).get("errors", -1)
print(f"{component};{total};{warnings};{errors}")
if not self.no_tcp_check:
print("<<<traefik_tcp_components:sep(59)>>>")
# Process TCP components
data = self.overview.get("tcp", {})
for component in ["routers", "services", "middlewares"]:
total = data.get(component, {}).get("total", -1)
warnings = data.get(component, {}).get("warnings", -1)
errors = data.get(component, {}).get("errors", -1)
print(f"{component};{total};{warnings};{errors}")
if not self.no_udp_check:
print("<<<traefik_udp_components:sep(59)>>>")
# Process UDP components
data = self.overview.get("udp", {})
for component in ["routers", "services"]:
total = data.get(component, {}).get("total", -1)
warnings = data.get(component, {}).get("warnings", -1)
errors = data.get(component, {}).get("errors", -1)
print(f"{component};{total};{warnings};{errors}")
def parse_arguments() -> ArgumentsMap:
"""Parse command-line arguments"""
parser = argparse.ArgumentParser(
description="Parameters to connect to the API of a Traefik instance",
)
# Required arguments
parser.add_argument(
"--hostname",
type=str,
required=True,
help="The hostname (FQDN) or IP address of the Traefik instance.",
)
parser.add_argument(
"--username",
type=str,
required=True,
help="The username for authentication.",
)
parser.add_argument(
"--password",
type=str,
required=True,
help="The password for authentication.",
)
parser.add_argument(
"--auth-type",
type=str,
required=True,
choices=["basic", "digest"], # Restrict allowed values
help="The authentication type to use ('basic' or 'digest').",
)
parser.add_argument(
"--no-http-check",
action="store_true",
default=False,
help="Disable HTTP components check (optional, default=no).",
)
parser.add_argument(
"--no-tcp-check",
action="store_true",
default=False,
help="Disable TCP components check (optional, default=no).",
)
parser.add_argument(
"--no-udp-check",
action="store_true",
default=False,
help="Disable UDP components check (optional, default=no).",
)
parser.add_argument(
"--no-cert-check",
action="store_true",
default=False,
help="Disable certificate check (optional, default=no).",
)
parser.add_argument(
"--no-https",
action="store_true",
default=False,
help="Disable HTTPS (optional, default=no).",
)
parser.add_argument(
"--port",
type=int,
required=False,
help="Port if not listening to HTTP(S) on default ports 80/443 (optional).",
)
args: argparse.Namespace = parser.parse_args()
arg_map: ArgumentsMap = vars(args)
return arg_map
def main() -> None:
"""Main function"""
args: ArgumentsMap = parse_arguments()
with TraefikAPI(args=args) as traefik:
overview = traefik.get_overview()
if overview:
traefik.create_section_output()
if __name__ == "__main__":
main()
"""
Sample output
component;total number of components;number of warnings;number of errors
<<<traefik_http_components:sep(59)>>>
routers;50;0;0
services;52;0;0
middlewares;5;0;0
<<<traefik_tcp_components:sep(59)>>>
routers;1;0;0
services;1;0;0
middlewares;0;0;0
<<<traefik_udp_components:sep(59)>>>
routers;0;0;0
services;0;0;0
## General Traefik info
Lines contain
Traefik Version
Traefik Codename
Traefik Start Date
Runtime of script (in seconds)
Agent Version
<<<traefik_info:sep(59)>>>
Traefik_Version;3.3.5
Traefik_CodeName;saintnectaire
Traefik_StartDate;2025-04-18T19:01:01.706796728+02:00
Agent_Runtime;0.3067862739553675
Agent_Version;0.1.0
"""