Mailcow-CheckMK/local/share/check_mk/agents/special/agent_mailcow

366 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
import getopt
import sys
import requests
import urllib3
import json
import os
from pprint import pprint
from requests.structures import CaseInsensitiveDict
from requests.auth import HTTPBasicAuth
def showUsage():
sys.stderr.write("""CheckMK Mailcow Special Agent
USAGE: agent_nextcloud -H [hostname] -k [apikey]
agent_nextcloud -h
OPTIONS:
-H, --hostname Hostname (FQDN or IP) of Nextcloud server
-k, --apikey API Key
-P, --port Port
--no-https True|False If "True": Disable HTTPS, use HTTP (not recommended!)
--no-cert-check True|False If "True": Disable TLS certificate check (not recommended!)
-h, --help Show this help message and exit
""")
# set this to True to produce debug output (this clutters the agent output)
# be aware: activating this logs very sensitive information to debug files in ~/tmp
# !!DO NOT FORGET to delete these files after debugging is done!!
DEBUG = False
mc_api_base = "api/v1/get"
opt_hostname = ""
opt_apikey = ""
opt_port = ""
opt_no_https = False
opt_no_cert_check = False
short_options = 'hH:k:P:'
long_options = [
'hostname=', 'apikey=', 'port=', 'no-https=', 'no-cert-check=', 'help'
]
domain_data = {}
mailbox_data = {}
def getOptions():
global opt_hostname
global opt_apikey
global opt_port
global opt_no_https
global opt_no_cert_check
opts, args = getopt.getopt(sys.argv[1:], short_options, long_options)
for opt, arg in opts:
if opt in ['-H', '--hostname']:
opt_hostname = arg
elif opt in ['-k', '--apikey']:
opt_apikey = arg
elif opt in ['-P', '--port']:
opt_port = arg
elif opt in ['--no-https']:
if arg == 'True':
opt_no_https = True
else:
opt_no_https = False
elif opt in ['--no-cert-check']:
if arg == 'True':
opt_no_cert_check = True
else:
opt_no_cert_check = False
elif opt in ['-h', '--help']:
showUsage()
sys.exit(0)
if DEBUG:
home_path = os.getenv("HOME")
tmp_path = f"{home_path}/tmp"
help_file = f"{tmp_path}/mailcow_{opt_hostname}_debug.txt"
with open(help_file, "a") as file:
file.write(f"Number of Arguments: {len(sys.argv)}, Argument List: {str(sys.argv)}\n")
def showOptions():
print(f"Hostname: {opt_hostname}")
print(f"Username: {opt_apikey}")
print(f"Port: {opt_port}")
print(f"No HTTPS: {opt_no_https}")
print(f"No TLS Check: {opt_no_cert_check}")
home_path = os.getenv("HOME")
tmp_path = f"{home_path}/tmp"
help_file = f"{tmp_path}/mailcow_{opt_hostname}_debug.txt"
with open(help_file, "a") as file:
file.write(f"Hostname: {opt_hostname}, Port: {opt_port}, No HTTPS: {opt_no_https}, No Cert Check: {opt_no_cert_check}\n")
def getDomainInfo(headers, verify, base_url):
url = f"{base_url}/domain/all"
response = requests.get(url, headers=headers, verify=verify)
if (response.status_code == 200):
jsdata = response.text
data = json.loads(jsdata) # returns a list of dictionaries
i = 0
while i < len(data):
#pprint(data[i])
# get domain name and status (active (1) or not (0))
domain_name = data[i].get("domain_name")
active = data[i].get("active")
# get creation and last modification date
created = data[i].get("created")
modified = data[i].get("modified") # returns "None" or date
# get maximum and current number of mailboxes in this domain
max_num_mboxes_for_domain = data[i].get("max_num_mboxes_for_domain")
mboxes_in_domain = data[i].get("mboxes_in_domain")
# get maximum and current number of aliases in this domain
max_num_aliases_for_domain = data[i].get("max_num_aliases_for_domain")
aliases_in_domain = data[i].get("aliases_in_domain")
# get total messages in this domain
msgs_total = data[i].get("msgs_total")
# get total bytes used in this domain
bytes_total = data[i].get("bytes_total")
# get maximum quota for this domain
max_quota_for_domain = data[i].get("max_quota_for_domain")
# store all domain data in global dictionary
dom = {
"active": active,
"created": created,
"modified": modified,
"max_num_mboxes_for_domain": max_num_mboxes_for_domain,
"mboxes_in_domain": mboxes_in_domain,
"max_num_aliases_for_domain": max_num_aliases_for_domain,
"aliases_in_domain": aliases_in_domain,
"msgs_total": msgs_total,
"bytes_total": bytes_total,
"max_quota_for_domain": max_quota_for_domain
}
domain_data[domain_name] = {}
domain_data[domain_name] = dom
i += 1
# return number of email domains
return i
else:
sys.stderr.write(f"Request response code is {response.status_code} with URL {url}\n")
sys.exit(1)
def getMailboxInfo(headers, verify, base_url):
url = f"{base_url}/mailbox/all"
response = requests.get(url, headers=headers, verify=verify)
if (response.status_code == 200):
jsdata = response.text
data = json.loads(jsdata) # returns a list of dictionaries
i = 0
global_num_messages = 0
while i < len(data):
# get status of mailbox (0-->inactive or 1-->active)
active = data[i].get("active")
# get username of mailbox
username = data[i].get("username")
# get friendly name of user
name = data[i].get("name")
# get number of messages in mailbox
num_messages = data[i].get("messages")
# get quota used in percent (rounded to full percent)
percent_in_use = data[i].get("percent_in_use")
# get quota and quota used in bytes
quota = data[i].get("quota")
quota_used = data[i].get("quota_used")
# get creation and last modification date
created = data[i].get("created")
modified = data[i].get("modified")
# get number of messages
messages = data[i].get("messages")
# get last login dates
last_imap_login = data[i].get("last_imap_login")
last_pop3_login = data[i].get("last_pop3_login")
last_smtp_login = data[i].get("last_smtp_login")
mb = {
"active": active,
"created": created,
"modified": modified,
"name": name,
"num_messages": num_messages,
"percent_in_use": percent_in_use,
"quota": quota,
"quota_used": quota_used,
"messages": messages,
"last_imap_login": last_imap_login,
"last_pop3_login": last_pop3_login,
"last_smtp_login": last_smtp_login
}
mailbox_data[username] = {}
mailbox_data[username] = mb
i += 1
global_num_messages += num_messages
# return number of mailboxes and global number of messages
return i, global_num_messages
else:
sys.stderr.write(f"Request response code is {response.status_code} with URL {url}\n")
sys.exit(1)
def getMailcowInfo(headers, verify, base_url):
url = f"{base_url}/status/version"
response = requests.get(url, headers=headers, verify=verify)
if (response.status_code == 200):
jsdata = response.text
data = json.loads(jsdata) # returns a dictionary
#pprint(data)
# get Mailcow version
mc_version = data["version"]
return mc_version
else:
sys.stderr.write(f"Request response code is {response.status_code} with URL {url}\n")
sys.exit(1)
'''
Output is as follows:
mailbox name email address used for login
active 1 --> active, 0 --> not active
creation date "None" if ???
last modified date "None" if never modified
name display name
number of messages
percent in use quota used, rounded to full percents
quota max quota in bytes
quota used quota used in bytes
last imap login seconds since epoch, 0 if never logged inin seconds since epoch
last pop3 login seconds since epoch, 0 if never logged in
last smtp login seconds since epoch, 0 if never logged in
Example:
user1@dom1.de;1;2022-04-29 14:29:34;2022-04-29 14:29:34;Sarah;2433;2;21474836480;495481374;1692520168;0;1692281537
user2@dom1.de;1;2022-04-29 14:38:33;2022-04-29 14:38:33;Tom;271;0;21474836480;25895752;1657394782;1692519758;1681065713
user1@dom2.de;1;2022-04-30 09:55:37;2022-04-30 09:55:37;Melissa;53460;33;19964887040;6677677548;1692520066;0;1692510822
'''
def doCmkOutputMailboxes():
print("<<<mailcow_mailboxes:sep(59)>>>")
for mb in mailbox_data:
active = mailbox_data[mb]["active"]
created = mailbox_data[mb]["created"]
modified = mailbox_data[mb]["modified"]
name = mailbox_data[mb]["name"]
# strip semicolons, if present, since we use it as delimiter
name = name.replace(";", "")
num_messages = mailbox_data[mb]["num_messages"]
percent_in_use = mailbox_data[mb]["percent_in_use"]
quota = mailbox_data[mb]["quota"]
quota_used = mailbox_data[mb]["quota_used"]
last_imap_login = mailbox_data[mb]["last_imap_login"]
last_pop3_login = mailbox_data[mb]["last_pop3_login"]
last_smtp_login = mailbox_data[mb]["last_smtp_login"]
print(f"{mb};{active};{created};{modified};{name};{num_messages};{percent_in_use};{quota};{quota_used};{last_imap_login};{last_pop3_login};{last_smtp_login}")
def doCmkOutputMailcow(version, num_domains, num_mailboxes, num_global_messages):
print("<<<mailcow_info:sep(59)>>>")
# strip semicolons, if present, since we use it as delimiter
version = version.replace(";", "")
print(f"{version};{num_domains};{num_mailboxes};{num_global_messages}")
'''
Output is as follows:
domain_name
active 1 --> active, 0 --> not active
creation date "None" if ???
last modified date "None" if never modified
max number mailboxes
number of mailboxes
max number of aliases
number of aliases
total number of messages
total number of bytes used in bytes
max quota in bytes
Example:
dom1.de;1;2022-04-23 22:54:57;None;10;0;400;6;0;0;10737418240
dom2.de;1;2022-04-29 13:38:42;2023-08-19 17:21:04;10;0;400;2;0;0;10737418240
dom3.de;1;2022-04-29 13:36:08;2022-04-29 21:26:19;10;1;100;3;28132;12852485367;21474836480
'''
def doCmkOutputDomains():
print("<<<mailcow_domains:sep(59)>>>")
for dom in domain_data:
active = domain_data[dom]["active"]
created = domain_data[dom]["created"]
modified = domain_data[dom]["modified"]
max_num_mboxes_for_domain = domain_data[dom]["max_num_mboxes_for_domain"]
mboxes_in_domain = domain_data[dom]["mboxes_in_domain"]
max_num_aliases_for_domain = domain_data[dom]["max_num_aliases_for_domain"]
aliases_in_domain = domain_data[dom]["aliases_in_domain"]
msgs_total = domain_data[dom]["msgs_total"]
bytes_total = domain_data[dom]["bytes_total"]
max_quota_for_domain = domain_data[dom]["max_quota_for_domain"]
print(f"{dom};{active};{created};{modified};{max_num_mboxes_for_domain};{mboxes_in_domain};{max_num_aliases_for_domain};{aliases_in_domain};{msgs_total};{bytes_total};{max_quota_for_domain}")
def main():
getOptions()
# do some parameter checks
if (opt_hostname == ""):
sys.stderr.write(f"No hostname given.\n")
showUsage()
sys.exit(1)
else:
hostname = opt_hostname
if (opt_apikey == ""):
sys.stderr.write(f"No API key given.\n")
showUsage()
sys.exit(1)
if (opt_no_cert_check):
# disable certificate warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
verify = False
else:
verify = True
if (opt_port == ""):
if (opt_no_https):
protocol = "http"
port = "80"
else:
protocol = "https"
port = "443"
else:
if (opt_no_https):
protocol = "http"
else:
protocol = "https"
port = opt_port
if (protocol == "http" and port == "443"):
sys.stderr.write(f"Combining HTTP with port 443 is not supported.\n")
sys.exit(1)
if (protocol == "https" and port == "80"):
sys.stderr.write(f"Combining HTTPS with port 80 is not supported.\n")
sys.exit(1)
headers = CaseInsensitiveDict()
headers["Accept"] = "application/json"
headers["X-API-Key"] = opt_apikey
# now "hostname" contains the FQDN of the host running Mailcow
# now "protocol" is http or https
# now "port" contains the port number to use
# now "verify" signals whether certificate checking will be done (True) or not (False)
# now "headers" contains the accepted format (JSON) and the API key to use
if DEBUG:
showOptions()
print(f"hostname: {hostname}, protocol: {protocol}, port: {port}, verify: {verify}")
base_url = f"{protocol}://{hostname}:{port}/{mc_api_base}"
# get domain data
num_domains = getDomainInfo(headers, verify, base_url)
# get mailbox data
num_mailboxes, num_global_messages = getMailboxInfo(headers, verify, base_url)
# get global Mailcow info
mailcow_version = getMailcowInfo(headers, verify, base_url)
# create agent output
doCmkOutputDomains()
doCmkOutputMailboxes()
doCmkOutputMailcow(mailcow_version, num_domains, num_mailboxes, num_global_messages)
if __name__ == "__main__":
main()