Tomli branch

This commit is contained in:
Rudi klein 2024-05-27 12:40:39 +02:00
parent 3c6553f37e
commit 0f612598c0
3 changed files with 109 additions and 30 deletions

79
wazuh-notify-config.toml Normal file
View File

@ -0,0 +1,79 @@
#############################################################################################################
# This is the TOML config file for wazuh-notify (active response) for both the Python and Go implementation #
#############################################################################################################
[general]
# Platforms in this string with comma seperated values are triggered.
targets = "slack, ntfy, discord"
# Platforms in this string will enable sending the full event information.
full_alert = ""
# Exclude rule events that are enabled in the ossec.conf active response definition.
# These settings provide an easier way to disable events from firing the notifiers.
excluded_rules = "99999, 00000"
excluded_agents = "99999"
# The next 2 settings are used to add information to the messages.
sender = "Wazuh (IDS)"
click = "https://documentation.wazuh.com/"
# Priority mapping from 0-15 (Wazuh threat levels) to 1-5 (in notifications) and their respective colors (Discord)
# https://documentation.wazuh.com/current/user-manual/ruleset/rules-classification.html
# Enter threat_map as lists of integers, mention_threshold as integer and color as Hex integer
[[priority_map]]
threat_map = [15, 14, 13, 12]
mention_threshold = 1
color = 0xec3e40 # Red, SEVERE
[[priority_map]]
threat_map = [11, 10, 9]
mention_threshold = 1
color = 0xff9b2b # Orange, HIGH
[[priority_map]]
threat_map = [8, 7, 6]
mention_threshold = 5
color = 0xf5d800 # Yellow, ELEVATED
[[priority_map]]
threat_map = [5, 4]
mention_threshold = 20
color = 0x377fc7 # Blue, GUARDED
[[priority_map]]
threat_map = [3, 2, 1, 0]
mention_threshold = 20
color = 0x01a465 # Green, LOW
################ End of priority mapping ##################################
# Following parameter defines the markdown characters to emphasise the parameter names in the notification messages
[markdown_emphasis]
slack = "*"
ntfy = "**"
discord = "**"
##################################################################################
# From here on the settings are ONLY used by the Python version of wazuh-notify. #
##################################################################################
[python]
# The next settings are used for testing and troubleshooting.
# Test mode will add the example event in wazuh-notify-test-event.json instead of the message received through wazuh.
# This enables testing for particular events when the test event is customized.
test_mode = true
# Enabling this parameter provides more logging to the wazuh-notifier log.
extended_logging = 2
# Enabling this parameter provides extended logging to the console.
extended_print = 2
# Below settings provide for a window that enable/disables events from firing the notifiers.
excluded_days = ""
# Enter as a tuple of string values. Be aware of your regional settings.
excluded_hours = ["23:59", "00:00"]

View File

@ -29,7 +29,7 @@ def main():
arguments = get_arguments() arguments = get_arguments()
# Check if we are in test mode (test_mode setting in config yaml). If so, load test event instead of live event. # Check if we are in test mode (test_mode setting in config yaml). If so, load test event instead of live event.
if config.get("test_mode"): if config.get('python', 'test_mode'):
logger(1, config, me, him, "Running in test mode: using test message wazuh-notify-test-event.json") logger(1, config, me, him, "Running in test mode: using test message wazuh-notify-test-event.json")

View File

