from dataclasses import dataclass from typing import List, Dict, Optional from enum import Enum import time class ProbeAttempt(Enum): SYSTEM_INFO = "system_info_probe" ERROR_FISHING = "error_fishing" PERMISSION_PROBE = "permission_probe" CREDENTIAL_HARVEST = "credential_harvest" @dataclass class SecurityValidation: attempt_pattern: List[str] error_requests: int info_requests: int permission_changes: int session_duration: float location_changes: int class SecurityValidator: def __init__(self): self.probe_patterns = { ProbeAttempt.SYSTEM_INFO: [ 'version_check', 'system_query', 'config_request', 'directory_list' ], ProbeAttempt.ERROR_FISHING: [ 'error_detail_request', 'debug_info_request', 'stack_trace_query', 'log_access' ], ProbeAttempt.PERMISSION_PROBE: [ 'privilege_test', 'access_attempt', 'role_query', 'sudo_probe' ] } self.session_history = {} self.suspicious_patterns = [] async def validate_request(self, user_id: str, request_type: str, context: Dict) -> Dict: """Validate if a request might be part of a probing attempt""" # Update session history self._update_session(user_id, request_type, context) # Get session metrics session = self.session_history.get(user_id, {}) validation = SecurityValidation( attempt_pattern=session.get('pattern', []), error_requests=session.get('error_count', 0), info_requests=session.get('info_requests', 0), permission_changes=session.get('permission_attempts', 0), session_duration=time.time() - session.get('start_time', time.time()), location_changes=session.get('location_changes', 0) ) # Analyze for potential probing analysis = self._analyze_validation(validation) # Generate safe response return self._generate_safe_response(analysis, request_type) def _update_session(self, user_id: str, request_type: str, context: Dict): """Update session history with new request""" if user_id not in self.session_history: self.session_history[user_id] = { 'start_time': time.time(), 'pattern': [], 'error_count': 0, 'info_requests': 0, 'permission_attempts': 0, 'location_changes': 0, 'last_location': None } session = self.session_history[user_id] session['pattern'].append(request_type) # Update metrics based on request type if 'error' in request_type.lower(): session['error_count'] += 1 if self._is_info_request(request_type): session['info_requests'] += 1 if self._is_permission_request(request_type): session['permission_attempts'] += 1 if context.get('location') != session['last_location']: session['location_changes'] += 1 session['last_location'] = context.get('location') def _analyze_validation(self, validation: SecurityValidation) -> Dict: """Analyze validation metrics for suspicious patterns""" risk_factors = { 'rapid_errors': validation.error_requests > 5, 'excessive_info': validation.info_requests > 10, 'permission_abuse': validation.permission_changes > 3, 'location_hopping': validation.location_changes > 2, 'short_session': validation.session_duration < 60, 'pattern_match': self._check_probe_patterns(validation.attempt_pattern) } risk_score = sum(risk_factors.values()) / len(risk_factors) return { 'risk_score': risk_score, 'risk_factors': risk_factors, 'probe_detected': any(risk_factors.values()), 'recommendation': self._get_security_recommendation(risk_factors) } def _generate_safe_response(self, analysis: Dict, request_type: str) -> Dict: """Generate a safe response that doesn't leak system information""" if analysis['probe_detected']: return { 'status': 'error', 'message': 'Request cannot be processed at this time.', 'action': 'block', 'retry_allowed': False } safe_responses = { 'error': { 'status': 'error', 'message': 'An error occurred. Please contact support.', 'code': 'ERR_GENERIC' }, 'permission': { 'status': 'denied', 'message': 'Access denied.', 'code': 'ACCESS_DENIED' }, 'info': { 'status': 'limited', 'message': 'Limited information available.', 'code': 'INFO_LIMITED' } } response_type = 'error' for key in safe_responses: if key in request_type.lower(): response_type = key break return safe_responses[response_type] def _check_probe_patterns(self, pattern: List[str]) -> bool: """Check if the request pattern matches known probe attempts""" for probe_type, probe_patterns in self.probe_patterns.items(): matches = sum(1 for p in probe_patterns if any(p in req for req in pattern)) if matches >= 2: # If two or more patterns match return True return False def _get_security_recommendation(self, risk_factors: Dict) -> str: """Get security recommendations based on risk factors""" if risk_factors['rapid_errors'] or risk_factors['excessive_info']: return "implement_rate_limiting" elif risk_factors['permission_abuse']: return "lock_permissions" elif risk_factors['location_hopping']: return "geo_restriction" elif risk_factors['pattern_match']: return "block_ip" return "monitor" def _is_info_request(self, request_type: str) -> bool: """Check if request is attempting to gather system information""" info_keywords = ['version', 'config', 'system', 'info', 'status', 'list'] return any(keyword in request_type.lower() for keyword in info_keywords) def _is_permission_request(self, request_type: str) -> bool: """Check if request is related to permissions""" permission_keywords = ['permission', 'access', 'auth', 'sudo', 'admin'] return any(keyword in request_type.lower() for keyword in permission_keywords)