#!/usr/bin/env python3 # pylint: disable=pointless-string-statement """HAL9002 special agent""" import sys import os import getopt import random """ Sample output <<>> ok;SK-1st-Edition 1965-12-17;arthur;127.0.0.1 1. Status, possible values: ok, update available, storage down (String) 2. Version (String) 3. Username used for login (String) 4. IP used for access (String) 5. Desired state, id an update is available (String) <<>> user1;David Bowman;65536.0;262144.0 user2;Frank Poole;16384.0;131072.0 user3;Elena;32768.0;65536.0 1. User ID (string) 2. User Real Name (string) 3. Quota Used (float in bytes, absolut) 4. Quota Max (float in bytes, absolut) <<>> storage1;DATA_Dullea;8192.0;262144.0 storage2;DATA_Poole;2048.0;131072.0 1. Storage ID (string) 2. Storage Custom Name (string) 3. Upload (float in bytes, counter) 4. Download (float in bytes, counter) """ def show_usage() -> None: """shows help for usage""" sys.stderr.write( """HAL9002 Special Agent USAGE: agent_hal9002 -H [hostname] -u [username] -p [password] agent_hal9002 -h OPTIONS: -H, --hostname Hostname (FQDN or IP) of HAL System -u, --username Username -p, --password Password -s, --state-if-update-is-available State of service if an update is available (0/1/2/3) -h, --help Show this help message and exit """ ) SEP: str = "|" TIMEFMT: str = "%Y-%m-%d %H:%M:%S" FLOATFMT: str = "{:.4f}" # Parameters coming from CheckMK opt_hostname: str = "" opt_username: str = "" opt_password: str = "" opt_state_if_update_is_available: str = "" # Base values for randomly calculating the use of the storage system per user base_usage: float = float(65536 * 8) # 0.5 MByte max_quota: float = 1073741824.0 # 1 GByte # these dicts are used as starting points, feel free to extend and/or adjust them # each user/storage results in an additional service users_dict: dict[str, str] = { "user1": { "realname": "David Bowman", "usage": base_usage * 2, "quota_max": max_quota, }, "user2": { "realname": "Frank Poole", "usage": base_usage * 3, "quota_max": max_quota / 2, }, "user3": {"realname": "Elena", "usage": base_usage * 4, "quota_max": max_quota * 2}, } storages_dict: dict[str, str] = { "storage1": { "realname": "DATA_Dullea", "download": float(1024**3), "upload": float((1024**2) * 6), }, "storage2": { "realname": "DATA_Poole", "download": float(1024**3), "upload": float((1024**2) * 4), }, } hal_status_list: list[str] = ["ok", "update available", "storage down"] hal_version: str = "SK-1st-Edition 1965-12-17" short_options: str = "hH:u:p:s:" long_options: list[str] = [ "hostname=", "username=", "password=", "state-if-update-is-available=", "help", ] def get_options() -> None: """retrieves all command line options""" global opt_hostname global opt_username global opt_password global opt_state_if_update_is_available opts, _ = getopt.getopt(sys.argv[1:], short_options, long_options) for opt, arg in opts: if opt in ["-H", "--hostname"]: opt_hostname = arg if opt in ["-u", "--username"]: opt_username = arg elif opt in ["-p", "--password"]: opt_password = arg elif opt in ["-s", "--state-if-update-is-available"]: opt_state_if_update_is_available = arg elif opt in ["-h", "--help"]: show_usage() sys.exit(0) def show_options() -> None: """show the options from the command line""" print(f"Hostname: {opt_hostname}") print(f"Username: {opt_username}") print(f"Password: {opt_password}") print(f"State if update is available: {opt_state_if_update_is_available}") def calculate_new_user_storage(current_storage: float) -> float: """calculates new values for the user's storage""" # let the chance that no change occured be at 90% no_change: int = random.randint(1, 100) if no_change > 10: return float(current_storage) else: # create some variance, value of "amount" is interpreted in bytes factor: int = random.randint(1, 4) amount: int = random.randint(1024000, 4096000) amount *= factor # increase or decrease, give increase a better chance (60:40), that's more real :-) plus_or_minus: int = random.randint(1, 100) if plus_or_minus > 40: new_storage: int = current_storage + amount else: new_storage: int = current_storage - amount # avoid negative values if new_storage < 0: new_storage = 1024000 return float(new_storage) def calculate_new_storage_counters( ul_bytes: float, dl_bytes: float ) -> tuple[float, float]: """calculates new values for the user's storage counters""" # let the chance that no change occured be at 2% no_change: int = random.randint(1, 100) if no_change > 98: return float(ul_bytes), float(dl_bytes) # create some variance, values of "amount_" is interpreted in bytes factor_ul: int = random.randint(1, 7) amount_ul: int = random.randint(20240000, 50960000) amount_ul *= factor_ul factor_dl: int = random.randint(1, 14) amount_dl: int = random.randint(30240000, 60960000) amount_dl *= factor_dl # we are simulating counters, so only increasing makes sense new_ul_bytes: int = ul_bytes + amount_ul new_dl_bytes: int = dl_bytes + amount_dl return float(new_ul_bytes), float(new_dl_bytes) def do_cmk_hal_status_output( status: str, version: str, username: str, ipaddress: str, state_if_update_is_available: str, ) -> None: """prints out the section for status""" print("<<>>") print(f"{status};{version};{username};{ipaddress};{state_if_update_is_available}") def do_cmk_hal_users_output(users: dict, hostname: str) -> None: """prints out the section for users""" print("<<>>") current_storage: float = 0.0 home_path: str = os.getenv("HOME") tmp_path: str = f"{home_path}/tmp" for user in users: # we need a way to store the current value for storage usage between runs of the agent help_file: str = f"{tmp_path}/hal9002_{hostname}_{user}_current_storage.txt" if os.path.exists(help_file): # help file exists, so get the content with open(help_file, "r", encoding="utf-8") as file: current_storage = float(file.read()) else: # help file does not exist, create it and store the start value in it with open(help_file, "w", encoding="utf-8") as file: file.write(str(users[user]["usage"])) current_storage = users[user]["usage"] realname: str = users[user]["realname"] quota_max: str = users[user]["quota_max"] # simulate changes in storage usage new_storage: float = calculate_new_user_storage(current_storage) # save the new value in help file with open(help_file, "w", encoding="utf-8") as file: file.write(str(new_storage)) # create output print(f"{user};{realname};{new_storage};{quota_max}") def do_cmk_hal_storages_output(storages: dict, hostname: str) -> None: """prints out the section for storages""" print("<<>>") current_dl_bytes: float = 0.0 current_ul_bytes: float = 0.0 new_dl_bytes: float = 0.0 new_ul_bytes: float = 0.0 home_path: str = os.getenv("HOME") tmp_path: str = f"{home_path}/tmp" for storage in storages: # we need a way to store the current values for storage usage between runs of the agent # so we can simulate counters for uploaded/downloaded bytes help_file_ul: str = f"{tmp_path}/hal9002_{hostname}_{storage}_ul_bytes.txt" help_file_dl: str = f"{tmp_path}/hal9002_{hostname}_{storage}_dl_bytes.txt" if os.path.exists(help_file_ul): # help file exists, so get the content with open(help_file_ul, "r", encoding="utf-8") as file: current_ul_bytes = float(file.read()) else: # help file does not exist, create it and store the start value in it with open(help_file_ul, "w", encoding="utf-8") as file: file.write(str(storages[storage]["upload"])) current_ul_bytes = storages[storage]["upload"] if os.path.exists(help_file_dl): # help file exists, so get the content with open(help_file_dl, "r", encoding="utf-8") as file: current_dl_bytes = float(file.read()) else: # help file does not exist, create it and store the start value in it with open(help_file_dl, "w", encoding="utf-8") as file: file.write(str(storages[storage]["download"])) current_dl_bytes = storages[storage]["download"] realname: str = storages[storage]["realname"] # simulate changes in storage usage new_ul_bytes, new_dl_bytes = calculate_new_storage_counters( current_ul_bytes, current_dl_bytes ) # save the new values in help files with open(help_file_ul, "w", encoding="utf-8") as file: file.write(str(new_ul_bytes)) with open(help_file_dl, "w", encoding="utf-8") as file: file.write(str(new_dl_bytes)) # create output print(f"{storage};{realname};{new_ul_bytes};{new_dl_bytes}") def get_status() -> tuple[str, str]: """returns status and version""" # randomly set one of the three available states hal_status: str = "" status_index: int = random.randint(1, 100) if status_index <= 5: # should result in critical state hal_status = hal_status_list[2] elif status_index <= 15: # should result in warning state hal_status = hal_status_list[1] else: # should result in ok state hal_status = hal_status_list[0] return hal_status, hal_version def do_login(_hostname: str, _username: str, _password: str) -> tuple[bool, int]: """returns whetjer login was successful or not and a status code""" # simulate the login to our HAL system # give it a chance of 2% to fail to demonstrate an error from time to time success: int = random.randint(1, 100) if success > 2: return True, 200 return False, 401 def main(): """main function""" success: bool = False code: int = -1 status: str = "no status" version: str = "no version" get_options() # show_options() # some checks # keep in mind: the 1st line printed out to std.err will be shown as check result in CheckMK if opt_hostname == "": sys.stderr.write("No hostname given.\n") show_usage() sys.exit(1) if opt_username == "": sys.stderr.write("No username given.\n") show_usage() sys.exit(1) if opt_password == "": sys.stderr.write("No password given.\n") show_usage() sys.exit(1) # if opt_state_if_update_is_available == "": # sys.stderr.write("No state given when update is available.\n") # show_usage() # sys.exit(1) success, code = do_login(opt_hostname, opt_username, opt_password) if success: status, version = get_status() do_cmk_hal_status_output( status, version, opt_username, opt_hostname, opt_state_if_update_is_available, ) do_cmk_hal_users_output(users_dict, opt_hostname) do_cmk_hal_storages_output(storages_dict, opt_hostname) else: sys.stderr.write( f"Login to {opt_hostname} with user {opt_username} failed with error code {code}\n" ) sys.exit(1) if __name__ == "__main__": main()