@ -8,7 +8,7 @@ import time
from os.path import join, dirname from os.path import join, dirname
from sys import _getframe as frame from sys import _getframe as frame
import yaml import tomli
from dotenv import load_dotenv from dotenv import load_dotenv
@ -17,10 +17,10 @@ from dotenv import load_dotenv
# config_path = wazuh-notify-config.yaml # config_path = wazuh-notify-config.yaml
def set_environment() -> tuple: def set_environment() -> tuple:
set_wazuh_path = os.path.abspath(os.path.join(__file__, "../.."))
set_wazuh_path = os.path.abspath(os.path.join(__file__, "../../..")) # set_wazuh_path = os.path.abspath(os.path.join(__file__, "../../.."))
set_log_path = '{0}/logs/wazuh-notify.log'.format(set_wazuh_path) set_log_path = '{0}/logs/wazuh-notify.log'.format(set_wazuh_path)
set_config_path = '{0}/etc/wazuh-notify-config.yaml'.format(set_wazuh_path) set_config_path = '{0}/etc/wazuh-notify-config.toml'.format(set_wazuh_path)
return set_wazuh_path, set_log_path, set_config_path return set_wazuh_path, set_log_path, set_config_path
@ -58,13 +58,13 @@ def logger(level, config, me, him, message):
# Compare the extended_print log level in the configuration to the log level of the message. # Compare the extended_print log level in the configuration to the log level of the message.
if config.get('extended_print') >= level: if config.get('python').get('extended_print', 0) >= level:
print(log_line) print(log_line)
try: try:
# Compare the extended_logging level in the configuration to the log level of the message. # Compare the extended_logging level in the configuration to the log level of the message.
if config.get("extended_logging") >= level: if config.get('python').get('extended_logging', 0) >= level:
with open(logger_log_path, mode="a") as log_file: with open(logger_log_path, mode="a") as log_file:
log_file.write(log_line + "\n") log_file.write(log_line + "\n")
@ -129,28 +129,28 @@ def get_config():
try: try:
_, _, this_config_path = set_environment() _, _, this_config_path = set_environment()
with open(this_config_path, 'r') as ntfier_config: with open(this_config_path, 'rb') as ntfier_config:
config: dict = yaml.safe_load(ntfier_config) config: dict = tomli.load(ntfier_config)
except (FileNotFoundError, PermissionError, OSError): except (FileNotFoundError, PermissionError, OSError):
logger(2, config, me, him, "Error accessing configuration file: " + this_config_path) logger(2, config, me, him, "Error accessing configuration file: " + this_config_path)
logger(2, config, me, him, "Reading configuration file: " + this_config_path) logger(2, config, me, him, "Reading configuration file: " + this_config_path)
config['targets'] = config.get('targets', 'discord, ntfy, slack') config['targets'] = config.get('general').get('targets', 'discord, slack, ntfy')
config['full_alert'] = config.get('full_alert', '') config['full_alert'] = config.get('general').get('full_alert', False)
config['excluded_rules'] = config.get('excluded_rules', '') config['excluded_rules'] = config.get('general').get('excluded_rules', '')
config['excluded_agents'] = config.get('excluded_agents', '') config['excluded_agents'] = config.get('general').get('excluded_agents', '')
config['priority_map'] = config.get('priority_map', []) config['priority_map'] = config.get('priority_map', [])
config['sender'] = config.get('sender', 'Wazuh (IDS)') config['sender'] = config.get('general').get('sender', 'Wazuh (IDS)')
config['click'] = config.get('click', 'https://wazuh.org') config['click'] = config.get('general').get('click', 'https://wazuh.com')
config['md_e'] = config.get('markdown_emphasis', '') config['md_e'] = config.get('general').get('markdown_emphasis', '')
config['excluded_days'] = config.get('excluded_days', '') config['excluded_days'] = config.get('python').get('excluded_days', '')
config['excluded_hours'] = config.get('excluded_hours', '') config['excluded_hours'] = config.get('python').get('excluded_hours', '')
config['test_mode'] = config.get('test_mode', False) config['test_mode'] = config.get('python').get('test_mode', False)
config['extended_logging'] = config.get('extended_logging', True) config['extended_logging'] = config.get('python').get('extended_logging', 0)
config['extended_print'] = config.get('extended_print', True) config['extended_print'] = config.get('python').get('extended_print', 0)
return config return config
@ -348,17 +348,17 @@ def exclusions_check(config, alert):
# Check the exclusion records from the configuration yaml. # Check the exclusion records from the configuration yaml.
ex_hours: tuple = config.get('excluded_hours') ex_hours: tuple = config.get('python', 'excluded_hours')
# Start hour may not be later than end hours. End hour may not exceed 00:00 midnight to avoid day jump. # Start hour may not be later than end hours. End hour may not exceed 00:00 midnight to avoid day jump.
################################################
ex_hours = [ex_hours[0], "23:59"] if (ex_hours[1] >= '23:59' or ex_hours[1] < ex_hours[0]) else ex_hours ex_hours = [ex_hours[0], "23:59"] if (ex_hours[1] >= '23:59' or ex_hours[1] < ex_hours[0]) else ex_hours
# Get some more exclusion records from the config. # Get some more exclusion records from the config.
ex_days = config.get('excluded_days') ex_days = config.get('python').get('excluded_days')
ex_agents = config.get("excluded_agents") ex_agents = config.get('general').get("excluded_agents")
ex_rules = config.get("excluded_rules") ex_rules = config.get('general').get("excluded_rules")
# Check agent and rule from within the event. # Check agent and rule from within the event.
@ -459,7 +459,7 @@ def construct_basic_message(config, arguments, caller: str, data: dict) -> str:
# Include a specific control sequence for markdown bold parameter names. # Include a specific control sequence for markdown bold parameter names.
md_map = config.get('markdown_emphasis') md_map = config.get('python').get('markdown_emphasis', '')
md_e = md_map[caller] md_e = md_map[caller]
# If the --message (-m) argument was fulfilled, use this message to be sent. # If the --message (-m) argument was fulfilled, use this message to be sent.
@ -500,11 +500,11 @@ def build_notification(caller, config, arguments, notification, alert, priority,
logger(2, config, me, him, caller + " notification being constructed.") logger(2, config, me, him, caller + " notification being constructed.")
md_map = config.get('markdown_emphasis') md_map = config.get('python').get('markdown_emphasis', '')
md_e = md_map[caller] md_e = md_map[caller]
click: str = config.get('click') click: str = config.get('general').get('click', 'https://wazuh.com')
sender: str = config.get('sender') sender: str = config.get('general').get('sender', 'Wazuh (IDS)')
priority: str = str(priority) priority: str = str(priority)
tags = (str(alert['rule']['groups']).replace("[", "") tags = (str(alert['rule']['groups']).replace("[", "")
.replace("]", "") .replace("]", "")