448 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			448 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
#!/usr/bin/env python3
 | 
						|
# pylint: disable=line-too-long, too-many-branches, too-many-locals
 | 
						|
# pylint: disable=bare-except, pointless-string-statement, consider-using-dict-items
 | 
						|
# pylint: disable=too-many-arguments, too-many-statements, simplifiable-if-statement, too-many-positional-arguments
 | 
						|
# pylint: disable=possibly-used-before-assignment, used-before-assignment
 | 
						|
# pylint: disable=missing-class-docstring, missing-module-docstring, missing-function-docstring
 | 
						|
"""CheckMK special agent for Mailcow systems"""
 | 
						|
 | 
						|
import json
 | 
						|
import argparse
 | 
						|
import sys
 | 
						|
import requests
 | 
						|
import urllib3
 | 
						|
 | 
						|
from requests.structures import CaseInsensitiveDict
 | 
						|
 | 
						|
 | 
						|
def get_args() -> argparse.Namespace:
 | 
						|
    parser: argparse.ArgumentParser = argparse.ArgumentParser(
 | 
						|
        description="Mailcow server parameters"
 | 
						|
    )
 | 
						|
    parser.add_argument(
 | 
						|
        "--hostname", required=True, type=str, help="Hostname or IP of mailcow server"
 | 
						|
    )
 | 
						|
    parser.add_argument("--apikey", required=True, type=str, help="API key")
 | 
						|
    parser.add_argument(
 | 
						|
        "--port",
 | 
						|
        required=False,
 | 
						|
        type=int,
 | 
						|
        help="Port where the server is listening on",
 | 
						|
    )
 | 
						|
    parser.add_argument(
 | 
						|
        "--no-https",
 | 
						|
        required=False,
 | 
						|
        default=False,
 | 
						|
        help="Disable HTTPS",
 | 
						|
        action="store_true",
 | 
						|
    )
 | 
						|
    parser.add_argument(
 | 
						|
        "--no-cert-check",
 | 
						|
        required=False,
 | 
						|
        default=False,
 | 
						|
        help="Disable certificate checks",
 | 
						|
        action="store_true",
 | 
						|
    )
 | 
						|
    parser.add_argument(
 | 
						|
        "--check-version",
 | 
						|
        required=False,
 | 
						|
        default=False,
 | 
						|
        help="Enable version check",
 | 
						|
        action="store_true",
 | 
						|
    )
 | 
						|
    args = parser.parse_args()
 | 
						|
    return args
 | 
						|
 | 
						|
 | 
						|
MC_API_BASE: str = "api/v1/get"
 | 
						|
TIMEOUT: int = 30
 | 
						|
 | 
						|
domain_data: dict = {}
 | 
						|
mailbox_data: dict = {}
 | 
						|
 | 
						|
 | 
						|
