313 lines
11 KiB
Python
Executable File

#!/usr/bin/env python3
# pylint: disable=missing-module-docstring, missing-class-docstring, missing-function-docstring
# pylint: disable=too-many-branches, line-too-long, too-many-statements
# pylint: disable=too-many-arguments, too-many-positional-arguments, too-many-locals
import json
import argparse
import sys
import requests
import urllib3
from requests.structures import CaseInsensitiveDict
NC_API_ENDPOINT = "ocs/v2.php/apps/serverinfo/api/v1/info?format=json"
NC_API_ENDPOINT_ALL_USERS = "ocs/v1.php/cloud/users?format=json"
NC_API_ENDPOINT_USER = "ocs/v1.php/cloud/users"
def get_args() -> argparse.Namespace:
parser: argparse.ArgumentParser = argparse.ArgumentParser(
description="Nextcloud server parameters"
)
parser.add_argument(
"--hostname", required=True, type=str, help="Hostname or IP of nextcloud server"
)
parser.add_argument(
"--username", required=True, type=str, help="User with admin privileges"
)
parser.add_argument(
"--password", required=True, type=str, help="App password for the user"
)
parser.add_argument(
"--port",
required=False,
type=int,
help="Port where the server is listening on (if not HTTP/HTTPS)",
)
parser.add_argument(
"--folder",
required=False,
type=str,
help="Folder if not installed in web root directory",
)
parser.add_argument(
"--no-https",
required=False,
default=False,
help="Disable HTTPS",
action="store_true",
)
parser.add_argument(
"--no-cert-check",
required=False,
default=False,
help="Disable certificate checks",
action="store_true",
)
args = parser.parse_args()
return args
def create_url(endpoint, hostname, protocol, port, folder):
# these parameters are needed, otherwise no information about updates regarding apps and Nextcloud itself are reported (since version 28)
params = "skipApps=false&skipUpdate=false"
if folder == "":
url = f"{protocol}://{hostname}:{port}/{endpoint}"
else:
url = f"{protocol}://{hostname}:{port}/{folder}/{endpoint}"
if endpoint == NC_API_ENDPOINT:
url = f"{url}&{params}"
return url
def create_url_user(user, endpoint, hostname, protocol, port, folder):
params = "format=json"
if folder == "":
url = f"{protocol}://{hostname}:{port}/{endpoint}/{user}?{params}"
else:
url = f"{protocol}://{hostname}:{port}/{folder}/{endpoint}/{user}?{params}"
return url
def get_session(username, secret):
session = requests.session()
session.cookies.set("SameSite", "Strict")
session.auth = (username, secret)
session.headers["Accept"] = "application/json"
return session
def get_data(session, url, verify):
data = {}
headers = CaseInsensitiveDict()
headers["Accept"] = "application/json"
response = session.get(url, headers=headers, verify=verify)
status = response.status_code
if status == 200:
jsdata = response.text
data = json.loads(jsdata) # returns a dictionary
return data
if status == 503:
# this code is reported when maintenance mode is on
sys.stderr.write(
f"Request response code is {response.status_code} with URL {url}, maybe maintenance mode is on?\n"
)
sys.exit(1)
else:
sys.stderr.write(
f"Request response code is {response.status_code} with URL {url}\n"
)
sys.exit(1)
def get_data_all_users(session, url, verify):
headers = CaseInsensitiveDict()
headers["Accept"] = "application/json"
headers["OCS-APIRequest"] = "true"
response = session.get(url, headers=headers, verify=verify)
status = response.status_code
if status == 200:
jsdata = response.text
data = json.loads(jsdata) # returns a dictionary
return data
sys.stderr.write(
f"Request response code is {response.status_code} with URL {url}\n"
)
sys.exit(1)
def get_data_user(session, url, verify):
headers = CaseInsensitiveDict()
headers["Accept"] = "application/json"
headers["OCS-APIRequest"] = "true"
response = session.get(url, headers=headers, verify=verify)
status = response.status_code
if status == 200:
jsdata = response.text
data = json.loads(jsdata) # returns a dictionary
return data
sys.stderr.write(
f"Request response code is {response.status_code} with URL {url}\n"
)
sys.exit(1)
def do_cmk_output(data):
apps_with_updates_available = {}
str_apps_with_updates_available = ""
print("<<<nextcloud_info:sep(59)>>>")
print(f"NC_Version;{data['ocs']['data']['nextcloud']['system']['version']}")
print(f"NC_Freespace;{data['ocs']['data']['nextcloud']['system']['freespace']}")
print(f"NC_Status;{data['ocs']['meta']['status']}")
# This update info is available only from version 28 onwards, so the key "update" does not exist in all versions before
try:
print(
f"NC_Last_Update;{data['ocs']['data']['nextcloud']['system']['update']['lastupdatedat']}"
)
print(
f"NC_Update_Available;{data['ocs']['data']['nextcloud']['system']['update']['available']}"
)
except KeyError:
# TBD
pass
print(f"NC_Num_Users;{data['ocs']['data']['nextcloud']['storage']['num_users']}")
print(f"NC_Num_Files;{data['ocs']['data']['nextcloud']['storage']['num_files']}")
print(f"NC_Num_Shares;{data['ocs']['data']['nextcloud']['shares']['num_shares']}")
print(
f"NC_Num_Storages;{data['ocs']['data']['nextcloud']['storage']['num_storages']}"
)
print(
f"NC_Num_Storages_Home;{data['ocs']['data']['nextcloud']['storage']['num_storages_home']}"
)
print(
f"NC_Num_Storages_Local;{data['ocs']['data']['nextcloud']['storage']['num_storages_local']}"
)
print(
f"NC_Num_Storages_Other;{data['ocs']['data']['nextcloud']['storage']['num_storages_other']}"
)
# Workaround for Nextcloud 28.0.1 (KeyError "apps")
try:
print(
f"NC_Num_Apps_Installed;{data['ocs']['data']['nextcloud']['system']['apps']['num_installed']}"
)
print(
f"NC_Num_Apps_Updates_Available;{data['ocs']['data']['nextcloud']['system']['apps']['num_updates_available']}"
)
apps_with_updates_available = data["ocs"]["data"]["nextcloud"]["system"][
"apps"
]["app_updates"]
if apps_with_updates_available:
for app, version in apps_with_updates_available.items():
str_apps_with_updates_available = (
str_apps_with_updates_available + app + "/" + version + " "
)
print(
f"NC_Apps_With_Updates_Available;{str_apps_with_updates_available}"
)
except KeyError:
# TBD
pass
print(
f"NC_Active_Users_Last_5Min;{data['ocs']['data']['activeUsers']['last5minutes']}"
)
print(
f"NC_Active_Users_Last_1Hour;{data['ocs']['data']['activeUsers']['last1hour']}"
)
print(
f"NC_Active_Users_Last_1Day;{data['ocs']['data']['activeUsers']['last24hours']}"
)
print(f"NC_Webserver;{data['ocs']['data']['server']['webserver']}")
print(f"NC_PHP_Version;{data['ocs']['data']['server']['php']['version']}")
print("<<<nextcloud_database:sep(59)>>>")
print(f"NC_Database_Type;{data['ocs']['data']['server']['database']['type']}")
print(f"NC_Database_Version;{data['ocs']['data']['server']['database']['version']}")
print(f"NC_Database_Size;{data['ocs']['data']['server']['database']['size']}")
# opcache entry does not exist if opcache_get_status is disabled by server settings
# thanks to Marcus Klein from Iteratio to report (and solve!) this bug
if data["ocs"]["data"]["server"]["php"]["opcache"]:
print(
f"NC_OPCache_Hit_Rate;{data['ocs']['data']['server']['php']['opcache']['opcache_statistics']['opcache_hit_rate']}"
)
else:
print("NC_OPCache_Hit_Rate;0")
def do_cmk_output_all_users(session, data, verify, hostname, protocol, port, folder):
print("<<<nextcloud_users:sep(59)>>>")
for user in data["ocs"]["data"]["users"]:
nc_url = create_url_user(
user, NC_API_ENDPOINT_USER, hostname, protocol, port, folder
)
user_data = get_data_user(session, nc_url, verify)
userid = user_data["ocs"]["data"]["id"]
displayname = user_data["ocs"]["data"]["displayname"]
lastlogin = int(user_data["ocs"]["data"]["lastLogin"])
if lastlogin == 0:
# user has never logged in
quota_free = -1
quota_quota = -1
quota_relative = -1
quota_total = -1
quota_used = -1
else:
quota_free = user_data["ocs"]["data"]["quota"]["free"]
# quota_quota == -3 --> unlimited
quota_quota = user_data["ocs"]["data"]["quota"]["quota"]
# quota_relative = used * 100 / (free + used)
quota_relative = user_data["ocs"]["data"]["quota"]["relative"]
quota_total = user_data["ocs"]["data"]["quota"]["total"]
quota_used = user_data["ocs"]["data"]["quota"]["used"]
print(
f"{userid};{displayname};{lastlogin};{quota_free};{quota_quota};{quota_relative};{quota_total};{quota_used}"
)
def main():
# get and check parameters
params: argparse.Namespace = get_args()
if params.hostname is None:
sys.stderr.write("No hostname given.\n")
sys.exit(1)
if params.username is None:
sys.stderr.write("No username given.\n")
sys.exit(1)
if params.password is None:
sys.stderr.write("No app password given.\n")
sys.exit(1)
hostname = params.hostname
username = params.username
pwd = params.password
if params.folder is None:
folder = ""
else:
folder = params.folder
if params.no_cert_check:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
verify = False
else:
verify = True
if params.port is None:
if params.no_https:
protocol = "http"
port = "80"
else:
protocol = "https"
port = "443"
else:
if params.no_https:
protocol = "http"
else:
protocol = "https"
port = params.port
if protocol == "http" and port == "443":
sys.stderr.write("Combining HTTP with port 443 is not supported.\n")
sys.exit(1)
if protocol == "https" and port == "80":
sys.stderr.write("Combining HTTPS with port 80 is not supported.\n")
sys.exit(1)
# create session
session = get_session(username, pwd)
nc_url = create_url(NC_API_ENDPOINT, hostname, protocol, port, folder)
nc_data = get_data(session, nc_url, verify)
do_cmk_output(nc_data)
nc_url = create_url(NC_API_ENDPOINT_ALL_USERS, hostname, protocol, port, folder)
nc_data = get_data_all_users(session, nc_url, verify)
do_cmk_output_all_users(session, nc_data, verify, hostname, protocol, port, folder)
if __name__ == "__main__":
main()