diff --git a/.gitignore b/.gitignore index 8c3e24a..2e83a71 100644 --- a/.gitignore +++ b/.gitignore @@ -1,12 +1,6 @@ /Data/Arrest_Data_from_2020_to_Present.csv /Data/processed_arrest_data.csv -/app/server/server_data.db -/app/server/server_data.db-journal -/app/server/server_data.db-lock -/app/server/server_data.db-trace -/app/server/server_data.db-wal - /app/client/__pycache__/* /app/client/client_gui.pyc /app/client/client_gui.pyo @@ -21,10 +15,29 @@ /app/server/server_gui.pyw /app/server/server_gui.pyz - app/shared/__pycache__/protocol.cpython-312.pyc app/shared/__pycache__/constants.cpython-312.pyc app/shared/__pycache__/protocol.cpython-310.pyc app/shared/__pycache__/constants.cpython-310.pyc -app/server/server_data.db-shm + +# PostgreSQL and Docker +postgres_data/ + +# Migration logs +migration_*.log + +# Environment files +.env +.env.local + +# IDE files +.vscode/ +.idea/ +*.swp +*.swo + +# OS files +.DS_Store +Thumbs.db + .cursor/generalrule.mdc diff --git a/app/client/client_gui.py b/app/client/client_gui.py index 7955ec5..d0a7069 100644 --- a/app/client/client_gui.py +++ b/app/client/client_gui.py @@ -31,7 +31,7 @@ QDateEdit, QDoubleSpinBox, QStackedWidget, QListWidget, QListWidgetItem ) from PySide6.QtGui import QPixmap, QFont, QIcon, QPalette, QColor -from PySide6.QtCore import Qt, QTimer, Signal, Slot, QObject, QSettings, QDate +from PySide6.QtCore import Qt, QTimer, Signal, Slot, QObject, QSettings, QDate, QThread, QThreadPool, QRunnable # Import necessary for rendering Matplotlib figure in Qt from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg @@ -51,6 +51,29 @@ logger = logging.getLogger('client_gui') +class QueryResultProcessor(QRunnable): + """Worker thread for processing large query results""" + + def __init__(self, result_data, callback): + super().__init__() + self.result_data = result_data + self.callback = callback + self.setAutoDelete(True) + + def run(self): + """Process the query result in background thread""" + try: + processed_data = self.process_result(self.result_data) + self.callback(processed_data) # Call as function, not .emit + except Exception as e: + import traceback + self.callback({'error': f'{e}\n{traceback.format_exc()}'}) + + def process_result(self, result_data): + """Process the query result data""" + # Add any heavy processing here if needed + return result_data + # Bridge class to convert client callbacks to Qt signals class ClientCallbacksBridge(QObject): @@ -182,6 +205,10 @@ def __init__(self): self.callbacks_bridge = ClientCallbacksBridge() self.plot_dialog = None # To store reference to plot dialog self.tab_reset_handlers = {} # <--- ADD THIS LINE + + # Thread pool for async processing + self.thread_pool = QThreadPool() + self.thread_pool.setMaxThreadCount(4) # Limit concurrent threads # Load or default theme self.current_theme = self.settings.value("theme", "dark") # Default to dark @@ -478,7 +505,7 @@ def on_login_status_change(self, logged_in): self.update_query_params() # Fetch metadata AFTER successful login def on_query_result(self, result): - """Handle results received from the server""" + """Handle results received from the server with async processing for large results""" logger.info(f"Received query result: {type(result)}") # Check if the result is for metadata @@ -528,9 +555,19 @@ def on_query_result(self, result): # Check if the result contains data for the table (and no plot was handled) if 'data' in result: - # This block is now only reached if fig_plot was None/False - self.query_tab.display_results(result) - logger.info("Query data result received and sent to display.") + # Check if this is a large dataset that needs async processing + data_size = len(result.get('data', [])) + if data_size > 1000: # Process large datasets asynchronously + logger.info(f"Large dataset detected ({data_size} rows), processing asynchronously") + self.status_bar.showMessage("Processing large dataset...", 3000) + + # Create worker for async processing + worker = QueryResultProcessor(result, self.query_result_processed) + self.thread_pool.start(worker) + else: + # Small dataset, process immediately + self.query_tab.display_results(result) + logger.info("Query data result received and sent to display.") else: # Handle case where neither plot nor data is present logger.warning("Query result received with OK status but no data or plot.") @@ -1123,6 +1160,21 @@ def on_plot_label_clicked(self, label_clicked): # def on_plot_clicked(self, fig): # ... + @Slot(object) + def query_result_processed(self, processed_result): + """Called when async query result processing is complete""" + if 'error' in processed_result: + self.on_error(f"Error processing query result: {processed_result['error']}") + return + + try: + self.query_tab.display_results(processed_result) + self.status_bar.showMessage("Query results displayed", 3000) + logger.info("Large query result processed and displayed successfully") + except Exception as e: + logger.error(f"Error displaying processed query result: {e}", exc_info=True) + self.on_error(f"Error displaying query results: {e}") + if __name__ == "__main__": # Create application diff --git a/app/server/ClientHandler.py b/app/server/ClientHandler.py index 9d0edb0..fece4e0 100644 --- a/app/server/ClientHandler.py +++ b/app/server/ClientHandler.py @@ -2,7 +2,7 @@ import socket import queue import time -import sqlite3 +import psycopg2 import re import logging import os @@ -237,9 +237,9 @@ def handle_login(self, data): }) self.server.log_activity(f"Client logged in: {client_info['nickname']} ({client_info['email']})") - except sqlite3.OperationalError as sqlerr: + except psycopg2.OperationalError as sqlerr: error_msg = str(sqlerr) - logger.error(f"SQLite error during login: {error_msg}") + logger.error(f"PostgreSQL error during login: {error_msg}") if "no column named address" in error_msg: self.send_error("Login failed: Database schema needs to be updated. Please restart the server.") @@ -278,7 +278,7 @@ def handle_logout(self): self.send_error("Logout failed: not logged in") def handle_query(self, data): - """Handle a query request""" + """Handle a query request with async processing""" if not self.client_info or not self.session_id: self.send_error("Query failed: not logged in") return @@ -304,21 +304,17 @@ def handle_query(self, data): logger.info(f"Processing query {query_type_id} from {self.client_info['nickname']} with params: {parameters}") - # --- Process the query based on query_type_id --- + # --- Process the query asynchronously based on query_type_id --- result = {} - if query_type_id == 'query1': - result = self.data_processor.process_query1(parameters) - elif query_type_id == 'query2': - result = self.data_processor.process_query2(parameters) - elif query_type_id == 'query3': - result = self.data_processor.process_query3(parameters) # This one needs plot generation - elif query_type_id == 'query4': - result = self.data_processor.process_query4(parameters) + + # Use async processing for heavy queries + if query_type_id == 'query4': + # Query 4 (map generation) is the most resource-intensive + logger.info(f"Starting async processing for {query_type_id}") + result = self.data_processor.process_query_async(query_type_id, parameters) else: - # Handle unknown queryX type - logger.error(f"Unknown query type identifier received: {query_type_id}") - self.send_error(f"Query failed: Unknown query type identifier: {query_type_id}") - return + # For other queries, use regular processing but with timeout + result = self.data_processor.process_query_async(query_type_id, parameters) # --- Log the raw result from processor --- logger.info(f"HANDLE_QUERY: Result received from processor ({query_type_id}): {result.get('status')}, map_path_present={result.get('map_filepath') is not None}") diff --git a/app/server/data_processor.py b/app/server/data_processor.py index 85de364..7a5d4fc 100644 --- a/app/server/data_processor.py +++ b/app/server/data_processor.py @@ -13,6 +13,12 @@ import uuid import tempfile import json # for the geojson parsing +import threading +import time +import gc +import psutil +from concurrent.futures import ThreadPoolExecutor, TimeoutError +from functools import wraps from shared.constants import DESCENT_CODE_MAP, ARREST_TYPE_CODE_MAP from datetime import datetime @@ -21,6 +27,42 @@ logger = logging.getLogger('data_processor') +def timeout_handler(timeout_seconds=30): + """Decorator to add timeout to query processing""" + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + with ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(func, *args, **kwargs) + try: + return future.result(timeout=timeout_seconds) + except TimeoutError: + logger.error(f"Query {func.__name__} timed out after {timeout_seconds} seconds") + return {'status': 'error', 'message': f'Query timed out after {timeout_seconds} seconds'} + return wrapper + return decorator + +def memory_monitor(func): + """Decorator to monitor memory usage""" + @wraps(func) + def wrapper(*args, **kwargs): + process = psutil.Process() + mem_before = process.memory_info().rss / 1024 / 1024 # MB + logger.info(f"Memory before {func.__name__}: {mem_before:.1f} MB") + + try: + result = func(*args, **kwargs) + mem_after = process.memory_info().rss / 1024 / 1024 # MB + logger.info(f"Memory after {func.__name__}: {mem_after:.1f} MB (delta: {mem_after - mem_before:.1f} MB)") + return result + except Exception as e: + logger.error(f"Error in {func.__name__}: {e}") + raise + finally: + # Force garbage collection + gc.collect() + return wrapper + class DataProcessor: """Data processor for handling queries on the dataset""" @@ -28,14 +70,64 @@ def __init__(self, dataset_path='Data/processed_arrest_data.csv'): """Initialize data processor with dataset""" self.dataset_path = dataset_path self.df = None + self._df_lock = threading.Lock() # Thread safety for DataFrame access + self._query_executor = ThreadPoolExecutor(max_workers=4) # Limit concurrent queries + self._temp_files = set() # Track temporary files for cleanup + self._temp_files_lock = threading.Lock() # Thread safety for temp files self.load_data() def load_data(self): - """Load dataset""" + """Load dataset with optimizations""" try: - self.df = pd.read_csv(self.dataset_path, low_memory=False) + # Load only essential columns first + essential_cols = ['Arrest Date', 'Age', 'Arrest Hour', 'Arrest Type Code', + 'Area Name', 'Address', 'Charge Group Description', + 'Descent Code', 'Sex Code', 'Report ID'] + + # Check if coordinate columns exist + if os.path.exists(self.dataset_path): + sample_df = pd.read_csv(self.dataset_path, nrows=1) + if 'LAT' in sample_df.columns and 'LON' in sample_df.columns: + essential_cols.extend(['LAT', 'LON']) + if 'Location_GeoJSON' in sample_df.columns: + essential_cols.append('Location_GeoJSON') + + # Load data without strict dtype conversions first + self.df = pd.read_csv( + self.dataset_path, + low_memory=False, + usecols=essential_cols, + parse_dates=['Arrest Date'] + ) + print(f"Dataset loaded with {len(self.df)} rows and {len(self.df.columns)} columns") + # Now apply safe dtype optimizations + try: + # Only convert to int8 if the data is actually integer-like + if 'Age' in self.df.columns: + # Check if Age column can be safely converted to int + age_series = pd.to_numeric(self.df['Age'], errors='coerce') + if age_series.notna().all(): + self.df['Age'] = age_series.astype('int8') + + if 'Arrest Hour' in self.df.columns: + # Check if Arrest Hour column can be safely converted to int + hour_series = pd.to_numeric(self.df['Arrest Hour'], errors='coerce') + if hour_series.notna().all(): + self.df['Arrest Hour'] = hour_series.astype('int8') + + # Convert categorical columns safely + categorical_cols = ['Arrest Type Code', 'Area Name', 'Charge Group Description', + 'Descent Code', 'Sex Code'] + for col in categorical_cols: + if col in self.df.columns: + self.df[col] = self.df[col].astype('category') + + except Exception as dtype_error: + print(f"Warning: Could not apply all dtype optimizations: {dtype_error}") + # Continue with original dtypes if optimization fails + # Convert date columns to datetime if they exist for col in self.df.columns: if 'Date' in col and self.df[col].dtype == 'object': @@ -74,6 +166,48 @@ def parse_geojson_str(geojson_str): print(f"Error loading dataset: {e}") return False + def get_filtered_dataframe(self, filters=None): + """Get a filtered copy of the DataFrame with memory optimization""" + with self._df_lock: + if filters is None: + return self.df.copy() + + # Apply filters efficiently + mask = pd.Series([True] * len(self.df), index=self.df.index) + + for column, condition in filters.items(): + if column in self.df.columns: + if isinstance(condition, dict): + if 'min' in condition: + mask &= (self.df[column] >= condition['min']) + if 'max' in condition: + mask &= (self.df[column] <= condition['max']) + if 'values' in condition: + mask &= self.df[column].isin(condition['values']) + else: + mask &= (self.df[column] == condition) + + return self.df[mask].copy() + + def process_query_async(self, query_type, parameters=None): + """Process a query asynchronously with timeout""" + if self.df is None: + return {'status': 'error', 'message': 'Dataset not loaded'} + + if parameters is None: + parameters = {} + + try: + # Submit query to thread pool + future = self._query_executor.submit(self.process_query, query_type, parameters) + return future.result(timeout=60) # 60 second timeout + except TimeoutError: + logger.error(f"Query {query_type} timed out") + return {'status': 'error', 'message': 'Query timed out'} + except Exception as e: + logger.error(f"Error processing query {query_type}: {e}", exc_info=True) + return {'status': 'error', 'message': f'Error processing query: {str(e)}'} + def process_query(self, query_type, parameters=None): """ Process a query based on query type and parameters @@ -88,7 +222,17 @@ def process_query(self, query_type, parameters=None): parameters = {} try: - if query_type == 'age_distribution': + # Map 'query1', 'query2', etc. to the correct methods + if query_type == 'query1': + return self.process_query1(parameters) + elif query_type == 'query2': + return self.process_query2(parameters) + elif query_type == 'query3': + return self.process_query3(parameters) + elif query_type == 'query4': + return self.process_query4(parameters) + # Legacy/other query types + elif query_type == 'age_distribution': return self.get_age_distribution(parameters) elif query_type == 'top_charge_groups': return self.get_top_charge_groups(parameters) @@ -118,65 +262,74 @@ def process_query(self, query_type, parameters=None): 'traceback': traceback.format_exc() } + def _safe_to_str(self, df, columns): + """Convert specified columns to string if they are categorical (inplace).""" + for col in columns: + if col in df.columns and pd.api.types.is_categorical_dtype(df[col]): + df[col] = df[col].astype(str) + return df + + def _df_to_records_and_headers(self, df): + # Convert all columns to string and return records and headers + df = df.copy() + for col in df.columns: + df[col] = df[col].astype(str) + return df.to_dict(orient='records'), list(df.columns) + def get_age_distribution(self, parameters): - """Get age distribution of arrested individuals""" age_counts = self.df['Age'].value_counts().sort_index().reset_index() age_counts.columns = ['Age', 'Count'] - + data, headers = self._df_to_records_and_headers(age_counts) fig, ax = plt.subplots(figsize=(10, 6)) sns.histplot(self.df['Age'], bins=30, kde=True, ax=ax) ax.set_title('Age Distribution of Arrested Individuals') ax.set_xlabel('Age') ax.set_ylabel('Count') ax.grid(True) - return { - 'status': 'ok', - 'data': age_counts, + 'status': 'OK', + 'data': data, + 'headers': headers, 'figure': fig, 'title': 'Age Distribution of Arrested Individuals' } def get_top_charge_groups(self, parameters): - """Get top charge groups by frequency""" n = parameters.get('n', 10) n = int(n) - top_charges = self.df['Charge Group Description'].value_counts().head(n).reset_index() top_charges.columns = ['Charge Group', 'Count'] - + data, headers = self._df_to_records_and_headers(top_charges) fig, ax = plt.subplots(figsize=(12, 8)) sns.barplot(x='Count', y='Charge Group', data=top_charges, ax=ax) ax.set_title(f'Top {n} Charge Groups') ax.set_xlabel('Count') ax.set_ylabel('Charge Group') ax.grid(True, axis='x') - return { - 'status': 'ok', - 'data': top_charges, + 'status': 'OK', + 'data': data, + 'headers': headers, 'figure': fig, 'title': f'Top {n} Charge Groups' } def get_arrests_by_area(self, parameters): - """Get arrests by geographic area""" n = parameters.get('n', 15) n = int(n) - area_counts = self.df['Area Name'].value_counts().head(n).reset_index() area_counts.columns = ['Area', 'Count'] - + data, headers = self._df_to_records_and_headers(area_counts) fig, ax = plt.subplots(figsize=(12, 8)) sns.barplot(x='Count', y='Area', data=area_counts, ax=ax) ax.set_title(f'Top {n} Areas by Number of Arrests') ax.set_xlabel('Number of Arrests') ax.set_ylabel('Area') ax.grid(True, axis='x') - return { - 'status': 'ok', - 'data': area_counts, + 'status': 'OK', + 'data': data, + 'headers': headers, 'figure': fig, 'title': f'Top {n} Areas by Number of Arrests' } @@ -649,8 +802,7 @@ def process_query1(self, params): result_df['Arrest Date'] = result_df['Arrest Date'].dt.strftime('%Y-%m-%d') # Format date for display # Prepare results - data = result_df.to_dict(orient='records') - headers = output_cols + data, headers = self._df_to_records_and_headers(result_df) return {'status': 'OK', 'data': data, 'headers': headers, 'title': f'Arrests in {area}'} except KeyError as ke: @@ -704,8 +856,7 @@ def process_query2(self, params): trend_data['Date'] = trend_data['Date'].dt.strftime('%Y') # Prepare results - data = trend_data.to_dict(orient='records') - headers = ['Date', 'Arrest Count'] + data, headers = self._df_to_records_and_headers(trend_data) title = f'Trend for {charge_group} ({granularity.capitalize()})' return {'status': 'OK', 'data': data, 'headers': headers, 'title': title} @@ -717,114 +868,89 @@ def process_query2(self, params): logger.error(f"Error processing Query 2: {e}", exc_info=True) return {'status': 'error', 'message': f"Error processing query: {e}"} - def process_query3(self, params): - """ - Query 3: Demografische Analyse van Arrestaties (Graph) - params: {'sex_codes': list[str], 'descent_codes': list[str], 'charge_group': str | None, 'arrest_type_code': str | None, 'generate_plot': bool} - """ logger.info(f"Processing Query 3 with params: {params}") if self.df.empty: return {'status': 'error', 'message': 'Dataset not loaded'} - try: sex_codes = params['sex_codes'] descent_codes = params['descent_codes'] - charge_group = params.get('charge_group') # Optional - arrest_type_code = params.get('arrest_type_code') # <-- ADDED: Get arrest_type_code + charge_group = params.get('charge_group') + arrest_type_code = params.get('arrest_type_code') # Filter data filtered_df = self.df[ self.df['Sex Code'].isin(sex_codes) & self.df['Descent Code'].isin(descent_codes) ] - if charge_group: filtered_df = filtered_df[filtered_df['Charge Group Description'] == charge_group] - - # --- ADDED: Filter by arrest_type_code if provided --- if arrest_type_code and 'Arrest Type Code' in filtered_df.columns: filtered_df = filtered_df[filtered_df['Arrest Type Code'] == arrest_type_code] - # ----------------------------------------------------- + + filtered_df = filtered_df.copy() + filtered_df['Descent Code'] = filtered_df['Descent Code'].astype(str) + filtered_df['Sex Code'] = filtered_df['Sex Code'].astype(str) if filtered_df.empty: - descent_names = [DESCENT_CODE_MAP.get(dc, dc) for dc in descent_codes] - sex_names = ["Male" if sc == 'M' else "Female" if sc == 'F' else sc for sc in sex_codes] - title = f'Arrests by Descent ({", ".join(descent_names)}) and Sex ({", ".join(sex_names)}) - No Data' - if charge_group: - title += f' for {charge_group}' - # --- ADDED: Append arrest type to "No Data" title --- - if arrest_type_code: - arrest_type_desc = ARREST_TYPE_CODE_MAP.get(arrest_type_code, arrest_type_code) - title += f' (Type: {arrest_type_desc})' - # ------------------------------------------------- - return {'status': 'OK', 'data': [], 'headers': [], 'plot': None, 'title': title} - - # --- NEW Plotting Logic --- - - # 1. Calculate counts grouped by Descent and Sex + descent_names = [DESCENT_CODE_MAP.get(dc, dc) for dc in descent_codes] + sex_names = ["Male" if sc == 'M' else "Female" if sc == 'F' else sc for sc in sex_codes] + title = f'Arrests by Descent ({", ".join(descent_names)}) and Sex ({", ".join(sex_names)}) - No Data' + if charge_group: + title += f' for {charge_group}' + if arrest_type_code: + arrest_type_desc = ARREST_TYPE_CODE_MAP.get(arrest_type_code, arrest_type_code) + title += f' (Type: {arrest_type_desc})' + return {'status': 'OK', 'data': [], 'headers': [], 'plot': None, 'title': title} + summary_data = filtered_df.groupby(['Descent Code', 'Sex Code']).size().reset_index(name='Count') - - # 2. Map Descent Code to Description - summary_data['Descent'] = summary_data['Descent Code'].map(DESCENT_CODE_MAP) - # Handle any codes not in the map (though get_unique_descent_codes should have description) - summary_data['Descent'] = summary_data['Descent'].fillna(summary_data['Descent Code'].apply(lambda x: f"Unknown ({x})")) - - # 3. Create the plot using object-oriented approach + for col in summary_data.columns: + summary_data[col] = summary_data[col].astype(str) + summary_data['Descent'] = summary_data['Descent Code'].map(DESCENT_CODE_MAP).fillna( + summary_data['Descent Code'].apply(lambda x: f"Unknown ({x})")) + summary_data['Descent'] = summary_data['Descent'].astype(str) + + # Ensure all combinations are present + all_descents = sorted(set(summary_data['Descent'])) + all_sexes = sorted(set(summary_data['Sex Code'])) + import itertools + full_index = pd.DataFrame(list(itertools.product(all_descents, all_sexes)), columns=['Descent', 'Sex Code']) + summary_data = pd.merge(full_index, summary_data, on=['Descent', 'Sex Code'], how='left').fillna({'Count': 0}) + summary_data['Count'] = summary_data['Count'].astype(int) + fig, ax = plt.subplots(figsize=(12, 7)) - plot_title = 'Arrests by Descent' if len(sex_codes) == 1: - sex_name = "Male" if sex_codes[0] == 'M' else "Female" if sex_codes[0] == 'F' else sex_codes[0] - plot_title += f' ({sex_name})' - # Plot single bars using the created axes object `ax` - # Assign x to hue and hide legend to satisfy future seaborn requirements - sns.barplot(data=summary_data, x='Descent', y='Count', hue='Descent', palette='viridis', ax=ax, legend=False) - else: # Both sexes selected - plot_title += ' (Male vs Female)' - # Plot grouped bars using hue and the created axes object `ax` - sns.barplot(data=summary_data, x='Descent', y='Count', hue='Sex Code', palette='coolwarm', ax=ax) - ax.legend(title='Sex Code') - - # Add charge group to title if specified + sex_name = "Male" if sex_codes[0] == 'M' else "Female" if sex_codes[0] == 'F' else sex_codes[0] + plot_title += f' ({sex_name})' + sns.barplot(data=summary_data, x='Descent', y='Count', hue='Descent', palette='viridis', ax=ax, legend=False) + else: + plot_title += ' (Male vs Female)' + sns.barplot(data=summary_data, x='Descent', y='Count', hue='Sex Code', palette='coolwarm', ax=ax) + ax.legend(title='Sex Code') if charge_group: - plot_title += f'\nCharge Group: {charge_group}' - - # --- ADDED: Append arrest type to plot_title --- + plot_title += f'\nCharge Group: {charge_group}' if arrest_type_code: arrest_type_desc = ARREST_TYPE_CODE_MAP.get(arrest_type_code, arrest_type_code) plot_title += f'\nArrest Type: {arrest_type_desc}' - # ---------------------------------------------- - ax.set_title(plot_title) ax.set_xlabel('Descent') ax.set_ylabel('Number of Arrests') - # Use ax.tick_params for label rotation - ax.tick_params(axis='x', rotation=45, labelsize='medium') - # Set horizontal alignment manually if needed after rotation + ax.tick_params(axis='x', rotation=45, labelsize='medium') plt.setp(ax.get_xticklabels(), ha="right", rotation_mode="anchor") ax.grid(True, axis='y', linestyle='--', alpha=0.7) - fig.tight_layout() # Call tight_layout on the figure object - - # --- End Plotting Logic --- - - # # Get the figure object << No longer needed, we have `fig` - # fig = plt.gcf() - - # Prepare return data (return figure object, not bytes) - data_for_table = summary_data.to_dict(orient='records') - headers_for_table = ['Descent', 'Sex Code', 'Count'] + fig.tight_layout() + data_for_table, headers_for_table = self._df_to_records_and_headers(summary_data) return { 'status': 'OK', 'data': data_for_table, 'headers': headers_for_table, - 'plot': fig, # Return the figure object + 'plot': fig, 'title': plot_title - } - + } except KeyError as ke: - logger.error(f"Query 3 failed - Missing column: {ke}") - return {'status': 'error', 'message': f"Query failed: Server data missing expected column '{ke}'."} + logger.error(f"Query 3 failed - Missing column: {ke}") + return {'status': 'error', 'message': f"Query failed: Server data missing expected column '{ke}'."} except Exception as e: logger.error(f"Error processing Query 3: {e}", exc_info=True) return {'status': 'error', 'message': f"Error processing query: {e}"} @@ -842,6 +968,8 @@ def _haversine(self, lat1, lon1, lat2, lon2): r = 6371 # Radius of Earth in kilometers return c * r + @timeout_handler(120) # 2 minute timeout for map generation + @memory_monitor def process_query4(self, params): """ Query 4: Geografische Hotspots van Arrestaties @@ -861,7 +989,18 @@ def process_query4(self, params): end_date = pd.to_datetime(params['end_date']).replace(hour=23, minute=59, second=59) arrest_type_code = params.get('arrest_type_code') - df_working = self.df.copy() # Work with a copy + # Use filtered DataFrame to reduce memory usage + filters = { + 'Arrest Date': {'min': start_date, 'max': end_date} + } + if arrest_type_code and 'Arrest Type Code' in self.df.columns: + filters['Arrest Type Code'] = {'values': [arrest_type_code]} + + df_working = self.get_filtered_dataframe(filters) + + if df_working.empty: + final_title = f'Arrests within {radius_km}km of ({center_lat:.4f}, {center_lon:.4f}) (No Results)' + return {'status': 'OK', 'data': [], 'headers': [], 'map_filepath': None, 'title': final_title} has_geojson_col = 'Location_GeoJSON_Parsed' in df_working.columns @@ -883,25 +1022,25 @@ def process_query4(self, params): df_working['extracted_LAT'] = pd.to_numeric(df_working['extracted_LAT'], errors='coerce') df_working['extracted_LON'] = pd.to_numeric(df_working['extracted_LON'], errors='coerce') - # --- End Coordinate Extraction --- - + + # Remove rows with invalid coordinates + df_working = df_working.dropna(subset=['extracted_LAT', 'extracted_LON']) + + if df_working.empty: + final_title = f'Arrests within {radius_km}km of ({center_lat:.4f}, {center_lon:.4f}) (No Results)' + return {'status': 'OK', 'data': [], 'headers': [], 'map_filepath': None, 'title': final_title} - # (Keep the efficient bounding box + precise radius filter) + # --- Efficient bounding box + precise radius filter --- lat_degrees_delta = radius_km / 111.0 lon_degrees_delta = radius_km / (111.0 * np.cos(np.radians(center_lat))) min_lat, max_lat = center_lat - lat_degrees_delta, center_lat + lat_degrees_delta min_lon, max_lon = center_lon - lon_degrees_delta, center_lon + lon_degrees_delta df_filtered = df_working[ - (df_working['Arrest Date'] >= start_date) & - (df_working['Arrest Date'] <= end_date) & df_working['extracted_LAT'].notna() & df_working['extracted_LON'].notna() & (df_working['extracted_LAT'] >= min_lat) & (df_working['extracted_LAT'] <= max_lat) & (df_working['extracted_LON'] >= min_lon) & (df_working['extracted_LON'] <= max_lon) ] - - if arrest_type_code and 'Arrest Type Code' in df_filtered.columns: - df_filtered = df_filtered[df_filtered['Arrest Type Code'] == arrest_type_code] map_filepath_abs = None # Initialize if not df_filtered.empty: @@ -949,6 +1088,9 @@ def process_query4(self, params): map_filepath_abs = os.path.abspath(os.path.join(system_temp_dir, map_basename)) m.save(map_filepath_abs) + # Track the temporary file for cleanup + self.add_temp_file(map_filepath_abs) + logger.info(f"Query 4: Folium map with {len(df_to_plot)} markers saved to {map_filepath_abs}.") except Exception as map_err: @@ -975,8 +1117,7 @@ def process_query4(self, params): result_df = df_filtered[output_cols].sort_values(by='distance_km') if 'Arrest Date' in result_df.columns: result_df['Arrest Date'] = result_df['Arrest Date'].dt.strftime('%Y-%m-%d') if 'distance_km' in result_df.columns: result_df['distance_km'] = result_df['distance_km'].round(2) - data = result_df.to_dict(orient='records') - headers = output_cols + data, headers = self._df_to_records_and_headers(result_df) logger.info(f"PROCESSOR_QUERY4: Returning map_filepath: {map_filepath_abs}") @@ -987,4 +1128,30 @@ def process_query4(self, params): return {'status': 'error', 'message': f"Query failed: Missing expected parameter '{ke}' or data column."} except Exception as e: logger.error(f"Error processing Query 4: {e}", exc_info=True) - return {'status': 'error', 'message': f"Error processing query: {e}"} \ No newline at end of file + return {'status': 'error', 'message': f"Error processing query: {e}"} + + def cleanup_temp_files(self): + """Clean up temporary files created by this processor""" + with self._temp_files_lock: + for temp_file in self._temp_files.copy(): + try: + if os.path.exists(temp_file): + os.remove(temp_file) + logger.info(f"Cleaned up temporary file: {temp_file}") + except Exception as e: + logger.warning(f"Failed to clean up temporary file {temp_file}: {e}") + self._temp_files.clear() + + def add_temp_file(self, filepath): + """Add a temporary file to the cleanup list""" + with self._temp_files_lock: + self._temp_files.add(filepath) + + def __del__(self): + """Cleanup when the processor is destroyed""" + try: + self.cleanup_temp_files() + if hasattr(self, '_query_executor'): + self._query_executor.shutdown(wait=False) + except Exception as e: + logger.error(f"Error during DataProcessor cleanup: {e}") \ No newline at end of file diff --git a/app/server/database.py b/app/server/database_postgres.py similarity index 51% rename from app/server/database.py rename to app/server/database_postgres.py index 6f02c84..89e6602 100644 --- a/app/server/database.py +++ b/app/server/database_postgres.py @@ -1,8 +1,7 @@ #!/usr/bin/env python3 -# Database module for the server +# PostgreSQL Database module for the server import os -import sqlite3 import datetime import pandas as pd import threading @@ -10,27 +9,33 @@ import logging import contextlib import json +import psycopg2 +from psycopg2 import pool +from psycopg2.extras import RealDictCursor +from sqlalchemy import create_engine, text +from sqlalchemy.pool import QueuePool logger = logging.getLogger(__name__) -class Database: - """Thread-safe database using a connection pool for storing client info and query history""" + +class DatabasePostgres: + """Thread-safe PostgreSQL database using connection pooling for storing client info and query history""" - def __init__(self, db_path='app/server/server_data.db', pool_size=10): - """Initialize database pool and create tables if they don't exist""" - self.db_path = db_path + def __init__(self, host='localhost', port=5432, database='arrest_data', + user='arrest_user', password='arrest_password', pool_size=10): + """Initialize PostgreSQL database pool and create tables if they don't exist""" + self.host = host + self.port = port + self.database = database + self.user = user + self.password = password self.pool_size = pool_size - self._pool = queue.Queue(maxsize=pool_size) - self._closed = False # Flag to indicate pool shutdown - + self._closed = False + # Lock for initializing/closing the pool safely self._init_lock = threading.Lock() - - os.makedirs(os.path.dirname(db_path), exist_ok=True) + # Initialize connection pool self._initialize_pool() - - # Perform database migration if needed (using a pooled connection) - self.migrate_database() # Ensure tables exist (using a pooled connection) try: @@ -45,186 +50,165 @@ def __init__(self, db_path='app/server/server_data.db', pool_size=10): self.return_connection(conn) def _initialize_pool(self): - """Populate the connection pool.""" + """Initialize PostgreSQL connection pool.""" with self._init_lock: if self._closed: - raise RuntimeError("Database pool is closed.") - logger.info(f"Initializing database connection pool (size {self.pool_size})...") - for _ in range(self.pool_size): - try: - # Connections in pool must allow sharing across threads - conn = sqlite3.connect(self.db_path, check_same_thread=False) - conn.execute("PRAGMA foreign_keys = ON") - conn.row_factory = sqlite3.Row - self._pool.put(conn) - except sqlite3.Error as e: - logger.error(f"Failed to create connection for pool: {e}", exc_info=True) - # Handle error appropriately - maybe raise, maybe try fewer connections? - raise RuntimeError(f"Failed to initialize database pool: {e}") from e - logger.info(f"Database connection pool initialized with {self._pool.qsize()} connections.") - - def migrate_database(self): - """Perform necessary database migrations""" - # Needs careful handling as pool might not be fully ready - conn = None - try: - # Temporarily get a raw connection for migration check before pool might be used - conn = sqlite3.connect(self.db_path) - cursor = conn.cursor() + raise RuntimeError("Database pool is closed.") - # Check if sessions table exists and has the old schema - cursor.execute("PRAGMA table_info(sessions)") - columns = cursor.fetchall() - - column_names = [col[1] for col in columns] if columns else [] - - # If sessions table exists but doesn't have 'address' column - if columns and 'address' not in column_names: - # Drop the old sessions table as we're changing the structure - # (another option would be to ALTER TABLE, but this is simpler for a new system) - cursor.execute("DROP TABLE IF EXISTS sessions") - conn.commit() - print("Migrated database: dropped old sessions table") + logger.info(f"Initializing PostgreSQL connection pool (size {self.pool_size})...") - conn.close() - except Exception as e: - logger.error(f"Error during database migration check: {e}") - if conn: - try: conn.close() # Ensure temporary connection is closed on error - except: pass - # Note: Actual table creation uses the pool via create_tables called from __init__ + try: + # Create connection pool + self._pool = psycopg2.pool.ThreadedConnectionPool( + minconn=1, + maxconn=self.pool_size, + host=self.host, + port=self.port, + database=self.database, + user=self.user, + password=self.password, + cursor_factory=RealDictCursor + ) + + logger.info(f"PostgreSQL connection pool initialized with {self.pool_size} connections.") + + except psycopg2.Error as e: + logger.error(f"Failed to create PostgreSQL connection pool: {e}", exc_info=True) + raise RuntimeError(f"Failed to initialize PostgreSQL connection pool: {e}") from e def get_connection(self): - """Get a connection from the pool. - Blocks until a connection is available, with a timeout. - """ + """Get a connection from the pool.""" if self._closed: raise RuntimeError("Database pool is closed.") + try: - # Wait up to 10 seconds for a connection - conn = self._pool.get(block=True, timeout=10) - logger.debug(f"Acquired DB connection {id(conn)} from pool (pool size: {self._pool.qsize()})") + conn = self._pool.getconn() + logger.debug(f"Acquired PostgreSQL connection {id(conn)} from pool") return conn - except queue.Empty: - logger.error("Timeout waiting for database connection from pool.") - # Depending on policy, could raise error or return None - raise TimeoutError("Timeout waiting for database connection from pool.") + except Exception as e: + logger.error(f"Error getting connection from pool: {e}", exc_info=True) + raise TimeoutError("Failed to get database connection from pool.") def return_connection(self, conn): """Return a connection to the pool.""" if conn is None: return + if self._closed: - # If pool is closed, just close the connection we got try: conn.close() except Exception as e: - logger.warning(f"Error closing connection {id(conn)} after pool shutdown: {e}", exc_info=True) + logger.warning(f"Error closing connection {id(conn)} after pool shutdown: {e}", exc_info=True) return + try: - # Use block=False with check to avoid waiting if pool is somehow full - # (shouldn't happen with correct usage but safer) - if not self._pool.full(): - self._pool.put(conn, block=False) - logger.debug(f"Returned DB connection {id(conn)} to pool (pool size: {self._pool.qsize()})") - else: - logger.warning(f"Attempted to return connection {id(conn)} to a full pool. Closing instead.") - try: - conn.close() - except Exception as e: - logger.error(f"Error closing connection {id(conn)} that couldn't be returned to full pool: {e}", exc_info=True) + self._pool.putconn(conn) + logger.debug(f"Returned PostgreSQL connection {id(conn)} to pool") except Exception as e: - logger.error(f"Error returning connection {id(conn)} to pool: {e}. Closing connection.", exc_info=True) - # Ensure connection is closed if putting back failed - try: - conn.close() - except Exception as close_e: - logger.error(f"Error closing connection {id(conn)} after failing to return to pool: {close_e}", exc_info=True) + logger.error(f"Error returning connection {id(conn)} to pool: {e}. Closing connection.", exc_info=True) + try: + conn.close() + except Exception as close_e: + logger.error(f"Error closing connection {id(conn)} after failing to return to pool: {close_e}", exc_info=True) def close_all_connections(self): """Close all connections in the pool and shut down the pool.""" with self._init_lock: if self._closed: - return # Already closed - logger.info(f"Closing all database connections in the pool ({self._pool.qsize()} connections estimated)...") + return # Already closed + + logger.info("Closing all PostgreSQL database connections in the pool...") self._closed = True - closed_count = 0 - while not self._pool.empty(): - try: - conn = self._pool.get_nowait() - conn.close() - closed_count += 1 - except queue.Empty: - break # Pool is empty - except Exception as e: - logger.error(f"Error closing connection during pool shutdown: {e}", exc_info=True) - logger.info(f"Database connection pool shutdown complete. Closed {closed_count} connections.") + + try: + self._pool.closeall() + logger.info("PostgreSQL connection pool shutdown complete.") + except Exception as e: + logger.error(f"Error during PostgreSQL pool shutdown: {e}", exc_info=True) @contextlib.contextmanager def get_connection_context(self): - """Context manager for getting and returning a connection.""" - conn = None - try: - conn = self.get_connection() - yield conn - finally: - if conn: - self.return_connection(conn) + """Context manager for getting and returning a connection.""" + conn = None + try: + conn = self.get_connection() + yield conn + finally: + if conn: + self.return_connection(conn) def create_tables(self, conn): """Create tables if they don't exist using a provided connection.""" - # This method now expects a connection to be passed in - # It's called from __init__ which handles getting/returning the connection cursor = conn.cursor() + # Set search path to arrest_data schema + cursor.execute("SET search_path TO arrest_data, public;") + + # Create clients table cursor.execute(''' CREATE TABLE IF NOT EXISTS clients ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT NOT NULL, - nickname TEXT NOT NULL UNIQUE, - email TEXT NOT NULL UNIQUE, - password TEXT NOT NULL, - registration_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP + id SERIAL PRIMARY KEY, + name VARCHAR(255) NOT NULL, + nickname VARCHAR(100) NOT NULL UNIQUE, + email VARCHAR(255) NOT NULL UNIQUE, + password VARCHAR(255) NOT NULL, + registration_date TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP ) ''') + # Create sessions table cursor.execute(''' CREATE TABLE IF NOT EXISTS sessions ( - id INTEGER PRIMARY KEY AUTOINCREMENT, + id SERIAL PRIMARY KEY, client_id INTEGER NOT NULL, - address TEXT NOT NULL, - start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - end_time TIMESTAMP, - FOREIGN KEY (client_id) REFERENCES clients (id) + address VARCHAR(45) NOT NULL, + start_time TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + end_time TIMESTAMP WITH TIME ZONE, + FOREIGN KEY (client_id) REFERENCES clients (id) ON DELETE CASCADE ) ''') + # Create queries table cursor.execute(''' CREATE TABLE IF NOT EXISTS queries ( - id INTEGER PRIMARY KEY AUTOINCREMENT, + id SERIAL PRIMARY KEY, client_id INTEGER NOT NULL, session_id INTEGER NOT NULL, - query_type TEXT NOT NULL, + query_type VARCHAR(100) NOT NULL, parameters TEXT, - timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (client_id) REFERENCES clients (id), - FOREIGN KEY (session_id) REFERENCES sessions (id) + timestamp TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (client_id) REFERENCES clients (id) ON DELETE CASCADE, + FOREIGN KEY (session_id) REFERENCES sessions (id) ON DELETE CASCADE ) ''') + # Create messages table cursor.execute(''' CREATE TABLE IF NOT EXISTS messages ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - sender_type TEXT NOT NULL, + id SERIAL PRIMARY KEY, + sender_type VARCHAR(50) NOT NULL, sender_id INTEGER NOT NULL, - recipient_type TEXT NOT NULL, + recipient_type VARCHAR(50) NOT NULL, recipient_id INTEGER NOT NULL, message TEXT NOT NULL, - timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - read INTEGER DEFAULT 0 + timestamp TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + read BOOLEAN DEFAULT FALSE ) ''') + # Create indexes for better performance + cursor.execute("CREATE INDEX IF NOT EXISTS idx_clients_email ON clients (email)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_clients_nickname ON clients (nickname)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_sessions_client_id ON sessions (client_id)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_sessions_start_time ON sessions (start_time)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_queries_client_id ON queries (client_id)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_queries_session_id ON queries (session_id)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_queries_timestamp ON queries (timestamp)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_queries_query_type ON queries (query_type)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_messages_recipient ON messages (recipient_type, recipient_id)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_messages_read ON messages (read)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_messages_timestamp ON messages (timestamp)") + conn.commit() def register_client(self, name, nickname, email, password): @@ -232,62 +216,62 @@ def register_client(self, name, nickname, email, password): with self.get_connection_context() as conn: cursor = conn.cursor() try: - cursor.execute("SELECT * FROM clients WHERE nickname = ? OR email = ?", (nickname, email)) + cursor.execute("SELECT * FROM clients WHERE nickname = %s OR email = %s", (nickname, email)) if cursor.fetchone(): return False + cursor.execute( - "INSERT INTO clients (name, nickname, email, password) VALUES (?, ?, ?, ?)", + "INSERT INTO clients (name, nickname, email, password) VALUES (%s, %s, %s, %s)", (name, nickname, email, password) ) conn.commit() return True - except sqlite3.Error as e: - logger.error(f"DB Error registering client {nickname}: {e}", exc_info=True) + except psycopg2.Error as e: + logger.error(f"PostgreSQL Error registering client {nickname}: {e}", exc_info=True) conn.rollback() - raise # Re-raise the database error + raise def check_login(self, email, password): """Check login credentials and return client info if valid""" with self.get_connection_context() as conn: cursor = conn.cursor() - cursor.execute("SELECT * FROM clients WHERE email = ? AND password = ?", (email, password)) + cursor.execute("SELECT * FROM clients WHERE email = %s AND password = %s", (email, password)) row = cursor.fetchone() if row: - # Convert row to dict before connection is returned return dict(row) else: return None def start_session(self, client_id, address): """Start a new session for a client and return its ID and start time.""" - start_time = datetime.datetime.now(datetime.timezone.utc).isoformat() # Record start time + start_time = datetime.datetime.now(datetime.timezone.utc).isoformat() session_id = None + with self.get_connection_context() as conn: cursor = conn.cursor() try: cursor.execute( - "INSERT INTO sessions (client_id, address, start_time) VALUES (?, ?, ?)", + "INSERT INTO sessions (client_id, address, start_time) VALUES (%s, %s, %s) RETURNING id", (client_id, address, start_time) ) - session_id = cursor.lastrowid + session_id = cursor.fetchone()['id'] conn.commit() - except sqlite3.Error as e: - logger.error(f"DB Error starting session for client {client_id}: {e}", exc_info=True) - conn.rollback() - raise # Re-raise + except psycopg2.Error as e: + logger.error(f"PostgreSQL Error starting session for client {client_id}: {e}", exc_info=True) + conn.rollback() + raise if session_id: - return {'id': session_id, 'start_time': start_time} + return {'id': session_id, 'start_time': start_time} else: - # Handle case where insert somehow failed to return an ID - raise Exception(f"Failed to obtain session ID for client {client_id} after insert.") + raise Exception(f"Failed to obtain session ID for client {client_id} after insert.") def end_session(self, session_id): """End a session""" with self.get_connection_context() as conn: cursor = conn.cursor() cursor.execute( - "UPDATE sessions SET end_time = CURRENT_TIMESTAMP WHERE id = ?", + "UPDATE sessions SET end_time = CURRENT_TIMESTAMP WHERE id = %s", (session_id,) ) conn.commit() @@ -298,45 +282,45 @@ def log_query(self, client_id, session_id, query_type, parameters=None): with self.get_connection_context() as conn: cursor = conn.cursor() cursor.execute( - "INSERT INTO queries (client_id, session_id, query_type, parameters) VALUES (?, ?, ?, ?)", + "INSERT INTO queries (client_id, session_id, query_type, parameters) VALUES (%s, %s, %s, %s) RETURNING id", (client_id, session_id, query_type, parameters_str) ) + query_id = cursor.fetchone()['id'] conn.commit() - return cursor.lastrowid + return query_id def add_message(self, sender_type, sender_id, recipient_type, recipient_id, message): """Add a message to the database""" with self.get_connection_context() as conn: cursor = conn.cursor() cursor.execute( - "INSERT INTO messages (sender_type, sender_id, recipient_type, recipient_id, message) VALUES (?, ?, ?, ?, ?)", + "INSERT INTO messages (sender_type, sender_id, recipient_type, recipient_id, message) VALUES (%s, %s, %s, %s, %s) RETURNING id", (sender_type, sender_id, recipient_type, recipient_id, message) ) + message_id = cursor.fetchone()['id'] conn.commit() - return cursor.lastrowid + return message_id def get_client_by_id(self, client_id): """Get client information by ID""" with self.get_connection_context() as conn: cursor = conn.cursor() - cursor.execute("SELECT * FROM clients WHERE id = ?", (client_id,)) + cursor.execute("SELECT * FROM clients WHERE id = %s", (client_id,)) row = cursor.fetchone() return dict(row) if row else None def get_all_clients(self): """Get all registered clients including their last login time.""" try: - # Use the context manager to ensure connection is returned with self.get_connection_context() as conn: cursor = conn.cursor() - # Query to get all clients with their registration date, last login, and total queries query = """ SELECT c.id, c.nickname, - c.registration_date as registration_date, -- Renamed for clarity - (SELECT MAX(s.start_time) FROM sessions s WHERE s.client_id = c.id) as last_seen, -- Get latest login time + c.registration_date, + (SELECT MAX(s.start_time) FROM sessions s WHERE s.client_id = c.id) as last_seen, (SELECT COUNT(*) FROM queries q WHERE q.client_id = c.id) as total_queries FROM clients c ORDER BY c.registration_date DESC; @@ -344,15 +328,11 @@ def get_all_clients(self): cursor.execute(query) clients = cursor.fetchall() - - # Convert rows to dicts before connection context closes return [dict(row) for row in clients] - except sqlite3.Error as sql_err: - # Catch specific database errors - logger.error(f"Database error fetching all clients: {sql_err}", exc_info=True) - return [] + except psycopg2.Error as sql_err: + logger.error(f"PostgreSQL error fetching all clients: {sql_err}", exc_info=True) + return [] except Exception as e: - # Catch any other unexpected errors (like TimeoutError from get_connection_context) logger.error(f"Error fetching all clients: {e}", exc_info=True) return [] @@ -361,7 +341,7 @@ def get_client_queries(self, client_id): with self.get_connection_context() as conn: cursor = conn.cursor() cursor.execute( - "SELECT * FROM queries WHERE client_id = ? ORDER BY timestamp DESC", + "SELECT * FROM queries WHERE client_id = %s ORDER BY timestamp DESC", (client_id,) ) return [dict(row) for row in cursor.fetchall()] @@ -379,7 +359,7 @@ def get_client_by_nickname(self, nickname): """Get client by nickname""" with self.get_connection_context() as conn: cursor = conn.cursor() - cursor.execute("SELECT * FROM clients WHERE nickname = ?", (nickname,)) + cursor.execute("SELECT * FROM clients WHERE nickname = %s", (nickname,)) row = cursor.fetchone() return dict(row) if row else None @@ -402,9 +382,9 @@ def get_messages_for_client(self, client_id): cursor.execute(""" SELECT id, sender_type, sender_id, message, timestamp FROM messages - WHERE ((recipient_type = 'client' AND recipient_id = ?) + WHERE ((recipient_type = 'client' AND recipient_id = %s) OR recipient_type = 'all') - AND read = 0 + AND read = FALSE ORDER BY timestamp DESC """, (client_id,)) return [dict(row) for row in cursor.fetchall()] @@ -413,7 +393,7 @@ def mark_message_as_read(self, message_id): """Mark a message as read""" with self.get_connection_context() as conn: cursor = conn.cursor() - cursor.execute("UPDATE messages SET read = 1 WHERE id = ?", (message_id,)) + cursor.execute("UPDATE messages SET read = TRUE WHERE id = %s", (message_id,)) conn.commit() def get_client_details_by_id(self, client_id): @@ -424,62 +404,55 @@ def get_client_details_by_id(self, client_id): cursor = conn.cursor() # 1. Get basic client info - cursor.execute("SELECT id, name, nickname, email, registration_date FROM clients WHERE id = ?", (client_id,)) + cursor.execute("SELECT id, name, nickname, email, registration_date FROM clients WHERE id = %s", (client_id,)) client_row = cursor.fetchone() if not client_row: logger.warning(f"No client found with ID {client_id} for details.") - return None # Return None if client doesn't exist - details = dict(zip([col[0] for col in cursor.description], client_row)) + return None + details = dict(client_row) - cursor.execute("SELECT MAX(start_time) FROM sessions WHERE client_id = ?", (client_id,)) + cursor.execute("SELECT MAX(start_time) FROM sessions WHERE client_id = %s", (client_id,)) last_login_result = cursor.fetchone() - details['last_login'] = last_login_result[0] if last_login_result else None + details['last_login'] = last_login_result['max'] if last_login_result and last_login_result['max'] else None # 2. Get query statistics cursor.execute(""" SELECT query_type, COUNT(*) as count FROM queries - WHERE client_id = ? + WHERE client_id = %s GROUP BY query_type ORDER BY count DESC """, (client_id,)) query_stats = cursor.fetchall() - details['query_stats'] = [ - dict(zip([col[0] for col in cursor.description], row)) - for row in query_stats - ] + details['query_stats'] = [dict(row) for row in query_stats] # 3. Get recent session history (e.g., last 10 sessions) cursor.execute(""" SELECT id, start_time, end_time, address, - (strftime('%s', end_time) - strftime('%s', start_time)) as duration_seconds + EXTRACT(EPOCH FROM (end_time - start_time)) as duration_seconds FROM sessions - WHERE client_id = ? + WHERE client_id = %s ORDER BY start_time DESC LIMIT 10 """, (client_id,)) session_history = cursor.fetchall() - details['session_history'] = [ - dict(zip([col[0] for col in cursor.description], row)) - for row in session_history - ] + details['session_history'] = [dict(row) for row in session_history] logger.info(f"Successfully fetched details for client ID {client_id}") return details - except sqlite3.Error as sql_err: - logger.error(f"Database error fetching details for client ID {client_id}: {sql_err}", exc_info=True) - # Return partial details if basic info was fetched? - return details # Return whatever was fetched, might be just basic info or empty + except psycopg2.Error as sql_err: + logger.error(f"PostgreSQL error fetching details for client ID {client_id}: {sql_err}", exc_info=True) + return details except Exception as e: logger.error(f"Unexpected error fetching details for client ID {client_id}: {e}", exc_info=True) - return details # Return whatever was fetched + return details def get_daily_query_counts(self): """Get daily counts for each query type.""" query = """ SELECT - date(timestamp) as query_date, + DATE(timestamp) as query_date, query_type, COUNT(*) as count FROM queries @@ -491,9 +464,9 @@ def get_daily_query_counts(self): cursor = conn.cursor() cursor.execute(query) results = cursor.fetchall() - return [dict(zip([col[0] for col in cursor.description], row)) for row in results] - except sqlite3.Error as sql_err: - logger.error(f"Database error fetching daily query counts: {sql_err}", exc_info=True) + return [dict(row) for row in results] + except psycopg2.Error as sql_err: + logger.error(f"PostgreSQL error fetching daily query counts: {sql_err}", exc_info=True) return [] except Exception as e: logger.error(f"Unexpected error fetching daily query counts: {e}", exc_info=True) diff --git a/app/server/server.py b/app/server/server.py index 2ece006..b4ccfd2 100644 --- a/app/server/server.py +++ b/app/server/server.py @@ -19,7 +19,7 @@ from shared.protocol import Message # Import server modules -from .database import Database +from .database_postgres import DatabasePostgres from .data_processor import DataProcessor # --- Configure logging to file --- @@ -47,7 +47,7 @@ def __init__(self, host=SERVER_HOST, port=SERVER_PORT): self.running = False self.clients = [] self.clients_lock = threading.Lock() # Lock for clients list - self.db = Database() + self.db = DatabasePostgres() self.data_processor = DataProcessor() # Activity log for the server (for the GUI) @@ -58,6 +58,10 @@ def __init__(self, host=SERVER_HOST, port=SERVER_PORT): self.on_activity_log = None self.on_client_list_update = None self.on_all_clients_update = None # Callback for new registrations + + # Cleanup timer + self.cleanup_timer = None + self.last_cleanup_time = time.time() def start(self): """Start the server""" @@ -66,8 +70,8 @@ def start(self): if getattr(self.db, '_closed', False): try: logger.info("Reinitializing database pool after previous stop()") - # Re-create the Database instance (uses same db_path & pool_size) - self.db = Database(self.db.db_path, self.db.pool_size) + # Re-create the Database instance + self.db = DatabasePostgres() except Exception as e: logger.error(f"Failed to reinitialize database pool: {e}", exc_info=True) self.log_activity(f"Error reinitializing database: {e}") @@ -89,6 +93,11 @@ def start(self): accept_thread.daemon = True accept_thread.start() + # Start cleanup thread + cleanup_thread = threading.Thread(target=self.periodic_cleanup) + cleanup_thread.daemon = True + cleanup_thread.start() + return True except Exception as e: logger.error(f"Error starting server: {e}") @@ -380,6 +389,41 @@ def notify_query_processed(self, client_id): if self.on_all_clients_update: self.on_all_clients_update() # Reuse existing callback + def periodic_cleanup(self): + """Periodic cleanup of resources""" + while self.running: + try: + time.sleep(300) # Run every 5 minutes + if not self.running: + break + + current_time = time.time() + if current_time - self.last_cleanup_time >= 300: # 5 minutes + logger.info("Running periodic cleanup...") + + # Clean up old temporary files + if hasattr(self.data_processor, 'cleanup_temp_files'): + self.data_processor.cleanup_temp_files() + + # Force garbage collection + import gc + gc.collect() + + # Log memory usage + try: + import psutil + process = psutil.Process() + memory_mb = process.memory_info().rss / 1024 / 1024 + logger.info(f"Server memory usage: {memory_mb:.1f} MB") + except ImportError: + pass # psutil not available + + self.last_cleanup_time = current_time + + except Exception as e: + logger.error(f"Error in periodic cleanup: {e}") + time.sleep(60) # Wait a minute before retrying + if __name__ == "__main__": # Start the server directly if run as a script diff --git a/app/shared/protocol.py b/app/shared/protocol.py index 68b60aa..3fdde8a 100644 --- a/app/shared/protocol.py +++ b/app/shared/protocol.py @@ -8,6 +8,8 @@ import base64 import logging import os +import gzip +import io # Get the logger for this module logger = logging.getLogger(__name__) # Use module-level logger @@ -21,9 +23,41 @@ def __init__(self, msg_type, data=None): self.data = data if data is not None else {} +def compress_data(data): + """Compress data using gzip if it's large enough""" + try: + # Serialize data to bytes + serialized = pickle.dumps(data, protocol=4) + + # Only compress if data is larger than 1MB + if len(serialized) > 1024 * 1024: + compressed = gzip.compress(serialized) + # Only use compression if it actually reduces size + if len(compressed) < len(serialized): + return compressed, True + + return serialized, False + except Exception as e: + logger.error(f"Error compressing data: {e}") + return pickle.dumps(data, protocol=4), False + + +def decompress_data(data, is_compressed): + """Decompress data if it was compressed""" + try: + if is_compressed: + decompressed = gzip.decompress(data) + return pickle.loads(decompressed) + else: + return pickle.loads(data) + except Exception as e: + logger.error(f"Error decompressing data: {e}") + raise + + def send_message(sock, message): """ - Send a message object through a socket using Pickle + Send a message object through a socket using Pickle with optional compression Parameters: - sock: socket object @@ -33,13 +67,17 @@ def send_message(sock, message): - True if message sent successfully, False otherwise """ try: - # Convert message object to bytes using pickle - msg_bytes = pickle.dumps(message, protocol=4) # Use a high protocol + # Compress data if needed + msg_bytes, is_compressed = compress_data(message) # Send message length first (4 bytes, network byte order) msg_len = len(msg_bytes) sock.sendall(struct.pack('!I', msg_len)) + # Send compression flag (1 byte) + compression_flag = 1 if is_compressed else 0 + sock.sendall(struct.pack('!B', compression_flag)) + # Send the message itself sock.sendall(msg_bytes) @@ -57,7 +95,7 @@ def send_message(sock, message): def receive_message(sock): """ - Receive a message object from a socket using Pickle + Receive a message object from a socket using Pickle with optional compression Parameters: - sock: socket object @@ -94,7 +132,7 @@ def receive_message(sock): msg_len = struct.unpack('!I', msg_len_bytes)[0] # --- Sanity check on message size --- - MAX_MSG_SIZE = 20 * 1024 * 1024 # Increased limit to 20MB, adjust if needed + MAX_MSG_SIZE = 50 * 1024 * 1024 # Increased limit to 50MB for compressed data if msg_len > MAX_MSG_SIZE: logger.error(f"PROTOCOL.RECEIVE: Message size {msg_len} bytes exceeds limit {MAX_MSG_SIZE} (socket fileno {fileno}). Closing socket.") try: @@ -103,16 +141,24 @@ def receive_message(sock): logger.warning(f"Ignoring error while closing socket after size limit exceeded: {close_err}") raise ValueError(f"Received message size ({msg_len}) exceeds limit.") + # --- Receive compression flag (1 byte) --- + compression_flag_bytes = sock.recv(1) + if not compression_flag_bytes: + logger.warning(f"PROTOCOL.RECEIVE: Connection closed unexpectedly while receiving compression flag (socket fileno {fileno}).") + raise ConnectionError("Connection closed during compression flag reception") + + is_compressed = struct.unpack('!B', compression_flag_bytes)[0] == 1 + # --- Receive the message body --- data = b'' bytes_received = 0 # Use a longer timeout for the body, proportionate to max size? - body_timeout = max(30.0, MAX_MSG_SIZE / (1024*1024) * 2) # e.g., 2s per MB, min 30s + body_timeout = max(60.0, MAX_MSG_SIZE / (1024*1024) * 5) # e.g., 5s per MB, min 60s sock.settimeout(body_timeout) - logger.debug(f"PROTOCOL.RECEIVE: Expecting {msg_len} bytes for message body (timeout: {body_timeout}s)...") + logger.debug(f"PROTOCOL.RECEIVE: Expecting {msg_len} bytes for message body (compressed: {is_compressed}, timeout: {body_timeout}s)...") try: while bytes_received < msg_len: - chunk_size = min(msg_len - bytes_received, 8192) # Read in larger chunks + chunk_size = min(msg_len - bytes_received, 32768) # Read in larger chunks (32KB) packet = sock.recv(chunk_size) if not packet: logger.warning(f"PROTOCOL.RECEIVE: Connection closed unexpectedly while receiving message body (received {bytes_received}/{msg_len} bytes, socket fileno {fileno}).") @@ -124,15 +170,15 @@ def receive_message(sock): logger.debug(f"PROTOCOL.RECEIVE: Received {bytes_received} bytes for message body.") - # Deserialize bytes using pickle - message = pickle.loads(data) + # Decompress and deserialize data + message_data = decompress_data(data, is_compressed) - if not isinstance(message, Message): - logger.error(f"PROTOCOL.RECEIVE: Deserialized object is not a Message type ({type(message)}). Socket {fileno}") + if not isinstance(message_data, Message): + logger.error(f"PROTOCOL.RECEIVE: Deserialized object is not a Message type ({type(message_data)}). Socket {fileno}") raise TypeError("Received invalid object type from socket") - logger.debug(f"PROTOCOL.RECEIVE: Successfully received message type {message.msg_type} (socket fileno {fileno}).") - return message + logger.debug(f"PROTOCOL.RECEIVE: Successfully received message type {message_data.msg_type} (socket fileno {fileno}).") + return message_data except socket.timeout: logger.warning(f"PROTOCOL.RECEIVE: Socket timeout during receive (socket fileno {fileno}).") diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..2da9283 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,24 @@ +services: + postgres: + image: postgres:15-alpine + container_name: arrest_data_postgres + environment: + POSTGRES_DB: arrest_data + POSTGRES_USER: arrest_user + POSTGRES_PASSWORD: arrest_password + POSTGRES_INITDB_ARGS: "--encoding=UTF-8 --lc-collate=C --lc-ctype=C" + ports: + - "5432:5432" + volumes: + - postgres_data:/var/lib/postgresql/data + - ./init.sql:/docker-entrypoint-initdb.d/init.sql + restart: unless-stopped + healthcheck: + test: ["CMD-SHELL", "pg_isready -U arrest_user -d arrest_data"] + interval: 10s + timeout: 5s + retries: 5 + +volumes: + postgres_data: + driver: local \ No newline at end of file diff --git a/fix_postgres_sequences.py b/fix_postgres_sequences.py new file mode 100644 index 0000000..8bf18dc --- /dev/null +++ b/fix_postgres_sequences.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python3 +""" +Fix PostgreSQL sequences +This script updates the sequences to start from the highest existing ID + 1 +""" + +import psycopg2 +import logging + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# Database configuration +POSTGRES_CONFIG = { + 'host': 'localhost', + 'port': 5432, + 'database': 'arrest_data', + 'user': 'arrest_user', + 'password': 'arrest_password' +} + +def fix_sequences(): + """Fix all sequences in the arrest_data schema""" + try: + conn = psycopg2.connect(**POSTGRES_CONFIG) + cursor = conn.cursor() + + # Set search path + cursor.execute("SET search_path TO arrest_data, public;") + + # Fix clients sequence + cursor.execute("SELECT MAX(id) FROM clients;") + max_client_id = cursor.fetchone()[0] + if max_client_id: + cursor.execute(f"SELECT setval('clients_id_seq', {max_client_id});") + logger.info(f"βœ… Fixed clients sequence to start from {max_client_id + 1}") + + # Fix sessions sequence + cursor.execute("SELECT MAX(id) FROM sessions;") + max_session_id = cursor.fetchone()[0] + if max_session_id: + cursor.execute(f"SELECT setval('sessions_id_seq', {max_session_id});") + logger.info(f"βœ… Fixed sessions sequence to start from {max_session_id + 1}") + + # Fix queries sequence + cursor.execute("SELECT MAX(id) FROM queries;") + max_query_id = cursor.fetchone()[0] + if max_query_id: + cursor.execute(f"SELECT setval('queries_id_seq', {max_query_id});") + logger.info(f"βœ… Fixed queries sequence to start from {max_query_id + 1}") + + # Fix messages sequence + cursor.execute("SELECT MAX(id) FROM messages;") + max_message_id = cursor.fetchone()[0] + if max_message_id: + cursor.execute(f"SELECT setval('messages_id_seq', {max_message_id});") + logger.info(f"βœ… Fixed messages sequence to start from {max_message_id + 1}") + + conn.commit() + cursor.close() + conn.close() + + logger.info("πŸŽ‰ All sequences have been fixed!") + return True + + except Exception as e: + logger.error(f"❌ Failed to fix sequences: {e}") + return False + +def show_current_data(): + """Show current data counts""" + try: + conn = psycopg2.connect(**POSTGRES_CONFIG) + cursor = conn.cursor() + + # Set search path + cursor.execute("SET search_path TO arrest_data, public;") + + tables = ['clients', 'sessions', 'queries', 'messages'] + for table in tables: + cursor.execute(f"SELECT COUNT(*) FROM {table};") + count = cursor.fetchone()[0] + logger.info(f"πŸ“Š {table}: {count} records") + + cursor.close() + conn.close() + + except Exception as e: + logger.error(f"❌ Failed to show data: {e}") + +if __name__ == "__main__": + logger.info("πŸ”§ Fixing PostgreSQL sequences after migration...") + + show_current_data() + print() + + if fix_sequences(): + logger.info("βœ… Sequences fixed successfully!") + logger.info("You can now run the test script: python test_postgres.py") + else: + logger.error("❌ Failed to fix sequences") + exit(1) \ No newline at end of file diff --git a/init.sql b/init.sql new file mode 100644 index 0000000..1171bc9 --- /dev/null +++ b/init.sql @@ -0,0 +1,74 @@ +-- PostgreSQL initialization script for Arrest Data application +-- This script runs when the PostgreSQL container starts for the first time + +-- Create the database schema +CREATE SCHEMA IF NOT EXISTS arrest_data; + +-- Set the search path +SET search_path TO arrest_data, public; + +-- Create clients table +CREATE TABLE IF NOT EXISTS clients ( + id SERIAL PRIMARY KEY, + name VARCHAR(255) NOT NULL, + nickname VARCHAR(100) NOT NULL UNIQUE, + email VARCHAR(255) NOT NULL UNIQUE, + password VARCHAR(255) NOT NULL, + registration_date TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP +); + +-- Create sessions table +CREATE TABLE IF NOT EXISTS sessions ( + id SERIAL PRIMARY KEY, + client_id INTEGER NOT NULL, + address VARCHAR(45) NOT NULL, -- IPv4/IPv6 address + start_time TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + end_time TIMESTAMP WITH TIME ZONE, + FOREIGN KEY (client_id) REFERENCES clients (id) ON DELETE CASCADE +); + +-- Create queries table +CREATE TABLE IF NOT EXISTS queries ( + id SERIAL PRIMARY KEY, + client_id INTEGER NOT NULL, + session_id INTEGER NOT NULL, + query_type VARCHAR(100) NOT NULL, + parameters TEXT, -- JSON string + timestamp TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (client_id) REFERENCES clients (id) ON DELETE CASCADE, + FOREIGN KEY (session_id) REFERENCES sessions (id) ON DELETE CASCADE +); + +-- Create messages table +CREATE TABLE IF NOT EXISTS messages ( + id SERIAL PRIMARY KEY, + sender_type VARCHAR(50) NOT NULL, + sender_id INTEGER NOT NULL, + recipient_type VARCHAR(50) NOT NULL, + recipient_id INTEGER NOT NULL, + message TEXT NOT NULL, + timestamp TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + read BOOLEAN DEFAULT FALSE +); + +-- Create indexes for better performance +CREATE INDEX IF NOT EXISTS idx_clients_email ON clients (email); +CREATE INDEX IF NOT EXISTS idx_clients_nickname ON clients (nickname); +CREATE INDEX IF NOT EXISTS idx_sessions_client_id ON sessions (client_id); +CREATE INDEX IF NOT EXISTS idx_sessions_start_time ON sessions (start_time); +CREATE INDEX IF NOT EXISTS idx_queries_client_id ON queries (client_id); +CREATE INDEX IF NOT EXISTS idx_queries_session_id ON queries (session_id); +CREATE INDEX IF NOT EXISTS idx_queries_timestamp ON queries (timestamp); +CREATE INDEX IF NOT EXISTS idx_queries_query_type ON queries (query_type); +CREATE INDEX IF NOT EXISTS idx_messages_recipient ON messages (recipient_type, recipient_id); +CREATE INDEX IF NOT EXISTS idx_messages_read ON messages (read); +CREATE INDEX IF NOT EXISTS idx_messages_timestamp ON messages (timestamp); + +-- Grant permissions to the application user +GRANT ALL PRIVILEGES ON SCHEMA arrest_data TO arrest_user; +GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA arrest_data TO arrest_user; +GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA arrest_data TO arrest_user; + +-- Set default privileges for future tables +ALTER DEFAULT PRIVILEGES IN SCHEMA arrest_data GRANT ALL ON TABLES TO arrest_user; +ALTER DEFAULT PRIVILEGES IN SCHEMA arrest_data GRANT ALL ON SEQUENCES TO arrest_user; \ No newline at end of file diff --git a/mermaid_diagrams.txt b/mermaid_diagrams.txt index f85770c..1bd0b45 100644 --- a/mermaid_diagrams.txt +++ b/mermaid_diagrams.txt @@ -84,7 +84,7 @@ graph TD Server_MainThread -- CreΓ«ert per client --> Server_ClientHandlerThread Server_ClientHandlerThread -- Leest van --> ClientSocket["Client Socket"] Server_ClientHandlerThread -- Schrijft naar --> ClientSocket - Server_ClientHandlerThread -- Database operaties --> DatabasePool["SQLite Connection Pool
(in Database.py)"] + Server_ClientHandlerThread -- Database operaties --> DatabasePool["PostgreSQL Connection Pool
(in DatabasePostgres.py)"] Server_ClientHandlerThread -- Gebruikt --> DataProcessor["DataProcessor Instantie"] %% Server-side message queue for sending to client diff --git a/performance_monitor.py b/performance_monitor.py new file mode 100644 index 0000000..13778f1 --- /dev/null +++ b/performance_monitor.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python3 +""" +Performance monitoring script for the arrest data server +""" + +import psutil +import time +import threading +import logging +import os +import tempfile +from datetime import datetime + +# Configure logging +TEMP_DIR = tempfile.gettempdir() +PERF_LOG_FILE = os.path.join(TEMP_DIR, 'performance_monitor.log') +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + filename=PERF_LOG_FILE, + filemode='w' +) +logger = logging.getLogger('performance_monitor') + +class PerformanceMonitor: + """Monitor system performance and resource usage""" + + def __init__(self, interval=30): + """Initialize the performance monitor""" + self.interval = interval # seconds between measurements + self.running = False + self.monitor_thread = None + self.start_time = None + + def start(self): + """Start performance monitoring""" + if self.running: + logger.warning("Performance monitor is already running") + return + + self.running = True + self.start_time = time.time() + self.monitor_thread = threading.Thread(target=self._monitor_loop) + self.monitor_thread.daemon = True + self.monitor_thread.start() + logger.info(f"Performance monitor started (interval: {self.interval}s)") + + def stop(self): + """Stop performance monitoring""" + self.running = False + if self.monitor_thread: + self.monitor_thread.join(timeout=5) + logger.info("Performance monitor stopped") + + def _monitor_loop(self): + """Main monitoring loop""" + while self.running: + try: + self._collect_metrics() + time.sleep(self.interval) + except Exception as e: + logger.error(f"Error in performance monitoring: {e}") + time.sleep(10) # Wait before retrying + + def _collect_metrics(self): + """Collect system performance metrics""" + try: + # CPU usage + cpu_percent = psutil.cpu_percent(interval=1) + + # Memory usage + memory = psutil.virtual_memory() + memory_percent = memory.percent + memory_mb = memory.used / 1024 / 1024 + + # Disk usage + disk = psutil.disk_usage('/') + disk_percent = disk.percent + + # Network I/O + network = psutil.net_io_counters() + bytes_sent = network.bytes_sent + bytes_recv = network.bytes_recv + + # Process-specific metrics (if monitoring a specific process) + process_metrics = self._get_process_metrics() + + # Log metrics + logger.info( + f"PERF: CPU={cpu_percent:.1f}% | " + f"MEM={memory_percent:.1f}% ({memory_mb:.0f}MB) | " + f"DISK={disk_percent:.1f}% | " + f"NET_SENT={bytes_sent/1024/1024:.1f}MB | " + f"NET_RECV={bytes_recv/1024/1024:.1f}MB" + ) + + if process_metrics: + logger.info(f"PROCESS: {process_metrics}") + + except Exception as e: + logger.error(f"Error collecting metrics: {e}") + + def _get_process_metrics(self): + """Get metrics for specific processes (server/client)""" + try: + processes = [] + for proc in psutil.process_iter(['pid', 'name', 'memory_info', 'cpu_percent']): + try: + if 'python' in proc.info['name'].lower(): + # Check if it's our server or client process + cmdline = ' '.join(proc.cmdline()) + if 'server' in cmdline.lower() or 'client' in cmdline.lower(): + memory_mb = proc.info['memory_info'].rss / 1024 / 1024 + cpu_percent = proc.info['cpu_percent'] + processes.append({ + 'pid': proc.info['pid'], + 'name': proc.info['name'], + 'memory_mb': memory_mb, + 'cpu_percent': cpu_percent, + 'cmdline': cmdline[:100] + '...' if len(cmdline) > 100 else cmdline + }) + except (psutil.NoSuchProcess, psutil.AccessDenied): + continue + + if processes: + return ' | '.join([ + f"{p['name']}({p['pid']}): {p['memory_mb']:.0f}MB, {p['cpu_percent']:.1f}%" + for p in processes + ]) + except Exception as e: + logger.error(f"Error getting process metrics: {e}") + + return None + +def main(): + """Main function to run the performance monitor""" + print("Starting performance monitor...") + print(f"Log file: {PERF_LOG_FILE}") + + monitor = PerformanceMonitor(interval=30) # Check every 30 seconds + + try: + monitor.start() + + # Keep running until interrupted + while True: + time.sleep(1) + + except KeyboardInterrupt: + print("\nStopping performance monitor...") + monitor.stop() + print("Performance monitor stopped") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 0def651..1efea04 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,7 @@ seaborn>=0.12.0 pandas>=1.5.0 matplotlib>=3.6.0 contextily>=1.3.0 -folium \ No newline at end of file +folium +psycopg2-binary>=2.9.0 +sqlalchemy>=2.0.0 +psutil>=5.9.0 \ No newline at end of file diff --git a/start_postgres.bat b/start_postgres.bat new file mode 100644 index 0000000..f88c043 --- /dev/null +++ b/start_postgres.bat @@ -0,0 +1,65 @@ +@echo off +REM PostgreSQL Startup Script for Arrest Data Application + +echo === PostgreSQL Startup Script === +echo. + +REM Check if Docker is running +docker info >nul 2>&1 +if errorlevel 1 ( + echo ❌ Docker is not running. Please start Docker and try again. + pause + exit /b 1 +) + +echo βœ… Docker is running + +REM Check if docker-compose is available +docker-compose --version >nul 2>&1 +if errorlevel 1 ( + echo ❌ docker-compose is not installed. Please install Docker Compose. + pause + exit /b 1 +) + +echo βœ… Docker Compose is available + +REM Start PostgreSQL container +echo. +echo πŸš€ Starting PostgreSQL container... +docker-compose up -d postgres + +REM Wait for PostgreSQL to be ready +echo ⏳ Waiting for PostgreSQL to be ready... +timeout /t 10 /nobreak >nul + +REM Check if PostgreSQL is running +docker-compose ps postgres | findstr "Up" >nul +if errorlevel 1 ( + echo ❌ PostgreSQL container failed to start + echo Check logs with: docker-compose logs postgres + pause + exit /b 1 +) else ( + echo βœ… PostgreSQL container is running +) + +REM Check if we can connect to PostgreSQL +echo πŸ” Testing PostgreSQL connection... +python -c "import psycopg2; conn = psycopg2.connect(host='localhost', port=5432, database='arrest_data', user='arrest_user', password='arrest_password'); conn.close(); print('βœ… PostgreSQL connection successful')" 2>nul +if errorlevel 1 ( + echo ❌ Cannot connect to PostgreSQL + pause + exit /b 1 +) else ( + echo βœ… PostgreSQL is ready for connections +) + +echo. +echo πŸŽ‰ Setup complete! You can now start your application: +echo python run_server.py +echo. +echo πŸ“Š To view PostgreSQL logs: docker-compose logs postgres +echo πŸ›‘ To stop PostgreSQL: docker-compose down +echo πŸ”„ To restart PostgreSQL: docker-compose restart postgres +pause \ No newline at end of file diff --git a/start_postgres.sh b/start_postgres.sh new file mode 100644 index 0000000..69a2466 --- /dev/null +++ b/start_postgres.sh @@ -0,0 +1,72 @@ +#!/bin/bash + +# PostgreSQL Startup Script for Arrest Data Application + +echo "=== PostgreSQL Startup Script ===" +echo "" + +# Check if Docker is running +if ! docker info > /dev/null 2>&1; then + echo "❌ Docker is not running. Please start Docker and try again." + exit 1 +fi + +echo "βœ… Docker is running" + +# Check if docker-compose is available +if ! command -v docker-compose &> /dev/null; then + echo "❌ docker-compose is not installed. Please install Docker Compose." + exit 1 +fi + +echo "βœ… Docker Compose is available" + +# Start PostgreSQL container +echo "" +echo "πŸš€ Starting PostgreSQL container..." +docker-compose up -d postgres + +# Wait for PostgreSQL to be ready +echo "⏳ Waiting for PostgreSQL to be ready..." +sleep 10 + +# Check if PostgreSQL is running +if docker-compose ps postgres | grep -q "Up"; then + echo "βœ… PostgreSQL container is running" +else + echo "❌ PostgreSQL container failed to start" + echo "Check logs with: docker-compose logs postgres" + exit 1 +fi + +# Check if we can connect to PostgreSQL +echo "πŸ” Testing PostgreSQL connection..." +if python -c " +import psycopg2 +try: + conn = psycopg2.connect( + host='localhost', + port=5432, + database='arrest_data', + user='arrest_user', + password='arrest_password' + ) + conn.close() + print('βœ… PostgreSQL connection successful') +except Exception as e: + print(f'❌ PostgreSQL connection failed: {e}') + exit(1) +"; then + echo "βœ… PostgreSQL is ready for connections" +else + echo "❌ Cannot connect to PostgreSQL" + exit 1 +fi + +echo "" +echo "πŸŽ‰ Setup complete! You can now start your application:" +echo " python run_server.py" +echo "" +echo "πŸ“Š To view PostgreSQL logs: docker-compose logs postgres" +echo "πŸ›‘ To stop PostgreSQL: docker-compose down" +echo "πŸ”„ To restart PostgreSQL: docker-compose restart postgres" \ No newline at end of file diff --git a/test_postgres.py b/test_postgres.py new file mode 100644 index 0000000..d481600 --- /dev/null +++ b/test_postgres.py @@ -0,0 +1,221 @@ +#!/usr/bin/env python3 +""" +Test script to verify PostgreSQL database functionality +""" + +import psycopg2 +import json +import logging +from datetime import datetime +import uuid + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# Database configuration +POSTGRES_CONFIG = { + 'host': 'localhost', + 'port': 5432, + 'database': 'arrest_data', + 'user': 'arrest_user', + 'password': 'arrest_password' +} + +def test_connection(): + """Test basic database connection""" + try: + conn = psycopg2.connect(**POSTGRES_CONFIG) + cursor = conn.cursor() + + # Test basic query + cursor.execute("SELECT version();") + version = cursor.fetchone() + logger.info(f"βœ… Connected to PostgreSQL: {version[0]}") + + cursor.close() + conn.close() + return True + except Exception as e: + logger.error(f"❌ Connection test failed: {e}") + return False + +def test_schema(): + """Test if the arrest_data schema exists""" + try: + conn = psycopg2.connect(**POSTGRES_CONFIG) + cursor = conn.cursor() + + # Check if schema exists + cursor.execute("SELECT schema_name FROM information_schema.schemata WHERE schema_name = 'arrest_data';") + schema = cursor.fetchone() + + if schema: + logger.info("βœ… arrest_data schema exists") + else: + logger.error("❌ arrest_data schema not found") + return False + + cursor.close() + conn.close() + return True + except Exception as e: + logger.error(f"❌ Schema test failed: {e}") + return False + +def test_tables(): + """Test if all required tables exist""" + try: + conn = psycopg2.connect(**POSTGRES_CONFIG) + cursor = conn.cursor() + + # Set search path + cursor.execute("SET search_path TO arrest_data, public;") + + # Check if tables exist + tables = ['clients', 'sessions', 'queries', 'messages'] + for table in tables: + cursor.execute(f"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'arrest_data' AND table_name = '{table}');") + exists = cursor.fetchone()[0] + if exists: + logger.info(f"βœ… Table '{table}' exists") + else: + logger.error(f"❌ Table '{table}' not found") + return False + + cursor.close() + conn.close() + return True + except Exception as e: + logger.error(f"❌ Tables test failed: {e}") + return False + +def test_basic_operations(): + """Test basic CRUD operations""" + try: + conn = psycopg2.connect(**POSTGRES_CONFIG) + cursor = conn.cursor() + + # Set search path + cursor.execute("SET search_path TO arrest_data, public;") + + # Test client registration with unique data + unique_suffix = str(uuid.uuid4())[:8] + test_client = { + 'name': f'Test User {unique_suffix}', + 'nickname': f'testuser_{unique_suffix}', + 'email': f'test_{unique_suffix}@example.com', + 'password': 'testpassword' + } + + # Insert test client + cursor.execute(""" + INSERT INTO clients (name, nickname, email, password) + VALUES (%s, %s, %s, %s) + RETURNING id; + """, (test_client['name'], test_client['nickname'], test_client['email'], test_client['password'])) + + result = cursor.fetchone() + if result: + client_id = result[0] + logger.info(f"βœ… Test client created with ID: {client_id}") + else: + logger.error("❌ Failed to create test client") + return False + + # Test session creation + cursor.execute(""" + INSERT INTO sessions (client_id, address, start_time) + VALUES (%s, %s, %s) + RETURNING id; + """, (client_id, '127.0.0.1', datetime.now().isoformat())) + + session_id = cursor.fetchone()[0] + logger.info(f"βœ… Test session created with ID: {session_id}") + + # Test query logging + test_params = {'test': 'data', 'number': 42} + cursor.execute(""" + INSERT INTO queries (client_id, session_id, query_type, parameters) + VALUES (%s, %s, %s, %s) + RETURNING id; + """, (client_id, session_id, 'test_query', json.dumps(test_params))) + + query_id = cursor.fetchone()[0] + logger.info(f"βœ… Test query logged with ID: {query_id}") + + # Test message creation + cursor.execute(""" + INSERT INTO messages (sender_type, sender_id, recipient_type, recipient_id, message) + VALUES (%s, %s, %s, %s, %s) + RETURNING id; + """, ('client', client_id, 'all', 0, 'Test message')) + + message_id = cursor.fetchone()[0] + logger.info(f"βœ… Test message created with ID: {message_id}") + + # Test data retrieval + cursor.execute("SELECT COUNT(*) FROM clients;") + client_count = cursor.fetchone()[0] + logger.info(f"βœ… Total clients in database: {client_count}") + + cursor.execute("SELECT COUNT(*) FROM sessions;") + session_count = cursor.fetchone()[0] + logger.info(f"βœ… Total sessions in database: {session_count}") + + cursor.execute("SELECT COUNT(*) FROM queries;") + query_count = cursor.fetchone()[0] + logger.info(f"βœ… Total queries in database: {query_count}") + + cursor.execute("SELECT COUNT(*) FROM messages;") + message_count = cursor.fetchone()[0] + logger.info(f"βœ… Total messages in database: {message_count}") + + # Clean up test data + cursor.execute("DELETE FROM messages WHERE id = %s", (message_id,)) + cursor.execute("DELETE FROM queries WHERE id = %s", (query_id,)) + cursor.execute("DELETE FROM sessions WHERE id = %s", (session_id,)) + cursor.execute("DELETE FROM clients WHERE id = %s", (client_id,)) + + conn.commit() + cursor.close() + conn.close() + + return True + except Exception as e: + logger.error(f"❌ Basic operations test failed: {e}") + return False + +def main(): + """Run all tests""" + logger.info("πŸ§ͺ Starting PostgreSQL database tests...") + + tests = [ + ("Connection", test_connection), + ("Schema", test_schema), + ("Tables", test_tables), + ("Basic Operations", test_basic_operations) + ] + + passed = 0 + total = len(tests) + + for test_name, test_func in tests: + logger.info(f"\n--- Testing {test_name} ---") + if test_func(): + passed += 1 + else: + logger.error(f"❌ {test_name} test failed") + + logger.info(f"\nπŸ“Š Test Results: {passed}/{total} tests passed") + + if passed == total: + logger.info("πŸŽ‰ All tests passed! PostgreSQL database is working correctly.") + return True + else: + logger.error("❌ Some tests failed. Please check the PostgreSQL setup.") + return False + +if __name__ == "__main__": + success = main() + exit(0 if success else 1) \ No newline at end of file