def get_domain_info(headers: str, verify: bool, base_url: str) -> int:
 | 
						|
    """retrieves info about all configured domains"""
 | 
						|
    url: str = f"{base_url}/domain/all"
 | 
						|
    response = requests.get(url=url, headers=headers, verify=verify, timeout=TIMEOUT)
 | 
						|
    if response.status_code == 200:
 | 
						|
        jsdata = response.text
 | 
						|
        data = json.loads(jsdata)  # returns a list of dictionaries
 | 
						|
        i = 0
 | 
						|
        while i < len(data):
 | 
						|
            # pprint(data[i])
 | 
						|
            # get domain name and status (active (1) or not (0))
 | 
						|
            domain_name = data[i].get("domain_name")
 | 
						|
            active = data[i].get("active")
 | 
						|
            # get creation and last modification date
 | 
						|
            created = data[i].get("created")
 | 
						|
            modified = data[i].get("modified")  # returns "None" or date
 | 
						|
            # get maximum and current number of mailboxes in this domain
 | 
						|
            max_num_mboxes_for_domain = data[i].get("max_num_mboxes_for_domain")
 | 
						|
            mboxes_in_domain = data[i].get("mboxes_in_domain")
 | 
						|
            # get maximum and current number of aliases in this domain
 | 
						|
            max_num_aliases_for_domain = data[i].get("max_num_aliases_for_domain")
 | 
						|
            aliases_in_domain = data[i].get("aliases_in_domain")
 | 
						|
            # get total messages in this domain
 | 
						|
            msgs_total = data[i].get("msgs_total")
 | 
						|
            # get total bytes used in this domain
 | 
						|
            bytes_total = data[i].get("bytes_total")
 | 
						|
            # get maximum quota for this domain
 | 
						|
            max_quota_for_domain = data[i].get("max_quota_for_domain")
 | 
						|
            # store all domain data in global dictionary
 | 
						|
            dom = {
 | 
						|
                "active": active,
 | 
						|
                "created": created,
 | 
						|
                "modified": modified,
 | 
						|
                "max_num_mboxes_for_domain": max_num_mboxes_for_domain,
 | 
						|
                "mboxes_in_domain": mboxes_in_domain,
 | 
						|
                "max_num_aliases_for_domain": max_num_aliases_for_domain,
 | 
						|
                "aliases_in_domain": aliases_in_domain,
 | 
						|
                "msgs_total": msgs_total,
 | 
						|
                "bytes_total": bytes_total,
 | 
						|
                "max_quota_for_domain": max_quota_for_domain,
 | 
						|
            }
 | 
						|
            domain_data[domain_name] = {}
 | 
						|
            domain_data[domain_name] = dom
 | 
						|
            i += 1
 | 
						|
        # return number of email domains
 | 
						|
        return i
 | 
						|
    sys.stderr.write(
 | 
						|
        f"Request response code is {response.status_code} with URL {url}\n"
 | 
						|
    )
 | 
						|
    sys.exit(1)
 | 
						|
 | 
						|
 | 
						|
def get_mailbox_info(headers: str, verify: bool, base_url: str) -> tuple:
 | 
						|
    """retrieves info about all configured mailboxes"""
 | 
						|
    url: str = f"{base_url}/mailbox/all"
 | 
						|
    response = requests.get(url=url, headers=headers, verify=verify, timeout=TIMEOUT)
 | 
						|
    if response.status_code == 200:
 | 
						|
        jsdata = response.text
 | 
						|
        data = json.loads(jsdata)  # returns a list of dictionaries
 | 
						|
        i: int = 0
 | 
						|
        global_num_messages: int = 0
 | 
						|
        while i < len(data):
 | 
						|
            # get status of mailbox (0-->inactive or 1-->active)
 | 
						|
            active = data[i].get("active")
 | 
						|
            # get username of mailbox
 | 
						|
            username = data[i].get("username")
 | 
						|
            # get friendly name of user
 | 
						|
            name = data[i].get("name")
 | 
						|
            # get number of messages in mailbox
 | 
						|
            num_messages = data[i].get("messages")
 | 
						|
            # get quota used in percent (rounded to full percent)
 | 
						|
            percent_in_use = data[i].get("percent_in_use")
 | 
						|
            # get quota and quota used in bytes
 | 
						|
            quota = data[i].get("quota")
 | 
						|
            quota_used = data[i].get("quota_used")
 | 
						|
            # get creation and last modification date
 | 
						|
            created = data[i].get("created")
 | 
						|
            modified = data[i].get("modified")
 | 
						|
            # get number of messages
 | 
						|
            messages = data[i].get("messages")
 | 
						|
            # get last login dates
 | 
						|
            last_imap_login = data[i].get("last_imap_login")
 | 
						|
            last_pop3_login = data[i].get("last_pop3_login")
 | 
						|
            last_smtp_login = data[i].get("last_smtp_login")
 | 
						|
            mailbox = {
 | 
						|
                "active": active,
 | 
						|
                "created": created,
 | 
						|
                "modified": modified,
 | 
						|
                "name": name,
 | 
						|
                "num_messages": num_messages,
 | 
						|
                "percent_in_use": percent_in_use,
 | 
						|
                "quota": quota,
 | 
						|
                "quota_used": quota_used,
 | 
						|
                "messages": messages,
 | 
						|
                "last_imap_login": last_imap_login,
 | 
						|
                "last_pop3_login": last_pop3_login,
 | 
						|
                "last_smtp_login": last_smtp_login,
 | 
						|
            }
 | 
						|
            mailbox_data[username] = {}
 | 
						|
            mailbox_data[username] = mailbox
 | 
						|
            i += 1
 | 
						|
            global_num_messages += num_messages
 | 
						|
        # return number of mailboxes and global number of messages
 | 
						|
        return i, global_num_messages
 | 
						|
    sys.stderr.write(
 | 
						|
        f"Request response code is {response.status_code} with URL {url}\n"
 | 
						|
    )
 | 
						|
    sys.exit(1)
 | 
						|
 | 
						|
 | 
						|
