Small changes and cosmetic changes

This commit is contained in:
Rudi klein 2024-05-24 13:06:46 +02:00
parent 1bb6776180
commit cd17a8ed07
3 changed files with 136 additions and 66 deletions

View File

@ -4,7 +4,8 @@
# This is the yaml config file for wazuh-active-response (for both the Python and Go version) # This is the yaml config file for wazuh-active-response (for both the Python and Go version)
targets: "slack, ntfy, discord" # Platforms in this string with comma seperated values are triggered. targets: "slack, ntfy, discord" # Platforms in this string with comma seperated values are triggered.
full_message: "" # Platforms in this string will enable the sending of the full event information. full_message: "" # Platforms in this string will enable sending the full event information.
full_alert: "" # Platforms in this string will enable sending the full event information.
# Exclude rule events that are enabled in the ossec.conf active response definition. # Exclude rule events that are enabled in the ossec.conf active response definition.
# These settings provide an easier way to disable events from firing the notifiers. # These settings provide an easier way to disable events from firing the notifiers.
@ -19,19 +20,19 @@ excluded_agents: "99999" # Enter as a string with comma seperated v
priority_map: priority_map:
- threat_map: [ 15,14,13,12 ] - threat_map: [ 15,14,13,12 ]
mention_threshold: 1 mention_threshold: 1
color: 0xcc3300 color: 0xec3e40 # Red, SEVERE
- threat_map: [ 11,10,9 ] - threat_map: [ 11,10,9 ]
mention_threshold: 1 mention_threshold: 1
color: 0xff9966 color: 0xff9b2b # Orange, HIGH
- threat_map: [ 8,7,6 ] - threat_map: [ 8,7,6 ]
mention_threshold: 5 mention_threshold: 5
color: 0xffcc00 color: 0xf5d800 # Yellow, ELEVATED
- threat_map: [ 5,4 ] - threat_map: [ 5,4 ]
mention_threshold: 20 mention_threshold: 20
color: 0x99cc33 color: 0x377fc7 # Blue, GUARDED
- threat_map: [ 3,2,1,0 ] - threat_map: [ 3,2,1,0 ]
mention_threshold: 20 mention_threshold: 20
color: 0x339900 color: 0x01a465 # Green, LOW
# The next 2 settings are used to add information to the messages. # The next 2 settings are used to add information to the messages.
sender: "Wazuh (IDS)" sender: "Wazuh (IDS)"

View File

