From 94ec4b586b3f22af46fa6b780b3e80bb92fe9080 Mon Sep 17 00:00:00 2001 From: Warre Snaet Date: Wed, 25 Jun 2025 13:37:51 +0200 Subject: [PATCH] refactor: Transition from SQLite to PostgreSQL, enhancing database handling and performance. Update .gitignore to exclude PostgreSQL files and add environment and IDE configurations. Implement asynchronous processing for large query results in client GUI and server data processing, improving responsiveness. Add memory and timeout management for query processing. --- .gitignore | 29 +- app/client/client_gui.py | 62 ++- app/server/ClientHandler.py | 30 +- app/server/data_processor.py | 379 +++++++++++++----- .../{database.py => database_postgres.py} | 365 ++++++++--------- app/server/server.py | 52 ++- app/shared/protocol.py | 74 +++- docker-compose.yml | 24 ++ fix_postgres_sequences.py | 103 +++++ init.sql | 74 ++++ mermaid_diagrams.txt | 2 +- performance_monitor.py | 155 +++++++ requirements.txt | 5 +- start_postgres.bat | 65 +++ start_postgres.sh | 72 ++++ test_postgres.py | 221 ++++++++++ 16 files changed, 1360 insertions(+), 352 deletions(-) rename app/server/{database.py => database_postgres.py} (51%) create mode 100644 docker-compose.yml create mode 100644 fix_postgres_sequences.py create mode 100644 init.sql create mode 100644 performance_monitor.py create mode 100644 start_postgres.bat create mode 100644 start_postgres.sh create mode 100644 test_postgres.py diff --git a/.gitignore b/.gitignore index 8c3e24a..2e83a71 100644 --- a/.gitignore +++ b/.gitignore @@ -1,12 +1,6 @@ /Data/Arrest_Data_from_2020_to_Present.csv /Data/processed_arrest_data.csv -/app/server/server_data.db -/app/server/server_data.db-journal -/app/server/server_data.db-lock -/app/server/server_data.db-trace -/app/server/server_data.db-wal - /app/client/__pycache__/* /app/client/client_gui.pyc /app/client/client_gui.pyo @@ -21,10 +15,29 @@ /app/server/server_gui.pyw /app/server/server_gui.pyz - app/shared/__pycache__/protocol.cpython-312.pyc app/shared/__pycache__/constants.cpython-312.pyc app/shared/__pycache__/protocol.cpython-310.pyc app/shared/__pycache__/constants.cpython-310.pyc -app/server/server_data.db-shm + +# PostgreSQL and Docker +postgres_data/ + +# Migration logs +migration_*.log + +# Environment files +.env +.env.local + +# IDE files +.vscode/ +.idea/ +*.swp +*.swo + +# OS files +.DS_Store +Thumbs.db + .cursor/generalrule.mdc diff --git a/app/client/client_gui.py b/app/client/client_gui.py index 7955ec5..d0a7069 100644 --- a/app/client/client_gui.py +++ b/app/client/client_gui.py @@ -31,7 +31,7 @@ QDateEdit, QDoubleSpinBox, QStackedWidget, QListWidget, QListWidgetItem ) from PySide6.QtGui import QPixmap, QFont, QIcon, QPalette, QColor -from PySide6.QtCore import Qt, QTimer, Signal, Slot, QObject, QSettings, QDate +from PySide6.QtCore import Qt, QTimer, Signal, Slot, QObject, QSettings, QDate, QThread, QThreadPool, QRunnable # Import necessary for rendering Matplotlib figure in Qt from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg @@ -51,6 +51,29 @@ logger = logging.getLogger('client_gui') +class QueryResultProcessor(QRunnable): + """Worker thread for processing large query results""" + + def __init__(self, result_data, callback): + super().__init__() + self.result_data = result_data + self.callback = callback + self.setAutoDelete(True) + + def run(self): + """Process the query result in background thread""" + try: + processed_data = self.process_result(self.result_data) + self.callback(processed_data) # Call as function, not .emit + except Exception as e: + import traceback + self.callback({'error': f'{e}\n{traceback.format_exc()}'}) + + def process_result(self, result_data): + """Process the query result data""" + # Add any heavy processing here if needed + return result_data + # Bridge class to convert client callbacks to Qt signals class ClientCallbacksBridge(QObject): @@ -182,6 +205,10 @@ def __init__(self): self.callbacks_bridge = ClientCallbacksBridge() self.plot_dialog = None # To store reference to plot dialog self.tab_reset_handlers = {} # <--- ADD THIS LINE + + # Thread pool for async processing + self.thread_pool = QThreadPool() + self.thread_pool.setMaxThreadCount(4) # Limit concurrent threads # Load or default theme self.current_theme = self.settings.value("theme", "dark") # Default to dark @@ -478,7 +505,7 @@ def on_login_status_change(self, logged_in): self.update_query_params() # Fetch metadata AFTER successful login def on_query_result(self, result): - """Handle results received from the server""" + """Handle results received from the server with async processing for large results""" logger.info(f"Received query result: {type(result)}") # Check if the result is for metadata @@ -528,9 +555,19 @@ def on_query_result(self, result): # Check if the result contains data for the table (and no plot was handled) if 'data' in result: - # This block is now only reached if fig_plot was None/False - self.query_tab.display_results(result) - logger.info("Query data result received and sent to display.") + # Check if this is a large dataset that needs async processing + data_size = len(result.get('data', [])) + if data_size > 1000: # Process large datasets asynchronously + logger.info(f"Large dataset detected ({data_size} rows), processing asynchronously") + self.status_bar.showMessage("Processing large dataset...", 3000) + + # Create worker for async processing + worker = QueryResultProcessor(result, self.query_result_processed) + self.thread_pool.start(worker) + else: + # Small dataset, process immediately + self.query_tab.display_results(result) + logger.info("Query data result received and sent to display.") else: # Handle case where neither plot nor data is present logger.warning("Query result received with OK status but no data or plot.") @@ -1123,6 +1160,21 @@ def on_plot_label_clicked(self, label_clicked): # def on_plot_clicked(self, fig): # ... + @Slot(object) + def query_result_processed(self, processed_result): + """Called when async query result processing is complete""" + if 'error' in processed_result: + self.on_error(f"Error processing query result: {processed_result['error']}") + return + + try: + self.query_tab.display_results(processed_result) + self.status_bar.showMessage("Query results displayed", 3000) + logger.info("Large query result processed and displayed successfully") + except Exception as e: + logger.error(f"Error displaying processed query result: {e}", exc_info=True) + self.on_error(f"Error displaying query results: {e}") + if __name__ == "__main__": # Create application diff --git a/app/server/ClientHandler.py b/app/server/ClientHandler.py index 9d0edb0..fece4e0 100644 --- a/app/server/ClientHandler.py +++ b/app/server/ClientHandler.py @@ -2,7 +2,7 @@ import socket import queue import time -import sqlite3 +import psycopg2 import re import logging import os @@ -237,9 +237,9 @@ def handle_login(self, data): }) self.server.log_activity(f"Client logged in: {client_info['nickname']} ({client_info['email']})") - except sqlite3.OperationalError as sqlerr: + except psycopg2.OperationalError as sqlerr: error_msg = str(sqlerr) - logger.error(f"SQLite error during login: {error_msg}") + logger.error(f"PostgreSQL error during login: {error_msg}") if "no column named address" in error_msg: self.send_error("Login failed: Database schema needs to be updated. Please restart the server.") @@ -278,7 +278,7 @@ def handle_logout(self): self.send_error("Logout failed: not logged in") def handle_query(self, data): - """Handle a query request""" + """Handle a query request with async processing""" if not self.client_info or not self.session_id: self.send_error("Query failed: not logged in") return @@ -304,21 +304,17 @@ def handle_query(self, data): logger.info(f"Processing query {query_type_id} from {self.client_info['nickname']} with params: {parameters}") - # --- Process the query based on query_type_id --- + # --- Process the query asynchronously based on query_type_id --- result = {} - if query_type_id == 'query1': - result = self.data_processor.process_query1(parameters) - elif query_type_id == 'query2': - result = self.data_processor.process_query2(parameters) - elif query_type_id == 'query3': - result = self.data_processor.process_query3(parameters) # This one needs plot generation - elif query_type_id == 'query4': - result = self.data_processor.process_query4(parameters) + + # Use async processing for heavy queries + if query_type_id == 'query4': + # Query 4 (map generation) is the most resource-intensive + logger.info(f"Starting async processing for {query_type_id}") + result = self.data_processor.process_query_async(query_type_id, parameters) else: - # Handle unknown queryX type - logger.error(f"Unknown query type identifier received: {query_type_id}") - self.send_error(f"Query failed: Unknown query type identifier: {query_type_id}") - return + # For other queries, use regular processing but with timeout + result = self.data_processor.process_query_async(query_type_id, parameters) # --- Log the raw result from processor --- logger.info(f"HANDLE_QUERY: Result received from processor ({query_type_id}): {result.get('status')}, map_path_present={result.get('map_filepath') is not None}") diff --git a/app/server/data_processor.py b/app/server/data_processor.py index 85de364..7a5d4fc 100644 --- a/app/server/data_processor.py +++ b/app/server/data_processor.py @@ -13,6 +13,12 @@ import uuid import tempfile import json # for the geojson parsing +import threading +import time +import gc +import psutil +from concurrent.futures import ThreadPoolExecutor, TimeoutError +from functools import wraps from shared.constants import DESCENT_CODE_MAP, ARREST_TYPE_CODE_MAP from datetime import datetime @@ -21,6 +27,42 @@ logger = logging.getLogger('data_processor') +def timeout_handler(timeout_seconds=30): + """Decorator to add timeout to query processing""" + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + with ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(func, *args, **kwargs) + try: + return future.result(timeout=timeout_seconds) + except TimeoutError: + logger.error(f"Query {func.__name__} timed out after {timeout_seconds} seconds") + return {'status': 'error', 'message': f'Query timed out after {timeout_seconds} seconds'} + return wrapper + return decorator + +def memory_monitor(func): + """Decorator to monitor memory usage""" + @wraps(func) + def wrapper(*args, **kwargs): + process = psutil.Process() + mem_before = process.memory_info().rss / 1024 / 1024 # MB + logger.info(f"Memory before {func.__name__}: {mem_before:.1f} MB") + + try: + result = func(*args, **kwargs) + mem_after = process.memory_info().rss / 1024 / 1024 # MB + logger.info(f"Memory after {func.__name__}: {mem_after:.1f} MB (delta: {mem_after - mem_before:.1f} MB)") + return result + except Exception as e: + logger.error(f"Error in {func.__name__}: {e}") + raise + finally: + # Force garbage collection + gc.collect() + return wrapper + class DataProcessor: """Data processor for handling queries on the dataset""" @@ -28,14 +70,64 @@ def __init__(self, dataset_path='Data/processed_arrest_data.csv'): """Initialize data processor with dataset""" self.dataset_path = dataset_path self.df = None + self._df_lock = threading.Lock() # Thread safety for DataFrame access + self._query_executor = ThreadPoolExecutor(max_workers=4) # Limit concurrent queries + self._temp_files = set() # Track temporary files for cleanup + self._temp_files_lock = threading.Lock() # Thread safety for temp files self.load_data() def load_data(self): - """Load dataset""" + """Load dataset with optimizations""" try: - self.df = pd.read_csv(self.dataset_path, low_memory=False) + # Load only essential columns first + essential_cols = ['Arrest Date', 'Age', 'Arrest Hour', 'Arrest Type Code', + 'Area Name', 'Address', 'Charge Group Description', + 'Descent Code', 'Sex Code', 'Report ID'] + + # Check if coordinate columns exist + if os.path.exists(self.dataset_path): + sample_df = pd.read_csv(self.dataset_path, nrows=1) + if 'LAT' in sample_df.columns and 'LON' in sample_df.columns: + essential_cols.extend(['LAT', 'LON']) + if 'Location_GeoJSON' in sample_df.columns: + essential_cols.append('Location_GeoJSON') + + # Load data without strict dtype conversions first + self.df = pd.read_csv( + self.dataset_path, + low_memory=False, + usecols=essential_cols, + parse_dates=['Arrest Date'] + ) + print(f"Dataset loaded with {len(self.df)} rows and {len(self.df.columns)} columns") + # Now apply safe dtype optimizations + try: + # Only convert to int8 if the data is actually integer-like + if 'Age' in self.df.columns: + # Check if Age column can be safely converted to int + age_series = pd.to_numeric(self.df['Age'], errors='coerce') + if age_series.notna().all(): + self.df['Age'] = age_series.astype('int8') + + if 'Arrest Hour' in self.df.columns: + # Check if Arrest Hour column can be safely converted to int + hour_series = pd.to_numeric(self.df['Arrest Hour'], errors='coerce') + if hour_series.notna().all(): + self.df['Arrest Hour'] = hour_series.astype('int8') + + # Convert categorical columns safely + categorical_cols = ['Arrest Type Code', 'Area Name', 'Charge Group Description', + 'Descent Code', 'Sex Code'] + for col in categorical_cols: + if col in self.df.columns: + self.df[col] = self.df[col].astype('category') + + except Exception as dtype_error: + print(f"Warning: Could not apply all dtype optimizations: {dtype_error}") + # Continue with original dtypes if optimization fails + # Convert date columns to datetime if they exist for col in self.df.columns: if 'Date' in col and self.df[col].dtype == 'object': @@ -74,6 +166,48 @@ def parse_geojson_str(geojson_str): print(f"Error loading dataset: {e}") return False + def get_filtered_dataframe(self, filters=None): + """Get a filtered copy of the DataFrame with memory optimization""" + with self._df_lock: + if filters is None: + return self.df.copy() + + # Apply filters efficiently + mask = pd.Series([True] * len(self.df), index=self.df.index) + + for column, condition in filters.items(): + if column in self.df.columns: + if isinstance(condition, dict): + if 'min' in condition: + mask &= (self.df[column] >= condition['min']) + if 'max' in condition: + mask &= (self.df[column] <= condition['max']) + if 'values' in condition: + mask &= self.df[column].isin(condition['values']) + else: + mask &= (self.df[column] == condition) + + return self.df[mask].copy() + + def process_query_async(self, query_type, parameters=None): + """Process a query asynchronously with timeout""" + if self.df is None: + return {'status': 'error', 'message': 'Dataset not loaded'} + + if parameters is None: + parameters = {} + + try: + # Submit query to thread pool + future = self._query_executor.submit(self.process_query, query_type, parameters) + return future.result(timeout=60) # 60 second timeout + except TimeoutError: + logger.error(f"Query {query_type} timed out") + return {'status': 'error', 'message': 'Query timed out'} + except Exception as e: + logger.error(f"Error processing query {query_type}: {e}", exc_info=True) + return {'status': 'error', 'message': f'Error processing query: {str(e)}'} + def process_query(self, query_type, parameters=None): """ Process a query based on query type and parameters @@ -88,7 +222,17 @@ def process_query(self, query_type, parameters=None): parameters = {} try: - if query_type == 'age_distribution': + # Map 'query1', 'query2', etc. to the correct methods + if query_type == 'query1': + return self.process_query1(parameters) + elif query_type == 'query2': + return self.process_query2(parameters) + elif query_type == 'query3': + return self.process_query3(parameters) + elif query_type == 'query4': + return self.process_query4(parameters) + # Legacy/other query types + elif query_type == 'age_distribution': return self.get_age_distribution(parameters) elif query_type == 'top_charge_groups': return self.get_top_charge_groups(parameters) @@ -118,65 +262,74 @@ def process_query(self, query_type, parameters=None): 'traceback': traceback.format_exc() } + def _safe_to_str(self, df, columns): + """Convert specified columns to string if they are categorical (inplace).""" + for col in columns: + if col in df.columns and pd.api.types.is_categorical_dtype(df[col]): + df[col] = df[col].astype(str) + return df + + def _df_to_records_and_headers(self, df): + # Convert all columns to string and return records and headers + df = df.copy() + for col in df.columns: + df[col] = df[col].astype(str) + return df.to_dict(orient='records'), list(df.columns) + def get_age_distribution(self, parameters): - """Get age distribution of arrested individuals""" age_counts = self.df['Age'].value_counts().sort_index().reset_index() age_counts.columns = ['Age', 'Count'] - + data, headers = self._df_to_records_and_headers(age_counts) fig, ax = plt.subplots(figsize=(10, 6)) sns.histplot(self.df['Age'], bins=30, kde=True, ax=ax) ax.set_title('Age Distribution of Arrested Individuals') ax.set_xlabel('Age') ax.set_ylabel('Count') ax.grid(True) - return { - 'status': 'ok', - 'data': age_counts, + 'status': 'OK', + 'data': data, + 'headers': headers, 'figure': fig, 'title': 'Age Distribution of Arrested Individuals' } def get_top_charge_groups(self, parameters): - """Get top charge groups by frequency""" n = parameters.get('n', 10) n = int(n) - top_charges = self.df['Charge Group Description'].value_counts().head(n).reset_index() top_charges.columns = ['Charge Group', 'Count'] - + data, headers = self._df_to_records_and_headers(top_charges) fig, ax = plt.subplots(figsize=(12, 8)) sns.barplot(x='Count', y='Charge Group', data=top_charges, ax=ax) ax.set_title(f'Top {n} Charge Groups') ax.set_xlabel('Count') ax.set_ylabel('Charge Group') ax.grid(True, axis='x') - return { - 'status': 'ok', - 'data': top_charges, + 'status': 'OK', + 'data': data, + 'headers': headers, 'figure': fig, 'title': f'Top {n} Charge Groups' } def get_arrests_by_area(self, parameters): - """Get arrests by geographic area""" n = parameters.get('n', 15) n = int(n) - area_counts = self.df['Area Name'].value_counts().head(n).reset_index() area_counts.columns = ['Area', 'Count'] - + data, headers = self._df_to_records_and_headers(area_counts) fig, ax = plt.subplots(figsize=(12, 8)) sns.barplot(x='Count', y='Area', data=area_counts, ax=ax) ax.set_title(f'Top {n} Areas by Number of Arrests') ax.set_xlabel('Number of Arrests') ax.set_ylabel('Area') ax.grid(True, axis='x') - return { - 'status': 'ok', - 'data': area_counts, + 'status': 'OK', + 'data': data, + 'headers': headers, 'figure': fig, 'title': f'Top {n} Areas by Number of Arrests' } @@ -649,8 +802,7 @@ def process_query1(self, params): result_df['Arrest Date'] = result_df['Arrest Date'].dt.strftime('%Y-%m-%d') # Format date for display # Prepare results - data = result_df.to_dict(orient='records') - headers = output_cols + data, headers = self._df_to_records_and_headers(result_df) return {'status': 'OK', 'data': data, 'headers': headers, 'title': f'Arrests in {area}'} except KeyError as ke: @@ -704,8 +856,7 @@ def process_query2(self, params): trend_data['Date'] = trend_data['Date'].dt.strftime('%Y') # Prepare results - data = trend_data.to_dict(orient='records') - headers = ['Date', 'Arrest Count'] + data, headers = self._df_to_records_and_headers(trend_data) title = f'Trend for {charge_group} ({granularity.capitalize()})' return {'status': 'OK', 'data': data, 'headers': headers, 'title': title} @@ -717,114 +868,89 @@ def process_query2(self, params): logger.error(f"Error processing Query 2: {e}", exc_info=True) return {'status': 'error', 'message': f"Error processing query: {e}"} - def process_query3(self, params): - """ - Query 3: Demografische Analyse van Arrestaties (Graph) - params: {'sex_codes': list[str], 'descent_codes': list[str], 'charge_group': str | None, 'arrest_type_code': str | None, 'generate_plot': bool} - """ logger.info(f"Processing Query 3 with params: {params}") if self.df.empty: return {'status': 'error', 'message': 'Dataset not loaded'} - try: sex_codes = params['sex_codes'] descent_codes = params['descent_codes'] - charge_group = params.get('charge_group') # Optional - arrest_type_code = params.get('arrest_type_code') # <-- ADDED: Get arrest_type_code + charge_group = params.get('charge_group') + arrest_type_code = params.get('arrest_type_code') # Filter data filtered_df = self.df[ self.df['Sex Code'].isin(sex_codes) & self.df['Descent Code'].isin(descent_codes) ] - if charge_group: filtered_df = filtered_df[filtered_df['Charge Group Description'] == charge_group] - - # --- ADDED: Filter by arrest_type_code if provided --- if arrest_type_code and 'Arrest Type Code' in filtered_df.columns: filtered_df = filtered_df[filtered_df['Arrest Type Code'] == arrest_type_code] - # ----------------------------------------------------- + + filtered_df = filtered_df.copy() + filtered_df['Descent Code'] = filtered_df['Descent Code'].astype(str) + filtered_df['Sex Code'] = filtered_df['Sex Code'].astype(str) if filtered_df.empty: - descent_names = [DESCENT_CODE_MAP.get(dc, dc) for dc in descent_codes] - sex_names = ["Male" if sc == 'M' else "Female" if sc == 'F' else sc for sc in sex_codes] - title = f'Arrests by Descent ({", ".join(descent_names)}) and Sex ({", ".join(sex_names)}) - No Data' - if charge_group: - title += f' for {charge_group}' - # --- ADDED: Append arrest type to "No Data" title --- - if arrest_type_code: - arrest_type_desc = ARREST_TYPE_CODE_MAP.get(arrest_type_code, arrest_type_code) - title += f' (Type: {arrest_type_desc})' - # ------------------------------------------------- - return {'status': 'OK', 'data': [], 'headers': [], 'plot': None, 'title': title} - - # --- NEW Plotting Logic --- - - # 1. Calculate counts grouped by Descent and Sex + descent_names = [DESCENT_CODE_MAP.get(dc, dc) for dc in descent_codes] + sex_names = ["Male" if sc == 'M' else "Female" if sc == 'F' else sc for sc in sex_codes] + title = f'Arrests by Descent ({", ".join(descent_names)}) and Sex ({", ".join(sex_names)}) - No Data' + if charge_group: + title += f' for {charge_group}' + if arrest_type_code: + arrest_type_desc = ARREST_TYPE_CODE_MAP.get(arrest_type_code, arrest_type_code) + title += f' (Type: {arrest_type_desc})' + return {'status': 'OK', 'data': [], 'headers': [], 'plot': None, 'title': title} + summary_data = filtered_df.groupby(['Descent Code', 'Sex Code']).size().reset_index(name='Count') - - # 2. Map Descent Code to Description - summary_data['Descent'] = summary_data['Descent Code'].map(DESCENT_CODE_MAP) - # Handle any codes not in the map (though get_unique_descent_codes should have description) - summary_data['Descent'] = summary_data['Descent'].fillna(summary_data['Descent Code'].apply(lambda x: f"Unknown ({x})")) - - # 3. Create the plot using object-oriented approach + for col in summary_data.columns: + summary_data[col] = summary_data[col].astype(str) + summary_data['Descent'] = summary_data['Descent Code'].map(DESCENT_CODE_MAP).fillna( + summary_data['Descent Code'].apply(lambda x: f"Unknown ({x})")) + summary_data['Descent'] = summary_data['Descent'].astype(str) + + # Ensure all combinations are present + all_descents = sorted(set(summary_data['Descent'])) + all_sexes = sorted(set(summary_data['Sex Code'])) + import itertools + full_index = pd.DataFrame(list(itertools.product(all_descents, all_sexes)), columns=['Descent', 'Sex Code']) + summary_data = pd.merge(full_index, summary_data, on=['Descent', 'Sex Code'], how='left').fillna({'Count': 0}) + summary_data['Count'] = summary_data['Count'].astype(int) + fig, ax = plt.subplots(figsize=(12, 7)) - plot_title = 'Arrests by Descent' if len(sex_codes) == 1: - sex_name = "Male" if sex_codes[0] == 'M' else "Female" if sex_codes[0] == 'F' else sex_codes[0] - plot_title += f' ({sex_name})' - # Plot single bars using the created axes object `ax` - # Assign x to hue and hide legend to satisfy future seaborn requirements - sns.barplot(data=summary_data, x='Descent', y='Count', hue='Descent', palette='viridis', ax=ax, legend=False) - else: # Both sexes selected - plot_title += ' (Male vs Female)' - # Plot grouped bars using hue and the created axes object `ax` - sns.barplot(data=summary_data, x='Descent', y='Count', hue='Sex Code', palette='coolwarm', ax=ax) - ax.legend(title='Sex Code') - - # Add charge group to title if specified + sex_name = "Male" if sex_codes[0] == 'M' else "Female" if sex_codes[0] == 'F' else sex_codes[0] + plot_title += f' ({sex_name})' + sns.barplot(data=summary_data, x='Descent', y='Count', hue='Descent', palette='viridis', ax=ax, legend=False) + else: + plot_title += ' (Male vs Female)' + sns.barplot(data=summary_data, x='Descent', y='Count', hue='Sex Code', palette='coolwarm', ax=ax) + ax.legend(title='Sex Code') if charge_group: - plot_title += f'\nCharge Group: {charge_group}' - - # --- ADDED: Append arrest type to plot_title --- + plot_title += f'\nCharge Group: {charge_group}' if arrest_type_code: arrest_type_desc = ARREST_TYPE_CODE_MAP.get(arrest_type_code, arrest_type_code) plot_title += f'\nArrest Type: {arrest_type_desc}' - # ---------------------------------------------- - ax.set_title(plot_title) ax.set_xlabel('Descent') ax.set_ylabel('Number of Arrests') - # Use ax.tick_params for label rotation - ax.tick_params(axis='x', rotation=45, labelsize='medium') - # Set horizontal alignment manually if needed after rotation + ax.tick_params(axis='x', rotation=45, labelsize='medium') plt.setp(ax.get_xticklabels(), ha="right", rotation_mode="anchor") ax.grid(True, axis='y', linestyle='--', alpha=0.7) - fig.tight_layout() # Call tight_layout on the figure object - - # --- End Plotting Logic --- - - # # Get the figure object << No longer needed, we have `fig` - # fig = plt.gcf() - - # Prepare return data (return figure object, not bytes) - data_for_table = summary_data.to_dict(orient='records') - headers_for_table = ['Descent', 'Sex Code', 'Count'] + fig.tight_layout() + data_for_table, headers_for_table = self._df_to_records_and_headers(summary_data) return { 'status': 'OK', 'data': data_for_table, 'headers': headers_for_table, - 'plot': fig, # Return the figure object + 'plot': fig, 'title': plot_title - } - + } except KeyError as ke: - logger.error(f"Query 3 failed - Missing column: {ke}") - return {'status': 'error', 'message': f"Query failed: Server data missing expected column '{ke}'."} + logger.error(f"Query 3 failed - Missing column: {ke}") + return {'status': 'error', 'message': f"Query failed: Server data missing expected column '{ke}'."} except Exception as e: logger.error(f"Error processing Query 3: {e}", exc_info=True) return {'status': 'error', 'message': f"Error processing query: {e}"} @@ -842,6 +968,8 @@ def _haversine(self, lat1, lon1, lat2, lon2): r = 6371 # Radius of Earth in kilometers return c * r + @timeout_handler(120) # 2 minute timeout for map generation + @memory_monitor def process_query4(self, params): """ Query 4: Geografische Hotspots van Arrestaties @@ -861,7 +989,18 @@ def process_query4(self, params): end_date = pd.to_datetime(params['end_date']).replace(hour=23, minute=59, second=59) arrest_type_code = params.get('arrest_type_code') - df_working = self.df.copy() # Work with a copy + # Use filtered DataFrame to reduce memory usage + filters = { + 'Arrest Date': {'min': start_date, 'max': end_date} + } + if arrest_type_code and 'Arrest Type Code' in self.df.columns: + filters['Arrest Type Code'] = {'values': [arrest_type_code]} + + df_working = self.get_filtered_dataframe(filters) + + if df_working.empty: + final_title = f'Arrests within {radius_km}km of ({center_lat:.4f}, {center_lon:.4f}) (No Results)' + return {'status': 'OK', 'data': [], 'headers': [], 'map_filepath': None, 'title': final_title} has_geojson_col = 'Location_GeoJSON_Parsed' in df_working.columns @@ -883,25 +1022,25 @@ def process_query4(self, params): df_working['extracted_LAT'] = pd.to_numeric(df_working['extracted_LAT'], errors='coerce') df_working['extracted_LON'] = pd.to_numeric(df_working['extracted_LON'], errors='coerce') - # --- End Coordinate Extraction --- - + + # Remove rows with invalid coordinates + df_working = df_working.dropna(subset=['extracted_LAT', 'extracted_LON']) + + if df_working.empty: + final_title = f'Arrests within {radius_km}km of ({center_lat:.4f}, {center_lon:.4f}) (No Results)' + return {'status': 'OK', 'data': [], 'headers': [], 'map_filepath': None, 'title': final_title} - # (Keep the efficient bounding box + precise radius filter) + # --- Efficient bounding box + precise radius filter --- lat_degrees_delta = radius_km / 111.0 lon_degrees_delta = radius_km / (111.0 * np.cos(np.radians(center_lat))) min_lat, max_lat = center_lat - lat_degrees_delta, center_lat + lat_degrees_delta min_lon, max_lon = center_lon - lon_degrees_delta, center_lon + lon_degrees_delta df_filtered = df_working[ - (df_working['Arrest Date'] >= start_date) & - (df_working['Arrest Date'] <= end_date) & df_working['extracted_LAT'].notna() & df_working['extracted_LON'].notna() & (df_working['extracted_LAT'] >= min_lat) & (df_working['extracted_LAT'] <= max_lat) & (df_working['extracted_LON'] >= min_lon) & (df_working['extracted_LON'] <= max_lon) ] - - if arrest_type_code and 'Arrest Type Code' in df_filtered.columns: - df_filtered = df_filtered[df_filtered['Arrest Type Code'] == arrest_type_code] map_filepath_abs = None # Initialize if not df_filtered.empty: @@ -949,6 +1088,9 @@ def process_query4(self, params): map_filepath_abs = os.path.abspath(os.path.join(system_temp_dir, map_basename)) m.save(map_filepath_abs) + # Track the temporary file for cleanup + self.add_temp_file(map_filepath_abs) + logger.info(f"Query 4: Folium map with {len(df_to_plot)} markers saved to {map_filepath_abs}.") except Exception as map_err: @@ -975,8 +1117,7 @@ def process_query4(self, params): result_df = df_filtered[output_cols].sort_values(by='distance_km') if 'Arrest Date' in result_df.columns: result_df['Arrest Date'] = result_df['Arrest Date'].dt.strftime('%Y-%m-%d') if 'distance_km' in result_df.columns: result_df['distance_km'] = result_df['distance_km'].round(2) - data = result_df.to_dict(orient='records') - headers = output_cols + data, headers = self._df_to_records_and_headers(result_df) logger.info(f"PROCESSOR_QUERY4: Returning map_filepath: {map_filepath_abs}") @@ -987,4 +1128,30 @@ def process_query4(self, params): return {'status': 'error', 'message': f"Query failed: Missing expected parameter '{ke}' or data column."} except Exception as e: logger.error(f"Error processing Query 4: {e}", exc_info=True) - return {'status': 'error', 'message': f"Error processing query: {e}"} \ No newline at end of file + return {'status': 'error', 'message': f"Error processing query: {e}"} + + def cleanup_temp_files(self): + """Clean up temporary files created by this processor""" + with self._temp_files_lock: + for temp_file in self._temp_files.copy(): + try: + if os.path.exists(temp_file): + os.remove(temp_file) + logger.info(f"Cleaned up temporary file: {temp_file}") + except Exception as e: + logger.warning(f"Failed to clean up temporary file {temp_file}: {e}") + self._temp_files.clear() + + def add_temp_file(self, filepath): + """Add a temporary file to the cleanup list""" + with self._temp_files_lock: + self._temp_files.add(filepath) + + def __del__(self): + """Cleanup when the processor is destroyed""" + try: + self.cleanup_temp_files() + if hasattr(self, '_query_executor'): + self._query_executor.shutdown(wait=False) + except Exception as e: + logger.error(f"Error during DataProcessor cleanup: {e}") \ No newline at end of file diff --git a/app/server/database.py b/app/server/database_postgres.py similarity index 51% rename from app/server/database.py rename to app/server/database_postgres.py index 6f02c84..89e6602 100644 --- a/app/server/database.py +++ b/app/server/database_postgres.py @@ -1,8 +1,7 @@ #!/usr/bin/env python3 -# Database module for the server +# PostgreSQL Database module for the server import os -import sqlite3 import datetime import pandas as pd import threading @@ -10,27 +9,33 @@ import logging import contextlib import json +import psycopg2 +from psycopg2 import pool +from psycopg2.extras import RealDictCursor +from sqlalchemy import create_engine, text +from sqlalchemy.pool import QueuePool logger = logging.getLogger(__name__) -class Database: - """Thread-safe database using a connection pool for storing client info and query history""" + +class DatabasePostgres: + """Thread-safe PostgreSQL database using connection pooling for storing client info and query history""" - def __init__(self, db_path='app/server/server_data.db', pool_size=10): - """Initialize database pool and create tables if they don't exist""" - self.db_path = db_path + def __init__(self, host='localhost', port=5432, database='arrest_data', + user='arrest_user', password='arrest_password', pool_size=10): + """Initialize PostgreSQL database pool and create tables if they don't exist""" + self.host = host + self.port = port + self.database = database + self.user = user + self.password = password self.pool_size = pool_size - self._pool = queue.Queue(maxsize=pool_size) - self._closed = False # Flag to indicate pool shutdown - + self._closed = False + # Lock for initializing/closing the pool safely self._init_lock = threading.Lock() - - os.makedirs(os.path.dirname(db_path), exist_ok=True) + # Initialize connection pool self._initialize_pool() - - # Perform database migration if needed (using a pooled connection) - self.migrate_database() # Ensure tables exist (using a pooled connection) try: @@ -45,186 +50,165 @@ def __init__(self, db_path='app/server/server_data.db', pool_size=10): self.return_connection(conn) def _initialize_pool(self): - """Populate the connection pool.""" + """Initialize PostgreSQL connection pool.""" with self._init_lock: if self._closed: - raise RuntimeError("Database pool is closed.") - logger.info(f"Initializing database connection pool (size {self.pool_size})...") - for _ in range(self.pool_size): - try: - # Connections in pool must allow sharing across threads - conn = sqlite3.connect(self.db_path, check_same_thread=False) - conn.execute("PRAGMA foreign_keys = ON") - conn.row_factory = sqlite3.Row - self._pool.put(conn) - except sqlite3.Error as e: - logger.error(f"Failed to create connection for pool: {e}", exc_info=True) - # Handle error appropriately - maybe raise, maybe try fewer connections? - raise RuntimeError(f"Failed to initialize database pool: {e}") from e - logger.info(f"Database connection pool initialized with {self._pool.qsize()} connections.") - - def migrate_database(self): - """Perform necessary database migrations""" - # Needs careful handling as pool might not be fully ready - conn = None - try: - # Temporarily get a raw connection for migration check before pool might be used - conn = sqlite3.connect(self.db_path) - cursor = conn.cursor() + raise RuntimeError("Database pool is closed.") - # Check if sessions table exists and has the old schema - cursor.execute("PRAGMA table_info(sessions)") - columns = cursor.fetchall() - - column_names = [col[1] for col in columns] if columns else [] - - # If sessions table exists but doesn't have 'address' column - if columns and 'address' not in column_names: - # Drop the old sessions table as we're changing the structure - # (another option would be to ALTER TABLE, but this is simpler for a new system) - cursor.execute("DROP TABLE IF EXISTS sessions") - conn.commit() - print("Migrated database: dropped old sessions table") + logger.info(f"Initializing PostgreSQL connection pool (size {self.pool_size})...") - conn.close() - except Exception as e: - logger.error(f"Error during database migration check: {e}") - if conn: - try: conn.close() # Ensure temporary connection is closed on error - except: pass - # Note: Actual table creation uses the pool via create_tables called from __init__ + try: + # Create connection pool + self._pool = psycopg2.pool.ThreadedConnectionPool( + minconn=1, + maxconn=self.pool_size, + host=self.host, + port=self.port, + database=self.database, + user=self.user, + password=self.password, + cursor_factory=RealDictCursor + ) + + logger.info(f"PostgreSQL connection pool initialized with {self.pool_size} connections.") + + except psycopg2.Error as e: + logger.error(f"Failed to create PostgreSQL connection pool: {e}", exc_info=True) + raise RuntimeError(f"Failed to initialize PostgreSQL connection pool: {e}") from e def get_connection(self): - """Get a connection from the pool. - Blocks until a connection is available, with a timeout. - """ + """Get a connection from the pool.""" if self._closed: raise RuntimeError("Database pool is closed.") + try: - # Wait up to 10 seconds for a connection - conn = self._pool.get(block=True, timeout=10) - logger.debug(f"Acquired DB connection {id(conn)} from pool (pool size: {self._pool.qsize()})") + conn = self._pool.getconn() + logger.debug(f"Acquired PostgreSQL connection {id(conn)} from pool") return conn - except queue.Empty: - logger.error("Timeout waiting for database connection from pool.") - # Depending on policy, could raise error or return None - raise TimeoutError("Timeout waiting for database connection from pool.") + except Exception as e: + logger.error(f"Error getting connection from pool: {e}", exc_info=True) + raise TimeoutError("Failed to get database connection from pool.") def return_connection(self, conn): """Return a connection to the pool.""" if conn is None: return + if self._closed: - # If pool is closed, just close the connection we got try: conn.close() except Exception as e: - logger.warning(f"Error closing connection {id(conn)} after pool shutdown: {e}", exc_info=True) + logger.warning(f"Error closing connection {id(conn)} after pool shutdown: {e}", exc_info=True) return + try: - # Use block=False with check to avoid waiting if pool is somehow full - # (shouldn't happen with correct usage but safer) - if not self._pool.full(): - self._pool.put(conn, block=False) - logger.debug(f"Returned DB connection {id(conn)} to pool (pool size: {self._pool.qsize()})") - else: - logger.warning(f"Attempted to return connection {id(conn)} to a full pool. Closing instead.") - try: - conn.close() - except Exception as e: - logger.error(f"Error closing connection {id(conn)} that couldn't be returned to full pool: {e}", exc_info=True) + self._pool.putconn(conn) + logger.debug(f"Returned PostgreSQL connection {id(conn)} to pool") except Exception as e: - logger.error(f"Error returning connection {id(conn)} to pool: {e}. Closing connection.", exc_info=True) - # Ensure connection is closed if putting back failed - try: - conn.close() - except Exception as close_e: - logger.error(f"Error closing connection {id(conn)} after failing to return to pool: {close_e}", exc_info=True) + logger.error(f"Error returning connection {id(conn)} to pool: {e}. Closing connection.", exc_info=True) + try: + conn.close() + except Exception as close_e: + logger.error(f"Error closing connection {id(conn)} after failing to return to pool: {close_e}", exc_info=True) def close_all_connections(self): """Close all connections in the pool and shut down the pool.""" with self._init_lock: if self._closed: - return # Already closed - logger.info(f"Closing all database connections in the pool ({self._pool.qsize()} connections estimated)...") + return # Already closed + + logger.info("Closing all PostgreSQL database connections in the pool...") self._closed = True - closed_count = 0 - while not self._pool.empty(): - try: - conn = self._pool.get_nowait() - conn.close() - closed_count += 1 - except queue.Empty: - break # Pool is empty - except Exception as e: - logger.error(f"Error closing connection during pool shutdown: {e}", exc_info=True) - logger.info(f"Database connection pool shutdown complete. Closed {closed_count} connections.") + + try: + self._pool.closeall() + logger.info("PostgreSQL connection pool shutdown complete.") + except Exception as e: + logger.error(f"Error during PostgreSQL pool shutdown: {e}", exc_info=True) @contextlib.contextmanager def get_connection_context(self): - """Context manager for getting and returning a connection.""" - conn = None - try: - conn = self.get_connection() - yield conn - finally: - if conn: - self.return_connection(conn) + """Context manager for getting and returning a connection.""" + conn = None + try: + conn = self.get_connection() + yield conn + finally: + if conn: + self.return_connection(conn) def create_tables(self, conn): """Create tables if they don't exist using a provided connection.""" - # This method now expects a connection to be passed in - # It's called from __init__ which handles getting/returning the connection cursor = conn.cursor() + # Set search path to arrest_data schema + cursor.execute("SET search_path TO arrest_data, public;") + + # Create clients table cursor.execute(''' CREATE TABLE IF NOT EXISTS clients ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT NOT NULL, - nickname TEXT NOT NULL UNIQUE, - email TEXT NOT NULL UNIQUE, - password TEXT NOT NULL, - registration_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP + id SERIAL PRIMARY KEY, + name VARCHAR(255) NOT NULL, + nickname VARCHAR(100) NOT NULL UNIQUE, + email VARCHAR(255) NOT NULL UNIQUE, + password VARCHAR(255) NOT NULL, + registration_date TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP ) ''') + # Create sessions table cursor.execute(''' CREATE TABLE IF NOT EXISTS sessions ( - id INTEGER PRIMARY KEY AUTOINCREMENT, + id SERIAL PRIMARY KEY, client_id INTEGER NOT NULL, - address TEXT NOT NULL, - start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - end_time TIMESTAMP, - FOREIGN KEY (client_id) REFERENCES clients (id) + address VARCHAR(45) NOT NULL, + start_time TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + end_time TIMESTAMP WITH TIME ZONE, + FOREIGN KEY (client_id) REFERENCES clients (id) ON DELETE CASCADE ) ''') + # Create queries table cursor.execute(''' CREATE TABLE IF NOT EXISTS queries ( - id INTEGER PRIMARY KEY AUTOINCREMENT, + id SERIAL PRIMARY KEY, client_id INTEGER NOT NULL, session_id INTEGER NOT NULL, - query_type TEXT NOT NULL, + query_type VARCHAR(100) NOT NULL, parameters TEXT, - timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (client_id) REFERENCES clients (id), - FOREIGN KEY (session_id) REFERENCES sessions (id) + timestamp TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (client_id) REFERENCES clients (id) ON DELETE CASCADE, + FOREIGN KEY (session_id) REFERENCES sessions (id) ON DELETE CASCADE ) ''') + # Create messages table cursor.execute(''' CREATE TABLE IF NOT EXISTS messages ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - sender_type TEXT NOT NULL, + id SERIAL PRIMARY KEY, + sender_type VARCHAR(50) NOT NULL, sender_id INTEGER NOT NULL, - recipient_type TEXT NOT NULL, + recipient_type VARCHAR(50) NOT NULL, recipient_id INTEGER NOT NULL, message TEXT NOT NULL, - timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - read INTEGER DEFAULT 0 + timestamp TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + read BOOLEAN DEFAULT FALSE ) ''') + # Create indexes for better performance + cursor.execute("CREATE INDEX IF NOT EXISTS idx_clients_email ON clients (email)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_clients_nickname ON clients (nickname)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_sessions_client_id ON sessions (client_id)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_sessions_start_time ON sessions (start_time)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_queries_client_id ON queries (client_id)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_queries_session_id ON queries (session_id)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_queries_timestamp ON queries (timestamp)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_queries_query_type ON queries (query_type)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_messages_recipient ON messages (recipient_type, recipient_id)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_messages_read ON messages (read)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_messages_timestamp ON messages (timestamp)") + conn.commit() def register_client(self, name, nickname, email, password): @@ -232,62 +216,62 @@ def register_client(self, name, nickname, email, password): with self.get_connection_context() as conn: cursor = conn.cursor() try: - cursor.execute("SELECT * FROM clients WHERE nickname = ? OR email = ?", (nickname, email)) + cursor.execute("SELECT * FROM clients WHERE nickname = %s OR email = %s", (nickname, email)) if cursor.fetchone(): return False + cursor.execute( - "INSERT INTO clients (name, nickname, email, password) VALUES (?, ?, ?, ?)", + "INSERT INTO clients (name, nickname, email, password) VALUES (%s, %s, %s, %s)", (name, nickname, email, password) ) conn.commit() return True - except sqlite3.Error as e: - logger.error(f"DB Error registering client {nickname}: {e}", exc_info=True) + except psycopg2.Error as e: + logger.error(f"PostgreSQL Error registering client {nickname}: {e}", exc_info=True) conn.rollback() - raise # Re-raise the database error + raise def check_login(self, email, password): """Check login credentials and return client info if valid""" with self.get_connection_context() as conn: cursor = conn.cursor() - cursor.execute("SELECT * FROM clients WHERE email = ? AND password = ?", (email, password)) + cursor.execute("SELECT * FROM clients WHERE email = %s AND password = %s", (email, password)) row = cursor.fetchone() if row: - # Convert row to dict before connection is returned return dict(row) else: return None def start_session(self, client_id, address): """Start a new session for a client and return its ID and start time.""" - start_time = datetime.datetime.now(datetime.timezone.utc).isoformat() # Record start time + start_time = datetime.datetime.now(datetime.timezone.utc).isoformat() session_id = None + with self.get_connection_context() as conn: cursor = conn.cursor() try: cursor.execute( - "INSERT INTO sessions (client_id, address, start_time) VALUES (?, ?, ?)", + "INSERT INTO sessions (client_id, address, start_time) VALUES (%s, %s, %s) RETURNING id", (client_id, address, start_time) ) - session_id = cursor.lastrowid + session_id = cursor.fetchone()['id'] conn.commit() - except sqlite3.Error as e: - logger.error(f"DB Error starting session for client {client_id}: {e}", exc_info=True) - conn.rollback() - raise # Re-raise + except psycopg2.Error as e: + logger.error(f"PostgreSQL Error starting session for client {client_id}: {e}", exc_info=True) + conn.rollback() + raise if session_id: - return {'id': session_id, 'start_time': start_time} + return {'id': session_id, 'start_time': start_time} else: - # Handle case where insert somehow failed to return an ID - raise Exception(f"Failed to obtain session ID for client {client_id} after insert.") + raise Exception(f"Failed to obtain session ID for client {client_id} after insert.") def end_session(self, session_id): """End a session""" with self.get_connection_context() as conn: cursor = conn.cursor() cursor.execute( - "UPDATE sessions SET end_time = CURRENT_TIMESTAMP WHERE id = ?", + "UPDATE sessions SET end_time = CURRENT_TIMESTAMP WHERE id = %s", (session_id,) ) conn.commit() @@ -298,45 +282,45 @@ def log_query(self, client_id, session_id, query_type, parameters=None): with self.get_connection_context() as conn: cursor = conn.cursor() cursor.execute( - "INSERT INTO queries (client_id, session_id, query_type, parameters) VALUES (?, ?, ?, ?)", + "INSERT INTO queries (client_id, session_id, query_type, parameters) VALUES (%s, %s, %s, %s) RETURNING id", (client_id, session_id, query_type, parameters_str) ) + query_id = cursor.fetchone()['id'] conn.commit() - return cursor.lastrowid + return query_id def add_message(self, sender_type, sender_id, recipient_type, recipient_id, message): """Add a message to the database""" with self.get_connection_context() as conn: cursor = conn.cursor() cursor.execute( - "INSERT INTO messages (sender_type, sender_id, recipient_type, recipient_id, message) VALUES (?, ?, ?, ?, ?)", + "INSERT INTO messages (sender_type, sender_id, recipient_type, recipient_id, message) VALUES (%s, %s, %s, %s, %s) RETURNING id", (sender_type, sender_id, recipient_type, recipient_id, message) ) + message_id = cursor.fetchone()['id'] conn.commit() - return cursor.lastrowid + return message_id def get_client_by_id(self, client_id): """Get client information by ID""" with self.get_connection_context() as conn: cursor = conn.cursor() - cursor.execute("SELECT * FROM clients WHERE id = ?", (client_id,)) + cursor.execute("SELECT * FROM clients WHERE id = %s", (client_id,)) row = cursor.fetchone() return dict(row) if row else None def get_all_clients(self): """Get all registered clients including their last login time.""" try: - # Use the context manager to ensure connection is returned with self.get_connection_context() as conn: cursor = conn.cursor() - # Query to get all clients with their registration date, last login, and total queries query = """ SELECT c.id, c.nickname, - c.registration_date as registration_date, -- Renamed for clarity - (SELECT MAX(s.start_time) FROM sessions s WHERE s.client_id = c.id) as last_seen, -- Get latest login time + c.registration_date, + (SELECT MAX(s.start_time) FROM sessions s WHERE s.client_id = c.id) as last_seen, (SELECT COUNT(*) FROM queries q WHERE q.client_id = c.id) as total_queries FROM clients c ORDER BY c.registration_date DESC; @@ -344,15 +328,11 @@ def get_all_clients(self): cursor.execute(query) clients = cursor.fetchall() - - # Convert rows to dicts before connection context closes return [dict(row) for row in clients] - except sqlite3.Error as sql_err: - # Catch specific database errors - logger.error(f"Database error fetching all clients: {sql_err}", exc_info=True) - return [] + except psycopg2.Error as sql_err: + logger.error(f"PostgreSQL error fetching all clients: {sql_err}", exc_info=True) + return [] except Exception as e: - # Catch any other unexpected errors (like TimeoutError from get_connection_context) logger.error(f"Error fetching all clients: {e}", exc_info=True) return [] @@ -361,7 +341,7 @@ def get_client_queries(self, client_id): with self.get_connection_context() as conn: cursor = conn.cursor() cursor.execute( - "SELECT * FROM queries WHERE client_id = ? ORDER BY timestamp DESC", + "SELECT * FROM queries WHERE client_id = %s ORDER BY timestamp DESC", (client_id,) ) return [dict(row) for row in cursor.fetchall()] @@ -379,7 +359,7 @@ def get_client_by_nickname(self, nickname): """Get client by nickname""" with self.get_connection_context() as conn: cursor = conn.cursor() - cursor.execute("SELECT * FROM clients WHERE nickname = ?", (nickname,)) + cursor.execute("SELECT * FROM clients WHERE nickname = %s", (nickname,)) row = cursor.fetchone() return dict(row) if row else None @@ -402,9 +382,9 @@ def get_messages_for_client(self, client_id): cursor.execute(""" SELECT id, sender_type, sender_id, message, timestamp FROM messages - WHERE ((recipient_type = 'client' AND recipient_id = ?) + WHERE ((recipient_type = 'client' AND recipient_id = %s) OR recipient_type = 'all') - AND read = 0 + AND read = FALSE ORDER BY timestamp DESC """, (client_id,)) return [dict(row) for row in cursor.fetchall()] @@ -413,7 +393,7 @@ def mark_message_as_read(self, message_id): """Mark a message as read""" with self.get_connection_context() as conn: cursor = conn.cursor() - cursor.execute("UPDATE messages SET read = 1 WHERE id = ?", (message_id,)) + cursor.execute("UPDATE messages SET read = TRUE WHERE id = %s", (message_id,)) conn.commit() def get_client_details_by_id(self, client_id): @@ -424,62 +404,55 @@ def get_client_details_by_id(self, client_id): cursor = conn.cursor() # 1. Get basic client info - cursor.execute("SELECT id, name, nickname, email, registration_date FROM clients WHERE id = ?", (client_id,)) + cursor.execute("SELECT id, name, nickname, email, registration_date FROM clients WHERE id = %s", (client_id,)) client_row = cursor.fetchone() if not client_row: logger.warning(f"No client found with ID {client_id} for details.") - return None # Return None if client doesn't exist - details = dict(zip([col[0] for col in cursor.description], client_row)) + return None + details = dict(client_row) - cursor.execute("SELECT MAX(start_time) FROM sessions WHERE client_id = ?", (client_id,)) + cursor.execute("SELECT MAX(start_time) FROM sessions WHERE client_id = %s", (client_id,)) last_login_result = cursor.fetchone() - details['last_login'] = last_login_result[0] if last_login_result else None + details['last_login'] = last_login_result['max'] if last_login_result and last_login_result['max'] else None # 2. Get query statistics cursor.execute(""" SELECT query_type, COUNT(*) as count FROM queries - WHERE client_id = ? + WHERE client_id = %s GROUP BY query_type ORDER BY count DESC """, (client_id,)) query_stats = cursor.fetchall() - details['query_stats'] = [ - dict(zip([col[0] for col in cursor.description], row)) - for row in query_stats - ] + details['query_stats'] = [dict(row) for row in query_stats] # 3. Get recent session history (e.g., last 10 sessions) cursor.execute(""" SELECT id, start_time, end_time, address, - (strftime('%s', end_time) - strftime('%s', start_time)) as duration_seconds + EXTRACT(EPOCH FROM (end_time - start_time)) as duration_seconds FROM sessions - WHERE client_id = ? + WHERE client_id = %s ORDER BY start_time DESC LIMIT 10 """, (client_id,)) session_history = cursor.fetchall() - details['session_history'] = [ - dict(zip([col[0] for col in cursor.description], row)) - for row in session_history - ] + details['session_history'] = [dict(row) for row in session_history] logger.info(f"Successfully fetched details for client ID {client_id}") return details - except sqlite3.Error as sql_err: - logger.error(f"Database error fetching details for client ID {client_id}: {sql_err}", exc_info=True) - # Return partial details if basic info was fetched? - return details # Return whatever was fetched, might be just basic info or empty + except psycopg2.Error as sql_err: + logger.error(f"PostgreSQL error fetching details for client ID {client_id}: {sql_err}", exc_info=True) + return details except Exception as e: logger.error(f"Unexpected error fetching details for client ID {client_id}: {e}", exc_info=True) - return details # Return whatever was fetched + return details def get_daily_query_counts(self): """Get daily counts for each query type.""" query = """ SELECT - date(timestamp) as query_date, + DATE(timestamp) as query_date, query_type, COUNT(*) as count FROM queries @@ -491,9 +464,9 @@ def get_daily_query_counts(self): cursor = conn.cursor() cursor.execute(query) results = cursor.fetchall() - return [dict(zip([col[0] for col in cursor.description], row)) for row in results] - except sqlite3.Error as sql_err: - logger.error(f"Database error fetching daily query counts: {sql_err}", exc_info=True) + return [dict(row) for row in results] + except psycopg2.Error as sql_err: + logger.error(f"PostgreSQL error fetching daily query counts: {sql_err}", exc_info=True) return [] except Exception as e: logger.error(f"Unexpected error fetching daily query counts: {e}", exc_info=True) diff --git a/app/server/server.py b/app/server/server.py index 2ece006..b4ccfd2 100644 --- a/app/server/server.py +++ b/app/server/server.py @@ -19,7 +19,7 @@ from shared.protocol import Message # Import server modules -from .database import Database +from .database_postgres import DatabasePostgres from .data_processor import DataProcessor # --- Configure logging to file --- @@ -47,7 +47,7 @@ def __init__(self, host=SERVER_HOST, port=SERVER_PORT): self.running = False self.clients = [] self.clients_lock = threading.Lock() # Lock for clients list - self.db = Database() + self.db = DatabasePostgres() self.data_processor = DataProcessor() # Activity log for the server (for the GUI) @@ -58,6 +58,10 @@ def __init__(self, host=SERVER_HOST, port=SERVER_PORT): self.on_activity_log = None self.on_client_list_update = None self.on_all_clients_update = None # Callback for new registrations + + # Cleanup timer + self.cleanup_timer = None + self.last_cleanup_time = time.time() def start(self): """Start the server""" @@ -66,8 +70,8 @@ def start(self): if getattr(self.db, '_closed', False): try: logger.info("Reinitializing database pool after previous stop()") - # Re-create the Database instance (uses same db_path & pool_size) - self.db = Database(self.db.db_path, self.db.pool_size) + # Re-create the Database instance + self.db = DatabasePostgres() except Exception as e: logger.error(f"Failed to reinitialize database pool: {e}", exc_info=True) self.log_activity(f"Error reinitializing database: {e}") @@ -89,6 +93,11 @@ def start(self): accept_thread.daemon = True accept_thread.start() + # Start cleanup thread + cleanup_thread = threading.Thread(target=self.periodic_cleanup) + cleanup_thread.daemon = True + cleanup_thread.start() + return True except Exception as e: logger.error(f"Error starting server: {e}") @@ -380,6 +389,41 @@ def notify_query_processed(self, client_id): if self.on_all_clients_update: self.on_all_clients_update() # Reuse existing callback + def periodic_cleanup(self): + """Periodic cleanup of resources""" + while self.running: + try: + time.sleep(300) # Run every 5 minutes + if not self.running: + break + + current_time = time.time() + if current_time - self.last_cleanup_time >= 300: # 5 minutes + logger.info("Running periodic cleanup...") + + # Clean up old temporary files + if hasattr(self.data_processor, 'cleanup_temp_files'): + self.data_processor.cleanup_temp_files() + + # Force garbage collection + import gc + gc.collect() + + # Log memory usage + try: + import psutil + process = psutil.Process() + memory_mb = process.memory_info().rss / 1024 / 1024 + logger.info(f"Server memory usage: {memory_mb:.1f} MB") + except ImportError: + pass # psutil not available + + self.last_cleanup_time = current_time + + except Exception as e: + logger.error(f"Error in periodic cleanup: {e}") + time.sleep(60) # Wait a minute before retrying + if __name__ == "__main__": # Start the server directly if run as a script diff --git a/app/shared/protocol.py b/app/shared/protocol.py index 68b60aa..3fdde8a 100644 --- a/app/shared/protocol.py +++ b/app/shared/protocol.py @@ -8,6 +8,8 @@ import base64 import logging import os +import gzip +import io # Get the logger for this module logger = logging.getLogger(__name__) # Use module-level logger @@ -21,9 +23,41 @@ def __init__(self, msg_type, data=None): self.data = data if data is not None else {} +def compress_data(data): + """Compress data using gzip if it's large enough""" + try: + # Serialize data to bytes + serialized = pickle.dumps(data, protocol=4) + + # Only compress if data is larger than 1MB + if len(serialized) > 1024 * 1024: + compressed = gzip.compress(serialized) + # Only use compression if it actually reduces size + if len(compressed) < len(serialized): + return compressed, True + + return serialized, False + except Exception as e: + logger.error(f"Error compressing data: {e}") + return pickle.dumps(data, protocol=4), False + + +def decompress_data(data, is_compressed): + """Decompress data if it was compressed""" + try: + if is_compressed: + decompressed = gzip.decompress(data) + return pickle.loads(decompressed) + else: + return pickle.loads(data) + except Exception as e: + logger.error(f"Error decompressing data: {e}") + raise + + def send_message(sock, message): """ - Send a message object through a socket using Pickle + Send a message object through a socket using Pickle with optional compression Parameters: - sock: socket object @@ -33,13 +67,17 @@ def send_message(sock, message): - True if message sent successfully, False otherwise """ try: - # Convert message object to bytes using pickle - msg_bytes = pickle.dumps(message, protocol=4) # Use a high protocol + # Compress data if needed + msg_bytes, is_compressed = compress_data(message) # Send message length first (4 bytes, network byte order) msg_len = len(msg_bytes) sock.sendall(struct.pack('!I', msg_len)) + # Send compression flag (1 byte) + compression_flag = 1 if is_compressed else 0 + sock.sendall(struct.pack('!B', compression_flag)) + # Send the message itself sock.sendall(msg_bytes) @@ -57,7 +95,7 @@ def send_message(sock, message): def receive_message(sock): """ - Receive a message object from a socket using Pickle + Receive a message object from a socket using Pickle with optional compression Parameters: - sock: socket object @@ -94,7 +132,7 @@ def receive_message(sock): msg_len = struct.unpack('!I', msg_len_bytes)[0] # --- Sanity check on message size --- - MAX_MSG_SIZE = 20 * 1024 * 1024 # Increased limit to 20MB, adjust if needed + MAX_MSG_SIZE = 50 * 1024 * 1024 # Increased limit to 50MB for compressed data if msg_len > MAX_MSG_SIZE: logger.error(f"PROTOCOL.RECEIVE: Message size {msg_len} bytes exceeds limit {MAX_MSG_SIZE} (socket fileno {fileno}). Closing socket.") try: @@ -103,16 +141,24 @@ def receive_message(sock): logger.warning(f"Ignoring error while closing socket after size limit exceeded: {close_err}") raise ValueError(f"Received message size ({msg_len}) exceeds limit.") + # --- Receive compression flag (1 byte) --- + compression_flag_bytes = sock.recv(1) + if not compression_flag_bytes: + logger.warning(f"PROTOCOL.RECEIVE: Connection closed unexpectedly while receiving compression flag (socket fileno {fileno}).") + raise ConnectionError("Connection closed during compression flag reception") + + is_compressed = struct.unpack('!B', compression_flag_bytes)[0] == 1 + # --- Receive the message body --- data = b'' bytes_received = 0 # Use a longer timeout for the body, proportionate to max size? - body_timeout = max(30.0, MAX_MSG_SIZE / (1024*1024) * 2) # e.g., 2s per MB, min 30s + body_timeout = max(60.0, MAX_MSG_SIZE / (1024*1024) * 5) # e.g., 5s per MB, min 60s sock.settimeout(body_timeout) - logger.debug(f"PROTOCOL.RECEIVE: Expecting {msg_len} bytes for message body (timeout: {body_timeout}s)...") + logger.debug(f"PROTOCOL.RECEIVE: Expecting {msg_len} bytes for message body (compressed: {is_compressed}, timeout: {body_timeout}s)...") try: while bytes_received < msg_len: - chunk_size = min(msg_len - bytes_received, 8192) # Read in larger chunks + chunk_size = min(msg_len - bytes_received, 32768) # Read in larger chunks (32KB) packet = sock.recv(chunk_size) if not packet: logger.warning(f"PROTOCOL.RECEIVE: Connection closed unexpectedly while receiving message body (received {bytes_received}/{msg_len} bytes, socket fileno {fileno}).") @@ -124,15 +170,15 @@ def receive_message(sock): logger.debug(f"PROTOCOL.RECEIVE: Received {bytes_received} bytes for message body.") - # Deserialize bytes using pickle - message = pickle.loads(data) + # Decompress and deserialize data + message_data = decompress_data(data, is_compressed) - if not isinstance(message, Message): - logger.error(f"PROTOCOL.RECEIVE: Deserialized object is not a Message type ({type(message)}). Socket {fileno}") + if not isinstance(message_data, Message): + logger.error(f"PROTOCOL.RECEIVE: Deserialized object is not a Message type ({type(message_data)}). Socket {fileno}") raise TypeError("Received invalid object type from socket") - logger.debug(f"PROTOCOL.RECEIVE: Successfully received message type {message.msg_type} (socket fileno {fileno}).") - return message + logger.debug(f"PROTOCOL.RECEIVE: Successfully received message type {message_data.msg_type} (socket fileno {fileno}).") + return message_data except socket.timeout: logger.warning(f"PROTOCOL.RECEIVE: Socket timeout during receive (socket fileno {fileno}).") diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..2da9283 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,24 @@ +services: + postgres: + image: postgres:15-alpine + container_name: arrest_data_postgres + environment: + POSTGRES_DB: arrest_data + POSTGRES_USER: arrest_user + POSTGRES_PASSWORD: arrest_password + POSTGRES_INITDB_ARGS: "--encoding=UTF-8 --lc-collate=C --lc-ctype=C" + ports: + - "5432:5432" + volumes: + - postgres_data:/var/lib/postgresql/data + - ./init.sql:/docker-entrypoint-initdb.d/init.sql + restart: unless-stopped + healthcheck: + test: ["CMD-SHELL", "pg_isready -U arrest_user -d arrest_data"] + interval: 10s + timeout: 5s + retries: 5 + +volumes: + postgres_data: + driver: local \ No newline at end of file diff --git a/fix_postgres_sequences.py b/fix_postgres_sequences.py new file mode 100644 index 0000000..8bf18dc --- /dev/null +++ b/fix_postgres_sequences.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python3 +""" +Fix PostgreSQL sequences +This script updates the sequences to start from the highest existing ID + 1 +""" + +import psycopg2 +import logging + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# Database configuration +POSTGRES_CONFIG = { + 'host': 'localhost', + 'port': 5432, + 'database': 'arrest_data', + 'user': 'arrest_user', + 'password': 'arrest_password' +} + +def fix_sequences(): + """Fix all sequences in the arrest_data schema""" + try: + conn = psycopg2.connect(**POSTGRES_CONFIG) + cursor = conn.cursor() + + # Set search path + cursor.execute("SET search_path TO arrest_data, public;") + + # Fix clients sequence + cursor.execute("SELECT MAX(id) FROM clients;") + max_client_id = cursor.fetchone()[0] + if max_client_id: + cursor.execute(f"SELECT setval('clients_id_seq', {max_client_id});") + logger.info(f"βœ… Fixed clients sequence to start from {max_client_id + 1}") + + # Fix sessions sequence + cursor.execute("SELECT MAX(id) FROM sessions;") + max_session_id = cursor.fetchone()[0] + if max_session_id: + cursor.execute(f"SELECT setval('sessions_id_seq', {max_session_id});") + logger.info(f"βœ… Fixed sessions sequence to start from {max_session_id + 1}") + + # Fix queries sequence + cursor.execute("SELECT MAX(id) FROM queries;") + max_query_id = cursor.fetchone()[0] + if max_query_id: + cursor.execute(f"SELECT setval('queries_id_seq', {max_query_id});") + logger.info(f"βœ… Fixed queries sequence to start from {max_query_id + 1}") + + # Fix messages sequence + cursor.execute("SELECT MAX(id) FROM messages;") + max_message_id = cursor.fetchone()[0] + if max_message_id: + cursor.execute(f"SELECT setval('messages_id_seq', {max_message_id});") + logger.info(f"βœ… Fixed messages sequence to start from {max_message_id + 1}") + + conn.commit() + cursor.close() + conn.close() + + logger.info("πŸŽ‰ All sequences have been fixed!") + return True + + except Exception as e: + logger.error(f"❌ Failed to fix sequences: {e}") + return False + +def show_current_data(): + """Show current data counts""" + try: + conn = psycopg2.connect(**POSTGRES_CONFIG) + cursor = conn.cursor() + + # Set search path + cursor.execute("SET search_path TO arrest_data, public;") + + tables = ['clients', 'sessions', 'queries', 'messages'] + for table in tables: + cursor.execute(f"SELECT COUNT(*) FROM {table};") + count = cursor.fetchone()[0] + logger.info(f"πŸ“Š {table}: {count} records") + + cursor.close() + conn.close() + + except Exception as e: + logger.error(f"❌ Failed to show data: {e}") + +if __name__ == "__main__": + logger.info("πŸ”§ Fixing PostgreSQL sequences after migration...") + + show_current_data() + print() + + if fix_sequences(): + logger.info("βœ… Sequences fixed successfully!") + logger.info("You can now run the test script: python test_postgres.py") + else: + logger.error("❌ Failed to fix sequences") + exit(1) \ No newline at end of file diff --git a/init.sql b/init.sql new file mode 100644 index 0000000..1171bc9 --- /dev/null +++ b/init.sql @@ -0,0 +1,74 @@ +-- PostgreSQL initialization script for Arrest Data application +-- This script runs when the PostgreSQL container starts for the first time + +-- Create the database schema +CREATE SCHEMA IF NOT EXISTS arrest_data; + +-- Set the search path +SET search_path TO arrest_data, public; + +-- Create clients table +CREATE TABLE IF NOT EXISTS clients ( + id SERIAL PRIMARY KEY, + name VARCHAR(255) NOT NULL, + nickname VARCHAR(100) NOT NULL UNIQUE, + email VARCHAR(255) NOT NULL UNIQUE, + password VARCHAR(255) NOT NULL, + registration_date TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP +); + +-- Create sessions table +CREATE TABLE IF NOT EXISTS sessions ( + id SERIAL PRIMARY KEY, + client_id INTEGER NOT NULL, + address VARCHAR(45) NOT NULL, -- IPv4/IPv6 address + start_time TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + end_time TIMESTAMP WITH TIME ZONE, + FOREIGN KEY (client_id) REFERENCES clients (id) ON DELETE CASCADE +); + +-- Create queries table +CREATE TABLE IF NOT EXISTS queries ( + id SERIAL PRIMARY KEY, + client_id INTEGER NOT NULL, + session_id INTEGER NOT NULL, + query_type VARCHAR(100) NOT NULL, + parameters TEXT, -- JSON string + timestamp TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (client_id) REFERENCES clients (id) ON DELETE CASCADE, + FOREIGN KEY (session_id) REFERENCES sessions (id) ON DELETE CASCADE +); + +-- Create messages table +CREATE TABLE IF NOT EXISTS messages ( + id SERIAL PRIMARY KEY, + sender_type VARCHAR(50) NOT NULL, + sender_id INTEGER NOT NULL, + recipient_type VARCHAR(50) NOT NULL, + recipient_id INTEGER NOT NULL, + message TEXT NOT NULL, + timestamp TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + read BOOLEAN DEFAULT FALSE +); + +-- Create indexes for better performance +CREATE INDEX IF NOT EXISTS idx_clients_email ON clients (email); +CREATE INDEX IF NOT EXISTS idx_clients_nickname ON clients (nickname); +CREATE INDEX IF NOT EXISTS idx_sessions_client_id ON sessions (client_id); +CREATE INDEX IF NOT EXISTS idx_sessions_start_time ON sessions (start_time); +CREATE INDEX IF NOT EXISTS idx_queries_client_id ON queries (client_id); +CREATE INDEX IF NOT EXISTS idx_queries_session_id ON queries (session_id); +CREATE INDEX IF NOT EXISTS idx_queries_timestamp ON queries (timestamp); +CREATE INDEX IF NOT EXISTS idx_queries_query_type ON queries (query_type); +CREATE INDEX IF NOT EXISTS idx_messages_recipient ON messages (recipient_type, recipient_id); +CREATE INDEX IF NOT EXISTS idx_messages_read ON messages (read); +CREATE INDEX IF NOT EXISTS idx_messages_timestamp ON messages (timestamp); + +-- Grant permissions to the application user +GRANT ALL PRIVILEGES ON SCHEMA arrest_data TO arrest_user; +GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA arrest_data TO arrest_user; +GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA arrest_data TO arrest_user; + +-- Set default privileges for future tables +ALTER DEFAULT PRIVILEGES IN SCHEMA arrest_data GRANT ALL ON TABLES TO arrest_user; +ALTER DEFAULT PRIVILEGES IN SCHEMA arrest_data GRANT ALL ON SEQUENCES TO arrest_user; \ No newline at end of file diff --git a/mermaid_diagrams.txt b/mermaid_diagrams.txt index f85770c..1bd0b45 100644 --- a/mermaid_diagrams.txt +++ b/mermaid_diagrams.txt @@ -84,7 +84,7 @@ graph TD Server_MainThread -- CreΓ«ert per client --> Server_ClientHandlerThread Server_ClientHandlerThread -- Leest van --> ClientSocket["Client Socket"] Server_ClientHandlerThread -- Schrijft naar --> ClientSocket - Server_ClientHandlerThread -- Database operaties --> DatabasePool["SQLite Connection Pool
(in Database.py)"] + Server_ClientHandlerThread -- Database operaties --> DatabasePool["PostgreSQL Connection Pool
(in DatabasePostgres.py)"] Server_ClientHandlerThread -- Gebruikt --> DataProcessor["DataProcessor Instantie"] %% Server-side message queue for sending to client diff --git a/performance_monitor.py b/performance_monitor.py new file mode 100644 index 0000000..13778f1 --- /dev/null +++ b/performance_monitor.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python3 +""" +Performance monitoring script for the arrest data server +""" + +import psutil +import time +import threading +import logging +import os +import tempfile +from datetime import datetime + +# Configure logging +TEMP_DIR = tempfile.gettempdir() +PERF_LOG_FILE = os.path.join(TEMP_DIR, 'performance_monitor.log') +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + filename=PERF_LOG_FILE, + filemode='w' +) +logger = logging.getLogger('performance_monitor') + +class PerformanceMonitor: + """Monitor system performance and resource usage""" + + def __init__(self, interval=30): + """Initialize the performance monitor""" + self.interval = interval # seconds between measurements + self.running = False + self.monitor_thread = None + self.start_time = None + + def start(self): + """Start performance monitoring""" + if self.running: + logger.warning("Performance monitor is already running") + return + + self.running = True + self.start_time = time.time() + self.monitor_thread = threading.Thread(target=self._monitor_loop) + self.monitor_thread.daemon = True + self.monitor_thread.start() + logger.info(f"Performance monitor started (interval: {self.interval}s)") + + def stop(self): + """Stop performance monitoring""" + self.running = False + if self.monitor_thread: + self.monitor_thread.join(timeout=5) + logger.info("Performance monitor stopped") + + def _monitor_loop(self): + """Main monitoring loop""" + while self.running: + try: + self._collect_metrics() + time.sleep(self.interval) + except Exception as e: + logger.error(f"Error in performance monitoring: {e}") + time.sleep(10) # Wait before retrying + + def _collect_metrics(self): + """Collect system performance metrics""" + try: + # CPU usage + cpu_percent = psutil.cpu_percent(interval=1) + + # Memory usage + memory = psutil.virtual_memory() + memory_percent = memory.percent + memory_mb = memory.used / 1024 / 1024 + + # Disk usage + disk = psutil.disk_usage('/') + disk_percent = disk.percent + + # Network I/O + network = psutil.net_io_counters() + bytes_sent = network.bytes_sent + bytes_recv = network.bytes_recv + + # Process-specific metrics (if monitoring a specific process) + process_metrics = self._get_process_metrics() + + # Log metrics + logger.info( + f"PERF: CPU={cpu_percent:.1f}% | " + f"MEM={memory_percent:.1f}% ({memory_mb:.0f}MB) | " + f"DISK={disk_percent:.1f}% | " + f"NET_SENT={bytes_sent/1024/1024:.1f}MB | " + f"NET_RECV={bytes_recv/1024/1024:.1f}MB" + ) + + if process_metrics: + logger.info(f"PROCESS: {process_metrics}") + + except Exception as e: + logger.error(f"Error collecting metrics: {e}") + + def _get_process_metrics(self): + """Get metrics for specific processes (server/client)""" + try: + processes = [] + for proc in psutil.process_iter(['pid', 'name', 'memory_info', 'cpu_percent']): + try: + if 'python' in proc.info['name'].lower(): + # Check if it's our server or client process + cmdline = ' '.join(proc.cmdline()) + if 'server' in cmdline.lower() or 'client' in cmdline.lower(): + memory_mb = proc.info['memory_info'].rss / 1024 / 1024 + cpu_percent = proc.info['cpu_percent'] + processes.append({ + 'pid': proc.info['pid'], + 'name': proc.info['name'], + 'memory_mb': memory_mb, + 'cpu_percent': cpu_percent, + 'cmdline': cmdline[:100] + '...' if len(cmdline) > 100 else cmdline + }) + except (psutil.NoSuchProcess, psutil.AccessDenied): + continue + + if processes: + return ' | '.join([ + f"{p['name']}({p['pid']}): {p['memory_mb']:.0f}MB, {p['cpu_percent']:.1f}%" + for p in processes + ]) + except Exception as e: + logger.error(f"Error getting process metrics: {e}") + + return None + +def main(): + """Main function to run the performance monitor""" + print("Starting performance monitor...") + print(f"Log file: {PERF_LOG_FILE}") + + monitor = PerformanceMonitor(interval=30) # Check every 30 seconds + + try: + monitor.start() + + # Keep running until interrupted + while True: + time.sleep(1) + + except KeyboardInterrupt: + print("\nStopping performance monitor...") + monitor.stop() + print("Performance monitor stopped") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 0def651..1efea04 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,7 @@ seaborn>=0.12.0 pandas>=1.5.0 matplotlib>=3.6.0 contextily>=1.3.0 -folium \ No newline at end of file +folium +psycopg2-binary>=2.9.0 +sqlalchemy>=2.0.0 +psutil>=5.9.0 \ No newline at end of file diff --git a/start_postgres.bat b/start_postgres.bat new file mode 100644 index 0000000..f88c043 --- /dev/null +++ b/start_postgres.bat @@ -0,0 +1,65 @@ +@echo off +REM PostgreSQL Startup Script for Arrest Data Application + +echo === PostgreSQL Startup Script === +echo. + +REM Check if Docker is running +docker info >nul 2>&1 +if errorlevel 1 ( + echo ❌ Docker is not running. Please start Docker and try again. + pause + exit /b 1 +) + +echo βœ… Docker is running + +REM Check if docker-compose is available +docker-compose --version >nul 2>&1 +if errorlevel 1 ( + echo ❌ docker-compose is not installed. Please install Docker Compose. + pause + exit /b 1 +) + +echo βœ… Docker Compose is available + +REM Start PostgreSQL container +echo. +echo πŸš€ Starting PostgreSQL container... +docker-compose up -d postgres + +REM Wait for PostgreSQL to be ready +echo ⏳ Waiting for PostgreSQL to be ready... +timeout /t 10 /nobreak >nul + +REM Check if PostgreSQL is running +docker-compose ps postgres | findstr "Up" >nul +if errorlevel 1 ( + echo ❌ PostgreSQL container failed to start + echo Check logs with: docker-compose logs postgres + pause + exit /b 1 +) else ( + echo βœ… PostgreSQL container is running +) + +REM Check if we can connect to PostgreSQL +echo πŸ” Testing PostgreSQL connection... +python -c "import psycopg2; conn = psycopg2.connect(host='localhost', port=5432, database='arrest_data', user='arrest_user', password='arrest_password'); conn.close(); print('βœ… PostgreSQL connection successful')" 2>nul +if errorlevel 1 ( + echo ❌ Cannot connect to PostgreSQL + pause + exit /b 1 +) else ( + echo βœ… PostgreSQL is ready for connections +) + +echo. +echo πŸŽ‰ Setup complete! You can now start your application: +echo python run_server.py +echo. +echo πŸ“Š To view PostgreSQL logs: docker-compose logs postgres +echo πŸ›‘ To stop PostgreSQL: docker-compose down +echo πŸ”„ To restart PostgreSQL: docker-compose restart postgres +pause \ No newline at end of file diff --git a/start_postgres.sh b/start_postgres.sh new file mode 100644 index 0000000..69a2466 --- /dev/null +++ b/start_postgres.sh @@ -0,0 +1,72 @@ +#!/bin/bash + +# PostgreSQL Startup Script for Arrest Data Application + +echo "=== PostgreSQL Startup Script ===" +echo "" + +# Check if Docker is running +if ! docker info > /dev/null 2>&1; then + echo "❌ Docker is not running. Please start Docker and try again." + exit 1 +fi + +echo "βœ… Docker is running" + +# Check if docker-compose is available +if ! command -v docker-compose &> /dev/null; then + echo "❌ docker-compose is not installed. Please install Docker Compose." + exit 1 +fi + +echo "βœ… Docker Compose is available" + +# Start PostgreSQL container +echo "" +echo "πŸš€ Starting PostgreSQL container..." +docker-compose up -d postgres + +# Wait for PostgreSQL to be ready +echo "⏳ Waiting for PostgreSQL to be ready..." +sleep 10 + +# Check if PostgreSQL is running +if docker-compose ps postgres | grep -q "Up"; then + echo "βœ… PostgreSQL container is running" +else + echo "❌ PostgreSQL container failed to start" + echo "Check logs with: docker-compose logs postgres" + exit 1 +fi + +# Check if we can connect to PostgreSQL +echo "πŸ” Testing PostgreSQL connection..." +if python -c " +import psycopg2 +try: + conn = psycopg2.connect( + host='localhost', + port=5432, + database='arrest_data', + user='arrest_user', + password='arrest_password' + ) + conn.close() + print('βœ… PostgreSQL connection successful') +except Exception as e: + print(f'❌ PostgreSQL connection failed: {e}') + exit(1) +"; then + echo "βœ… PostgreSQL is ready for connections" +else + echo "❌ Cannot connect to PostgreSQL" + exit 1 +fi + +echo "" +echo "πŸŽ‰ Setup complete! You can now start your application:" +echo " python run_server.py" +echo "" +echo "πŸ“Š To view PostgreSQL logs: docker-compose logs postgres" +echo "πŸ›‘ To stop PostgreSQL: docker-compose down" +echo "πŸ”„ To restart PostgreSQL: docker-compose restart postgres" \ No newline at end of file diff --git a/test_postgres.py b/test_postgres.py new file mode 100644 index 0000000..d481600 --- /dev/null +++ b/test_postgres.py @@ -0,0 +1,221 @@ +#!/usr/bin/env python3 +""" +Test script to verify PostgreSQL database functionality +""" + +import psycopg2 +import json +import logging +from datetime import datetime +import uuid + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# Database configuration +POSTGRES_CONFIG = { + 'host': 'localhost', + 'port': 5432, + 'database': 'arrest_data', + 'user': 'arrest_user', + 'password': 'arrest_password' +} + +def test_connection(): + """Test basic database connection""" + try: + conn = psycopg2.connect(**POSTGRES_CONFIG) + cursor = conn.cursor() + + # Test basic query + cursor.execute("SELECT version();") + version = cursor.fetchone() + logger.info(f"βœ… Connected to PostgreSQL: {version[0]}") + + cursor.close() + conn.close() + return True + except Exception as e: + logger.error(f"❌ Connection test failed: {e}") + return False + +def test_schema(): + """Test if the arrest_data schema exists""" + try: + conn = psycopg2.connect(**POSTGRES_CONFIG) + cursor = conn.cursor() + + # Check if schema exists + cursor.execute("SELECT schema_name FROM information_schema.schemata WHERE schema_name = 'arrest_data';") + schema = cursor.fetchone() + + if schema: + logger.info("βœ… arrest_data schema exists") + else: + logger.error("❌ arrest_data schema not found") + return False + + cursor.close() + conn.close() + return True + except Exception as e: + logger.error(f"❌ Schema test failed: {e}") + return False + +def test_tables(): + """Test if all required tables exist""" + try: + conn = psycopg2.connect(**POSTGRES_CONFIG) + cursor = conn.cursor() + + # Set search path + cursor.execute("SET search_path TO arrest_data, public;") + + # Check if tables exist + tables = ['clients', 'sessions', 'queries', 'messages'] + for table in tables: + cursor.execute(f"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'arrest_data' AND table_name = '{table}');") + exists = cursor.fetchone()[0] + if exists: + logger.info(f"βœ… Table '{table}' exists") + else: + logger.error(f"❌ Table '{table}' not found") + return False + + cursor.close() + conn.close() + return True + except Exception as e: + logger.error(f"❌ Tables test failed: {e}") + return False + +def test_basic_operations(): + """Test basic CRUD operations""" + try: + conn = psycopg2.connect(**POSTGRES_CONFIG) + cursor = conn.cursor() + + # Set search path + cursor.execute("SET search_path TO arrest_data, public;") + + # Test client registration with unique data + unique_suffix = str(uuid.uuid4())[:8] + test_client = { + 'name': f'Test User {unique_suffix}', + 'nickname': f'testuser_{unique_suffix}', + 'email': f'test_{unique_suffix}@example.com', + 'password': 'testpassword' + } + + # Insert test client + cursor.execute(""" + INSERT INTO clients (name, nickname, email, password) + VALUES (%s, %s, %s, %s) + RETURNING id; + """, (test_client['name'], test_client['nickname'], test_client['email'], test_client['password'])) + + result = cursor.fetchone() + if result: + client_id = result[0] + logger.info(f"βœ… Test client created with ID: {client_id}") + else: + logger.error("❌ Failed to create test client") + return False + + # Test session creation + cursor.execute(""" + INSERT INTO sessions (client_id, address, start_time) + VALUES (%s, %s, %s) + RETURNING id; + """, (client_id, '127.0.0.1', datetime.now().isoformat())) + + session_id = cursor.fetchone()[0] + logger.info(f"βœ… Test session created with ID: {session_id}") + + # Test query logging + test_params = {'test': 'data', 'number': 42} + cursor.execute(""" + INSERT INTO queries (client_id, session_id, query_type, parameters) + VALUES (%s, %s, %s, %s) + RETURNING id; + """, (client_id, session_id, 'test_query', json.dumps(test_params))) + + query_id = cursor.fetchone()[0] + logger.info(f"βœ… Test query logged with ID: {query_id}") + + # Test message creation + cursor.execute(""" + INSERT INTO messages (sender_type, sender_id, recipient_type, recipient_id, message) + VALUES (%s, %s, %s, %s, %s) + RETURNING id; + """, ('client', client_id, 'all', 0, 'Test message')) + + message_id = cursor.fetchone()[0] + logger.info(f"βœ… Test message created with ID: {message_id}") + + # Test data retrieval + cursor.execute("SELECT COUNT(*) FROM clients;") + client_count = cursor.fetchone()[0] + logger.info(f"βœ… Total clients in database: {client_count}") + + cursor.execute("SELECT COUNT(*) FROM sessions;") + session_count = cursor.fetchone()[0] + logger.info(f"βœ… Total sessions in database: {session_count}") + + cursor.execute("SELECT COUNT(*) FROM queries;") + query_count = cursor.fetchone()[0] + logger.info(f"βœ… Total queries in database: {query_count}") + + cursor.execute("SELECT COUNT(*) FROM messages;") + message_count = cursor.fetchone()[0] + logger.info(f"βœ… Total messages in database: {message_count}") + + # Clean up test data + cursor.execute("DELETE FROM messages WHERE id = %s", (message_id,)) + cursor.execute("DELETE FROM queries WHERE id = %s", (query_id,)) + cursor.execute("DELETE FROM sessions WHERE id = %s", (session_id,)) + cursor.execute("DELETE FROM clients WHERE id = %s", (client_id,)) + + conn.commit() + cursor.close() + conn.close() + + return True + except Exception as e: + logger.error(f"❌ Basic operations test failed: {e}") + return False + +def main(): + """Run all tests""" + logger.info("πŸ§ͺ Starting PostgreSQL database tests...") + + tests = [ + ("Connection", test_connection), + ("Schema", test_schema), + ("Tables", test_tables), + ("Basic Operations", test_basic_operations) + ] + + passed = 0 + total = len(tests) + + for test_name, test_func in tests: + logger.info(f"\n--- Testing {test_name} ---") + if test_func(): + passed += 1 + else: + logger.error(f"❌ {test_name} test failed") + + logger.info(f"\nπŸ“Š Test Results: {passed}/{total} tests passed") + + if passed == total: + logger.info("πŸŽ‰ All tests passed! PostgreSQL database is working correctly.") + return True + else: + logger.error("❌ Some tests failed. Please check the PostgreSQL setup.") + return False + +if __name__ == "__main__": + success = main() + exit(0 if success else 1) \ No newline at end of file