def get_git_version() -> str:
 | 
						|
    """gets the actual version of Mailcow from Github"""
 | 
						|
    url: str = "https://api.github.com/repos/mailcow/mailcow-dockerized/releases/latest"
 | 
						|
    git_version: str = ""
 | 
						|
    response = requests.get(url=url, timeout=TIMEOUT)
 | 
						|
    if response.status_code == 200:
 | 
						|
        jsdata = response.text
 | 
						|
        data = json.loads(jsdata)  # returns a dictionary
 | 
						|
        git_version = data["tag_name"]
 | 
						|
    else:
 | 
						|
        sys.stderr.write(
 | 
						|
            f"Request response code is {response.status_code} with URL {url}\n"
 | 
						|
        )
 | 
						|
        sys.exit(1)
 | 
						|
    return git_version
 | 
						|
 | 
						|
 | 
						|
def compare_versions(mc_ver: str, git_ver: str) -> bool:
 | 
						|
    """compares the actual Github version with the version of the running Mailcow system"""
 | 
						|
    update_available: bool = False
 | 
						|
    try:
 | 
						|
        mc_year, mc_month = mc_ver.split("-")
 | 
						|
        git_year, git_month = git_ver.split("-")
 | 
						|
        # print(mc_year, mc_month, git_year, git_month)
 | 
						|
        mc_year_int = int(mc_year)
 | 
						|
        git_year_int = int(git_year)
 | 
						|
        if git_year_int > mc_year_int:
 | 
						|
            update_available = True
 | 
						|
        else:
 | 
						|
            len_mc_month = len(mc_month)
 | 
						|
            len_git_month = len(git_month)
 | 
						|
            if len_mc_month == 2:
 | 
						|
                mc_month_int = int(mc_month)
 | 
						|
            elif len_mc_month == 3:
 | 
						|
                mc_month_int = int(mc_month[0:2])
 | 
						|
                mc_month_ver = mc_month[-1]
 | 
						|
            else:
 | 
						|
                pass
 | 
						|
            if len_git_month == 2:
 | 
						|
                git_month_int = int(git_month)
 | 
						|
            elif len_git_month == 3:
 | 
						|
                git_month_int = int(git_month[0:2])
 | 
						|
                git_month_ver = git_month[-1]
 | 
						|
            else:
 | 
						|
                pass
 | 
						|
            if git_month_int > mc_month_int:
 | 
						|
                update_available = True
 | 
						|
            elif len_mc_month == 2 and len_git_month == 3:
 | 
						|
                update_available = True
 | 
						|
            elif git_month_ver > mc_month_ver:
 | 
						|
                update_available = True
 | 
						|
    except:
 | 
						|
        # TBD
 | 
						|
        pass
 | 
						|
    return update_available
 | 
						|
 | 
						|
 | 
						|
