Mailcow-CheckMK/local/share/check_mk/agents/special/agent_mailcow
2024-01-26 15:05:03 +01:00

546 lines
21 KiB
Python
Executable File

#!/usr/bin/env python3
import getopt
import sys
import requests
import urllib3
import json
import os
import cmk.utils.password_store
from pprint import pprint
from requests.structures import CaseInsensitiveDict
from requests.auth import HTTPBasicAuth
from time import (
time, localtime, strftime,
)
def showUsage() -> None:
sys.stderr.write("""CheckMK Mailcow Special Agent
USAGE: agent_nextcloud -H [hostname] -k [apikey]
agent_nextcloud -h
OPTIONS:
-H, --hostname Hostname (FQDN or IP) of Nextcloud server
-k, --apikey API Key
-P, --port Port
--check-version True|False If "True": Running version will be checked against official Github repo
--no-https True|False If "True": Disable HTTPS, use HTTP (not recommended!)
--no-cert-check True|False If "True": Disable TLS certificate check (not recommended!)
-h, --help Show this help message and exit
""")
# set this to True to produce debug output (this clutters the agent output)
# be aware: activating this logs very sensitive information to debug files in ~/tmp
# !!DO NOT FORGET to delete these files after debugging is done!!
DEBUG = False
mc_api_base = "api/v1/get"
opt_hostname = ""
opt_apikey = ""
opt_port = ""
opt_check_version = False
opt_no_https = False
opt_no_cert_check = False
short_options = 'hH:k:P:'
long_options = [
'hostname=', 'apikey=', 'port=', 'check-version=', 'no-https=', 'no-cert-check=', 'help'
]
domain_data = {}
mailbox_data = {}
debugLogFilename = ""
SEP = "|"
#SEP = "\t"
TIMEFMT = "%Y-%m-%d %H:%M:%S"
FLOATFMT = "{:.4f}"
'''
Decorator function for debugging purposes
creates a file with many information regarding the function call, like:
timestamp
name of function
runtime
number of arguments
number of keyword arguments
return value of function call
type of return value
all parameters given to function
'''
def debugLog(function):
def wrapper(*args, **kwargs):
# execute function and measure runtime
start = time()
value = function(*args, **kwargs)
end = time()
# get number of args and kwargs
len_args = len(args)
len_kwargs = len(kwargs)
# format the output
seconds = FLOATFMT.format(end - start)
local_time = strftime(TIMEFMT,localtime(start))
# get function name
fname = function.__name__
# create output
# out1: Timestamp|Name of Function|Runtime|Num Args|Num Kwargs|Return Value|Return Value Type
out1 = f"{local_time}{SEP}{fname}{SEP}{seconds}{SEP}{len_args}{SEP}{len_kwargs}{SEP}{value}{SEP}{type(value)}"
# out2: all arguments
out2 = ""
for arg in args:
out2 = out2 + SEP + str(arg)
# out3: all keyword arguments
out3 = ""
if len_kwargs > 0:
for key, val in kwargs.items():
out3 = out3 + SEP + key + ":" + str(val)
# write output to file
if debugLogFilename != "":
try:
with open(debugLogFilename, "a+") as f:
f.write(f"{out1}{out2}{out3}\n")
except:
sys.stderr.write(f"Something went wrong when writing to file {debugLogFilename}\n")
sys.exit(1)
else:
sys.stderr.write(f"Debug activated, but no debug filename given.\n")
sys.exit(1)
return value
return wrapper
def getDebugFilename(hostname: str) -> str:
home_path = os.getenv("HOME")
tmp_path = f"{home_path}/tmp"
file_name = f"{tmp_path}/mailcow_{hostname}_debug.log"
return file_name
def getOptions() -> None:
global opt_hostname
global opt_apikey
global opt_port
global opt_check_version
global opt_no_https
global opt_no_cert_check
opts, args = getopt.getopt(sys.argv[1:], short_options, long_options)
for opt, arg in opts:
if opt in ['-H', '--hostname']:
opt_hostname = arg
elif opt in ['-k', '--apikey']:
opt_apikey = arg
elif opt in ['-P', '--port']:
opt_port = arg
elif opt in ['--check-version']:
if arg == 'True':
opt_check_version = True
else:
opt_check_version = False
elif opt in ['--no-https']:
if arg == 'True':
opt_no_https = True
else:
opt_no_https = False
elif opt in ['--no-cert-check']:
if arg == 'True':
opt_no_cert_check = True
else:
opt_no_cert_check = False
elif opt in ['-h', '--help']:
showUsage()
sys.exit(0)
if DEBUG:
home_path = os.getenv("HOME")
tmp_path = f"{home_path}/tmp"
help_file = f"{tmp_path}/mailcow_{opt_hostname}_debug.txt"
with open(help_file, "a") as file:
file.write(f"Number of Arguments: {len(sys.argv)}, Argument List: {str(sys.argv)}\n")
#@debugLog
def showOptions() -> None:
print(f"Hostname: {opt_hostname}")
print(f"Username: {opt_apikey}")
print(f"Port: {opt_port}")
print(f"Check Version: {opt_check_version}")
print(f"No HTTPS: {opt_no_https}")
print(f"No TLS Check: {opt_no_cert_check}")
home_path = os.getenv("HOME")
tmp_path = f"{home_path}/tmp"
help_file = f"{tmp_path}/mailcow_{opt_hostname}_debug.txt"
with open(help_file, "a") as file:
file.write(f"Hostname: {opt_hostname}, Port: {opt_port}, No HTTPS: {opt_no_https}, No Cert Check: {opt_no_cert_check}\n")
#@debugLog
def getDomainInfo(headers: str, verify: bool, base_url: str) -> int:
url = f"{base_url}/domain/all"
response = requests.get(url, headers=headers, verify=verify)
if (response.status_code == 200):
jsdata = response.text
data = json.loads(jsdata) # returns a list of dictionaries
i = 0
while i < len(data):
#pprint(data[i])
# get domain name and status (active (1) or not (0))
domain_name = data[i].get("domain_name")
active = data[i].get("active")
# get creation and last modification date
created = data[i].get("created")
modified = data[i].get("modified") # returns "None" or date
# get maximum and current number of mailboxes in this domain
max_num_mboxes_for_domain = data[i].get("max_num_mboxes_for_domain")
mboxes_in_domain = data[i].get("mboxes_in_domain")
# get maximum and current number of aliases in this domain
max_num_aliases_for_domain = data[i].get("max_num_aliases_for_domain")
aliases_in_domain = data[i].get("aliases_in_domain")
# get total messages in this domain
msgs_total = data[i].get("msgs_total")
# get total bytes used in this domain
bytes_total = data[i].get("bytes_total")
# get maximum quota for this domain
max_quota_for_domain = data[i].get("max_quota_for_domain")
# store all domain data in global dictionary
dom = {
"active": active,
"created": created,
"modified": modified,
"max_num_mboxes_for_domain": max_num_mboxes_for_domain,
"mboxes_in_domain": mboxes_in_domain,
"max_num_aliases_for_domain": max_num_aliases_for_domain,
"aliases_in_domain": aliases_in_domain,
"msgs_total": msgs_total,
"bytes_total": bytes_total,
"max_quota_for_domain": max_quota_for_domain
}
domain_data[domain_name] = {}
domain_data[domain_name] = dom
i += 1
# return number of email domains
return i
else:
sys.stderr.write(f"Request response code is {response.status_code} with URL {url}\n")
sys.exit(1)
#@debugLog
def getMailboxInfo(headers: str, verify: bool, base_url: str) -> tuple:
url = f"{base_url}/mailbox/all"
response = requests.get(url, headers=headers, verify=verify)
if (response.status_code == 200):
jsdata = response.text
data = json.loads(jsdata) # returns a list of dictionaries
i = 0
global_num_messages = 0
while i < len(data):
# get status of mailbox (0-->inactive or 1-->active)
active = data[i].get("active")
# get username of mailbox
username = data[i].get("username")
# get friendly name of user
name = data[i].get("name")
# get number of messages in mailbox
num_messages = data[i].get("messages")
# get quota used in percent (rounded to full percent)
percent_in_use = data[i].get("percent_in_use")
# get quota and quota used in bytes
quota = data[i].get("quota")
quota_used = data[i].get("quota_used")
# get creation and last modification date
created = data[i].get("created")
modified = data[i].get("modified")
# get number of messages
messages = data[i].get("messages")
# get last login dates
last_imap_login = data[i].get("last_imap_login")
last_pop3_login = data[i].get("last_pop3_login")
last_smtp_login = data[i].get("last_smtp_login")
mb = {
"active": active,
"created": created,
"modified": modified,
"name": name,
"num_messages": num_messages,
"percent_in_use": percent_in_use,
"quota": quota,
"quota_used": quota_used,
"messages": messages,
"last_imap_login": last_imap_login,
"last_pop3_login": last_pop3_login,
"last_smtp_login": last_smtp_login
}
mailbox_data[username] = {}
mailbox_data[username] = mb
i += 1
global_num_messages += num_messages
# return number of mailboxes and global number of messages
return i, global_num_messages
else:
sys.stderr.write(f"Request response code is {response.status_code} with URL {url}\n")
sys.exit(1)
def getGitVersion() -> str:
url = "https://api.github.com/repos/mailcow/mailcow-dockerized/releases/latest"
git_version = ""
response = requests.get(url)
if (response.status_code == 200):
jsdata = response.text
data = json.loads(jsdata) # returns a dictionary
git_version = data["tag_name"]
#print(git_version)
else:
sys.stderr.write(f"Request response code is {response.status_code} with URL {url}\n")
sys.exit(1)
return git_version
def compareVersions(mc_ver: str, git_ver: str) -> bool:
update_available = False
try:
mc_year, mc_month = mc_ver.split("-")
git_year, git_month = git_ver.split("-")
#print(mc_year, mc_month, git_year, git_month)
mc_year_int = int(mc_year)
git_year_int = int(git_year)
if git_year_int > mc_year_int:
update_available = True
else:
len_mc_month = len(mc_month)
len_git_month = len(git_month)
if len_mc_month == 2:
mc_month_int = int(mc_month)
elif len_mc_month == 3:
mc_month_int = int(mc_month[0:2])
mc_month_ver = mc_month[-1]
else:
pass
if len_git_month == 2:
git_month_int = int(git_month)
elif len_git_month == 3:
git_month_int = int(git_month[0:2])
git_month_ver = git_month[-1]
else:
pass
if git_month_int > mc_month_int:
update_available = True
elif len_mc_month == 2 and len_git_month == 3:
update_available = True
elif git_month_ver > mc_month_ver:
update_available = True
except:
pass
return update_available
#@debugLog
def getMailcowInfo(headers: str, verify: bool, base_url: str) -> dict:
info_data = {}
url = f"{base_url}/status/version"
response = requests.get(url, headers=headers, verify=verify)
if (response.status_code == 200):
jsdata = response.text
data = json.loads(jsdata) # returns a dictionary
#pprint(data)
# get Mailcow version
mc_version = data["version"]
# if enabled, check this version against the official Mailcow repo on Github
if opt_check_version:
git_version = getGitVersion()
update_available = compareVersions(mc_version, git_version)
#update_available = compareVersions("2023-01a", "2023-02")
#print(update_available)
info_data["git_version"] = git_version
info_data["update_available"] = update_available
else:
info_data["git_version"] = "Version check disabled"
info_data["update_available"] = False
info_data["mc_version"] = mc_version
info_data["check_version_enabled"] = opt_check_version
return info_data
else:
sys.stderr.write(f"Request response code is {response.status_code} with URL {url}\n")
sys.exit(1)
#@debugLog
def getSolrInfo(headers: str, verify: bool, base_url: str) -> tuple:
url = f"{base_url}/status/solr"
response = requests.get(url, headers=headers, verify=verify)
if (response.status_code == 200):
jsdata = response.text
data = json.loads(jsdata) # returns a dictionary
#pprint(data)
# get Solr infos
solr_enabled = data["solr_enabled"]
solr_size = data["solr_size"]
solr_documents = data["solr_documents"]
return solr_enabled, solr_size, solr_documents
else:
sys.stderr.write(f"Request response code is {response.status_code} with URL {url}\n")
sys.exit(1)
'''
Output is as follows:
0 mailbox name email address used for login
1 active 1 --> active, 0 --> not active
2 creation date "None" if ???
3 last modified date "None" if never modified
4 name display name
5 number of messages
6 percent in use quota used, rounded to full percents
7 quota max quota in bytes
8 quota used quota used in bytes
9 last imap login seconds since epoch, 0 if never logged in
10 last pop3 login seconds since epoch, 0 if never logged in
11 last smtp login seconds since epoch, 0 if never logged in
Example:
user1@dom1.de;1;2022-04-29 14:29:34;2022-04-29 14:29:34;Sarah;2433;2;21474836480;495481374;1692520168;0;1692281537
user2@dom1.de;1;2022-04-29 14:38:33;2022-04-29 14:38:33;Tom;271;0;21474836480;25895752;1657394782;1692519758;1681065713
user1@dom2.de;1;2022-04-30 09:55:37;2022-04-30 09:55:37;Melissa;53460;33;19964887040;6677677548;1692520066;0;1692510822
'''
#@debugLog
def doCmkOutputMailboxes() -> None:
print("<<<mailcow_mailboxes:sep(59)>>>")
for mb in mailbox_data:
active = mailbox_data[mb]["active"]
created = mailbox_data[mb]["created"]
modified = mailbox_data[mb]["modified"]
name = mailbox_data[mb]["name"]
# strip semicolons, if present, since we use it as delimiter
name = name.replace(";", "")
num_messages = mailbox_data[mb]["num_messages"]
percent_in_use = mailbox_data[mb]["percent_in_use"]
quota = mailbox_data[mb]["quota"]
quota_used = mailbox_data[mb]["quota_used"]
last_imap_login = mailbox_data[mb]["last_imap_login"]
last_pop3_login = mailbox_data[mb]["last_pop3_login"]
last_smtp_login = mailbox_data[mb]["last_smtp_login"]
print(f"{mb};{active};{created};{modified};{name};{num_messages};{percent_in_use};{quota};{quota_used};{last_imap_login};{last_pop3_login};{last_smtp_login}")
#@debugLog
def doCmkOutputMailcow(version: str,
num_domains: int, num_mailboxes: int, num_global_messages: int,
solr_enabled: bool, solr_size: float, solr_documents: int,
git_version: str, update_available: bool, check_version_enabled: bool
) -> None:
print("<<<mailcow_info:sep(59)>>>")
# strip semicolons, if present, since we use it as delimiter
version = version.replace(";", "")
print(f"{version};{num_domains};{num_mailboxes};{num_global_messages};{solr_enabled};{solr_size};{solr_documents};{git_version};{update_available};{check_version_enabled}")
'''
Output is as follows:
0 domain_name
1 active 1 --> active, 0 --> not active
2 creation date "None" if ???
3 last modified date "None" if never modified
4 max number mailboxes
5 number of mailboxes
6 max number of aliases
7 number of aliases
8 total number of messages
9 total number of bytes used in bytes
10 max quota in bytes
Example:
dom1.de;1;2022-04-23 22:54:57;None;10;0;400;6;0;0;10737418240
dom2.de;1;2022-04-29 13:38:42;2023-08-19 17:21:04;10;0;400;2;0;0;10737418240
dom3.de;1;2022-04-29 13:36:08;2022-04-29 21:26:19;10;1;100;3;28132;12852485367;21474836480
'''
#@debugLog
def doCmkOutputDomains() -> None:
print("<<<mailcow_domains:sep(59)>>>")
for dom in domain_data:
active = domain_data[dom]["active"]
created = domain_data[dom]["created"]
modified = domain_data[dom]["modified"]
max_num_mboxes_for_domain = domain_data[dom]["max_num_mboxes_for_domain"]
mboxes_in_domain = domain_data[dom]["mboxes_in_domain"]
max_num_aliases_for_domain = domain_data[dom]["max_num_aliases_for_domain"]
aliases_in_domain = domain_data[dom]["aliases_in_domain"]
msgs_total = domain_data[dom]["msgs_total"]
bytes_total = domain_data[dom]["bytes_total"]
max_quota_for_domain = domain_data[dom]["max_quota_for_domain"]
print(f"{dom};{active};{created};{modified};{max_num_mboxes_for_domain};{mboxes_in_domain};{max_num_aliases_for_domain};{aliases_in_domain};{msgs_total};{bytes_total};{max_quota_for_domain}")
def main():
global debugLogFilename
cmk.utils.password_store.replace_passwords()
getOptions()
# do some parameter checks
if (opt_hostname == ""):
sys.stderr.write(f"No hostname given.\n")
showUsage()
sys.exit(1)
else:
hostname = opt_hostname
if (opt_apikey == ""):
sys.stderr.write(f"No API key given.\n")
showUsage()
sys.exit(1)
if (opt_no_cert_check):
# disable certificate warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
verify = False
else:
verify = True
if (opt_port == ""):
if (opt_no_https):
protocol = "http"
port = "80"
else:
protocol = "https"
port = "443"
else:
if (opt_no_https):
protocol = "http"
else:
protocol = "https"
port = opt_port
if (protocol == "http" and port == "443"):
sys.stderr.write(f"Combining HTTP with port 443 is not supported.\n")
sys.exit(1)
if (protocol == "https" and port == "80"):
sys.stderr.write(f"Combining HTTPS with port 80 is not supported.\n")
sys.exit(1)
headers = CaseInsensitiveDict()
headers["Accept"] = "application/json"
headers["X-API-Key"] = opt_apikey
# now "hostname" contains the FQDN of the host running Mailcow
# now "protocol" is http or https
# now "port" contains the port number to use
# now "verify" signals whether certificate checking will be done (True) or not (False)
# now "headers" contains the accepted format (JSON) and the API key to use
debugLogFilename = getDebugFilename(hostname)
if DEBUG:
showOptions()
print(f"hostname: {hostname}, protocol: {protocol}, port: {port}, verify: {verify}")
base_url = f"{protocol}://{hostname}:{port}/{mc_api_base}"
# get domain data
num_domains = getDomainInfo(headers, verify, base_url)
# get mailbox data
num_mailboxes, num_global_messages = getMailboxInfo(headers, verify, base_url)
# get global Mailcow info
solr_enabled, solr_size, solr_documents = getSolrInfo(headers, verify, base_url)
mailcow_info = getMailcowInfo(headers, verify, base_url)
mailcow_version = mailcow_info["mc_version"]
github_version = mailcow_info["git_version"]
update_available = mailcow_info["update_available"]
check_version_enabled = mailcow_info["check_version_enabled"]
# create agent output
doCmkOutputDomains()
doCmkOutputMailboxes()
doCmkOutputMailcow(mailcow_version,
num_domains, num_mailboxes, num_global_messages,
solr_enabled, solr_size, solr_documents,
github_version, update_available, check_version_enabled
)
if __name__ == "__main__":
main()