#!/usr/bin/env python3 import getopt import sys import requests import urllib3 import json import os from pprint import pprint from requests.structures import CaseInsensitiveDict from requests.auth import HTTPBasicAuth def showUsage(): sys.stderr.write("""CheckMK Mailcow Special Agent USAGE: agent_nextcloud -H [hostname] -k [apikey] agent_nextcloud -h OPTIONS: -H, --hostname Hostname (FQDN or IP) of Nextcloud server -k, --apikey API Key -P, --port Port --no-https True|False If "True": Disable HTTPS, use HTTP (not recommended!) --no-cert-check True|False If "True": Disable TLS certificate check (not recommended!) -h, --help Show this help message and exit """) # set this to True to produce debug output (this clutters the agent output) # be aware: activating this logs very sensitive information to debug files in ~/tmp # !!DO NOT FORGET to delete these files after debugging is done!! DEBUG = False mc_api_base = "api/v1/get" opt_hostname = "" opt_apikey = "" opt_port = "" opt_no_https = False opt_no_cert_check = False short_options = 'hH:k:P:' long_options = [ 'hostname=', 'apikey=', 'port=', 'no-https=', 'no-cert-check=', 'help' ] domain_data = {} mailbox_data = {} def getOptions(): global opt_hostname global opt_apikey global opt_port global opt_no_https global opt_no_cert_check opts, args = getopt.getopt(sys.argv[1:], short_options, long_options) for opt, arg in opts: if opt in ['-H', '--hostname']: opt_hostname = arg elif opt in ['-k', '--apikey']: opt_apikey = arg elif opt in ['-P', '--port']: opt_port = arg elif opt in ['--no-https']: if arg == 'True': opt_no_https = True else: opt_no_https = False elif opt in ['--no-cert-check']: if arg == 'True': opt_no_cert_check = True else: opt_no_cert_check = False elif opt in ['-h', '--help']: showUsage() sys.exit(0) if DEBUG: home_path = os.getenv("HOME") tmp_path = f"{home_path}/tmp" help_file = f"{tmp_path}/mailcow_{opt_hostname}_debug.txt" with open(help_file, "a") as file: file.write(f"Number of Arguments: {len(sys.argv)}, Argument List: {str(sys.argv)}\n") def showOptions(): print(f"Hostname: {opt_hostname}") print(f"Username: {opt_apikey}") print(f"Port: {opt_port}") print(f"No HTTPS: {opt_no_https}") print(f"No TLS Check: {opt_no_cert_check}") home_path = os.getenv("HOME") tmp_path = f"{home_path}/tmp" help_file = f"{tmp_path}/mailcow_{opt_hostname}_debug.txt" with open(help_file, "a") as file: file.write(f"Hostname: {opt_hostname}, Port: {opt_port}, No HTTPS: {opt_no_https}, No Cert Check: {opt_no_cert_check}\n") def getDomainInfo(headers, verify, base_url): url = f"{base_url}/domain/all" response = requests.get(url, headers=headers, verify=verify) if (response.status_code == 200): jsdata = response.text data = json.loads(jsdata) # returns a list of dictionaries i = 0 while i < len(data): #pprint(data[i]) # get domain name and status (active (1) or not (0)) domain_name = data[i].get("domain_name") active = data[i].get("active") # get creation and last modification date created = data[i].get("created") modified = data[i].get("modified") # returns "None" or date # get maximum and current number of mailboxes in this domain max_num_mboxes_for_domain = data[i].get("max_num_mboxes_for_domain") mboxes_in_domain = data[i].get("mboxes_in_domain") # get maximum and current number of aliases in this domain max_num_aliases_for_domain = data[i].get("max_num_aliases_for_domain") aliases_in_domain = data[i].get("aliases_in_domain") # get total messages in this domain msgs_total = data[i].get("msgs_total") # get total bytes used in this domain bytes_total = data[i].get("bytes_total") # get maximum quota for this domain max_quota_for_domain = data[i].get("max_quota_for_domain") # store all domain data in global dictionary dom = { "active": active, "created": created, "modified": modified, "max_num_mboxes_for_domain": max_num_mboxes_for_domain, "mboxes_in_domain": mboxes_in_domain, "max_num_aliases_for_domain": max_num_aliases_for_domain, "aliases_in_domain": aliases_in_domain, "msgs_total": msgs_total, "bytes_total": bytes_total, "max_quota_for_domain": max_quota_for_domain } domain_data[domain_name] = {} domain_data[domain_name] = dom i += 1 # return number of email domains return i else: sys.stderr.write(f"Request response code is {response.status_code} with URL {url}\n") sys.exit(1) def getMailboxInfo(headers, verify, base_url): url = f"{base_url}/mailbox/all" response = requests.get(url, headers=headers, verify=verify) if (response.status_code == 200): jsdata = response.text data = json.loads(jsdata) # returns a list of dictionaries i = 0 global_num_messages = 0 while i < len(data): # get status of mailbox (0-->inactive or 1-->active) active = data[i].get("active") # get username of mailbox username = data[i].get("username") # get friendly name of user name = data[i].get("name") # get number of messages in mailbox num_messages = data[i].get("messages") # get quota used in percent (rounded to full percent) percent_in_use = data[i].get("percent_in_use") # get quota and quota used in bytes quota = data[i].get("quota") quota_used = data[i].get("quota_used") # get creation and last modification date created = data[i].get("created") modified = data[i].get("modified") # get number of messages messages = data[i].get("messages") # get last login dates last_imap_login = data[i].get("last_imap_login") last_pop3_login = data[i].get("last_pop3_login") last_smtp_login = data[i].get("last_smtp_login") mb = { "active": active, "created": created, "modified": modified, "name": name, "num_messages": num_messages, "percent_in_use": percent_in_use, "quota": quota, "quota_used": quota_used, "messages": messages, "last_imap_login": last_imap_login, "last_pop3_login": last_pop3_login, "last_smtp_login": last_smtp_login } mailbox_data[username] = {} mailbox_data[username] = mb i += 1 global_num_messages += num_messages # return number of mailboxes and global number of messages return i, global_num_messages else: sys.stderr.write(f"Request response code is {response.status_code} with URL {url}\n") sys.exit(1) def getMailcowInfo(headers, verify, base_url): url = f"{base_url}/status/version" response = requests.get(url, headers=headers, verify=verify) if (response.status_code == 200): jsdata = response.text data = json.loads(jsdata) # returns a dictionary #pprint(data) # get Mailcow version mc_version = data["version"] return mc_version else: sys.stderr.write(f"Request response code is {response.status_code} with URL {url}\n") sys.exit(1) ''' Output is as follows: mailbox name email address used for login active 1 --> active, 0 --> not active creation date "None" if ??? last modified date "None" if never modified name display name number of messages percent in use quota used, rounded to full percents quota max quota in bytes quota used quota used in bytes last imap login seconds since epoch, 0 if never logged inin seconds since epoch last pop3 login seconds since epoch, 0 if never logged in last smtp login seconds since epoch, 0 if never logged in Example: user1@dom1.de;1;2022-04-29 14:29:34;2022-04-29 14:29:34;Sarah;2433;2;21474836480;495481374;1692520168;0;1692281537 user2@dom1.de;1;2022-04-29 14:38:33;2022-04-29 14:38:33;Tom;271;0;21474836480;25895752;1657394782;1692519758;1681065713 user1@dom2.de;1;2022-04-30 09:55:37;2022-04-30 09:55:37;Melissa;53460;33;19964887040;6677677548;1692520066;0;1692510822 ''' def doCmkOutputMailboxes(): print("<<>>") for mb in mailbox_data: active = mailbox_data[mb]["active"] created = mailbox_data[mb]["created"] modified = mailbox_data[mb]["modified"] name = mailbox_data[mb]["name"] # strip semicolons, if present, since we use it as delimiter name = name.replace(";", "") num_messages = mailbox_data[mb]["num_messages"] percent_in_use = mailbox_data[mb]["percent_in_use"] quota = mailbox_data[mb]["quota"] quota_used = mailbox_data[mb]["quota_used"] last_imap_login = mailbox_data[mb]["last_imap_login"] last_pop3_login = mailbox_data[mb]["last_pop3_login"] last_smtp_login = mailbox_data[mb]["last_smtp_login"] print(f"{mb};{active};{created};{modified};{name};{num_messages};{percent_in_use};{quota};{quota_used};{last_imap_login};{last_pop3_login};{last_smtp_login}") def doCmkOutputMailcow(version, num_domains, num_mailboxes, num_global_messages): print("<<>>") # strip semicolons, if present, since we use it as delimiter version = version.replace(";", "") print(f"{version};{num_domains};{num_mailboxes};{num_global_messages}") ''' Output is as follows: domain_name active 1 --> active, 0 --> not active creation date "None" if ??? last modified date "None" if never modified max number mailboxes number of mailboxes max number of aliases number of aliases total number of messages total number of bytes used in bytes max quota in bytes Example: dom1.de;1;2022-04-23 22:54:57;None;10;0;400;6;0;0;10737418240 dom2.de;1;2022-04-29 13:38:42;2023-08-19 17:21:04;10;0;400;2;0;0;10737418240 dom3.de;1;2022-04-29 13:36:08;2022-04-29 21:26:19;10;1;100;3;28132;12852485367;21474836480 ''' def doCmkOutputDomains(): print("<<>>") for dom in domain_data: active = domain_data[dom]["active"] created = domain_data[dom]["created"] modified = domain_data[dom]["modified"] max_num_mboxes_for_domain = domain_data[dom]["max_num_mboxes_for_domain"] mboxes_in_domain = domain_data[dom]["mboxes_in_domain"] max_num_aliases_for_domain = domain_data[dom]["max_num_aliases_for_domain"] aliases_in_domain = domain_data[dom]["aliases_in_domain"] msgs_total = domain_data[dom]["msgs_total"] bytes_total = domain_data[dom]["bytes_total"] max_quota_for_domain = domain_data[dom]["max_quota_for_domain"] print(f"{dom};{active};{created};{modified};{max_num_mboxes_for_domain};{mboxes_in_domain};{max_num_aliases_for_domain};{aliases_in_domain};{msgs_total};{bytes_total};{max_quota_for_domain}") def main(): getOptions() # do some parameter checks if (opt_hostname == ""): sys.stderr.write(f"No hostname given.\n") showUsage() sys.exit(1) else: hostname = opt_hostname if (opt_apikey == ""): sys.stderr.write(f"No API key given.\n") showUsage() sys.exit(1) if (opt_no_cert_check): # disable certificate warnings urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) verify = False else: verify = True if (opt_port == ""): if (opt_no_https): protocol = "http" port = "80" else: protocol = "https" port = "443" else: if (opt_no_https): protocol = "http" else: protocol = "https" port = opt_port if (protocol == "http" and port == "443"): sys.stderr.write(f"Combining HTTP with port 443 is not supported.\n") sys.exit(1) if (protocol == "https" and port == "80"): sys.stderr.write(f"Combining HTTPS with port 80 is not supported.\n") sys.exit(1) headers = CaseInsensitiveDict() headers["Accept"] = "application/json" headers["X-API-Key"] = opt_apikey # now "hostname" contains the FQDN of the host running Mailcow # now "protocol" is http or https # now "port" contains the port number to use # now "verify" signals whether certificate checking will be done (True) or not (False) # now "headers" contains the accepted format (JSON) and the API key to use if DEBUG: showOptions() print(f"hostname: {hostname}, protocol: {protocol}, port: {port}, verify: {verify}") base_url = f"{protocol}://{hostname}:{port}/{mc_api_base}" # get domain data num_domains = getDomainInfo(headers, verify, base_url) # get mailbox data num_mailboxes, num_global_messages = getMailboxInfo(headers, verify, base_url) # get global Mailcow info mailcow_version = getMailcowInfo(headers, verify, base_url) # create agent output doCmkOutputDomains() doCmkOutputMailboxes() doCmkOutputMailcow(mailcow_version, num_domains, num_mailboxes, num_global_messages) if __name__ == "__main__": main()