feat: implement alert configuration and recent report features

- Added alert configuration management for email and chat notifications, allowing users to set preferences for severity levels, attack types, and notification channels.
- Introduced functionality to save, edit, and delete alert configurations, enhancing user control over security notifications.
- Implemented a new query to list recent security reports, providing users with quick access to the latest security incidents.
- Enhanced the backend schema to support alert configurations and recent report tracking, improving overall security management capabilities.
This commit is contained in:
2025-11-16 07:37:36 -03:00
parent 88983ea297
commit 70d405d98d
5 changed files with 650 additions and 37 deletions

View File

@@ -78,6 +78,7 @@ class SegurancaTeste:
endpoint_analyze = urljoin(self.convex_url if self.convex_url.endswith('/') else self.convex_url + '/', "http/security/analyze")
bloqueado = False
ip_origem = f"203.0.113.{random.randint(10, 250)}"
for i, senha in enumerate(senhas_comuns[:tentativas], 1):
try:
payload = {
@@ -88,6 +89,7 @@ class SegurancaTeste:
response = self.session.post(
endpoint,
json=payload,
headers={"X-Forwarded-For": ip_origem},
timeout=5,
allow_redirects=False
)
@@ -134,7 +136,15 @@ class SegurancaTeste:
# Registrar tentativa de brute force no analisador para validar detecção no backend
try:
mark = "multiple failed login; brute force password guess"
r2 = self.session.post(endpoint_analyze, data=mark, headers={"Content-Type":"text/plain","X-Test-Scenario":"brute_force"})
r2 = self.session.post(
endpoint_analyze,
data=mark,
headers={
"Content-Type": "text/plain",
"X-Test-Scenario": "brute_force",
"X-Forwarded-For": ip_origem
}
)
if r2.status_code == 200:
jd = r2.json()
if jd.get("ataqueDetectado") and jd.get("tipoAtaque") == "brute_force":
@@ -174,15 +184,17 @@ class SegurancaTeste:
]
endpoint_login = f"{self.base_url}/api/auth/sign-in/email"
endpoint_analyze = urljoin(self.convex_url if self.convex_url.endswith('/') else self.convex_url + '/', "http/security/analyze")
endpoint_analyze = urljoin(self.convex_url if self.convex_url.endswith('/') else self.convex_url + '/', "http/security/analyze?dst=127.0.0.1&proto=http")
detectado = False
for payload in payloads_sql:
ip_origem = f"203.0.113.{random.randint(10, 250)}"
try:
# Teste no campo email
response = self.session.post(
endpoint_login,
json={"email": payload, "password": "test"},
headers={"X-Forwarded-For": ip_origem},
timeout=5,
allow_redirects=False
)
@@ -207,7 +219,11 @@ class SegurancaTeste:
time.sleep(0.3)
# Registrar via analisador HTTP para validar detecção no backend
try:
r2 = self.session.post(endpoint_analyze, data=payload, headers={"Content-Type":"text/plain"})
r2 = self.session.post(
endpoint_analyze,
data=payload,
headers={"Content-Type": "text/plain", "X-Forwarded-For": ip_origem}
)
if r2.status_code == 200:
jd = r2.json()
if jd.get("ataqueDetectado") and jd.get("tipoAtaque") == "sql_injection":
@@ -257,10 +273,12 @@ class SegurancaTeste:
for payload in payloads_xss:
try:
ip_origem = f"203.0.113.{random.randint(100, 200)}"
# Teste no campo email
response = self.session.post(
endpoint_login,
json={"email": payload, "password": "test"},
headers={"X-Forwarded-For": ip_origem},
timeout=5,
allow_redirects=False
)
@@ -284,7 +302,7 @@ class SegurancaTeste:
time.sleep(0.3)
# Registrar via analisador HTTP
try:
r2 = self.session.post(endpoint_analyze, data=payload, headers={"Content-Type":"text/html"})
r2 = self.session.post(endpoint_analyze, data=payload, headers={"Content-Type":"text/html","X-Forwarded-For": ip_origem})
if r2.status_code == 200:
jd = r2.json()
if jd.get("ataqueDetectado") and jd.get("tipoAtaque") == "xss":
@@ -414,15 +432,17 @@ class SegurancaTeste:
]
endpoint = f"{self.base_url}/api/auth/sign-in/email"
endpoint_analyze = urljoin(self.convex_url if self.convex_url.endswith('/') else self.convex_url + '/', "http/security/analyze")
endpoint_analyze = urljoin(self.convex_url if self.convex_url.endswith('/') else self.convex_url + '/', "http/security/analyze?dst=127.0.0.1&proto=http")
detectado = False
for payload in payloads_path:
try:
ip_origem = f"203.0.113.{random.randint(10, 250)}"
# Tentar em diferentes campos
response = self.session.post(
endpoint,
json={"email": payload, "password": "test"},
headers={"X-Forwarded-For": ip_origem},
timeout=5,
allow_redirects=False
)
@@ -437,7 +457,7 @@ class SegurancaTeste:
time.sleep(0.3)
# Registrar via analisador HTTP
try:
r2 = self.session.post(endpoint_analyze + f"?file={payload}")
r2 = self.session.post(endpoint_analyze + f"&file={payload}", headers={"X-Forwarded-For": ip_origem})
if r2.status_code == 200:
jd = r2.json()
if jd.get("ataqueDetectado") and jd.get("tipoAtaque") == "path_traversal":
@@ -476,14 +496,16 @@ class SegurancaTeste:
]
endpoint = f"{self.base_url}/api/auth/sign-in/email"
endpoint_analyze = urljoin(self.convex_url if self.convex_url.endswith('/') else self.convex_url + '/', "http/security/analyze")
endpoint_analyze = urljoin(self.convex_url if self.convex_url.endswith('/') else self.convex_url + '/', "http/security/analyze?dst=127.0.0.1&proto=http")
detectado = False
for payload in payloads_cmd:
try:
ip_origem = f"203.0.113.{random.randint(10, 250)}"
response = self.session.post(
endpoint,
json={"email": f"test{payload}@example.com", "password": "test"},
headers={"X-Forwarded-For": ip_origem},
timeout=5,
allow_redirects=False
)
@@ -498,7 +520,7 @@ class SegurancaTeste:
time.sleep(0.3)
# Registrar via analisador HTTP
try:
r2 = self.session.post(endpoint_analyze, data=payload, headers={"Content-Type":"text/plain"})
r2 = self.session.post(endpoint_analyze, data=payload, headers={"Content-Type":"text/plain","X-Forwarded-For": ip_origem})
if r2.status_code == 200:
jd = r2.json()
if jd.get("ataqueDetectado") and jd.get("tipoAtaque") == "command_injection":
@@ -538,6 +560,7 @@ class SegurancaTeste:
response = self.session.post(
endpoint,
json={"email": payload, "password": {"$ne": None}},
headers={"X-Forwarded-For": f"203.0.113.{random.randint(10, 250)}"},
timeout=5,
allow_redirects=False
)
@@ -574,7 +597,8 @@ class SegurancaTeste:
for payload in payloads_xxe:
try:
# Tentar enviar como XML
headers = {'Content-Type': 'application/xml'}
ip_origem = f"203.0.113.{random.randint(10, 250)}"
headers = {'Content-Type': 'application/xml', 'X-Forwarded-For': ip_origem}
response = self.session.post(
endpoint,
data=payload,
@@ -591,6 +615,18 @@ class SegurancaTeste:
self.resultados['xxe']['detectado'] = True
time.sleep(0.3)
# Registrar via analisador HTTP
try:
endpoint_analyze = urljoin(self.convex_url if self.convex_url.endswith('/') else self.convex_url + '/', "http/security/analyze?dst=127.0.0.1&proto=http")
r2 = self.session.post(endpoint_analyze, data=payload, headers={'Content-Type': 'application/xml', 'X-Forwarded-For': ip_origem})
if r2.status_code == 200:
jd = r2.json()
if jd.get("ataqueDetectado") and jd.get("tipoAtaque") == "xxe":
self.log("XXE", "✅ DETECTADO (analisador)!", Colors.OKGREEN)
detectado = True
self.resultados['xxe']['detectado'] = True
except Exception:
pass
except requests.exceptions.RequestException as e:
self.log("XXE", f"Erro: {str(e)}", Colors.WARNING)