@ -18,65 +18,85 @@ def main():
him = frame(1).f_code.co_name him = frame(1).f_code.co_name
# Load the YAML config. # Load the YAML config.
config: dict = get_config() config: dict = get_config()
logger(0, config, me, him, "############ Processing event ###############################") logger(0, config, me, him, "############ Processing event ###############################")
logger(2, config, me, him, "Loading yaml configuration") logger(2, config, me, him, "Loading yaml configuration")
# Get the arguments used with running the script. # Get the arguments used with running the script.
arguments = get_arguments() arguments = get_arguments()
# Check if we are in test mode (test_mode setting in config yaml). If so, load test event instead of live event. # Check if we are in test mode (test_mode setting in config yaml). If so, load test event instead of live event.
if config.get("test_mode"): if config.get("test_mode"):
logger(1, config, me, him, "Running in test mode: using test message wazuh-notify-test-event.json") logger(1, config, me, him, "Running in test mode: using test message wazuh-notify-test-event.json")
# Load the test event data # Load the test event data.
home_path, _, _ = set_environment() home_path, _, _ = set_environment()
with (open(home_path + '/etc/wazuh-notify-test-event.json') as event_file): with (open(home_path + '/etc/wazuh-notify-test-event.json') as event_file):
data: dict = json.loads(event_file.read()) data: dict = json.loads(event_file.read())
else: else:
# We are running live. Load the data from the Wazuh process. # We are running live. Load the data from the Wazuh process.
logger(2, config, me, him, "Running in live mode: using live message") logger(2, config, me, him, "Running in live mode: using live message")
data = load_message() data = load_message()
# Extract the 'alert' section of the (JSON) event # Extract the 'alert' section of the (JSON) event
alert = data["parameters"]["alert"] alert = data["parameters"]["alert"]
logger(2, config, me, him, "Extracting data from the event") logger(2, config, me, him, "Extracting data from the event")
# Check the config for any exclusion rules # Check the config for any exclusion rules
fire_notification = exclusions_check(config, alert) fire_notification = exclusions_check(config, alert)
logger(1, config, me, him, "Checking if we are outside of the exclusion rules: " + str(fire_notification)) logger(1, config, me, him, "Checking if we are outside of the exclusion rules: " + str(fire_notification))
if not fire_notification: if not fire_notification:
# The event was excluded by the exclusion rules in the configuration. # The event was excluded by the exclusion rules in the configuration.
logger(1, config, me, him, "Event excluded, no notification sent. Exiting") logger(1, config, me, him, "Event excluded, no notification sent. Exiting")
exit() exit()
else: else:
# The event was not excluded by the exclusion rules in the configuration. Keep processing. # The event was not excluded by the exclusion rules in the configuration. Keep processing.
logger(2, config, me, him, "Event NOT excluded, notification will be sent") logger(2, config, me, him, "Event NOT excluded, notification will be sent")
# Get the mapping from event threat level to priority, color and mention_flag. # Get the mapping from event threat level to priority, color and mention_flag.
priority, color, mention = threat_mapping(config, alert['rule']['level'], alert['rule']['firedtimes']) priority, color, mention = threat_mapping(config, alert['rule']['level'], alert['rule']['firedtimes'])
logger(2, config, me, him, "Threat mapping done: " + "p:" + str(priority) + " c:" + str(color) + " m:" + mention)
logger(2, config, me, him, "Threat mapping done: " +
"prio:" + str(priority) + " color:" + str(color) + " mention:" + mention)
# If the target argument was used with the script, we'll use that instead of the configuration parameter. # If the target argument was used with the script, we'll use that instead of the configuration parameter.
config["targets"] = arguments['targets'] if arguments['targets'] != "" else config["targets"] config["targets"] = arguments['targets'] if arguments['targets'] != "" else config["targets"]
# Prepare the messaging platform specific request and execute # Prepare the messaging platform specific request and execute
if "discord" in config["targets"]: if "discord" in config["targets"]:
caller = "discord" caller = "discord"
# Load the url/webhook from the configuration. # Load the url/webhook from the configuration.
discord_url, _, _ = get_env() discord_url, _, _ = get_env()
discord_url = arguments['url'] if arguments['url'] else discord_url discord_url = arguments['url'] if arguments['url'] else discord_url
# Build the basic notification message content. # Build the basic notification message content.
notification: str = construct_basic_message(config, arguments, caller, alert) notification: str = construct_basic_message(config, arguments, caller, alert)
logger(2, config, me, him, caller + " basic message constructed") logger(2, config, me, him, caller + " basic message constructed")
# Build the payload(s) for the POST request. # Build the payload(s) for the POST request.
_, _, payload_json = build_notification(caller, _, _, payload_json = build_notification(caller,
config, config,
arguments, arguments,
@ -88,17 +108,22 @@ def main():
) )
# POST the notification through requests. # POST the notification through requests.
result = requests.post(discord_url, json=payload_json) result = requests.post(discord_url, json=payload_json)
logger(1, config, me, him, caller + " notification constructed and HTTPS request done: " + str(result)) logger(1, config, me, him, caller + " notification constructed and HTTPS request done: " + str(result))
if "ntfy" in config["targets"]: if "ntfy" in config["targets"]:
caller = "ntfy" caller = "ntfy"
# Load the url/webhook from the configuration. # Load the url/webhook from the configuration.
_, ntfy_url, _ = get_env() _, ntfy_url, _ = get_env()
# Build the basic notification message content. # Build the basic notification message content.
notification: str = construct_basic_message(config, arguments, caller, alert) notification: str = construct_basic_message(config, arguments, caller, alert)
logger(2, config, me, him, caller + " basic message constructed") logger(2, config, me, him, caller + " basic message constructed")
# Build the payload(s) for the POST request. # Build the payload(s) for the POST request.
@ -113,6 +138,7 @@ def main():
) )
# POST the notification through requests. # POST the notification through requests.
result = requests.post(ntfy_url, data=payload_data, headers=payload_headers) result = requests.post(ntfy_url, data=payload_data, headers=payload_headers)
logger(1, config, me, him, caller + " notification constructed and request done: " + str(result)) logger(1, config, me, him, caller + " notification constructed and request done: " + str(result))
@ -120,13 +146,17 @@ def main():
caller = "slack" caller = "slack"
# Load the url/webhook from the configuration. # Load the url/webhook from the configuration.
_, _, slack_url = get_env() _, _, slack_url = get_env()
# Build the basic notification message content. # Build the basic notification message content.
notification: str = construct_basic_message(config, arguments, caller, alert) notification: str = construct_basic_message(config, arguments, caller, alert)
logger(2, config, me, him, caller + " basic message constructed") logger(2, config, me, him, caller + " basic message constructed")
# Build the payload(s) for the POST request. # Build the payload(s) for the POST request.
_, _, payload_json = build_notification(caller, _, _, payload_json = build_notification(caller,
config, config,
arguments, arguments,
@ -138,7 +168,9 @@ def main():
) )
# POST the notification through requests. # POST the notification through requests.
result = requests.post(slack_url, headers={'Content-Type': 'application/json'}, json=payload_json) result = requests.post(slack_url, headers={'Content-Type': 'application/json'}, json=payload_json)
logger(1, config, me, him, caller + " notification constructed and request done: " + str(result)) logger(1, config, me, him, caller + " notification constructed and request done: " + str(result))
logger(0, config, me, him, "############ Event processed ################################") logger(0, config, me, him, "############ Event processed ################################")