def get_mailcow_info(
 | 
						|
    headers: str, verify: bool, base_url: str, check_version: bool
 | 
						|
) -> dict:
 | 
						|
    """retrieves several global information about the mailcow system"""
 | 
						|
    info_data = {}
 | 
						|
    url = f"{base_url}/status/version"
 | 
						|
    response = requests.get(url=url, headers=headers, verify=verify, timeout=TIMEOUT)
 | 
						|
    if response.status_code == 200:
 | 
						|
        jsdata = response.text
 | 
						|
        data = json.loads(jsdata)  # returns a dictionary
 | 
						|
        # pprint(data)
 | 
						|
        # get Mailcow version
 | 
						|
        mc_version = data["version"]
 | 
						|
        # if enabled, check this version against the official Mailcow repo on Github
 | 
						|
        if check_version:
 | 
						|
            git_version = get_git_version()
 | 
						|
            update_available = compare_versions(mc_version, git_version)
 | 
						|
            info_data["git_version"] = git_version
 | 
						|
            info_data["update_available"] = update_available
 | 
						|
        else:
 | 
						|
            info_data["git_version"] = "Version check disabled"
 | 
						|
            info_data["update_available"] = False
 | 
						|
        info_data["mc_version"] = mc_version
 | 
						|
        info_data["check_version_enabled"] = check_version
 | 
						|
        return info_data
 | 
						|
    sys.stderr.write(
 | 
						|
        f"Request response code is {response.status_code} with URL {url}\n"
 | 
						|
    )
 | 
						|
    sys.exit(1)
 | 
						|
 | 
						|
 | 
						|
"""
 | 
						|
Output is as follows:
 | 
						|
0   mailbox name                email address used for login
 | 
						|
1   active                      1 --> active, 0 --> not active
 | 
						|
2   creation date               "None" if ???
 | 
						|
3   last modified date          "None" if never modified
 | 
						|
4   name                        display name
 | 
						|
5   number of messages
 | 
						|
6   percent in use              quota used, rounded to full percents
 | 
						|
7   quota                       max quota in bytes
 | 
						|
8   quota used                  quota used in bytes
 | 
						|
9   last imap login             seconds since epoch, 0 if never logged in
 | 
						|
10  last pop3 login             seconds since epoch, 0 if never logged in
 | 
						|
11  last smtp login             seconds since epoch, 0 if never logged in
 | 
						|
 | 
						|
Example:
 | 
						|
user1@dom1.de;1;2022-04-29 14:29:34;2022-04-29 14:29:34;Sarah;2433;2;21474836480;495481374;1692520168;0;1692281537
 | 
						|
user2@dom1.de;1;2022-04-29 14:38:33;2022-04-29 14:38:33;Tom;271;0;21474836480;25895752;1657394782;1692519758;1681065713
 | 
						|
user1@dom2.de;1;2022-04-30 09:55:37;2022-04-30 09:55:37;Melissa;53460;33;19964887040;6677677548;1692520066;0;1692510822
 | 
						|
"""
 | 
						|
 | 
						|
 | 
						|
