#!/usr/bin/env python3 """ Script de Teste de Segurança para SGSE Simula diferentes tipos de ataques para validar o sistema de segurança Autor: Sistema de Testes Automatizados Data: 2024 Atualizado: 2026 - Suporte a bloqueio automático configurável Nota: 'engenharia_social' foi removido do sistema conforme atualização de segurança """ import requests import time import random import string from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Dict, Tuple import json from datetime import datetime from urllib.parse import urljoin class Colors: """Códigos de cores para terminal""" HEADER = '\033[95m' OKBLUE = '\033[94m' OKCYAN = '\033[96m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' class SegurancaTeste: """Classe principal para testes de segurança""" def __init__(self, base_url: str = "http://localhost:5173", convex_url: str = "http://127.0.0.1:3210"): self.base_url = base_url self.convex_url = convex_url self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'SGSE-Security-Test-Client/1.0', 'Accept': 'application/json', 'Content-Type': 'application/json' }) self.resultados = { 'brute_force': {'sucesso': 0, 'falhas': 0, 'detectado': False, 'bloqueado_automatico': False}, 'sql_injection': {'sucesso': 0, 'falhas': 0, 'detectado': False, 'bloqueado_automatico': False}, 'xss': {'sucesso': 0, 'falhas': 0, 'detectado': False, 'bloqueado_automatico': False}, 'ddos': {'sucesso': 0, 'falhas': 0, 'detectado': False, 'bloqueado_automatico': False}, 'path_traversal': {'sucesso': 0, 'falhas': 0, 'detectado': False, 'bloqueado_automatico': False}, 'command_injection': {'sucesso': 0, 'falhas': 0, 'detectado': False, 'bloqueado_automatico': False}, 'nosql_injection': {'sucesso': 0, 'falhas': 0, 'detectado': False, 'bloqueado_automatico': False}, 'xxe': {'sucesso': 0, 'falhas': 0, 'detectado': False, 'bloqueado_automatico': False}, } def log(self, tipo: str, mensagem: str, cor: str = Colors.OKCYAN): """Log formatado""" timestamp = datetime.now().strftime("%H:%M:%S") print(f"{cor}[{timestamp}] [{tipo}] {mensagem}{Colors.ENDC}") def verificar_bloqueio_automatico(self, response: requests.Response) -> bool: """ Verifica se a resposta indica bloqueio automático Retorna True se bloqueio automático foi aplicado """ if response.status_code == 403: try: resp_json = response.json() # Verificar flags de bloqueio automático if (resp_json.get('bloqueado') or resp_json.get('reason') == 'ataque_detectado' or resp_json.get('bloqueadoAutomatico')): return True except (json.JSONDecodeError, ValueError): # Se não for JSON, verificar no texto if 'bloqueado automaticamente' in response.text.lower() or 'ataque_detectado' in response.text.lower(): return True return False def testar_brute_force(self, email: str = "test@example.com", tentativas: int = 10) -> bool: """ Testa ataque de força bruta tentando múltiplas senhas Espera-se que o sistema bloqueie após 5 tentativas """ self.log("BRUTE_FORCE", f"Iniciando teste de força bruta ({tentativas} tentativas)...") senhas_comuns = [ "123456", "password", "12345678", "qwerty", "abc123", "1234567", "letmein", "trustno1", "dragon", "baseball", "iloveyou", "master", "sunshine", "ashley", "bailey" ] endpoint = f"{self.base_url}/api/auth/sign-in/email" endpoint_analyze = urljoin(self.convex_url if self.convex_url.endswith('/') else self.convex_url + '/', "http/security/analyze") bloqueado = False ip_origem = f"203.0.113.{random.randint(10, 250)}" for i, senha in enumerate(senhas_comuns[:tentativas], 1): try: payload = { "email": email, "password": senha } response = self.session.post( endpoint, json=payload, headers={"X-Forwarded-For": ip_origem}, timeout=5, allow_redirects=False ) # Verificar se foi bloqueado if response.status_code == 429: # Too Many Requests self.log("BRUTE_FORCE", f"✅ DETECTADO! Bloqueio após {i} tentativas (429)", Colors.OKGREEN) bloqueado = True self.resultados['brute_force']['detectado'] = True break if response.status_code == 403: # Forbidden bloqueio_auto = self.verificar_bloqueio_automatico(response) if bloqueio_auto: self.log("BRUTE_FORCE", f"✅ BLOQUEIO AUTOMÁTICO! Ataque detectado e bloqueado após {i} tentativas (403)", Colors.OKGREEN) self.resultados['brute_force']['bloqueado_automatico'] = True else: self.log("BRUTE_FORCE", f"✅ DETECTADO! Acesso negado após {i} tentativas (403)", Colors.OKGREEN) bloqueado = True self.resultados['brute_force']['detectado'] = True break # Verificar rate limiting nos headers if 'X-RateLimit-Remaining' in response.headers: remaining = response.headers['X-RateLimit-Remaining'] if remaining == '0': self.log("BRUTE_FORCE", f"✅ DETECTADO! Rate limit atingido após {i} tentativas", Colors.OKGREEN) bloqueado = True self.resultados['brute_force']['detectado'] = True break if i % 5 == 0: self.log("BRUTE_FORCE", f"Tentativa {i}/{tentativas}...") # Pequeno delay para não sobrecarregar time.sleep(0.5) except requests.exceptions.RequestException as e: self.log("BRUTE_FORCE", f"Erro na requisição {i}: {str(e)}", Colors.WARNING) self.resultados['brute_force']['falhas'] += 1 if not bloqueado: # Registrar tentativa de brute force no analisador para validar detecção no backend try: mark = "multiple failed login; brute force password guess" r2 = self.session.post( endpoint_analyze, data=mark, headers={ "Content-Type": "text/plain", "X-Test-Scenario": "brute_force", "X-Forwarded-For": ip_origem } ) if r2.status_code == 200: jd = r2.json() if jd.get("ataqueDetectado") and jd.get("tipoAtaque") == "brute_force": if jd.get("bloqueadoAutomatico"): self.log("BRUTE_FORCE", "✅ BLOQUEIO AUTOMÁTICO (analisador) mesmo sem 429/403", Colors.OKGREEN) self.resultados['brute_force']['bloqueado_automatico'] = True else: self.log("BRUTE_FORCE", "✅ DETECTADO (analisador) mesmo sem 429/403", Colors.OKGREEN) self.resultados['brute_force']['detectado'] = True else: self.log("BRUTE_FORCE", f"⚠️ AVISO: Nenhum bloqueio detectado após {tentativas} tentativas", Colors.WARNING) elif r2.status_code == 403: if self.verificar_bloqueio_automatico(r2): self.log("BRUTE_FORCE", "✅ BLOQUEIO AUTOMÁTICO (analisador retornou 403)", Colors.OKGREEN) self.resultados['brute_force']['bloqueado_automatico'] = True self.resultados['brute_force']['detectado'] = True else: self.log("BRUTE_FORCE", f"⚠️ AVISO: analisador retornou {r2.status_code}", Colors.WARNING) except Exception as e: self.log("BRUTE_FORCE", f"⚠️ AVISO: falha ao chamar analisador: {e}", Colors.WARNING) return bloqueado def testar_sql_injection(self) -> bool: """ Testa ataques de SQL Injection em campos de entrada Agora verifica bloqueio automático configurável """ self.log("SQL_INJECTION", "Iniciando testes de SQL Injection...") payloads_sql = [ "' OR '1'='1", "' OR '1'='1' --", "' OR '1'='1' /*", "admin'--", "admin'/*", "' UNION SELECT NULL--", "' UNION SELECT NULL, NULL--", "'; DROP TABLE usuarios--", "' OR 1=1#", "' OR 'a'='a", "1' AND '1'='1", "1' OR '1'='1", "admin' OR '1'='1", "' OR 1=1 --", "') OR ('1'='1", ] endpoint_login = f"{self.base_url}/api/auth/sign-in/email" endpoint_analyze = urljoin(self.convex_url if self.convex_url.endswith('/') else self.convex_url + '/', "http/security/analyze?dst=127.0.0.1&proto=http") detectado = False for payload in payloads_sql: ip_origem = f"203.0.113.{random.randint(10, 250)}" try: # Teste no campo email response = self.session.post( endpoint_login, json={"email": payload, "password": "test"}, headers={"X-Forwarded-For": ip_origem}, timeout=5, allow_redirects=False ) # Verificar se houve erro específico de segurança if response.status_code in [400, 403, 422]: if 'sql' in response.text.lower() or 'injection' in response.text.lower(): self.log("SQL_INJECTION", f"✅ DETECTADO! Payload: {payload[:30]}...", Colors.OKGREEN) detectado = True self.resultados['sql_injection']['detectado'] = True # Verificar se há bloqueio automático ou WAF bloqueando if response.status_code == 403: bloqueio_auto = self.verificar_bloqueio_automatico(response) if bloqueio_auto: self.log("SQL_INJECTION", f"✅ BLOQUEIO AUTOMÁTICO! Payload: {payload[:30]}...", Colors.OKGREEN) self.resultados['sql_injection']['bloqueado_automatico'] = True else: self.log("SQL_INJECTION", f"✅ BLOQUEADO pelo WAF! Payload: {payload[:30]}...", Colors.OKGREEN) detectado = True self.resultados['sql_injection']['detectado'] = True time.sleep(0.3) # Registrar via analisador HTTP para validar detecção no backend try: r2 = self.session.post( endpoint_analyze, data=payload, headers={"Content-Type": "text/plain", "X-Forwarded-For": ip_origem} ) if r2.status_code == 200: jd = r2.json() if jd.get("ataqueDetectado") and jd.get("tipoAtaque") == "sql_injection": if jd.get("bloqueadoAutomatico"): self.log("SQL_INJECTION", f"✅ BLOQUEIO AUTOMÁTICO (analisador)! Payload: {payload[:30]}...", Colors.OKGREEN) self.resultados['sql_injection']['bloqueado_automatico'] = True else: self.log("SQL_INJECTION", f"✅ DETECTADO (analisador)! Payload: {payload[:30]}...", Colors.OKGREEN) detectado = True self.resultados['sql_injection']['detectado'] = True elif r2.status_code == 403: if self.verificar_bloqueio_automatico(r2): self.log("SQL_INJECTION", f"✅ BLOQUEIO AUTOMÁTICO (analisador retornou 403)! Payload: {payload[:30]}...", Colors.OKGREEN) self.resultados['sql_injection']['bloqueado_automatico'] = True detectado = True self.resultados['sql_injection']['detectado'] = True except Exception: pass except requests.exceptions.RequestException as e: self.log("SQL_INJECTION", f"Erro: {str(e)}", Colors.WARNING) if not detectado: self.log("SQL_INJECTION", "⚠️ AVISO: Nenhum bloqueio específico de SQL Injection detectado", Colors.WARNING) return detectado def testar_xss(self) -> bool: """ Testa ataques de Cross-Site Scripting (XSS) Agora verifica bloqueio automático configurável """ self.log("XSS", "Iniciando testes de XSS...") payloads_xss = [ "", "", "", "javascript:alert('XSS')", "", "