View File

@ -16,12 +16,8 @@ from dotenv import load_dotenv
# log_path = wazuh-notify.log path, # log_path = wazuh-notify.log path,
# config_path = wazuh-notify-config.yaml # config_path = wazuh-notify-config.yaml
def set_environment() -> tuple: def set_environment() -> tuple:
me = frame(0).f_code.co_name
him = frame(1).f_code.co_name
# set_wazuh_path = os.path.abspath(os.path.join(__file__, "../../"))
set_wazuh_path = os.path.abspath(os.path.join(__file__, "../../..")) set_wazuh_path = os.path.abspath(os.path.join(__file__, "../../.."))
set_log_path = '{0}/logs/wazuh-notify.log'.format(set_wazuh_path) set_log_path = '{0}/logs/wazuh-notify.log'.format(set_wazuh_path)
set_config_path = '{0}/etc/wazuh-notify-config.yaml'.format(set_wazuh_path) set_config_path = '{0}/etc/wazuh-notify-config.yaml'.format(set_wazuh_path)
@ -36,7 +32,6 @@ wazuh_path, log_path, config_path = set_environment()
# Set structured timestamps for notifications. # Set structured timestamps for notifications.
def set_time_format(): def set_time_format():
now_message = time.strftime('%A, %d %b %Y %H:%M:%S') now_message = time.strftime('%A, %d %b %Y %H:%M:%S')
@ -49,23 +44,25 @@ def set_time_format():
# Logger: print to console and/or log to file # Logger: print to console and/or log to file
def logger(level, config, me, him, message): def logger(level, config, me, him, message):
_, now_logging, _, _ = set_time_format()
logger_wazuh_path = os.path.abspath(os.path.join(__file__, "../../..")) logger_wazuh_path = os.path.abspath(os.path.join(__file__, "../../.."))
# logger_wazuh_path = os.path.abspath(os.path.join(__file__, "../.."))
logger_log_path = '{0}/logs/wazuh-notify.log'.format(logger_wazuh_path) logger_log_path = '{0}/logs/wazuh-notify.log'.format(logger_wazuh_path)
him = 'main' if him == '<module>' else him # When logging from main(), the destination function is called "<module>". For cosmetic reasons rename to "main".
time_stamp = time.strftime('%Y-%m-%d %H:%M:%S')
log_line = f'{time_stamp} | {level} | {me: <23} | {him: <15} | {message}'
# Compare the console log level in the configuration to the log level of the message him = 'main' if him == '<module>' else him
log_line = f'{now_logging} | {level} | {me: <23} | {him: <15} | {message}'
# Compare the extended_print log level in the configuration to the log level of the message.
if config.get('extended_print') >= level: if config.get('extended_print') >= level:
print(log_line) print(log_line)
try:
# Compare the file logging, log level in the configuration to the log level of the message try:
# Compare the extended_logging level in the configuration to the log level of the message.
if config.get("extended_logging") >= level: if config.get("extended_logging") >= level:
with open(logger_log_path, mode="a") as log_file: with open(logger_log_path, mode="a") as log_file:
@ -73,23 +70,26 @@ def logger(level, config, me, him, message):
except (FileNotFoundError, PermissionError, OSError): except (FileNotFoundError, PermissionError, OSError):
# Special message to console when logging to file fails # Special message to console when logging to file fails and console logging might not be set.
log_line = f'{time_stamp} | {level} | {me: <23} | {him: <15} | error opening log file: {logger_log_path}' log_line = f'{now_logging} | {level} | {me: <23} | {him: <15} | error opening log file: {logger_log_path}'
print(log_line) print(log_line)
# Get the content of the .env file (url's and/or webhooks) # Get the content of the .env file (url's and/or webhooks).
def get_env(): def get_env():
# The 'me' variable sets the calling function, the 'him' the called function. Used for logging. # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging.
me = frame(0).f_code.co_name me = frame(0).f_code.co_name
him = frame(1).f_code.co_name him = frame(1).f_code.co_name
# Write the configuration to a dictionary.
config: dict = get_config() config: dict = get_config()
# Check if the secrets .env file is available.
try: try:
dotenv_path = join(dirname(__file__), '.env') dotenv_path = join(dirname(__file__), '.env')
load_dotenv(dotenv_path) load_dotenv(dotenv_path)
@ -97,7 +97,7 @@ def get_env():
logger(0, config, me, him, dotenv_path + " not found") logger(0, config, me, him, dotenv_path + " not found")
raise Exception(dotenv_path, "file not found") raise Exception(dotenv_path, "file not found")
# Retrieve url from .env # Retrieve URLs from .env
discord_url = os.getenv("DISCORD_URL") discord_url = os.getenv("DISCORD_URL")
ntfy_url = os.getenv("NTFY_URL") ntfy_url = os.getenv("NTFY_URL")
@ -115,10 +115,11 @@ def get_env():
return discord_url, ntfy_url, slack_url return discord_url, ntfy_url, slack_url
# Process configuration settings from wazuh-notify-config.yaml # Read and process configuration settings from wazuh-notify-config.yaml and create dictionary.
def get_config(): def get_config():
# The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging.
me = frame(0).f_code.co_name me = frame(0).f_code.co_name
him = frame(1).f_code.co_name him = frame(1).f_code.co_name
@ -136,24 +137,26 @@ def get_config():
logger(2, config, me, him, "Reading configuration file: " + this_config_path) logger(2, config, me, him, "Reading configuration file: " + this_config_path)
config['targets'] = config.get('targets', 'ntfy, discord') config['targets'] = config.get('targets', 'discord, ntfy, slack')
config['full_alert'] = config.get('full_alert', '')
config['excluded_rules'] = config.get('excluded_rules', '') config['excluded_rules'] = config.get('excluded_rules', '')
config['excluded_agents'] = config.get('excluded_agents', '') config['excluded_agents'] = config.get('excluded_agents', '')
config['excluded_days'] = config.get('excluded_days', '') config['priority_map'] = config.get('priority_map', [])
config['excluded_hours'] = config.get('excluded_hours', '')
config['test_mode'] = config.get('test_mode', True)
config['extended_logging'] = config.get('extended_logging', True)
config['extended_print'] = config.get('extended_print', True)
config['sender'] = config.get('sender', 'Wazuh (IDS)') config['sender'] = config.get('sender', 'Wazuh (IDS)')
config['click'] = config.get('click', 'https://wazuh.org') config['click'] = config.get('click', 'https://wazuh.org')
config['md_e'] = config.get('markdown_emphasis', '') config['md_e'] = config.get('markdown_emphasis', '')
config['excluded_days'] = config.get('excluded_days', '')
config['excluded_hours'] = config.get('excluded_hours', '')
config['test_mode'] = config.get('test_mode', False)
config['extended_logging'] = config.get('extended_logging', True)
config['extended_print'] = config.get('extended_print', True)
return config return config
# Show configuration settings from wazuh-notify-config.yaml # Show configuration settings from wazuh-notify-config.yaml
def view_config(): def view_config():
_, _, this_config_path, _ = set_environment() _, _, this_config_path, _ = set_environment()
@ -168,8 +171,9 @@ def view_config():
# Get script arguments during execution. Params found here override config settings. # Get script arguments during execution. Params found here override config settings.
def get_arguments(): def get_arguments():
# The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging.
me = frame(0).f_code.co_name me = frame(0).f_code.co_name
him = frame(1).f_code.co_name him = frame(1).f_code.co_name
@ -226,8 +230,16 @@ def get_arguments():
if not argument_list: if not argument_list:
logger(1, config, me, him, 'No argument list found (no arguments provided with script execution') logger(1, config, me, him, 'No argument list found (no arguments provided with script execution')
arguments: dict = {'url': url, 'sender': sender, 'targets': targets, 'message': message,
'priority': priority, 'tags': tags, 'click': click} # Store defaults for the non-existing arguments in the arguments dictionary to avoid None errors.
arguments: dict = {'url': url,
'sender': sender,
'targets': targets,
'message': message,
'priority': priority,
'tags': tags,
'click': click}
return arguments return arguments
else: else:
@ -240,7 +252,7 @@ def get_arguments():
p_arguments, values = getopt.getopt(argument_list, options, long_options) p_arguments, values = getopt.getopt(argument_list, options, long_options)
# Check each argument # Check each argument. Arguments that are present will override the defaults.
for current_argument, current_value in p_arguments: for current_argument, current_value in p_arguments:
@ -275,12 +287,14 @@ def get_arguments():
except getopt.error as err: except getopt.error as err:
# output error, and return with an error code # Output error, and return error code
logger(0, config, me, him, "Error during argument parsing:" + str(err)) logger(0, config, me, him, "Error during argument parsing:" + str(err))
logger(2, config, me, him, "Arguments returned as dictionary") logger(2, config, me, him, "Arguments returned as dictionary")
# Store the arguments in the arguments dictionary.
arguments: dict = {'url': url, 'sender': sender, 'targets': targets, 'message': message, arguments: dict = {'url': url, 'sender': sender, 'targets': targets, 'message': message,
'priority': priority, 'tags': tags, 'click': click} 'priority': priority, 'tags': tags, 'click': click}
@ -289,8 +303,9 @@ def get_arguments():
# Receive and load message from Wazuh # Receive and load message from Wazuh
def load_message(): def load_message():
# The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging.
me = frame(0).f_code.co_name me = frame(0).f_code.co_name
him = frame(1).f_code.co_name him = frame(1).f_code.co_name
@ -310,17 +325,20 @@ def load_message():
if data.get("command") == "add": if data.get("command") == "add":
logger(1, config, me, him, "Relevant event data found") logger(1, config, me, him, "Relevant event data found")
return data return data
else: else:
# Event came in, but wasn't processed. Shouldn't happen.
# Event came in, but wasn't processed.
logger(0, config, me, him, "Event data not found") logger(0, config, me, him, "Event data not found")
sys.exit(1) sys.exit(1)
# Check if there are reasons not to process this event (as per config yaml) # Check if there are reasons not to process this event. Check exclusions for rules, agents, days and hours.
def exclusions_check(config, alert): def exclusions_check(config, alert):
# The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging.
me = frame(0).f_code.co_name me = frame(0).f_code.co_name
him = frame(1).f_code.co_name him = frame(1).f_code.co_name
@ -328,26 +346,26 @@ def exclusions_check(config, alert):
now_message, now_logging, now_weekday, now_time = set_time_format() now_message, now_logging, now_weekday, now_time = set_time_format()
# Check the exclusion records from the configuration yaml # Check the exclusion records from the configuration yaml.
ex_hours: tuple = config.get('excluded_hours') ex_hours: tuple = config.get('excluded_hours')
# Start hour may not be later than end hours. End hour may not exceed 00:00 midnight to avoid day jump # Start hour may not be later than end hours. End hour may not exceed 00:00 midnight to avoid day jump.
ex_hours = [ex_hours[0], "23:59"] if (ex_hours[1] >= '23:59' or ex_hours[1] < ex_hours[0]) else ex_hours ex_hours = [ex_hours[0], "23:59"] if (ex_hours[1] >= '23:59' or ex_hours[1] < ex_hours[0]) else ex_hours
# Get some more exclusion records from the config # Get some more exclusion records from the config.
ex_days = config.get('excluded_days') ex_days = config.get('excluded_days')
ex_agents = config.get("excluded_agents") ex_agents = config.get("excluded_agents")
ex_rules = config.get("excluded_rules") ex_rules = config.get("excluded_rules")
# Check agent and rule from within the event # Check agent and rule from within the event.
ev_agent = alert['agent']['id'] ev_agent = alert['agent']['id']
ev_rule = alert['rule']['id'] ev_rule = alert['rule']['id']
# Let's assume all lights are green, until proven otherwise # Let's assume all lights are green, until proven otherwise.
ex_hours_eval, ex_weekday_eval, ev_rule_eval, ev_agent_eval = True, True, True, True ex_hours_eval, ex_weekday_eval, ev_rule_eval, ev_agent_eval = True, True, True, True
@ -356,21 +374,25 @@ def exclusions_check(config, alert):
if (now_time > ex_hours[0]) and (now_time < ex_hours[1]): if (now_time > ex_hours[0]) and (now_time < ex_hours[1]):
logger(2, config, me, him, "excluded: event inside exclusion time frame") logger(2, config, me, him, "excluded: event inside exclusion time frame")
ex_hours_eval = False ex_hours_eval = False
elif now_weekday in ex_days: elif now_weekday in ex_days:
logger(2, config, me, him, "excluded: event inside excluded weekdays") logger(2, config, me, him, "excluded: event inside excluded weekdays")
ex_weekday_eval = False ex_weekday_eval = False
elif ev_rule in ex_rules: elif ev_rule in ex_rules:
logger(2, config, me, him, "excluded: event id inside exclusion list") logger(2, config, me, him, "excluded: event id inside exclusion list")
ev_rule_eval = False ev_rule_eval = False
elif ev_agent in ex_agents: elif ev_agent in ex_agents:
logger(2, config, me, him, "excluded: event agent inside exclusion list") logger(2, config, me, him, "excluded: event agent inside exclusion list")
ev_rule_eval = False ev_rule_eval = False
notification_eval = True if (ex_hours_eval and ex_weekday_eval and ev_rule_eval and ev_agent_eval) else False notification_eval = True if (ex_hours_eval and ex_weekday_eval and ev_rule_eval and ev_agent_eval) else False
@ -382,14 +404,16 @@ def exclusions_check(config, alert):
# Map the event threat level to the appropriate 5-level priority scale and color for use in the notification platforms. # Map the event threat level to the appropriate 5-level priority scale and color for use in the notification platforms.
def threat_mapping(config, threat_level, fired_times): def threat_mapping(config, threat_level, fired_times):
# The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging.
me = frame(0).f_code.co_name me = frame(0).f_code.co_name
him = frame(1).f_code.co_name him = frame(1).f_code.co_name
# Map threat level v/s priority # Map threat level to priority. Enters Homeland Security :-).
p_map = config.get('priority_map') p_map = config.get('priority_map')
logger(2, config, me, him, "Prio map: " + str(p_map)) logger(2, config, me, him, "Prio map: " + str(p_map))
for i in range(len(p_map)): for i in range(len(p_map)):
@ -398,43 +422,48 @@ def threat_mapping(config, threat_level, fired_times):
logger(2, config, me, him, "Level: " + str(threat_level)) logger(2, config, me, him, "Level: " + str(threat_level))
if threat_level in p_map[i]["threat_map"]: if threat_level in p_map[i]["threat_map"]:
color_mapping = p_map[i]["color"] color_mapping = p_map[i]["color"]
priority_mapping = 5 - i priority_mapping = 5 - i
logger(2, config, me, him, "Prio: " + str(priority_mapping)) logger(2, config, me, him, "Prio: " + str(priority_mapping))
logger(2, config, me, him, "Color: " + str(color_mapping)) logger(2, config, me, him, "Color: " + str(color_mapping))
if fired_times >= p_map[i]["mention_threshold"]: if fired_times >= p_map[i]["mention_threshold"]:
# When this flag is set, Discord recipients get a stronger message # When this flag is set, Discord recipients get a stronger message (DM).
mention_flag = "@here" mention_flag = "@here"
else: else:
mention_flag = "" mention_flag = ""
logger(2, config, me, him, "Threat level mapped as: " + logger(2, config, me, him, "Threat level mapped as: " +
"prio:" + str(priority_mapping) + " color: " + str(color_mapping) + " mention: " + mention_flag)
"p:" + str(priority_mapping) + " c: " + str(color_mapping) + " m: " + mention_flag)
return priority_mapping, color_mapping, mention_flag return priority_mapping, color_mapping, mention_flag
logger(0, config, me, him, "Threat level mapping failed! Returning garbage (99, 99, 99)") logger(0, config, me, him, "Threat level mapping failed! Returning garbage (99, 99, 99)")
return 99, 99, 99 return 99, 99, "99"
# Construct the message that will be sent to the notifier platforms # Construct the message that will be sent to the notifier platforms.
def construct_basic_message(config, arguments, caller: str, data: dict) -> str: def construct_basic_message(config, arguments, caller: str, data: dict) -> str:
# The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging.
me = frame(0).f_code.co_name me = frame(0).f_code.co_name
him = frame(1).f_code.co_name him = frame(1).f_code.co_name
# Include a specific control sequence for markdown bold parameters names # Include a specific control sequence for markdown bold parameter names.
md_map = config.get('markdown_emphasis') md_map = config.get('markdown_emphasis')
md_e = md_map[caller] md_e = md_map[caller]
# If the --message (-m) argument was fulfilled, use this message to be sent.
if arguments['message']: if arguments['message']:
basic_msg = arguments['message'] basic_msg = arguments['message']
@ -453,21 +482,23 @@ def construct_basic_message(config, arguments, caller: str, data: dict) -> str:
md_e + "Times fired:" + md_e + " " + str(data["rule"]["firedtimes"]) + "\n") md_e + "Times fired:" + md_e + " " + str(data["rule"]["firedtimes"]) + "\n")
if caller == "ntfy": if caller == "ntfy":
# todo Check this out
basic_msg = "&nbsp;\n" + basic_msg basic_msg = "&nbsp;\n" + basic_msg
logger(2, config, me, him, caller + " basic message constructed") logger(2, config, me, him, caller + " basic message constructed.")
return basic_msg return basic_msg
# Construct the notification (message + additional information) that will be sent to the notifier platforms. # Construct the notification (message + additional information) that will be sent to the notifier platforms.
def build_notification(caller, config, arguments, notification, alert, priority, color, mention): def build_notification(caller, config, arguments, notification, alert, priority, color, mention):
# The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging.
me = frame(0).f_code.co_name me = frame(0).f_code.co_name
him = frame(1).f_code.co_name him = frame(1).f_code.co_name
logger(2, config, me, him, caller + " notification being constructed") logger(2, config, me, him, caller + " notification being constructed.")
md_map = config.get('markdown_emphasis') md_map = config.get('markdown_emphasis')
md_e = md_map[caller] md_e = md_map[caller]
@ -480,7 +511,7 @@ def build_notification(caller, config, arguments, notification, alert, priority,
.replace("'", "") .replace("'", "")
.replace(",", ", ") .replace(",", ", ")
) )
logger(2, config, me, him, caller + " full event formatted") logger(2, config, me, him, caller + " full event formatted.")
full_event: str = str(json.dumps(alert, indent=4) full_event: str = str(json.dumps(alert, indent=4)
.replace('"', '') .replace('"', '')
@ -490,15 +521,18 @@ def build_notification(caller, config, arguments, notification, alert, priority,
.replace(']', '') .replace(']', '')
.replace(',', ' ') .replace(',', ' ')
) )
# Fill some of the variables with argument values if available.
# todo Redundant?
click = arguments['click'] if arguments['click'] else click click = arguments['click'] if arguments['click'] else click
priority = arguments['priority'] if arguments['priority'] else priority priority = arguments['priority'] if arguments['priority'] else priority
sender = arguments['sender'] if arguments['sender'] else sender sender = arguments['sender'] if arguments['sender'] else sender
tags = arguments['tags'] if arguments['tags'] else tags tags = arguments['tags'] if arguments['tags'] else tags
# Add the full alert data to the notification # Add the full alert data to the notification.
if caller in config["full_message"]: if caller in config["full_alert"]:
logger(2, config, me, him, caller + "Full alert data will be sent") logger(2, config, me, him, caller + "Full alert data will be sent.")
notification: str = ("\n\n" + notification + "\n" + notification: str = ("\n\n" + notification + "\n" +
md_e + "__Full event__" + md_e + "\n" + "```\n" + full_event + "```") md_e + "__Full event__" + md_e + "\n" + "```\n" + full_event + "```")
@ -515,6 +549,7 @@ def build_notification(caller, config, arguments, notification, alert, priority,
# Prepare the messaging platform specific notification and execute # Prepare the messaging platform specific notification and execute
if caller == "discord": if caller == "discord":
logger(2, config, me, him, caller + " payload created") logger(2, config, me, him, caller + " payload created")
payload_json = {"username": sender, payload_json = {"username": sender,
"content": mention, "content": mention,
@ -542,6 +577,8 @@ def build_notification(caller, config, arguments, notification, alert, priority,
if caller == "slack": if caller == "slack":
logger(2, config, me, him, caller + " payloads created") logger(2, config, me, him, caller + " payloads created")
# todo Need some investigation.
payload_json = {"text": notification} payload_json = {"text": notification}
# payload_json = {"username": sender, # payload_json = {"username": sender,
# "content": mention, # "content": mention,