#!/usr/bin/env python3 import getopt import sys import requests import urllib3 import json import os from pprint import pprint from requests.structures import CaseInsensitiveDict from requests.auth import HTTPBasicAuth import cmk.utils.password_store def showUsage(): sys.stderr.write("""CheckMK Nextcloud Special Agent USAGE: agent_nextcloud_info -u [username] -p [password] OR agent_nextcloud_info -h OPTIONS: -H, --hostname Hostname (FQDN or IP) of Nextcloud server -u, --username Username -p, --password App Password -P, --port Port -f, --folder Subfolder if not installed in web root --no-https True|False If "True": Disable HTTPS, use HTTP (not recommended!) --no-cert-check True|False If "True": Disable TLS certificate check (not recommended!) -h, --help Show this help message and exit """) # set this to true to produce debug output (this clutters the agent output) # be aware: activating this flag logs very sensitive information to debug files in ~/tmp # !!DO NOT FORGET to delete these files after debugging is done!! DEBUG = False nc_api_endpoint = "ocs/v2.php/apps/serverinfo/api/v1/info?format=json" nc_api_endpoint_all_users = "ocs/v1.php/cloud/users?format=json" nc_api_endpoint_user = "ocs/v1.php/cloud/users" opt_hostname = "" opt_username = "" opt_password = "" opt_port = "" opt_folder = "" opt_no_https = False opt_no_cert_check = False short_options = 'hH:u:p:P:f:' long_options = [ 'hostname=', 'username=', 'password=', 'port=', 'folder=', 'no-https=', 'no-cert-check=', 'help' ] def logDebug(line): if DEBUG: home_path = os.getenv("HOME") tmp_path = f"{home_path}/tmp" help_file = f"{tmp_path}/nextcloud_{opt_hostname}_{opt_port}_debug.txt" with open(help_file, "a") as file: file.write(line) def getOptions(): global opt_hostname global opt_username global opt_password global opt_port global opt_folder global opt_no_https global opt_no_cert_check opts, args = getopt.getopt(sys.argv[1:], short_options, long_options) for opt, arg in opts: if opt in ['-H', '--hostname']: opt_hostname = arg elif opt in ['-u', '--username']: opt_username = arg elif opt in ['-p', '--password']: opt_password = arg elif opt in ['-P', '--port']: opt_port = arg elif opt in ['-f', '--folder']: opt_folder = arg elif opt in ['--no-https']: if arg == 'True': opt_no_https = True else: opt_no_https = False elif opt in ['--no-cert-check']: if arg == 'True': opt_no_cert_check = True else: opt_no_cert_check = False elif opt in ['-h', '--help']: showUsage() sys.exit(0) logDebug(f"getOptions - Number of Arguments: {len(sys.argv)}, Argument List: {str(sys.argv)}\n") def showOptions(): print(f"Hostname: {opt_hostname}") print(f"Username: {opt_username}") print(f"Password: {opt_password}") print(f"Port: {opt_port}") print(f"Folder: {opt_folder}") print(f"No HTTPS: {opt_no_https}") print(f"No TLS Check: {opt_no_cert_check}") logDebug(f"showOptions - Hostname: {opt_hostname}, Port: {opt_port}, No HTTPS: {opt_no_https}, No Cert Check: {opt_no_cert_check}\n") def createUrl(endpoint, hostname, protocol, port, folder): # these parameters are needed, otherwise no information about updates regarding apps and Nextcloud itself are reported (since version 28) params = "skipApps=false&skipUpdate=false" if folder == "": url = f"{protocol}://{hostname}:{port}/{endpoint}" else: url = f"{protocol}://{hostname}:{port}/{folder}/{endpoint}" if endpoint == nc_api_endpoint: url = f"{url}&{params}" logDebug(f"createUrl - Data URL: {url}\n") return url def createUrlUser(user, endpoint, hostname, protocol, port, folder): params = "format=json" if folder == "": url = f"{protocol}://{hostname}:{port}/{endpoint}/{user}?{params}" else: url = f"{protocol}://{hostname}:{port}/{folder}/{endpoint}/{user}?{params}" logDebug(f"createUrlUser - User URL: {url}\n") return url def getSession(username, secret): session = requests.session() session.cookies.set("SameSite", "Strict") session.auth = (username, secret) session.headers['Accept'] = "application/json" return session def getData(session, url, verify): data = {} headers = CaseInsensitiveDict() headers["Accept"] = "application/json" response = session.get(url, headers=headers, verify=verify) status = response.status_code if (status == 200): jsdata = response.text data = json.loads(jsdata) # returns a dictionary return data elif (status == 503): # this code is reported when maintenance mode is on sys.stderr.write(f"Request response code is {response.status_code} with URL {url}, maybe maintenance mode is on?\n") sys.exit(1) else: sys.stderr.write(f"Request response code is {response.status_code} with URL {url}\n") sys.exit(1) def getDataAllUsers(session, url, verify): headers = CaseInsensitiveDict() headers["Accept"] = "application/json" headers["OCS-APIRequest"] = "true" response = session.get(url, headers=headers, verify=verify) status = response.status_code if (status == 200): jsdata = response.text data = json.loads(jsdata) # returns a dictionary return data else: sys.stderr.write(f"Request response code is {response.status_code} with URL {url}\n") sys.exit(1) def getDataUser(session, url, verify): headers = CaseInsensitiveDict() headers["Accept"] = "application/json" headers["OCS-APIRequest"] = "true" response = session.get(url, headers=headers, verify=verify) status = response.status_code if (status == 200): jsdata = response.text data = json.loads(jsdata) # returns a dictionary return data else: sys.stderr.write(f"Request response code is {response.status_code} with URL {url}\n") sys.exit(1) def doCmkOutput(data): apps_with_updates_available = {} str_apps_with_updates_available = "" print("<<>>") print(f"NC_Version;{data['ocs']['data']['nextcloud']['system']['version']}") print(f"NC_Freespace;{data['ocs']['data']['nextcloud']['system']['freespace']}") print(f"NC_Status;{data['ocs']['meta']['status']}") # This update info is available only from version 28 onwards, so the key "update" does not exist in all versions before try: print(f"NC_Last_Update;{data['ocs']['data']['nextcloud']['system']['update']['lastupdatedat']}") print(f"NC_Update_Available;{data['ocs']['data']['nextcloud']['system']['update']['available']}") except KeyError: pass print(f"NC_Num_Users;{data['ocs']['data']['nextcloud']['storage']['num_users']}") print(f"NC_Num_Files;{data['ocs']['data']['nextcloud']['storage']['num_files']}") print(f"NC_Num_Shares;{data['ocs']['data']['nextcloud']['shares']['num_shares']}") print(f"NC_Num_Storages;{data['ocs']['data']['nextcloud']['storage']['num_storages']}") print(f"NC_Num_Storages_Home;{data['ocs']['data']['nextcloud']['storage']['num_storages_home']}") print(f"NC_Num_Storages_Local;{data['ocs']['data']['nextcloud']['storage']['num_storages_local']}") print(f"NC_Num_Storages_Other;{data['ocs']['data']['nextcloud']['storage']['num_storages_other']}") # Workaround for Nextcloud 28.0.1 (KeyError "apps") try: print(f"NC_Num_Apps_Installed;{data['ocs']['data']['nextcloud']['system']['apps']['num_installed']}") print(f"NC_Num_Apps_Updates_Available;{data['ocs']['data']['nextcloud']['system']['apps']['num_updates_available']}") apps_with_updates_available = data['ocs']['data']['nextcloud']['system']['apps']['app_updates'] if apps_with_updates_available: for app, version in apps_with_updates_available.items(): str_apps_with_updates_available = str_apps_with_updates_available + app + "/" + version + " " print(f"NC_Apps_With_Updates_Available;{str_apps_with_updates_available}") except KeyError: pass print(f"NC_Active_Users_Last_5Min;{data['ocs']['data']['activeUsers']['last5minutes']}") print(f"NC_Active_Users_Last_1Hour;{data['ocs']['data']['activeUsers']['last1hour']}") print(f"NC_Active_Users_Last_1Day;{data['ocs']['data']['activeUsers']['last24hours']}") print(f"NC_Webserver;{data['ocs']['data']['server']['webserver']}") print(f"NC_PHP_Version;{data['ocs']['data']['server']['php']['version']}") print("<<>>") print(f"NC_Database_Type;{data['ocs']['data']['server']['database']['type']}") print(f"NC_Database_Version;{data['ocs']['data']['server']['database']['version']}") print(f"NC_Database_Size;{data['ocs']['data']['server']['database']['size']}") # opcache entry does not exist if opcache_get_status is disabled by server settings # thanks to Marcus Klein from Iteratio to report (and solve!) this bug if (data['ocs']['data']['server']['php']['opcache']): print(f"NC_OPCache_Hit_Rate;{data['ocs']['data']['server']['php']['opcache']['opcache_statistics']['opcache_hit_rate']}") else: print(f"NC_OPCache_Hit_Rate;0") def doCmkOutputAllUsers(session, data, verify, hostname, protocol, port, folder): print("<<>>") for user in data['ocs']['data']['users']: nc_url = createUrlUser(user, nc_api_endpoint_user, hostname, protocol, port, folder) user_data = getDataUser(session, nc_url, verify) userid = user_data['ocs']['data']['id'] displayname = user_data['ocs']['data']['displayname'] lastlogin = int(user_data['ocs']['data']['lastLogin']) if lastlogin == 0: # user has never logged in quota_free = -1 quota_quota = -1 quota_relative = -1 quota_total = -1 quota_used = -1 else: quota_free = user_data['ocs']['data']['quota']['free'] # quota_quota == -3 --> unlimited quota_quota = user_data['ocs']['data']['quota']['quota'] # quota_relative = used * 100 / (free + used) quota_relative = user_data['ocs']['data']['quota']['relative'] quota_total = user_data['ocs']['data']['quota']['total'] quota_used = user_data['ocs']['data']['quota']['used'] print(f"{userid};{displayname};{lastlogin};{quota_free};{quota_quota};{quota_relative};{quota_total};{quota_used}") def main(): # replace password from pwd store cmk.utils.password_store.replace_passwords() getOptions() if (opt_hostname == ""): sys.stderr.write(f"No hostname given.\n") showUsage() sys.exit(1) if (opt_username == ""): sys.stderr.write(f"No username given.\n") showUsage() sys.exit(1) if (opt_password == ""): sys.stderr.write(f"No password given.\n") showUsage() sys.exit(1) if (opt_no_cert_check): urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) verify = False else: verify = True if (opt_port == ""): if (opt_no_https): protocol = "http" port = "80" else: protocol = "https" port = "443" else: if (opt_no_https): protocol = "http" else: protocol = "https" port = opt_port if DEBUG: showOptions() if (protocol == "http" and port == "443"): sys.stderr.write(f"Combining HTTP with port 443 is not supported.\n") sys.exit(1) if (protocol == "https" and port == "80"): sys.stderr.write(f"Combining HTTPS with port 80 is not supported.\n") sys.exit(1) pwd = opt_password # create session session = getSession(opt_username, pwd) nc_url = createUrl(nc_api_endpoint, opt_hostname, protocol, port, opt_folder) nc_data = getData(session, nc_url, verify) doCmkOutput(nc_data) nc_url = createUrl(nc_api_endpoint_all_users, opt_hostname, protocol, port, opt_folder) nc_data = getDataAllUsers(session, nc_url, verify) doCmkOutputAllUsers(session, nc_data, verify, opt_hostname, protocol, port, opt_folder) if __name__ == "__main__": main()