Mailcow-CheckMK/local/share/check_mk/agents/special/agent_mailcow

246 lines
8.6 KiB
Python
Executable File

#!/usr/bin/env python3
import getopt
import sys
import requests
import urllib3
import json
import os
from pprint import pprint
from requests.structures import CaseInsensitiveDict
from requests.auth import HTTPBasicAuth
def showUsage():
sys.stderr.write("""CheckMK Mailcow Special Agent
USAGE: agent_nextcloud -H [hostname] -k [apikey]
agent_nextcloud -h
OPTIONS:
-H, --hostname Hostname (FQDN or IP) of Nextcloud server
-k, --apikey API Key
-P, --port Port
--no-https True|False If "True": Disable HTTPS, use HTTP (not recommended!)
--no-cert-check True|False If "True": Disable TLS certificate check (not recommended!)
-h, --help Show this help message and exit
""")
# set this to True to produce debug output (this clutters the agent output)
# be aware: activating this logs very sensitive information to debug files in ~/tmp
# !!DO NOT FORGET to delete these files after debugging is done!!
DEBUG = False
mc_api_base = "api/v1/get"
opt_hostname = ""
opt_apikey = ""
opt_port = ""
opt_no_https = False
opt_no_cert_check = False
short_options = 'hH:k:P:'
long_options = [
'hostname=', 'apikey=', 'port=', 'no-https=', 'no-cert-check=', 'help'
]
domain_data = {}
mailbox_data = {}
def getOptions():
global opt_hostname
global opt_apikey
global opt_port
global opt_no_https
global opt_no_cert_check
opts, args = getopt.getopt(sys.argv[1:], short_options, long_options)
for opt, arg in opts:
if opt in ['-H', '--hostname']:
opt_hostname = arg
elif opt in ['-k', '--apikey']:
opt_apikey = arg
elif opt in ['-P', '--port']:
opt_port = arg
elif opt in ['--no-https']:
if arg == 'True':
opt_no_https = True
else:
opt_no_https = False
elif opt in ['--no-cert-check']:
if arg == 'True':
opt_no_cert_check = True
else:
opt_no_cert_check = False
elif opt in ['-h', '--help']:
showUsage()
sys.exit(0)
if DEBUG:
home_path = os.getenv("HOME")
tmp_path = f"{home_path}/tmp"
help_file = f"{tmp_path}/mailcow_{opt_hostname}_debug.txt"
with open(help_file, "a") as file:
file.write(f"Number of Arguments: {len(sys.argv)}, Argument List: {str(sys.argv)}\n")
def showOptions():
print(f"Hostname: {opt_hostname}")
print(f"Username: {opt_apikey}")
print(f"Port: {opt_port}")
print(f"No HTTPS: {opt_no_https}")
print(f"No TLS Check: {opt_no_cert_check}")
home_path = os.getenv("HOME")
tmp_path = f"{home_path}/tmp"
help_file = f"{tmp_path}/mailcow_{opt_hostname}_debug.txt"
with open(help_file, "a") as file:
file.write(f"Hostname: {opt_hostname}, Port: {opt_port}, No HTTPS: {opt_no_https}, No Cert Check: {opt_no_cert_check}\n")
def getDomainInfo(headers, verify, base_url):
url = f"{base_url}/domain/all"
response = requests.get(url, headers=headers, verify=verify)
if (response.status_code == 200):
jsdata = response.text
data = json.loads(jsdata) # returns a list of dictionaries
i = 0
while i < len(data):
#pprint(data[i])
# get domain name and status (active (1) or not (0))
domain_name = data[i].get("domain_name")
active = data[i].get("active")
# get creation and last modification date
created = data[i].get("created")
modified = data[i].get("modified") # returns "None" or date
# get maximum and current number of mailboxes in this domain
max_num_mboxes_for_domain = data[i].get("max_num_mboxes_for_domain")
mboxes_in_domain = data[i].get("mboxes_in_domain")
# get maximum and current number of aliases in this domain
max_num_aliases_for_domain = data[i].get("max_num_aliases_for_domain")
aliases_in_domain = data[i].get("aliases_in_domain")
# get total messages in this domain
msgs_total = data[i].get("msgs_total")
# get total bytes used in this domain
bytes_total = data[i].get("bytes_total")
# get maximum quota for this domain
max_quota_for_domain = data[i].get("max_quota_for_domain")
i += 1
# return number of email domains
return i
else:
sys.stderr.write(f"Request response code is {response.status_code} with URL {url}\n")
sys.exit(1)
def getMailboxInfo(headers, verify, base_url):
url = f"{base_url}/mailbox/all"
response = requests.get(url, headers=headers, verify=verify)
if (response.status_code == 200):
jsdata = response.text
data = json.loads(jsdata) # returns a list of dictionaries
i = 0
global_num_messages = 0
while i < len(data):
#pprint(data[i])
# get status of mailbox (0-->inactive or 1-->active)
active = data[i].get("active")
# get username of mailbox
username = data[i].get("username")
# get friendly name of user
name = data[i].get("name")
# get number of messages in mailbox
num_messages = data[i].get("messages")
# get quota used in percent
percent_in_use = data[i].get("percent_in_use")
# get creation and last modification date
created = data[i].get("created")
modified = data[i].get("modified")
# get number of messages
messages = data[i].get("messages")
# get last login dates
last_imap_login = data[i].get("last_imap_login")
last_pop3_login = data[i].get("last_pop3_login")
last_smtp_login = data[i].get("last_smtp_login")
i += 1
global_num_messages += num_messages
# return number of mailboxes and global number of messages
return i, global_num_messages
else:
sys.stderr.write(f"Request response code is {response.status_code} with URL {url}\n")
sys.exit(1)
def getMailcowInfo(headers, verify, base_url):
url = f"{base_url}/status/version"
response = requests.get(url, headers=headers, verify=verify)
if (response.status_code == 200):
jsdata = response.text
data = json.loads(jsdata) # returns a dictionary
#pprint(data)
# get Mailcow version
mc_version = data["version"]
return mc_version
else:
sys.stderr.write(f"Request response code is {response.status_code} with URL {url}\n")
sys.exit(1)
def main():
getOptions()
# do some parameter checks
if (opt_hostname == ""):
sys.stderr.write(f"No hostname given.\n")
showUsage()
sys.exit(1)
else:
hostname = opt_hostname
if (opt_apikey == ""):
sys.stderr.write(f"No API key given.\n")
showUsage()
sys.exit(1)
if (opt_no_cert_check):
# disable certificate warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
verify = False
else:
verify = True
if (opt_port == ""):
if (opt_no_https):
protocol = "http"
port = "80"
else:
protocol = "https"
port = "443"
else:
if (opt_no_https):
protocol = "http"
else:
protocol = "https"
port = opt_port
if (protocol == "http" and port == "443"):
sys.stderr.write(f"Combining HTTP with port 443 is not supported.\n")
sys.exit(1)
if (protocol == "https" and port == "80"):
sys.stderr.write(f"Combining HTTPS with port 80 is not supported.\n")
sys.exit(1)
headers = CaseInsensitiveDict()
headers["Accept"] = "application/json"
headers["X-API-Key"] = opt_apikey
# now "hostname" contains the FQDN of the host running Mailcow
# now "protocol" is http or https
# now "port" contains the port number to use
# now "verify" signals whether certificate checking will be done (True) or not (False)
# now "headers" contains the accepted format (JSON) and the API key to use
if DEBUG:
showOptions()
print(f"hostname: {hostname}, protocol: {protocol}, port: {port}, verify: {verify}")
base_url = f"{protocol}://{hostname}:{port}/{mc_api_base}"
# get domain data
num_domains = getDomainInfo(headers, verify, base_url)
# get mailbox data
num_mailboxes, num_global_messages = getMailboxInfo(headers, verify, base_url)
# get global Mailcow info
mailcow_version = getMailcowInfo(headers, verify, base_url)
if __name__ == "__main__":
main()