#!/usr/bin/env python3 import getopt import sys import requests import urllib3 import json import os import cmk.utils.password_store from pprint import pprint from requests.structures import CaseInsensitiveDict from requests.auth import HTTPBasicAuth from time import ( time, localtime, strftime, ) def showUsage() -> None: sys.stderr.write("""CheckMK Mailcow Special Agent USAGE: agent_nextcloud -H [hostname] -k [apikey] agent_nextcloud -h OPTIONS: -H, --hostname Hostname (FQDN or IP) of Nextcloud server -k, --apikey API Key -P, --port Port --check-version True|False If "True": Running version will be checked against official Github repo --no-https True|False If "True": Disable HTTPS, use HTTP (not recommended!) --no-cert-check True|False If "True": Disable TLS certificate check (not recommended!) -h, --help Show this help message and exit """) # set this to True to produce debug output (this clutters the agent output) # be aware: activating this logs very sensitive information to debug files in ~/tmp # !!DO NOT FORGET to delete these files after debugging is done!! DEBUG = False mc_api_base = "api/v1/get" opt_hostname = "" opt_apikey = "" opt_port = "" opt_check_version = False opt_no_https = False opt_no_cert_check = False short_options = 'hH:k:P:' long_options = [ 'hostname=', 'apikey=', 'port=', 'check-version=', 'no-https=', 'no-cert-check=', 'help' ] domain_data = {} mailbox_data = {} debugLogFilename = "" SEP = "|" #SEP = "\t" TIMEFMT = "%Y-%m-%d %H:%M:%S" FLOATFMT = "{:.4f}" ''' Decorator function for debugging purposes creates a file with many information regarding the function call, like: timestamp name of function runtime number of arguments number of keyword arguments return value of function call type of return value all parameters given to function ''' def debugLog(function): def wrapper(*args, **kwargs): # execute function and measure runtime start = time() value = function(*args, **kwargs) end = time() # get number of args and kwargs len_args = len(args) len_kwargs = len(kwargs) # format the output seconds = FLOATFMT.format(end - start) local_time = strftime(TIMEFMT,localtime(start)) # get function name fname = function.__name__ # create output # out1: Timestamp|Name of Function|Runtime|Num Args|Num Kwargs|Return Value|Return Value Type out1 = f"{local_time}{SEP}{fname}{SEP}{seconds}{SEP}{len_args}{SEP}{len_kwargs}{SEP}{value}{SEP}{type(value)}" # out2: all arguments out2 = "" for arg in args: out2 = out2 + SEP + str(arg) # out3: all keyword arguments out3 = "" if len_kwargs > 0: for key, val in kwargs.items(): out3 = out3 + SEP + key + ":" + str(val) # write output to file if debugLogFilename != "": try: with open(debugLogFilename, "a+") as f: f.write(f"{out1}{out2}{out3}\n") except: sys.stderr.write(f"Something went wrong when writing to file {debugLogFilename}\n") sys.exit(1) else: sys.stderr.write(f"Debug activated, but no debug filename given.\n") sys.exit(1) return value return wrapper def getDebugFilename(hostname: str) -> str: home_path = os.getenv("HOME") tmp_path = f"{home_path}/tmp" file_name = f"{tmp_path}/mailcow_{hostname}_debug.log" return file_name def getOptions() -> None: global opt_hostname global opt_apikey global opt_port global opt_check_version global opt_no_https global opt_no_cert_check opts, args = getopt.getopt(sys.argv[1:], short_options, long_options) for opt, arg in opts: if opt in ['-H', '--hostname']: opt_hostname = arg elif opt in ['-k', '--apikey']: opt_apikey = arg elif opt in ['-P', '--port']: opt_port = arg elif opt in ['--check-version']: if arg == 'True': opt_check_version = True else: opt_check_version = False elif opt in ['--no-https']: if arg == 'True': opt_no_https = True else: opt_no_https = False elif opt in ['--no-cert-check']: if arg == 'True': opt_no_cert_check = True else: opt_no_cert_check = False elif opt in ['-h', '--help']: showUsage() sys.exit(0) if DEBUG: home_path = os.getenv("HOME") tmp_path = f"{home_path}/tmp" help_file = f"{tmp_path}/mailcow_{opt_hostname}_debug.txt" with open(help_file, "a") as file: file.write(f"Number of Arguments: {len(sys.argv)}, Argument List: {str(sys.argv)}\n") #@debugLog def showOptions() -> None: print(f"Hostname: {opt_hostname}") print(f"Username: {opt_apikey}") print(f"Port: {opt_port}") print(f"Check Version: {opt_check_version}") print(f"No HTTPS: {opt_no_https}") print(f"No TLS Check: {opt_no_cert_check}") home_path = os.getenv("HOME") tmp_path = f"{home_path}/tmp" help_file = f"{tmp_path}/mailcow_{opt_hostname}_debug.txt" with open(help_file, "a") as file: file.write(f"Hostname: {opt_hostname}, Port: {opt_port}, No HTTPS: {opt_no_https}, No Cert Check: {opt_no_cert_check}\n") #@debugLog def getDomainInfo(headers: str, verify: bool, base_url: str) -> int: url = f"{base_url}/domain/all" response = requests.get(url, headers=headers, verify=verify) if (response.status_code == 200): jsdata = response.text data = json.loads(jsdata) # returns a list of dictionaries i = 0 while i < len(data): #pprint(data[i]) # get domain name and status (active (1) or not (0)) domain_name = data[i].get("domain_name") active = data[i].get("active") # get creation and last modification date created = data[i].get("created") modified = data[i].get("modified") # returns "None" or date # get maximum and current number of mailboxes in this domain max_num_mboxes_for_domain = data[i].get("max_num_mboxes_for_domain") mboxes_in_domain = data[i].get("mboxes_in_domain") # get maximum and current number of aliases in this domain max_num_aliases_for_domain = data[i].get("max_num_aliases_for_domain") aliases_in_domain = data[i].get("aliases_in_domain") # get total messages in this domain msgs_total = data[i].get("msgs_total") # get total bytes used in this domain bytes_total = data[i].get("bytes_total") # get maximum quota for this domain max_quota_for_domain = data[i].get("max_quota_for_domain") # store all domain data in global dictionary dom = { "active": active, "created": created, "modified": modified, "max_num_mboxes_for_domain": max_num_mboxes_for_domain, "mboxes_in_domain": mboxes_in_domain, "max_num_aliases_for_domain": max_num_aliases_for_domain, "aliases_in_domain": aliases_in_domain, "msgs_total": msgs_total, "bytes_total": bytes_total, "max_quota_for_domain": max_quota_for_domain } domain_data[domain_name] = {} domain_data[domain_name] = dom i += 1 # return number of email domains return i else: sys.stderr.write(f"Request response code is {response.status_code} with URL {url}\n") sys.exit(1) #@debugLog def getMailboxInfo(headers: str, verify: bool, base_url: str) -> tuple: url = f"{base_url}/mailbox/all" response = requests.get(url, headers=headers, verify=verify) if (response.status_code == 200): jsdata = response.text data = json.loads(jsdata) # returns a list of dictionaries i = 0 global_num_messages = 0 while i < len(data): # get status of mailbox (0-->inactive or 1-->active) active = data[i].get("active") # get username of mailbox username = data[i].get("username") # get friendly name of user name = data[i].get("name") # get number of messages in mailbox num_messages = data[i].get("messages") # get quota used in percent (rounded to full percent) percent_in_use = data[i].get("percent_in_use") # get quota and quota used in bytes quota = data[i].get("quota") quota_used = data[i].get("quota_used") # get creation and last modification date created = data[i].get("created") modified = data[i].get("modified") # get number of messages messages = data[i].get("messages") # get last login dates last_imap_login = data[i].get("last_imap_login") last_pop3_login = data[i].get("last_pop3_login") last_smtp_login = data[i].get("last_smtp_login") mb = { "active": active, "created": created, "modified": modified, "name": name, "num_messages": num_messages, "percent_in_use": percent_in_use, "quota": quota, "quota_used": quota_used, "messages": messages, "last_imap_login": last_imap_login, "last_pop3_login": last_pop3_login, "last_smtp_login": last_smtp_login } mailbox_data[username] = {} mailbox_data[username] = mb i += 1 global_num_messages += num_messages # return number of mailboxes and global number of messages return i, global_num_messages else: sys.stderr.write(f"Request response code is {response.status_code} with URL {url}\n") sys.exit(1) def getGitVersion() -> str: url = "https://api.github.com/repos/mailcow/mailcow-dockerized/releases/latest" git_version = "" response = requests.get(url) if (response.status_code == 200): jsdata = response.text data = json.loads(jsdata) # returns a dictionary git_version = data["tag_name"] #print(git_version) else: sys.stderr.write(f"Request response code is {response.status_code} with URL {url}\n") sys.exit(1) return git_version def compareVersions(mc_ver: str, git_ver: str) -> bool: update_available = False try: mc_year, mc_month = mc_ver.split("-") git_year, git_month = git_ver.split("-") #print(mc_year, mc_month, git_year, git_month) mc_year_int = int(mc_year) git_year_int = int(git_year) if git_year_int > mc_year_int: update_available = True else: len_mc_month = len(mc_month) len_git_month = len(git_month) if len_mc_month == 2: mc_month_int = int(mc_month) elif len_mc_month == 3: mc_month_int = int(mc_month[0:2]) mc_month_ver = mc_month[-1] else: pass if len_git_month == 2: git_month_int = int(git_month) elif len_git_month == 3: git_month_int = int(git_month[0:2]) git_month_ver = git_month[-1] else: pass if git_month_int > mc_month_int: update_available = True elif len_mc_month == 2 and len_git_month == 3: update_available = True elif git_month_ver > mc_month_ver: update_available = True except: pass return update_available #@debugLog def getMailcowInfo(headers: str, verify: bool, base_url: str) -> dict: info_data = {} url = f"{base_url}/status/version" response = requests.get(url, headers=headers, verify=verify) if (response.status_code == 200): jsdata = response.text data = json.loads(jsdata) # returns a dictionary #pprint(data) # get Mailcow version mc_version = data["version"] # if enabled, check this version against the official Mailcow repo on Github if opt_check_version: git_version = getGitVersion() update_available = compareVersions(mc_version, git_version) #update_available = compareVersions("2023-01a", "2023-02") #print(update_available) info_data["git_version"] = git_version info_data["update_available"] = update_available else: info_data["git_version"] = "Version check disabled" info_data["update_available"] = False info_data["mc_version"] = mc_version info_data["check_version_enabled"] = opt_check_version return info_data else: sys.stderr.write(f"Request response code is {response.status_code} with URL {url}\n") sys.exit(1) #@debugLog def getSolrInfo(headers: str, verify: bool, base_url: str) -> tuple: url = f"{base_url}/status/solr" response = requests.get(url, headers=headers, verify=verify) if (response.status_code == 200): jsdata = response.text data = json.loads(jsdata) # returns a dictionary #pprint(data) # get Solr infos solr_enabled = data["solr_enabled"] solr_size = data["solr_size"] solr_documents = data["solr_documents"] return solr_enabled, solr_size, solr_documents else: sys.stderr.write(f"Request response code is {response.status_code} with URL {url}\n") sys.exit(1) ''' Output is as follows: 0 mailbox name email address used for login 1 active 1 --> active, 0 --> not active 2 creation date "None" if ??? 3 last modified date "None" if never modified 4 name display name 5 number of messages 6 percent in use quota used, rounded to full percents 7 quota max quota in bytes 8 quota used quota used in bytes 9 last imap login seconds since epoch, 0 if never logged in 10 last pop3 login seconds since epoch, 0 if never logged in 11 last smtp login seconds since epoch, 0 if never logged in Example: user1@dom1.de;1;2022-04-29 14:29:34;2022-04-29 14:29:34;Sarah;2433;2;21474836480;495481374;1692520168;0;1692281537 user2@dom1.de;1;2022-04-29 14:38:33;2022-04-29 14:38:33;Tom;271;0;21474836480;25895752;1657394782;1692519758;1681065713 user1@dom2.de;1;2022-04-30 09:55:37;2022-04-30 09:55:37;Melissa;53460;33;19964887040;6677677548;1692520066;0;1692510822 ''' #@debugLog def doCmkOutputMailboxes() -> None: print("<<>>") for mb in mailbox_data: active = mailbox_data[mb]["active"] created = mailbox_data[mb]["created"] modified = mailbox_data[mb]["modified"] name = mailbox_data[mb]["name"] # strip semicolons, if present, since we use it as delimiter name = name.replace(";", "") num_messages = mailbox_data[mb]["num_messages"] percent_in_use = mailbox_data[mb]["percent_in_use"] quota = mailbox_data[mb]["quota"] quota_used = mailbox_data[mb]["quota_used"] last_imap_login = mailbox_data[mb]["last_imap_login"] last_pop3_login = mailbox_data[mb]["last_pop3_login"] last_smtp_login = mailbox_data[mb]["last_smtp_login"] print(f"{mb};{active};{created};{modified};{name};{num_messages};{percent_in_use};{quota};{quota_used};{last_imap_login};{last_pop3_login};{last_smtp_login}") #@debugLog def doCmkOutputMailcow(version: str, num_domains: int, num_mailboxes: int, num_global_messages: int, solr_enabled: bool, solr_size: float, solr_documents: int, git_version: str, update_available: bool, check_version_enabled: bool ) -> None: print("<<>>") # strip semicolons, if present, since we use it as delimiter version = version.replace(";", "") print(f"{version};{num_domains};{num_mailboxes};{num_global_messages};{solr_enabled};{solr_size};{solr_documents};{git_version};{update_available};{check_version_enabled}") ''' Output is as follows: 0 domain_name 1 active 1 --> active, 0 --> not active 2 creation date "None" if ??? 3 last modified date "None" if never modified 4 max number mailboxes 5 number of mailboxes 6 max number of aliases 7 number of aliases 8 total number of messages 9 total number of bytes used in bytes 10 max quota in bytes Example: dom1.de;1;2022-04-23 22:54:57;None;10;0;400;6;0;0;10737418240 dom2.de;1;2022-04-29 13:38:42;2023-08-19 17:21:04;10;0;400;2;0;0;10737418240 dom3.de;1;2022-04-29 13:36:08;2022-04-29 21:26:19;10;1;100;3;28132;12852485367;21474836480 ''' #@debugLog def doCmkOutputDomains() -> None: print("<<>>") for dom in domain_data: active = domain_data[dom]["active"] created = domain_data[dom]["created"] modified = domain_data[dom]["modified"] max_num_mboxes_for_domain = domain_data[dom]["max_num_mboxes_for_domain"] mboxes_in_domain = domain_data[dom]["mboxes_in_domain"] max_num_aliases_for_domain = domain_data[dom]["max_num_aliases_for_domain"] aliases_in_domain = domain_data[dom]["aliases_in_domain"] msgs_total = domain_data[dom]["msgs_total"] bytes_total = domain_data[dom]["bytes_total"] max_quota_for_domain = domain_data[dom]["max_quota_for_domain"] print(f"{dom};{active};{created};{modified};{max_num_mboxes_for_domain};{mboxes_in_domain};{max_num_aliases_for_domain};{aliases_in_domain};{msgs_total};{bytes_total};{max_quota_for_domain}") def main(): global debugLogFilename cmk.utils.password_store.replace_passwords() getOptions() # do some parameter checks if (opt_hostname == ""): sys.stderr.write(f"No hostname given.\n") showUsage() sys.exit(1) else: hostname = opt_hostname if (opt_apikey == ""): sys.stderr.write(f"No API key given.\n") showUsage() sys.exit(1) if (opt_no_cert_check): # disable certificate warnings urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) verify = False else: verify = True if (opt_port == ""): if (opt_no_https): protocol = "http" port = "80" else: protocol = "https" port = "443" else: if (opt_no_https): protocol = "http" else: protocol = "https" port = opt_port if (protocol == "http" and port == "443"): sys.stderr.write(f"Combining HTTP with port 443 is not supported.\n") sys.exit(1) if (protocol == "https" and port == "80"): sys.stderr.write(f"Combining HTTPS with port 80 is not supported.\n") sys.exit(1) headers = CaseInsensitiveDict() headers["Accept"] = "application/json" headers["X-API-Key"] = opt_apikey # now "hostname" contains the FQDN of the host running Mailcow # now "protocol" is http or https # now "port" contains the port number to use # now "verify" signals whether certificate checking will be done (True) or not (False) # now "headers" contains the accepted format (JSON) and the API key to use debugLogFilename = getDebugFilename(hostname) if DEBUG: showOptions() print(f"hostname: {hostname}, protocol: {protocol}, port: {port}, verify: {verify}") base_url = f"{protocol}://{hostname}:{port}/{mc_api_base}" # get domain data num_domains = getDomainInfo(headers, verify, base_url) # get mailbox data num_mailboxes, num_global_messages = getMailboxInfo(headers, verify, base_url) # get global Mailcow info solr_enabled, solr_size, solr_documents = getSolrInfo(headers, verify, base_url) mailcow_info = getMailcowInfo(headers, verify, base_url) mailcow_version = mailcow_info["mc_version"] github_version = mailcow_info["git_version"] update_available = mailcow_info["update_available"] check_version_enabled = mailcow_info["check_version_enabled"] # create agent output doCmkOutputDomains() doCmkOutputMailboxes() doCmkOutputMailcow(mailcow_version, num_domains, num_mailboxes, num_global_messages, solr_enabled, solr_size, solr_documents, github_version, update_available, check_version_enabled ) if __name__ == "__main__": main()