#!/usr/bin/env python3 # pylint: disable=missing-module-docstring, missing-class-docstring, missing-function-docstring # pylint: disable=too-many-branches, line-too-long, too-many-statements # pylint: disable=too-many-arguments, too-many-positional-arguments, too-many-locals import json import argparse import sys import requests import urllib3 from requests.structures import CaseInsensitiveDict NC_API_ENDPOINT = "ocs/v2.php/apps/serverinfo/api/v1/info?format=json" NC_API_ENDPOINT_ALL_USERS = "ocs/v1.php/cloud/users?format=json" NC_API_ENDPOINT_USER = "ocs/v1.php/cloud/users" def get_args() -> argparse.Namespace: parser: argparse.ArgumentParser = argparse.ArgumentParser( description="Nextcloud server parameters" ) parser.add_argument( "--hostname", required=True, type=str, help="Hostname or IP of nextcloud server" ) parser.add_argument( "--username", required=True, type=str, help="User with admin privileges" ) parser.add_argument( "--password", required=True, type=str, help="App password for the user" ) parser.add_argument( "--port", required=False, type=int, help="Port where the server is listening on (if not HTTP/HTTPS)", ) parser.add_argument( "--folder", required=False, type=str, help="Folder if not installed in web root directory", ) parser.add_argument( "--no-https", required=False, default=False, help="Disable HTTPS", action="store_true", ) parser.add_argument( "--no-cert-check", required=False, default=False, help="Disable certificate checks", action="store_true", ) args = parser.parse_args() return args def create_url(endpoint, hostname, protocol, port, folder): # these parameters are needed, otherwise no information about updates regarding apps and Nextcloud itself are reported (since version 28) params = "skipApps=false&skipUpdate=false" if folder == "": url = f"{protocol}://{hostname}:{port}/{endpoint}" else: url = f"{protocol}://{hostname}:{port}/{folder}/{endpoint}" if endpoint == NC_API_ENDPOINT: url = f"{url}&{params}" return url def create_url_user(user, endpoint, hostname, protocol, port, folder): params = "format=json" if folder == "": url = f"{protocol}://{hostname}:{port}/{endpoint}/{user}?{params}" else: url = f"{protocol}://{hostname}:{port}/{folder}/{endpoint}/{user}?{params}" return url def get_session(username, secret): session = requests.session() session.cookies.set("SameSite", "Strict") session.auth = (username, secret) session.headers["Accept"] = "application/json" return session def get_data(session, url, verify): data = {} headers = CaseInsensitiveDict() headers["Accept"] = "application/json" response = session.get(url, headers=headers, verify=verify) status = response.status_code if status == 200: jsdata = response.text data = json.loads(jsdata) # returns a dictionary return data if status == 503: # this code is reported when maintenance mode is on sys.stderr.write( f"Request response code is {response.status_code} with URL {url}, maybe maintenance mode is on?\n" ) sys.exit(1) else: sys.stderr.write( f"Request response code is {response.status_code} with URL {url}\n" ) sys.exit(1) def get_data_all_users(session, url, verify): headers = CaseInsensitiveDict() headers["Accept"] = "application/json" headers["OCS-APIRequest"] = "true" response = session.get(url, headers=headers, verify=verify) status = response.status_code if status == 200: jsdata = response.text data = json.loads(jsdata) # returns a dictionary return data sys.stderr.write( f"Request response code is {response.status_code} with URL {url}\n" ) sys.exit(1) def get_data_user(session, url, verify): headers = CaseInsensitiveDict() headers["Accept"] = "application/json" headers["OCS-APIRequest"] = "true" response = session.get(url, headers=headers, verify=verify) status = response.status_code if status == 200: jsdata = response.text data = json.loads(jsdata) # returns a dictionary return data sys.stderr.write( f"Request response code is {response.status_code} with URL {url}\n" ) sys.exit(1) def do_cmk_output(data): apps_with_updates_available = {} str_apps_with_updates_available = "" print("<<>>") print(f"NC_Version;{data['ocs']['data']['nextcloud']['system']['version']}") print(f"NC_Freespace;{data['ocs']['data']['nextcloud']['system']['freespace']}") print(f"NC_Status;{data['ocs']['meta']['status']}") # This update info is available only from version 28 onwards, so the key "update" does not exist in all versions before try: print( f"NC_Last_Update;{data['ocs']['data']['nextcloud']['system']['update']['lastupdatedat']}" ) print( f"NC_Update_Available;{data['ocs']['data']['nextcloud']['system']['update']['available']}" ) except KeyError: # TBD pass print(f"NC_Num_Users;{data['ocs']['data']['nextcloud']['storage']['num_users']}") print(f"NC_Num_Files;{data['ocs']['data']['nextcloud']['storage']['num_files']}") print(f"NC_Num_Shares;{data['ocs']['data']['nextcloud']['shares']['num_shares']}") print( f"NC_Num_Storages;{data['ocs']['data']['nextcloud']['storage']['num_storages']}" ) print( f"NC_Num_Storages_Home;{data['ocs']['data']['nextcloud']['storage']['num_storages_home']}" ) print( f"NC_Num_Storages_Local;{data['ocs']['data']['nextcloud']['storage']['num_storages_local']}" ) print( f"NC_Num_Storages_Other;{data['ocs']['data']['nextcloud']['storage']['num_storages_other']}" ) # Workaround for Nextcloud 28.0.1 (KeyError "apps") try: print( f"NC_Num_Apps_Installed;{data['ocs']['data']['nextcloud']['system']['apps']['num_installed']}" ) print( f"NC_Num_Apps_Updates_Available;{data['ocs']['data']['nextcloud']['system']['apps']['num_updates_available']}" ) apps_with_updates_available = data["ocs"]["data"]["nextcloud"]["system"][ "apps" ]["app_updates"] if apps_with_updates_available: for app, version in apps_with_updates_available.items(): str_apps_with_updates_available = ( str_apps_with_updates_available + app + "/" + version + " " ) print( f"NC_Apps_With_Updates_Available;{str_apps_with_updates_available}" ) except KeyError: # TBD pass print( f"NC_Active_Users_Last_5Min;{data['ocs']['data']['activeUsers']['last5minutes']}" ) print( f"NC_Active_Users_Last_1Hour;{data['ocs']['data']['activeUsers']['last1hour']}" ) print( f"NC_Active_Users_Last_1Day;{data['ocs']['data']['activeUsers']['last24hours']}" ) print(f"NC_Webserver;{data['ocs']['data']['server']['webserver']}") print(f"NC_PHP_Version;{data['ocs']['data']['server']['php']['version']}") print("<<>>") print(f"NC_Database_Type;{data['ocs']['data']['server']['database']['type']}") print(f"NC_Database_Version;{data['ocs']['data']['server']['database']['version']}") print(f"NC_Database_Size;{data['ocs']['data']['server']['database']['size']}") # opcache entry does not exist if opcache_get_status is disabled by server settings # thanks to Marcus Klein from Iteratio to report (and solve!) this bug if data["ocs"]["data"]["server"]["php"]["opcache"]: print( f"NC_OPCache_Hit_Rate;{data['ocs']['data']['server']['php']['opcache']['opcache_statistics']['opcache_hit_rate']}" ) else: print("NC_OPCache_Hit_Rate;0") def do_cmk_output_all_users(session, data, verify, hostname, protocol, port, folder): print("<<>>") for user in data["ocs"]["data"]["users"]: nc_url = create_url_user( user, NC_API_ENDPOINT_USER, hostname, protocol, port, folder ) user_data = get_data_user(session, nc_url, verify) userid = user_data["ocs"]["data"]["id"] displayname = user_data["ocs"]["data"]["displayname"] lastlogin = int(user_data["ocs"]["data"]["lastLogin"]) if lastlogin == 0: # user has never logged in quota_free = -1 quota_quota = -1 quota_relative = -1 quota_total = -1 quota_used = -1 else: quota_free = user_data["ocs"]["data"]["quota"]["free"] # quota_quota == -3 --> unlimited quota_quota = user_data["ocs"]["data"]["quota"]["quota"] # quota_relative = used * 100 / (free + used) quota_relative = user_data["ocs"]["data"]["quota"]["relative"] quota_total = user_data["ocs"]["data"]["quota"]["total"] quota_used = user_data["ocs"]["data"]["quota"]["used"] print( f"{userid};{displayname};{lastlogin};{quota_free};{quota_quota};{quota_relative};{quota_total};{quota_used}" ) def main(): # get and check parameters params: argparse.Namespace = get_args() if params.hostname is None: sys.stderr.write("No hostname given.\n") sys.exit(1) if params.username is None: sys.stderr.write("No username given.\n") sys.exit(1) if params.password is None: sys.stderr.write("No app password given.\n") sys.exit(1) hostname = params.hostname username = params.username pwd = params.password if params.folder is None: folder = "" else: folder = params.folder if params.no_cert_check: urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) verify = False else: verify = True if params.port is None: if params.no_https: protocol = "http" port = "80" else: protocol = "https" port = "443" else: if params.no_https: protocol = "http" else: protocol = "https" port = params.port if protocol == "http" and port == "443": sys.stderr.write("Combining HTTP with port 443 is not supported.\n") sys.exit(1) if protocol == "https" and port == "80": sys.stderr.write("Combining HTTPS with port 80 is not supported.\n") sys.exit(1) # create session session = get_session(username, pwd) nc_url = create_url(NC_API_ENDPOINT, hostname, protocol, port, folder) nc_data = get_data(session, nc_url, verify) do_cmk_output(nc_data) nc_url = create_url(NC_API_ENDPOINT_ALL_USERS, hostname, protocol, port, folder) nc_data = get_data_all_users(session, nc_url, verify) do_cmk_output_all_users(session, nc_data, verify, hostname, protocol, port, folder) if __name__ == "__main__": main()