#!/usr/bin/env python3
"""
Script de Teste de Segurança para SGSE
Simula diferentes tipos de ataques para validar o sistema de segurança
Autor: Sistema de Testes Automatizados
Data: 2024
Atualizado: 2026 - Suporte a bloqueio automático configurável
Nota: 'engenharia_social' foi removido do sistema conforme atualização de segurança
"""
import requests
import time
import random
import string
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Tuple
import json
from datetime import datetime
from urllib.parse import urljoin
class Colors:
"""Códigos de cores para terminal"""
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
class SegurancaTeste:
"""Classe principal para testes de segurança"""
def __init__(self, base_url: str = "http://localhost:5173",
convex_url: str = "http://127.0.0.1:3210"):
self.base_url = base_url
self.convex_url = convex_url
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'SGSE-Security-Test-Client/1.0',
'Accept': 'application/json',
'Content-Type': 'application/json'
})
self.resultados = {
'brute_force': {'sucesso': 0, 'falhas': 0, 'detectado': False, 'bloqueado_automatico': False},
'sql_injection': {'sucesso': 0, 'falhas': 0, 'detectado': False, 'bloqueado_automatico': False},
'xss': {'sucesso': 0, 'falhas': 0, 'detectado': False, 'bloqueado_automatico': False},
'ddos': {'sucesso': 0, 'falhas': 0, 'detectado': False, 'bloqueado_automatico': False},
'path_traversal': {'sucesso': 0, 'falhas': 0, 'detectado': False, 'bloqueado_automatico': False},
'command_injection': {'sucesso': 0, 'falhas': 0, 'detectado': False, 'bloqueado_automatico': False},
'nosql_injection': {'sucesso': 0, 'falhas': 0, 'detectado': False, 'bloqueado_automatico': False},
'xxe': {'sucesso': 0, 'falhas': 0, 'detectado': False, 'bloqueado_automatico': False},
}
def log(self, tipo: str, mensagem: str, cor: str = Colors.OKCYAN):
"""Log formatado"""
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"{cor}[{timestamp}] [{tipo}] {mensagem}{Colors.ENDC}")
def verificar_bloqueio_automatico(self, response: requests.Response) -> bool:
"""
Verifica se a resposta indica bloqueio automático
Retorna True se bloqueio automático foi aplicado
"""
if response.status_code == 403:
try:
resp_json = response.json()
# Verificar flags de bloqueio automático
if (resp_json.get('bloqueado') or
resp_json.get('reason') == 'ataque_detectado' or
resp_json.get('bloqueadoAutomatico')):
return True
except (json.JSONDecodeError, ValueError):
# Se não for JSON, verificar no texto
if 'bloqueado automaticamente' in response.text.lower() or 'ataque_detectado' in response.text.lower():
return True
return False
def testar_brute_force(self, email: str = "test@example.com",
tentativas: int = 10) -> bool:
"""
Testa ataque de força bruta tentando múltiplas senhas
Espera-se que o sistema bloqueie após 5 tentativas
"""
self.log("BRUTE_FORCE", f"Iniciando teste de força bruta ({tentativas} tentativas)...")
senhas_comuns = [
"123456", "password", "12345678", "qwerty", "abc123",
"1234567", "letmein", "trustno1", "dragon", "baseball",
"iloveyou", "master", "sunshine", "ashley", "bailey"
]
endpoint = f"{self.base_url}/api/auth/sign-in/email"
endpoint_analyze = urljoin(self.convex_url if self.convex_url.endswith('/') else self.convex_url + '/', "http/security/analyze")
bloqueado = False
ip_origem = f"203.0.113.{random.randint(10, 250)}"
for i, senha in enumerate(senhas_comuns[:tentativas], 1):
try:
payload = {
"email": email,
"password": senha
}
response = self.session.post(
endpoint,
json=payload,
headers={"X-Forwarded-For": ip_origem},
timeout=5,
allow_redirects=False
)
# Verificar se foi bloqueado
if response.status_code == 429: # Too Many Requests
self.log("BRUTE_FORCE",
f"✅ DETECTADO! Bloqueio após {i} tentativas (429)",
Colors.OKGREEN)
bloqueado = True
self.resultados['brute_force']['detectado'] = True
break
if response.status_code == 403: # Forbidden
bloqueio_auto = self.verificar_bloqueio_automatico(response)
if bloqueio_auto:
self.log("BRUTE_FORCE",
f"✅ BLOQUEIO AUTOMÁTICO! Ataque detectado e bloqueado após {i} tentativas (403)",
Colors.OKGREEN)
self.resultados['brute_force']['bloqueado_automatico'] = True
else:
self.log("BRUTE_FORCE",
f"✅ DETECTADO! Acesso negado após {i} tentativas (403)",
Colors.OKGREEN)
bloqueado = True
self.resultados['brute_force']['detectado'] = True
break
# Verificar rate limiting nos headers
if 'X-RateLimit-Remaining' in response.headers:
remaining = response.headers['X-RateLimit-Remaining']
if remaining == '0':
self.log("BRUTE_FORCE",
f"✅ DETECTADO! Rate limit atingido após {i} tentativas",
Colors.OKGREEN)
bloqueado = True
self.resultados['brute_force']['detectado'] = True
break
if i % 5 == 0:
self.log("BRUTE_FORCE", f"Tentativa {i}/{tentativas}...")
# Pequeno delay para não sobrecarregar
time.sleep(0.5)
except requests.exceptions.RequestException as e:
self.log("BRUTE_FORCE", f"Erro na requisição {i}: {str(e)}", Colors.WARNING)
self.resultados['brute_force']['falhas'] += 1
if not bloqueado:
# Registrar tentativa de brute force no analisador para validar detecção no backend
try:
mark = "multiple failed login; brute force password guess"
r2 = self.session.post(
endpoint_analyze,
data=mark,
headers={
"Content-Type": "text/plain",
"X-Test-Scenario": "brute_force",
"X-Forwarded-For": ip_origem
}
)
if r2.status_code == 200:
jd = r2.json()
if jd.get("ataqueDetectado") and jd.get("tipoAtaque") == "brute_force":
if jd.get("bloqueadoAutomatico"):
self.log("BRUTE_FORCE", "✅ BLOQUEIO AUTOMÁTICO (analisador) mesmo sem 429/403", Colors.OKGREEN)
self.resultados['brute_force']['bloqueado_automatico'] = True
else:
self.log("BRUTE_FORCE", "✅ DETECTADO (analisador) mesmo sem 429/403", Colors.OKGREEN)
self.resultados['brute_force']['detectado'] = True
else:
self.log("BRUTE_FORCE", f"⚠️ AVISO: Nenhum bloqueio detectado após {tentativas} tentativas", Colors.WARNING)
elif r2.status_code == 403:
if self.verificar_bloqueio_automatico(r2):
self.log("BRUTE_FORCE", "✅ BLOQUEIO AUTOMÁTICO (analisador retornou 403)", Colors.OKGREEN)
self.resultados['brute_force']['bloqueado_automatico'] = True
self.resultados['brute_force']['detectado'] = True
else:
self.log("BRUTE_FORCE", f"⚠️ AVISO: analisador retornou {r2.status_code}", Colors.WARNING)
except Exception as e:
self.log("BRUTE_FORCE", f"⚠️ AVISO: falha ao chamar analisador: {e}", Colors.WARNING)
return bloqueado
def testar_sql_injection(self) -> bool:
"""
Testa ataques de SQL Injection em campos de entrada
Agora verifica bloqueio automático configurável
"""
self.log("SQL_INJECTION", "Iniciando testes de SQL Injection...")
payloads_sql = [
"' OR '1'='1",
"' OR '1'='1' --",
"' OR '1'='1' /*",
"admin'--",
"admin'/*",
"' UNION SELECT NULL--",
"' UNION SELECT NULL, NULL--",
"'; DROP TABLE usuarios--",
"' OR 1=1#",
"' OR 'a'='a",
"1' AND '1'='1",
"1' OR '1'='1",
"admin' OR '1'='1",
"' OR 1=1 --",
"') OR ('1'='1",
]
endpoint_login = f"{self.base_url}/api/auth/sign-in/email"
endpoint_analyze = urljoin(self.convex_url if self.convex_url.endswith('/') else self.convex_url + '/', "http/security/analyze?dst=127.0.0.1&proto=http")
detectado = False
for payload in payloads_sql:
ip_origem = f"203.0.113.{random.randint(10, 250)}"
try:
# Teste no campo email
response = self.session.post(
endpoint_login,
json={"email": payload, "password": "test"},
headers={"X-Forwarded-For": ip_origem},
timeout=5,
allow_redirects=False
)
# Verificar se houve erro específico de segurança
if response.status_code in [400, 403, 422]:
if 'sql' in response.text.lower() or 'injection' in response.text.lower():
self.log("SQL_INJECTION",
f"✅ DETECTADO! Payload: {payload[:30]}...",
Colors.OKGREEN)
detectado = True
self.resultados['sql_injection']['detectado'] = True
# Verificar se há bloqueio automático ou WAF bloqueando
if response.status_code == 403:
bloqueio_auto = self.verificar_bloqueio_automatico(response)
if bloqueio_auto:
self.log("SQL_INJECTION",
f"✅ BLOQUEIO AUTOMÁTICO! Payload: {payload[:30]}...",
Colors.OKGREEN)
self.resultados['sql_injection']['bloqueado_automatico'] = True
else:
self.log("SQL_INJECTION",
f"✅ BLOQUEADO pelo WAF! Payload: {payload[:30]}...",
Colors.OKGREEN)
detectado = True
self.resultados['sql_injection']['detectado'] = True
time.sleep(0.3)
# Registrar via analisador HTTP para validar detecção no backend
try:
r2 = self.session.post(
endpoint_analyze,
data=payload,
headers={"Content-Type": "text/plain", "X-Forwarded-For": ip_origem}
)
if r2.status_code == 200:
jd = r2.json()
if jd.get("ataqueDetectado") and jd.get("tipoAtaque") == "sql_injection":
if jd.get("bloqueadoAutomatico"):
self.log("SQL_INJECTION", f"✅ BLOQUEIO AUTOMÁTICO (analisador)! Payload: {payload[:30]}...", Colors.OKGREEN)
self.resultados['sql_injection']['bloqueado_automatico'] = True
else:
self.log("SQL_INJECTION", f"✅ DETECTADO (analisador)! Payload: {payload[:30]}...", Colors.OKGREEN)
detectado = True
self.resultados['sql_injection']['detectado'] = True
elif r2.status_code == 403:
if self.verificar_bloqueio_automatico(r2):
self.log("SQL_INJECTION", f"✅ BLOQUEIO AUTOMÁTICO (analisador retornou 403)! Payload: {payload[:30]}...", Colors.OKGREEN)
self.resultados['sql_injection']['bloqueado_automatico'] = True
detectado = True
self.resultados['sql_injection']['detectado'] = True
except Exception:
pass
except requests.exceptions.RequestException as e:
self.log("SQL_INJECTION", f"Erro: {str(e)}", Colors.WARNING)
if not detectado:
self.log("SQL_INJECTION",
"⚠️ AVISO: Nenhum bloqueio específico de SQL Injection detectado",
Colors.WARNING)
return detectado
def testar_xss(self) -> bool:
"""
Testa ataques de Cross-Site Scripting (XSS)
Agora verifica bloqueio automático configurável
"""
self.log("XSS", "Iniciando testes de XSS...")
payloads_xss = [
"",
"
",
"