after active-response fixes

This commit is contained in:
Rudi klein 2024-05-11 19:59:49 +02:00
parent 7e95376a22
commit 1cf27eb64f
5 changed files with 288 additions and 370 deletions

View File

@ -1,6 +1,5 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# This script is adapted version of the Python active response script sample, provided by Wazuh, in the documentation: # This script is adapted version of the Python active response script sample, provided by Wazuh, in the documentation:
# https://documentation.wazuh.com/current/user-manual/capabilities/active-response/custom-active-response-scripts.html # https://documentation.wazuh.com/current/user-manual/capabilities/active-response/custom-active-response-scripts.html
# It is provided under the below copyright statement: # It is provided under the below copyright statement:
@ -13,244 +12,116 @@
# License (version 2) as published by the FSF - Free Software # License (version 2) as published by the FSF - Free Software
# Foundation. # Foundation.
# #
# This version has changes in
# 1) the first lines of code with the assignments, and
# 2) the Start Custom Action Add section
# This adapted version is free software. Rudi Klein, april 2024 # This adapted version is free software. Rudi Klein, april 2024
import datetime
import json
import os import os
import sys import sys
from pathlib import PureWindowsPath, PurePosixPath
from wazuh_notifier_module import import_config as ic from wazuh_notifier_module import construct_basic_message
from wazuh_notifier_module import set_environment as se from wazuh_notifier_module import get_config
from wazuh_notifier_module import parameters_deconstruct
from wazuh_notifier_module import set_environment
from wazuh_notifier_module import threat_mapping
# Some variable assignments # Path variable assignments
wazuh_path, ar_path, config_path = se() wazuh_path, ar_path, config_path, notifier_path = set_environment()
ADD_COMMAND = 0
DELETE_COMMAND = 1
CONTINUE_COMMAND = 2
ABORT_COMMAND = 3
OS_SUCCESS = 0
OS_INVALID = -1
class Message:
def __init__(self):
self.alert = ""
self.command = 0
def write_debug_file(ar_name, msg):
with open(ar_path, mode="a") as log_file:
ar_name_posix = str(PurePosixPath(PureWindowsPath(ar_name[ar_name.find("active-response"):])))
log_file.write(
str(datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')) + " " + ar_name_posix + ": " + msg + "\n")
def setup_and_check_message(argv):
# get alert from stdin
input_str = ""
for line in sys.stdin:
input_str = line
break
write_debug_file(argv[0], input_str)
try:
data = json.loads(input_str)
except ValueError:
write_debug_file(argv[0], 'Decoding JSON has failed, invalid input format')
Message.command = OS_INVALID
return Message
Message.alert = data
command = data.get("command")
if command == "add":
Message.command = ADD_COMMAND
elif command == "delete":
Message.command = DELETE_COMMAND
else:
Message.command = OS_INVALID
write_debug_file(argv[0], 'Not valid command: ' + command)
return Message
def send_keys_and_check_message(argv, keys):
# build and send message with keys
keys_msg = json.dumps(
{"version": 1, "origin": {"name": argv[0], "module": "active-response"}, "command": "check_keys",
"parameters": {"keys": keys}})
write_debug_file(argv[0], keys_msg)
print(keys_msg)
sys.stdout.flush()
# read the response of previous message
input_str = ""
while True:
line = sys.stdin.readline()
if line:
input_str = line
break
write_debug_file(argv[0], input_str)
try:
data = json.loads(input_str)
except ValueError:
write_debug_file(argv[0], 'Decoding JSON has failed, invalid input format')
return Message
action = data.get("command")
if "continue" == action:
ret = CONTINUE_COMMAND
elif "abort" == action:
ret = ABORT_COMMAND
else:
ret = OS_INVALID
write_debug_file(argv[0], "Invalid value of 'command'")
return ret
def parameters_deconstruct(argv, event_keys):
a_id: str = str(event_keys["agent"]["id"])
a_name: str = str(event_keys["agent"]["name"])
e_id: str = str(event_keys["rule"]["id"])
e_description: str = str(event_keys["rule"]["description"])
e_level: str = str(event_keys["rule"]["level"])
e_fired_times: str = str(event_keys["rule"]["firedtimes"])
e_full_event: str = str(json.dumps(event_keys, indent=0).replace('"', '')
.replace('{', '')
.replace('}', '')
.replace('[', '')
.replace(']', '')
.replace(',', '')
.replace(' ', '')
)
if e_id in ic("excluded_rules") or a_id in ic("excluded_agents"):
write_debug_file(argv[0], "Excluded rule or agent: " + e_id + "/" + a_id)
else:
return a_id, a_name, e_id, e_description, e_level, e_fired_times, e_full_event
def construct_basic_message(argv, accent: str, a_id: str, a_name: str, e_id: str, e_description: str, e_level: str,
e_fired_times: str):
# Adding the BOLD text string to the Discord message. Ntfy has a different message format.
basic_message: str = ("--message " + '"' +
accent + "Agent: " + accent + a_name + " (" + a_id + ")" + "\n" +
accent + "Event id: " + accent + e_id + "\n" +
accent + "Description: " + accent + e_description + "\n" +
accent + "Threat level: " + accent + e_level + "\n" +
# Watch this last addition to the string. It should include the closing quote for the
# basic_message string. It must be closed by -> '"'. This will be done outside this function
# in order to enable another specific addition (event_full_message) in the calling procedure.
accent + "Times fired: " + accent + e_fired_times + "\n")
return basic_message
def main(argv): def main(argv):
write_debug_file(argv[0], "Started")
# validate json and get command # validate json and get command
msg = setup_and_check_message(argv)
if msg.command < 0: # data = load_message(argv)
sys.exit(OS_INVALID) # This example event can be used for troubleshooting. Comment out the line above and uncomment the line below.
data: dict = {"version": 1, "origin": {"name": "worker01", "module": "wazuh-execd"}, "command": "add",
"parameters": {"extra_args": [], "alert": {"timestamp": "2021-02-01T20:58:44.830+0000",
"rule": {"level": 15,
"description": "Shellshock attack detected",
"id": "31168", "mitre": {"id": ["T1068", "T1190"],
"tactic": [
"Privilege Escalation",
"Initial Access"],
"technique": [
"Exploitation for Privilege Escalation",
"Exploit Public-Facing Application"]},
"info": "CVE-2014-6271https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2014-6271",
"firedtimes": 2, "mail": "true",
"groups": ["web", "accesslog", "attack"],
"pci_dss": ["11.4"], "gdpr": ["IV_35.7.d"],
"nist_800_53": ["SI.4"],
"tsc": ["CC6.1", "CC6.8", "CC7.2", "CC7.3"]},
"agent": {"id": "000", "name": "wazuh-server"},
"manager": {"name": "wazuh-server"},
"id": "1612213124.6448363",
"full_log": "192.168.0.223 - - [01/Feb/2021:20:58:43 +0000] \"GET / HTTP/1.1\" 200 612 \"-\" \"() { :; }; /bin/cat /etc/passwd\"",
"decoder": {"name": "web-accesslog"},
"data": {"protocol": "GET", "srcip": "192.168.0.223",
"id": "200", "url": "/"},
"location": "/var/log/nginx/access.log"},
"program": "/var/ossec/active-response/bin/firewall-drop"}}
if msg.command == ADD_COMMAND: alert = data["parameters"]["alert"]
""" Start Custom Key # Get the threat level from the event (message)
At this point, it is necessary to select the keys from the alert and add them into the keys array. threat_level = data["parameters"]["alert"]["rule"]["level"]
"""
alert = msg.alert["parameters"]["alert"] parameters: dict = parameters_deconstruct(argv, alert)
keys = [alert["rule"]]
agent_id, agent_name, event_id, event_description, event_level, event_fired_times, event_full_message = \ # Get the YAML config if any
parameters_deconstruct(argv, alert) config: dict = get_config()
action = send_keys_and_check_message(argv, keys) # Get the mapping between threat level (event) and priority (Discord/ntfy)
threat_priority = threat_mapping(threat_level, config.get('np_1'), config.get('np_2'),
# if necessary, abort execution config.get('np_3'), config.get('np_4'), config.get('np_5'))
if action != CONTINUE_COMMAND:
if action == ABORT_COMMAND:
write_debug_file(argv[0], "Aborted")
sys.exit(OS_SUCCESS)
else:
write_debug_file(argv[0], "Invalid command")
sys.exit(OS_INVALID)
""" Start Custom Action Add """
if str(ic("discord_enabled")) == "1":
accent = "**"
discord_notifier = '{0}/active-response/bin/wazuh-discord-notifier.py'.format(wazuh_path)
discord_exec = "python3 " + discord_notifier + " "
write_debug_file(argv[0], "Start Discord notifier")
discord_message = construct_basic_message(argv, accent, agent_id, agent_name, event_id, event_description,
event_level, event_fired_times)
if ic("discord_full_message") == "1":
discord_message = discord_message + "\n" + accent + "__Full event__" + accent + event_full_message + '"'
else:
discord_message = discord_message + '"'
discord_command = discord_exec + discord_message
os.system(discord_command)
if str(ic("ntfy_enabled")) == "1":
accent = ""
ntfy_notifier = '{0}/active-response/bin/wazuh-ntfy-notifier.py'.format(wazuh_path)
ntfy_exec = "python3 " + ntfy_notifier + " "
write_debug_file(argv[0], "Start NTFY notifier")
ntfy_message = construct_basic_message(argv, accent, agent_id, agent_name, event_level, event_description,
event_id, event_fired_times)
# If the full message flag is set, the full message PLUS the closing parenthesis will be added
if ic("ntfy_full_message") == "1":
ntfy_message = ntfy_message + "\n" + "Full event" + event_full_message + '"'
else:
ntfy_message = ntfy_message + '"'
ntfier_command = ntfy_exec + ntfy_message
os.system(ntfier_command)
""" End Custom Action Add """
elif msg.command == DELETE_COMMAND:
""" Start Custom Action Delete """
pass
""" End Custom Action Delete """
if "discord" in config["targets"]:
accent: str = "**"
elif "ntfy" in config["targets"]:
accent: str = ""
else: else:
write_debug_file(argv[0], "Invalid command") accent: str = ""
write_debug_file(argv[0], "Ended") notifier_message: str = construct_basic_message(argv, accent,
parameters.get('a_id', '000'),
parameters.get('a_name', 'agent not found'),
parameters.get('e_id', '9999'),
parameters.get('e_description', 'Event not found'),
parameters.get('e_level', '9999'),
parameters.get('e_fired_times', '3')
)
sys.exit(OS_SUCCESS) if "discord" in config["targets"]:
discord_notifier: str = '{0}/active-response/bin/wazuh-discord-notifier.py'.format(wazuh_path)
discord_exec: str = "python3 " + discord_notifier + " "
discord_message: str = notifier_message
if "discord" in config["full_message"]:
discord_message: str = (discord_message + "\n" + accent + "__Full event__" +
accent + parameters['e_full_event'] + '"')
else:
discord_message: str = discord_message + '"'
discord_command: str = discord_exec + discord_message
os.system(discord_command)
if "ntfy" in config["targets"]:
ntfy_notifier: str = '{0}/active-response/bin/wazuh-ntfy-notifier.py'.format(wazuh_path)
ntfy_exec: str = "python3 " + ntfy_notifier + " "
ntfy_message: str = notifier_message
# If the full message flag is set, the full message PLUS the closing parenthesis will be added
if "ntfy" in config["full_message"]:
ntfy_message: str = ntfy_message + "\n" + "Full event" + parameters['e_full_event'] + '"'
else:
ntfy_message: str = ntfy_message + '"'
ntfy_command: str = ntfy_exec + ntfy_message
os.system(ntfy_command)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -16,98 +16,51 @@
# with their friends and communities. It allows for receiving message using webhooks. # with their friends and communities. It allows for receiving message using webhooks.
# For more information: https://discord.com. # For more information: https://discord.com.
import os
from os.path import join, dirname
import requests import requests
from dotenv import load_dotenv
from wazuh_notifier_module import get_arguments as ga from wazuh_notifier_module import color_mapping
from wazuh_notifier_module import get_yaml_config as yc from wazuh_notifier_module import get_arguments
from wazuh_notifier_module import set_basic_defaults as bd from wazuh_notifier_module import get_config
from wazuh_notifier_module import set_environment as se from wazuh_notifier_module import get_env
from wazuh_notifier_module import set_time as st from wazuh_notifier_module import set_environment
from wazuh_notifier_module import threat_priority_mapping as tpm from wazuh_notifier_module import set_time
# Get path values # Get path values
wazuh_path, ar_path, config_path = se() wazuh_path, ar_path, config_path, notifier_path = set_environment()
# Get time value # Get time value
now_message, now_logging = st() now_message, now_logging = set_time()
# Retrieve webhook from .env # Get some paths.
discord_url, ntfy_url = get_env()
# Catching some path errors. # Get the yaml config
try: config: dict = get_config()
dotenv_path = join(dirname(__file__), '.env')
load_dotenv(dotenv_path)
if not os.path.isfile(dotenv_path):
raise Exception(dotenv_path, "file not found")
discord_webhook = os.getenv("DISCORD_WEBHOOK")
except Exception as err:
# output error, and return with an error code
print(str(Exception(err.args)))
exit(err)
# the POST builder. Prepares https and sends the request. # the POST builder. Prepares https and sends the request.
def discord_command(n_server, n_sender, n_destination, n_priority, n_message, n_tags, n_click): def discord_command(n_url, n_sender, n_destination, n_priority, n_message, n_tags, n_click):
color = color_mapping(n_priority)
x_message = (now_message + x_message = (now_message +
"\n\n" + n_message + "\n\n" + "\n\n" + n_message + "\n\n" +
"Priority: " + n_priority + "\n" + "Priority: " + n_priority + "\n" +
"Tags: " + n_tags + "\n\n" + n_click "Tags: " + n_tags + "\n\n" + n_click
) )
n_data = {"username": n_sender, "embeds": [{"description": x_message, "title": n_destination}]} n_data = {"username": n_sender, "embeds": [{"color": color, "description": x_message, "title": n_destination}]}
requests.post(n_server, json=n_data) requests.post(n_url, json=n_data)
# Remove 1st argument from the list of command line arguments # Remove 1st argument from the list of command line arguments
# argument_list: list = sys.argv[1:] # argument_list: list = sys.argv[1:]
# Short options
options: str = "u:s:p:m:t:c:hv"
# Long options
long_options: list = ["server=", "sender=", "destination=", "priority=", "message=", "tags=", "click=", "help", "view"]
# Defining who I am
notifier = "discord" notifier = "discord"
# Retrieve the hard-coded basic defaults. url, sender, destination, priority, message, tags, click = get_arguments()
(d_server, d_sender, d_destination, d_priority, d_message, d_tags, d_click, d_notifier_priority_1,
d_notifier_priority_2, d_notifier_priority_3, d_notifier_priority_4, d_notifier_priority_5) = bd(notifier)
# Use the values from the config yaml if available. Overrides the basic defaults (get_yaml_config).
yc_args = [notifier, d_server, d_sender, d_destination, d_priority, d_message, d_tags, d_click, d_notifier_priority_1,
d_notifier_priority_2, d_notifier_priority_3, d_notifier_priority_4, d_notifier_priority_5]
(server, sender, destination, priority, message, tags, click, notifier_priority_1, notifier_priority_2,
notifier_priority_3, notifier_priority_4, notifier_priority_5) = yc(*yc_args)
# Get params during execution. Params found here, override minimal defaults and/or config settings.
if ga(notifier, options, long_options) is None:
pass
# sender, destination, priority, message, tags, click = "", "", "", "", "", ""
else:
sender, destination, priority, message, tags, click = ga(notifier, options, long_options)
# Get the threat level from the message and map it to priority
threat_level = message[message.find('Threat level:') + 13:message.find('Threat level:') + 15].replace(" ", "")
# Get the mapping between threat level (event) and priority (Discord/ntfy)
# noinspection PyRedeclaration
priority = tpm(threat_level, notifier_priority_1, notifier_priority_2, notifier_priority_3,
notifier_priority_4, notifier_priority_5)
# Finally, execute the POST request # Finally, execute the POST request
discord_command(discord_webhook, sender, destination, priority, message, tags, click) discord_command(discord_url, sender, destination, priority, message, tags, click)

View File

@ -1,31 +0,0 @@
---
#start of yaml
# This is the yaml config file for both the wazuh-ntfy-notifier.py and wazuh-discord-notifier.py.
# The yaml needs to be in the same folder as the wazuh-ntfy-notifier.py and wazuh-discord-notifier.py
# COMMON (custom-wazuh-notifiers.py) configuration settings start here.
# 1 = messages will be sent through this message server. 0 = messages will NOT be sent through this message server.
targets: "discord,ntfy"
# Exclude rules that are listed in the ossec.conf active response definition.
excluded_rules: "5401, 5403"
excluded_agents: "999"
# Priority mapping from 1-12 (Wazuh events) to 1-5 (Discord and ntfy notification)
notifier_priority_1: 12, 11, 10
notifier_priority_2: 9, 8
notifier_priority_3: 7, 6
notifier_priority_4: 5, 4
notifier_priority_5: 3 ,2, 1
sender: "Wazuh (IDS)"
click: "https://google.com"
#end of yaml
...

32
wazuh-notify-config.yaml Executable file
View File

@ -0,0 +1,32 @@
---
#start of yaml
# This is the yaml config file for both the wazuh-ntfy-notifier.py and wazuh-discord-notifier.py.
# The yaml needs to be in the same folder as the wazuh-ntfy-notifier.py and wazuh-discord-notifier.py
targets: "discord, ntfy"
full_message: "discord, ntfy"
# Exclude rules that are listed in the ossec.conf active response definition.
excluded_rules: "5401, 5403"
excluded_agents: "999"
# Priority mapping from 0-15 (Wazuh events: threat levels) to 1-5 ( in notification)
# https://documentation.wazuh.com/current/user-manual/ruleset/rules-classification.html
priority_5: [ 15,14,13,12 ]
priority_4: [ 11,10,9 ]
priority_3: [ 8,7,6 ]
priority_2: [ 5,4 ]
priority_1: [ 3,2,1,0 ]
sender: "Wazuh (IDS)"
click: "https://google.com"
#end of yaml
...

View File

@ -1,13 +1,43 @@
import datetime
import getopt import getopt
import json
import os import os
import sys import sys
import time import time
from os.path import join, dirname from os.path import join, dirname
from pathlib import PureWindowsPath, PurePosixPath
import yaml import yaml
from dotenv import load_dotenv from dotenv import load_dotenv
def set_environment() -> tuple:
# todo fix reference when running manually/in process
set_wazuh_path = "/home/rudi/pycharm"
# set_wazuh_path = os.path.abspath(os.path.join(__file__, "../../.."))
set_ar_path = '{0}/logs/active-responses.log'.format(set_wazuh_path)
set_config_path = '{0}/etc/wazuh-notify-config.yaml'.format(set_wazuh_path)
set_notifier_path = '{0}/active-response/bin'.format(set_wazuh_path)
return set_wazuh_path, set_ar_path, set_config_path, set_notifier_path
# Define paths: wazuh_path = wazuh root directory
# ar_path = active-responses.log path,
# config_path = wazuh-notifier-wazuh-notify-config.yaml
wazuh_path, ar_path, config_path, notifier_path = set_environment()
# Debug writer
def write_debug_file(ar_name, msg):
with open(ar_path, mode="a") as log_file:
ar_name_posix = str(PurePosixPath(PureWindowsPath(ar_name[ar_name.find("active-response"):])))
log_file.write(
str(datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')) + " " + ar_name_posix + ": " + msg + "\n")
def get_env(): def get_env():
try: try:
dotenv_path = join(dirname(__file__), '.env') dotenv_path = join(dirname(__file__), '.env')
@ -36,46 +66,51 @@ def set_time():
return now_message, now_logging return now_message, now_logging
# Define paths: wazuh_path = wazuh root directory # Import configuration settings from wazuh-notify-config.yaml
# ar_path = active-responses.log path,
# config_path = wazuh-notifier-wazuh-notify-config.yaml
def set_environment():
# todo fix reference when running manually/in process
wazuh_path = "/var/ossec"
# wazuh_path = os.path.abspath(os.path.join(__file__, "../../.."))
ar_path = '{0}/logs/active-responses.log'.format(wazuh_path)
config_path = 'wazuh-notifier-wazuh-notify-config.yaml'.format(wazuh_path)
return wazuh_path, ar_path, config_path
# Import configuration settings from wazuh-notifier-wazuh-notify-config.yaml
def import_config(): def import_config():
try: try:
_, _, config_path = set_environment() _, _, this_config_path, _ = set_environment()
with open(config_path, 'r') as ntfier_config: with open(this_config_path, 'r') as ntfier_config:
config: dict = yaml.safe_load(ntfier_config) config: dict = yaml.safe_load(ntfier_config)
return config return config
except (FileNotFoundError, PermissionError, OSError): except (FileNotFoundError, PermissionError, OSError):
return None return None
# Show configuration settings from wazuh-notifier-wazuh-notify-config.yaml # Process configuration settings from wazuh-notify-config.yaml
def get_config():
config = import_config()
config['np_5'] = config.get('np_1', [15, 14, 13, 12])
config['np_4'] = config.get('np_2', [11, 10, 9])
config['np_3'] = config.get('np_3', [8, 7, 6])
config['np_2'] = config.get('np_4', [5, 4])
config['np_1'] = config.get('np_5', [3, 2, 1, 0])
config['targets'] = config.get('targets', 'ntfy, discord')
config['excluded_rules'] = config.get('excluded_rules', '')
config['excluded_agents'] = config.get('excluded_agents', '')
config['sender'] = 'Wazuh (IDS)'
config['click'] = 'https://wazuh.org'
return config
# Show configuration settings from wazuh-notify-config.yaml
def view_config(): def view_config():
_, _, config_path = set_environment() _, _, this_config_path, _ = set_environment()
try: try:
with open(config_path, 'r') as ntfier_config: with open(this_config_path, 'r') as ntfier_config:
print(ntfier_config.read()) print(ntfier_config.read())
except (FileNotFoundError, PermissionError, OSError): except (FileNotFoundError, PermissionError, OSError):
print(config_path + " does not exist or is not accessible") print(this_config_path + " does not exist or is not accessible")
return return
@ -84,53 +119,49 @@ def view_config():
def ar_log(): def ar_log():
now = set_time() now = set_time()
_, ar_path, _ = set_environment() _, this_ar_path, _, _ = set_environment()
msg = '{0} {1} {2}'.format(now, os.path.realpath(__file__), 'Post JSON Alert') msg = '{0} {1} {2}'.format(now, os.path.realpath(__file__), 'Post JSON Alert')
f = open(ar_path, 'a') f = open(this_ar_path, 'a')
f.write(msg + '\n') f.write(msg + '\n')
f.close() f.close()
def threat_priority_mapping(threat_level, np_1, np_2, np_3, np_4, np_5): def threat_mapping(threat_level, np_1, np_2, np_3, np_4, np_5):
# Map threat level v/s priority # Map threat level v/s priority
if threat_level in np_1: if threat_level in np_1:
priority_mapping = "1" priority_mapping = "1"
priority_color = 0x339900
elif threat_level in np_2: elif threat_level in np_2:
priority_mapping = "2" priority_mapping = "2"
priority_color = 0x99cc33
elif threat_level in np_3: elif threat_level in np_3:
priority_mapping = "3" priority_mapping = "3"
priority_color = 0xffcc00
elif threat_level in np_4: elif threat_level in np_4:
priority_mapping = "4" priority_mapping = "4"
priority_color = 0xff9966
elif threat_level in np_5: elif threat_level in np_5:
priority_mapping = "5" priority_mapping = "5"
priority_color = 0xcc3300
else: else:
priority_mapping = "3" priority_mapping = "3"
return priority_mapping
def color_mapping(priority):
# Map priority to color
if priority == 1:
priority_color = 0x339900
elif priority == 2:
priority_color = 0x99cc33
elif priority == 3:
priority_color = 0xffcc00
elif priority == 4:
priority_color = 0xff9966
elif priority == 5:
priority_color = 0xcc3300
else:
priority_color = 0xffcc00 priority_color = 0xffcc00
return priority_mapping, priority_color return priority_color
def get_yaml_config():
config = import_config()
config['np_1'] = config.get('np_1', '1, 2, 3')
config['np_2'] = config.get('np_2', '4,5')
config['np_3'] = config.get('np_3', '6,7')
config['np_4'] = config.get('np_4', '8,9')
config['np_5'] = config.get('np_5', '10, 11, 12')
config['targets'] = config.get('targets', 'ntfy, discord')
config['excluded_rules'] = config.get('excluded_rules', '')
config['excluded_agents'] = config.get('excluded_agents', '')
config['sender'] = 'Wazuh (IDS)'
config['click'] = 'https://wazuh.org'
return config
def get_arguments(): def get_arguments():
@ -155,8 +186,15 @@ def get_arguments():
-v, --view show config. -v, --view show config.
""" """
url: str
sender: str
destination: str
message: str
priority: int
tags: str
click: str
url, sender, destination, message, priority, tags, click = "", "", "", "", "", "", "" url, sender, destination, message, priority, tags, click = "", "", "", "", 0, "", ""
argument_list: list = sys.argv[1:] argument_list: list = sys.argv[1:]
@ -181,28 +219,83 @@ def get_arguments():
exit() exit()
elif current_argument in ("-u", "--url"): elif current_argument in ("-u", "--url"):
url = current_value url: str = current_value
elif current_argument in ("-s", "--sender"): elif current_argument in ("-s", "--sender"):
sender = current_value sender: str = current_value
elif current_argument in ("-d", "--destination"): elif current_argument in ("-d", "--destination"):
destination = current_value destination: str = current_value
elif current_argument in ("-p", "--priority"): elif current_argument in ("-p", "--priority"):
priority = current_value priority: int = current_value
elif current_argument in ("-m", "--message"): elif current_argument in ("-m", "--message"):
message = current_value message: str = current_value
elif current_argument in ("-t", "--tags"): elif current_argument in ("-t", "--tags"):
tags = current_value tags: str = current_value
elif current_argument in ("-c", "--click"): elif current_argument in ("-c", "--click"):
click = current_value click: str = current_value
except getopt.error as err: except getopt.error as err:
# output error, and return with an error code # output error, and return with an error code
print(str(err)) print(str(err))
return url, sender, destination, message, priority, tags, click return url, sender, destination, message, priority, tags, click
def load_message(argv):
# get alert from stdin
input_str: str = ""
for line in sys.stdin:
input_str: str = line
break
data: json = json.loads(input_str)
if data.get("command") == "add":
return data
else:
# todo fix error message
sys.exit(1)
def parameters_deconstruct(argv, event_keys):
config: dict = get_config()
a_id: str = str(event_keys["agent"]["id"])
a_name: str = str(event_keys["agent"]["name"])
e_id: str = str(event_keys["rule"]["id"])
e_description: str = str(event_keys["rule"]["description"])
e_level: str = str(event_keys["rule"]["level"])
e_fired_times: str = str(event_keys["rule"]["firedtimes"])
e_full_event: str = str(json.dumps(event_keys, indent=4).replace('"', '')
.replace('{', '')
.replace('}', '')
.replace('[', '')
.replace(']', '')
)
if e_id not in config["excluded_rules"] or a_id not in config["excluded_agents"]:
parameters: dict = dict(a_id=a_id, a_name=a_name, e_id=e_id, e_description=e_description, e_level=e_level,
e_fired_times=e_fired_times, e_full_event=e_full_event)
return parameters
def construct_basic_message(argv, accent: str, a_id: str, a_name: str, e_id: str, e_description: str, e_level: str,
e_fired_times: str) -> str:
# Adding the BOLD text string to the Discord message. Ntfy has a different message format.
basic_message: str = ("--message " + '"' +
accent + "Agent: " + accent + a_name + " (" + a_id + ")" + "\n" +
accent + "Event id: " + accent + e_id + "\n" +
accent + "Description: " + accent + e_description + "\n" +
accent + "Threat level: " + accent + e_level + "\n" +
# Watch this last addition to the string. It should include the closing quote for the
# basic_message string. It must be closed by -> '"'. This will be done outside this function
# in order to enable another specific addition (event_full_message) in the calling procedure.
accent + "Times fired: " + accent + e_fired_times + "\n")
return basic_message