import numpy as np from dataclasses import dataclass from typing import List, Dict, Optional from enum import Enum import time class BehaviorCategory(Enum): NORMAL = "normal" SUSPICIOUS = "suspicious" ERROR = "user_error" MALICIOUS = "malicious" @dataclass class UserActivity: user_id: str timestamp: float action_type: str resource_accessed: str access_pattern: Dict location_data: Dict device_info: Dict historical_pattern: bool class BehaviorAnalyzer: def __init__(self): self.user_baselines = {} self.error_patterns = { 'mistyped_commands': ['rm -rf /', 'del *.*', 'drop database'], 'common_mistakes': ['wrong_password_multiple', 'expired_credential', 'misconfigured_access'], 'permission_errors': ['access_denied', 'insufficient_privileges', 'role_mismatch'] } async def analyze_behavior(self, activity: UserActivity) -> Dict: """Analyze user behavior and determine if it's an error or threat""" # Get user's baseline behavior baseline = await self._get_user_baseline(activity.user_id) # Calculate behavior metrics metrics = { 'pattern_match': self._check_pattern_match(activity, baseline), 'time_analysis': self._analyze_time_patterns(activity, baseline), 'location_check': self._verify_location(activity, baseline), 'resource_access': self._analyze_resource_access(activity, baseline), 'error_indicators': self._check_error_patterns(activity) } # Determine behavior category category = self._categorize_behavior(metrics) return { 'category': category, 'confidence': self._calculate_confidence(metrics), 'metrics': metrics, 'recommended_action': self._get_recommended_action(category, metrics) } async def _get_user_baseline(self, user_id: str) -> Dict: """Retrieve or create user baseline behavior patterns""" if user_id not in self.user_baselines: # Initialize new user baseline self.user_baselines[user_id] = { 'common_locations': [], 'usual_hours': [], 'regular_resources': [], 'typical_patterns': [], 'device_history': [] } return self.user_baselines[user_id] def _check_pattern_match(self, activity: UserActivity, baseline: Dict) -> float: """Check if current activity matches user's normal patterns""" pattern_score = 0.0 # Check device familiarity if activity.device_info in baseline['device_history']: pattern_score += 0.3 # Check resource familiarity if activity.resource_accessed in baseline['regular_resources']: pattern_score += 0.3 # Check location familiarity if any(self._compare_locations(activity.location_data, loc) for loc in baseline['common_locations']): pattern_score += 0.4 return pattern_score def _analyze_time_patterns(self, activity: UserActivity, baseline: Dict) -> float: """Analyze if the activity timing matches user's normal patterns""" hour = time.localtime(activity.timestamp).tm_hour if hour in baseline['usual_hours']: return 1.0 elif abs(hour - np.mean(baseline['usual_hours'])) <= 2: return 0.7 return 0.3 def _verify_location(self, activity: UserActivity, baseline: Dict) -> float: """Verify if the access location is consistent with user's history""" for known_location in baseline['common_locations']: if self._compare_locations(activity.location_data, known_location): return 1.0 return 0.5 def _analyze_resource_access(self, activity: UserActivity, baseline: Dict) -> float: """Analyze if resource access is consistent with user's permissions and history""" if activity.resource_accessed in baseline['regular_resources']: return 1.0 return 0.4 def _check_error_patterns(self, activity: UserActivity) -> Dict: """Check for common error patterns in the activity""" error_scores = { 'command_error': self._check_command_errors(activity), 'permission_error': self._check_permission_errors(activity), 'configuration_error': self._check_configuration_errors(activity) } return error_scores def _check_command_errors(self, activity: UserActivity) -> float: """Check for common command errors""" for pattern in self.error_patterns['mistyped_commands']: if pattern in activity.action_type: return 0.9 return 0.1 def _check_permission_errors(self, activity: UserActivity) -> float: """Check for permission-related errors""" for pattern in self.error_patterns['permission_errors']: if pattern in activity.action_type: return 0.8 return 0.2 def _check_configuration_errors(self, activity: UserActivity) -> float: """Check for configuration-related errors""" for pattern in self.error_patterns['common_mistakes']: if pattern in activity.action_type: return 0.7 return 0.3 def _categorize_behavior(self, metrics: Dict) -> BehaviorCategory: """Categorize behavior based on analysis metrics""" pattern_score = metrics['pattern_match'] error_indicators = metrics['error_indicators'] if pattern_score > 0.8: return BehaviorCategory.NORMAL elif max(error_indicators.values()) > 0.7: return BehaviorCategory.ERROR elif pattern_score < 0.3: return BehaviorCategory.MALICIOUS else: return BehaviorCategory.SUSPICIOUS def _calculate_confidence(self, metrics: Dict) -> float: """Calculate confidence level of behavior categorization""" return np.mean([ metrics['pattern_match'], metrics['time_analysis'], metrics['location_check'], metrics['resource_access'], 1 - max(metrics['error_indicators'].values()) ]) def _get_recommended_action(self, category: BehaviorCategory, metrics: Dict) -> Dict: """Get recommended action based on behavior category""" if category == BehaviorCategory.NORMAL: return { 'action': 'allow', 'monitoring_level': 'routine', 'notification': None } elif category == BehaviorCategory.ERROR: return { 'action': 'assist', 'monitoring_level': 'elevated', 'notification': 'user_support' } elif category == BehaviorCategory.SUSPICIOUS: return { 'action': 'monitor', 'monitoring_level': 'high', 'notification': 'admin_alert' } else: # MALICIOUS return { 'action': 'block', 'monitoring_level': 'maximum', 'notification': 'security_team' } def _compare_locations(self, loc1: Dict, loc2: Dict) -> bool: """Compare two locations for similarity""" # Implement location comparison logic pass