def do_cmk_output_mailboxes() -> None:
 | 
						|
    """prints out agent section for mailboxes"""
 | 
						|
    print("<<<mailcow_mailboxes:sep(59)>>>")
 | 
						|
    for mb in mailbox_data:
 | 
						|
        active = mailbox_data[mb]["active"]
 | 
						|
        created = mailbox_data[mb]["created"]
 | 
						|
        modified = mailbox_data[mb]["modified"]
 | 
						|
        name = mailbox_data[mb]["name"]
 | 
						|
        # strip semicolons, if present, since we use it as delimiter
 | 
						|
        name = name.replace(";", "")
 | 
						|
        num_messages = mailbox_data[mb]["num_messages"]
 | 
						|
        percent_in_use = mailbox_data[mb]["percent_in_use"]
 | 
						|
        quota = mailbox_data[mb]["quota"]
 | 
						|
        quota_used = mailbox_data[mb]["quota_used"]
 | 
						|
        last_imap_login = mailbox_data[mb]["last_imap_login"]
 | 
						|
        last_pop3_login = mailbox_data[mb]["last_pop3_login"]
 | 
						|
        last_smtp_login = mailbox_data[mb]["last_smtp_login"]
 | 
						|
        print(
 | 
						|
            f"{mb};{active};{created};{modified};{name};{num_messages};{percent_in_use};{quota};{quota_used};{last_imap_login};{last_pop3_login};{last_smtp_login}"
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
def do_cmk_output_mailcow(
 | 
						|
    version: str,
 | 
						|
    num_domains: int,
 | 
						|
    num_mailboxes: int,
 | 
						|
    num_global_messages: int,
 | 
						|
    git_version: str,
 | 
						|
    update_available: bool,
 | 
						|
    check_version_enabled: bool,
 | 
						|
) -> None:
 | 
						|
    """prints out agent section for global mailcow info"""
 | 
						|
    print("<<<mailcow_info:sep(59)>>>")
 | 
						|
    # strip semicolons, if present, since we use it as delimiter
 | 
						|
    version = version.replace(";", "")
 | 
						|
    print(
 | 
						|
        f"{version};{num_domains};{num_mailboxes};{num_global_messages};{git_version};{update_available};{check_version_enabled}"
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
"""
 | 
						|
Output is as follows:
 | 
						|
0   domain name
 | 
						|
1   active                      1 --> active, 0 --> not active
 | 
						|
2   creation date               "None" if ???
 | 
						|
3   last modified date          "None" if never modified
 | 
						|
4   max number of mailboxes
 | 
						|
5   number of mailboxes
 | 
						|
6   max number of aliases
 | 
						|
7   number of aliases
 | 
						|
8   total number of messages
 | 
						|
9   total number of bytes used  in bytes
 | 
						|
10  max quota                   in bytes
 | 
						|
 | 
						|
Example:
 | 
						|
dom1.de;1;2022-04-23 22:54:57;None;10;0;400;6;0;0;10737418240
 | 
						|
dom2.de;1;2022-04-29 13:38:42;2023-08-19 17:21:04;10;0;400;2;0;0;10737418240
 | 
						|
dom3.de;1;2022-04-29 13:36:08;2022-04-29 21:26:19;10;1;100;3;28132;12852485367;21474836480
 | 
						|
"""
 | 
						|
 | 
						|
 | 
						|
def do_cmk_output_domains() -> None:
 | 
						|
    """print out agent section for domains"""
 | 
						|
    print("<<<mailcow_domains:sep(59)>>>")
 | 
						|
    for dom in domain_data:
 | 
						|
        active = domain_data[dom]["active"]
 | 
						|
        created = domain_data[dom]["created"]
 | 
						|
        modified = domain_data[dom]["modified"]
 | 
						|
        max_num_mboxes_for_domain = domain_data[dom]["max_num_mboxes_for_domain"]
 | 
						|
        mboxes_in_domain = domain_data[dom]["mboxes_in_domain"]
 | 
						|
        max_num_aliases_for_domain = domain_data[dom]["max_num_aliases_for_domain"]
 | 
						|
        aliases_in_domain = domain_data[dom]["aliases_in_domain"]
 | 
						|
        msgs_total = domain_data[dom]["msgs_total"]
 | 
						|
        bytes_total = domain_data[dom]["bytes_total"]
 | 
						|
        max_quota_for_domain = domain_data[dom]["max_quota_for_domain"]
 | 
						|
        print(
 | 
						|
            f"{dom};{active};{created};{modified};{max_num_mboxes_for_domain};{mboxes_in_domain};{max_num_aliases_for_domain};{aliases_in_domain};{msgs_total};{bytes_total};{max_quota_for_domain}"
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
def main():
 | 
						|
    """main function"""
 | 
						|
    # get and check parameters
 | 
						|
    params: argparse.Namespace = get_args()
 | 
						|
    # do some parameter checks
 | 
						|
    if params.hostname is None:
 | 
						|
        sys.stderr.write("No hostname given.\n")
 | 
						|
        sys.exit(1)
 | 
						|
    else:
 | 
						|
        hostname = params.hostname
 | 
						|
    if params.apikey is None:
 | 
						|
        sys.stderr.write("No API key given.\n")
 | 
						|
        sys.exit(1)
 | 
						|
    else:
 | 
						|
        apikey = params.apikey
 | 
						|
    if params.no_cert_check:
 | 
						|
        # disable certificate warnings
 | 
						|
        urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
 | 
						|
        verify = False
 | 
						|
    else:
 | 
						|
        verify = True
 | 
						|
    if params.check_version:
 | 
						|
        check_version = True
 | 
						|
    else:
 | 
						|
        check_version = False
 | 
						|
    if params.port is None:
 | 
						|
        if params.no_https:
 | 
						|
            protocol = "http"
 | 
						|
            port = "80"
 | 
						|
        else:
 | 
						|
            protocol = "https"
 | 
						|
            port = "443"
 | 
						|
    else:
 | 
						|
        if params.no_https:
 | 
						|
            protocol = "http"
 | 
						|
        else:
 | 
						|
            protocol = "https"
 | 
						|
        port = params.port
 | 
						|
    if protocol == "http" and port == "443":
 | 
						|
        sys.stderr.write("Combining HTTP with port 443 is not supported.\n")
 | 
						|
        sys.exit(1)
 | 
						|
    if protocol == "https" and port == "80":
 | 
						|
        sys.stderr.write("Combining HTTPS with port 80 is not supported.\n")
 | 
						|
        sys.exit(1)
 | 
						|
    headers = CaseInsensitiveDict()
 | 
						|
    headers["Accept"] = "application/json"
 | 
						|
    headers["X-API-Key"] = apikey
 | 
						|
    # now "hostname" contains the FQDN of the host running Mailcow
 | 
						|
    # now "protocol" is http or https
 | 
						|
    # now "port" contains the port number to use
 | 
						|
    # now "verify" signals whether certificate checking will be done (True) or not (False)
 | 
						|
    # now "check_version" signals whether the Mailcow version will be checked against the Github version
 | 
						|
    # now "headers" contains the accepted format (JSON) and the API key to use
 | 
						|
    base_url = f"{protocol}://{hostname}:{port}/{MC_API_BASE}"
 | 
						|
    # get domain data
 | 
						|
    num_domains = get_domain_info(headers=headers, verify=verify, base_url=base_url)
 | 
						|
    # get mailbox data
 | 
						|
    num_mailboxes, num_global_messages = get_mailbox_info(
 | 
						|
        headers=headers, verify=verify, base_url=base_url
 | 
						|
    )
 | 
						|
    # get global Mailcow info
 | 
						|
    mailcow_info = get_mailcow_info(
 | 
						|
        headers=headers, verify=verify, base_url=base_url, check_version=check_version
 | 
						|
    )
 | 
						|
    mailcow_version = mailcow_info["mc_version"]
 | 
						|
    github_version = mailcow_info["git_version"]
 | 
						|
    update_available = mailcow_info["update_available"]
 | 
						|
    check_version_enabled = mailcow_info["check_version_enabled"]
 | 
						|
    # create agent output
 | 
						|
    do_cmk_output_domains()
 | 
						|
    do_cmk_output_mailboxes()
 | 
						|
    do_cmk_output_mailcow(
 | 
						|
        mailcow_version,
 | 
						|
        num_domains,
 | 
						|
        num_mailboxes,
 | 
						|
        num_global_messages,
 | 
						|
        github_version,
 | 
						|
        update_available,
 | 
						|
        check_version_enabled,
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    main()
 |