-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathdb_sqlite.py
More file actions
1012 lines (829 loc) · 34.7 KB
/
db_sqlite.py
File metadata and controls
1012 lines (829 loc) · 34.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#! /usr/bin/env python3
# -*- coding: utf-8; py-indent-offset: 4 -*-
#
# Author: Linuxfabrik GmbH, Zurich, Switzerland
# Contact: info (at) linuxfabrik (dot) ch
# https://www.linuxfabrik.ch/
# License: The Unlicense, see LICENSE file.
# https://github.com/Linuxfabrik/monitoring-plugins/blob/main/CONTRIBUTING.rst
"""Library for accessing SQLite databases.
This is one typical use case of this library (taken from `disk-io`):
>>> conn = lib.base.coe(lib.db_sqlite.connect(filename='disk-io.db'))
>>> lib.base.coe(lib.db_sqlite.create_table(conn, definition, drop_table_first=False))
>>> lib.base.coe(lib.db_sqlite.create_index(conn, 'name')) # optional
>>> lib.base.coe(lib.db_sqlite.insert(conn, data))
>>> lib.base.coe(lib.db_sqlite.cut(conn, max=args.COUNT*len(disks)))
>>> lib.base.coe(lib.db_sqlite.commit(conn))
>>> result = lib.base.coe(lib.db_sqlite.select(conn,
'SELECT * FROM perfdata WHERE name = :name ORDER BY timestamp DESC LIMIT 2',
{'name': disk}
>>> lib.db_sqlite.close(conn)
"""
__author__ = 'Linuxfabrik GmbH, Zurich/Switzerland'
__version__ = '2025042102'
import csv
import hashlib
import os
import re
import sqlite3
from . import disk
from . import txt
def __filter_str(s, charclass='a-zA-Z0-9_'):
"""
Filter a string to keep only allowed characters.
This function removes all characters from a string except those matching the allowed
character class. By default, it allows only alphanumeric characters (`a-z`, `A-Z`, `0-9`)
and underscores (`_`), making the output safe for use in variable names, table names,
index names, and similar identifiers.
### Parameters
- **s** (`str`):
The input string to sanitize.
- **charclass** (`str`, optional):
A regex character class defining allowed characters.
Defaults to `'a-zA-Z0-9_'`.
### Returns
- **str**:
A sanitized string containing only characters matching the allowed character class.
### Notes
- Useful for cleaning user input before using it in database object names or variable names.
- The function uses regular expressions for filtering.
### Example
>>> __filter_str('user@example.ch')
'userexamplech'
>>> __filter_str('project-123', charclass='a-zA-Z0-9')
'project123'
"""
regex = f'[^{charclass}]'
return re.sub(regex, '', s)
def __sha1sum(string):
"""
Calculate the SHA-1 hash of a given string.
This function encodes the input as bytes (if necessary) and returns its SHA-1 checksum
as a hexadecimal string.
### Parameters
- **string** (`str`):
The input string to hash.
### Returns
- **str**:
The SHA-1 hash of the input string, represented as a 40-character hexadecimal string.
### Notes
- Internally, the input is safely converted to bytes before hashing using `txt.to_bytes()`.
- SHA-1 produces a fixed-size 160-bit (20-byte) hash, commonly used for checksums and
identifiers.
### Example
>>> __sha1sum('linuxfabrik')
'74301e766db4a4006ec1fbd6e031760e7e322223'
"""
return hashlib.sha1(txt.to_bytes(string)).hexdigest()
def close(conn):
"""
Close a SQLite database connection safely.
This function attempts to close an open database connection.
It does not automatically commit any uncommitted changes — if you close the connection
without calling `commit()` first, any uncommitted changes will be lost.
### Parameters
- **conn** (`sqlite3.Connection` or compatible):
An active database connection object.
### Returns
- **bool**:
- `True` if the connection was closed successfully.
- `False` if an exception occurred during closing.
### Notes
- Always call `commit()` manually before calling `close()` if you want to save changes.
- Exceptions during closing are caught and handled silently.
### Example
>>> close(conn)
True
"""
try:
conn.close()
return True
except Exception:
return False
def commit(conn):
"""
Commit any pending changes to the SQLite database.
This function saves (commits) all changes made during the current database session.
If committing fails, an error message is returned.
### Parameters
- **conn** (`sqlite3.Connection` or compatible):
An active database connection object.
### Returns
- **tuple** (`bool`, `str or None`):
- First element (`bool`): `True` if the commit succeeded, `False` if it failed.
- Second element (`str` or `None`):
- `None` on success.
- Error message (`str`) describing the failure if commit fails.
### Notes
- Always commit before closing the connection if you want to preserve changes.
- Exceptions during commit are caught and returned as part of the result.
### Example
>>> success, error = commit(conn)
>>> if not success:
>>> print(error)
>>> else:
>>> print("Changes committed successfully.")
"""
try:
conn.commit()
return True, None
except Exception as e:
return False, f'Commit failed: {e}'
def compute_load(conn, sensorcol, datacols, count, table='perfdata'):
"""
Calculate per-second load metrics based on historical data in a SQLite table.
This function calculates `Load1` (over the last 1 interval) and `Loadn` (over the last `count` intervals)
for one or more sensors, based on timestamped performance data.
### Parameters
- **conn** (`sqlite3.Connection`):
An active database connection object.
- **sensorcol** (`str`):
Column name that identifies the sensor (e.g., `'interface'`).
- **datacols** (`list` of `str`):
List of columns for which to calculate per-second loads (e.g., `['tx_bytes', 'rx_bytes']`).
- **count** (`int`):
Number of historical entries to use for calculating `Loadn`.
- **table** (`str`, optional):
Name of the table containing the performance data.
Defaults to `'perfdata'`.
### Returns
- **tuple** (`bool`, `list or bool or str`):
- First element (`bool`): `True` if the calculation succeeded, `False` if a database error occurred.
- Second element:
- A `list` of dictionaries containing per-sensor load values on success.
- `False` if there is not enough data to compute the load.
- Error message (`str`) on database failure.
### Notes
- The table must contain a `timestamp` column (UNIX epoch seconds).
- Data must exist for each sensor with at least `count` historical entries.
- Results include:
- `<column>1`: Load computed between the two most recent entries.
- `<column>n`: Load computed between the most recent and the oldest of `count` entries.
- Load values are calculated as delta per second.
- Table names are sanitized to allow only safe characters.
### Example
Calculate loads for `tx_bytes` and `rx_bytes` over 5 intervals:
>>> compute_load(conn, sensorcol='interface', datacols=['tx_bytes', 'rx_bytes'], count=5, table='perfdata')
Example output:
[
{
'interface': 'mgmt1',
'tx_bytes1': 6906,
'rx_bytes1': 10418,
'tx_bytesn': 7442,
'rx_bytesn': 10871
},
...
]
"""
table = __filter_str(table)
sql = f'SELECT DISTINCT {sensorcol} FROM {table} ORDER BY {sensorcol} ASC;'
success, sensors = select(conn, sql)
if not success:
return False, sensors
if len(sensors) == 0:
return True, False
load = []
for sensor in sensors:
sensor_name = sensor[sensorcol]
success, perfdata = select(
conn,
f'SELECT * FROM {table} WHERE {sensorcol} = :{sensorcol} ORDER BY timestamp DESC;',
data={sensorcol: sensor_name}
)
if not success:
return False, perfdata
if len(perfdata) < count:
return True, False
load1_delta = perfdata[0]['timestamp'] - perfdata[1]['timestamp']
loadn_delta = perfdata[0]['timestamp'] - perfdata[count-1]['timestamp']
tmp = {sensorcol: sensor_name}
for key in datacols:
if key in perfdata[0]:
tmp[f'{key}1'] = (perfdata[0][key] - perfdata[1][key]) / load1_delta if load1_delta else 0
tmp[f'{key}n'] = (perfdata[0][key] - perfdata[count-1][key]) / loadn_delta if loadn_delta else 0
load.append(tmp)
return True, load
def connect(path='', filename=''):
"""
Connect to a SQLite database file.
This function establishes a connection to a SQLite database file.
If no path is provided, a temporary directory is used.
If no filename is provided, the default filename `'linuxfabrik-monitoring-plugins-sqlite.db'`
is used.
### Parameters
- **path** (`str`, optional):
Path to the directory containing the database file.
Defaults to the system temporary directory (e.g., `/tmp`).
- **filename** (`str`, optional):
Name of the database file.
Defaults to `'linuxfabrik-monitoring-plugins-sqlite.db'`.
### Returns
- **tuple** (`bool`, `Connection or str`):
- First element (`bool`): `True` if connection succeeded, `False` if it failed.
- Second element (`Connection` or `str`):
- Database connection object on success.
- Error message string on failure.
### Notes
- The connection uses a `Row` factory, allowing rows to behave like dictionaries.
- The connection registers a `REGEXP` SQL function for regular expression support.
- Always check the returned success flag before using the connection.
### Example
>>> success, conn = connect()
>>> if success:
>>> # Use conn
>>> pass
>>> else:
>>> print(conn)
"""
def get_filename(path='', filename=''):
"""Helper to build the absolute path to the SQLite database file."""
if not path:
path = disk.get_tmpdir()
if not filename:
filename = 'linuxfabrik-monitoring-plugins-sqlite.db'
return os.path.join(path, filename)
db = get_filename(path, filename)
try:
conn = sqlite3.connect(db)
conn.row_factory = sqlite3.Row
conn.text_factory = str
conn.create_function('REGEXP', 2, regexp)
return True, conn
except Exception as e:
return False, f'Connecting to DB {db} failed, Error: {e}'
def create_index(conn, column_list, table='perfdata', unique=False, delete_db_on_operational_error=True):
"""
Create an index on one or more columns in a SQLite table.
This function creates a (unique or non-unique) index on the specified columns of a table.
If the database structure has changed and an `OperationalError` occurs, the database file
can optionally be deleted automatically.
### Parameters
- **conn** (`sqlite3.Connection`):
An active database connection object.
- **column_list** (`str`):
A comma-separated list of columns to index, for example `'col1, col2'`.
- **table** (`str`, optional):
The table name. Defaults to `'perfdata'`.
- **unique** (`bool`, optional):
If `True`, creates a unique index.
If `False`, creates a standard (non-unique) index. Defaults to `False`.
- **delete_db_on_operational_error** (`bool`, optional):
If `True`, deletes the database file when an `OperationalError` occurs.
Defaults to `True`.
### Returns
- **tuple** (`bool`, `bool or str`):
- First element (`bool`): `True` if the operation succeeded, `False` if it failed.
- Second element (`bool` or `str`):
- `True` on success.
- Error message (`str`) describing the failure.
### Notes
- The table name is sanitized to only allow safe characters.
- The index name is automatically generated as `idx_<sha1sum>`, based on table and column names.
- Index creation uses `IF NOT EXISTS` to avoid errors if the index already exists.
### Example
>>> create_index(conn, 'hostname, service')
(True, True)
>>> create_index(conn, 'timestamp', table='logs', unique=True)
(True, True)
"""
table = __filter_str(table)
index_name = f"idx_{__sha1sum(table + column_list)}"
if unique:
sql = f'CREATE UNIQUE INDEX IF NOT EXISTS {index_name} ON "{table}" ({column_list});'
else:
sql = f'CREATE INDEX IF NOT EXISTS {index_name} ON "{table}" ({column_list});'
c = conn.cursor()
try:
c.execute(sql)
return True, True
except sqlite3.OperationalError as e:
if delete_db_on_operational_error:
rm_db(conn)
return False, f'Operational Error: {e}, Query: {sql}'
except Exception as e:
return False, f'Query failed: {sql}, Error: {e}'
def create_table(conn, definition, table='perfdata', drop_table_first=False):
"""
Create a database table if it does not exist.
This function creates a table in the SQLite database based on the given column definition.
Optionally, the table can be dropped first if it already exists.
### Parameters
- **conn** (`sqlite3.Connection`):
An active database connection object.
- **definition** (`str`):
Column definitions for the table, e.g., `'col1 TEXT, col2 INTEGER NOT NULL'`.
- **table** (`str`, optional):
Name of the table to create. Defaults to `'perfdata'`.
- **drop_table_first** (`bool`, optional):
If `True`, drops the table before creating it. Defaults to `False`.
### Returns
- **tuple** (`bool`, `bool or str`):
- First element (`bool`): `True` if the table was created successfully, `False` if an
error occurred.
- Second element (`bool` or `str`):
- `True` on success.
- Error message (`str`) describing the failure.
### Notes
- The table name is sanitized to allow only safe characters.
- If `drop_table_first=True`, the function will attempt to drop the existing table before
creating it.
- The table creation uses `IF NOT EXISTS` to avoid errors if the table already exists.
### Example
Create a new table with three columns:
>>> create_table(conn, 'a TEXT, b TEXT, c INTEGER NOT NULL', table='test')
Resulting SQL:
CREATE TABLE IF NOT EXISTS "test" (a TEXT, b TEXT, c INTEGER NOT NULL);
"""
table = __filter_str(table)
if drop_table_first:
success, result = drop_table(conn, table)
if not success:
return success, result
sql = f'CREATE TABLE IF NOT EXISTS "{table}" ({definition});'
c = conn.cursor()
try:
c.execute(sql)
return True, True
except Exception as e:
return False, f'Query failed: {sql}, Error: {e}'
def cut(conn, table='perfdata', _max=5, delete_db_on_operational_error=True):
"""
Keep only the latest records in a SQLite table, based on `rowid`.
This function deletes older rows from a table, keeping only the most recent `_max` entries
according to the SQLite built-in `rowid`. Useful for maintaining lightweight, capped tables.
### Parameters
- **conn** (`sqlite3.Connection`):
An active database connection object.
- **table** (`str`, optional):
Name of the table to prune. Defaults to `'perfdata'`.
- **_max** (`int`, optional):
Number of most recent records to keep. Defaults to `5`.
- **delete_db_on_operational_error** (`bool`, optional):
If `True`, deletes the database file when an `OperationalError` occurs.
Defaults to `True`.
### Returns
- **tuple** (`bool`, `bool or str`):
- First element (`bool`): `True` if deletion succeeded, `False` if it failed.
- Second element (`bool` or `str`):
- `True` on success.
- Error message (`str`) describing the failure.
### Notes
- The function relies on the implicit `rowid` column for ordering.
- The table name is sanitized to allow only safe characters.
- If an `OperationalError` occurs (e.g., due to schema mismatch), the database file can
be deleted automatically.
- Uses `LIMIT -1 OFFSET :_max` to delete everything after the most recent `_max` records.
### Example
>>> cut(conn, table='logs', _max=1000)
(True, True)
"""
table = __filter_str(table)
sql = f'''
DELETE FROM {table}
WHERE rowid IN (
SELECT rowid FROM {table}
ORDER BY rowid DESC
LIMIT -1 OFFSET :_max
);
'''
c = conn.cursor()
try:
c.execute(sql, {'_max': _max})
return True, True
except sqlite3.OperationalError as e:
if delete_db_on_operational_error:
rm_db(conn)
return False, f'Operational Error: {e}, Query: {sql}'
except Exception as e:
return False, f'Query failed: {sql}, Error: {e}'
def delete(conn, sql, data=None, delete_db_on_operational_error=True):
"""
Execute a DELETE command against a SQLite table.
This function deletes records from a table based on the given SQL DELETE statement.
If no WHERE clause is provided, all records are deleted.
Parameter binding is supported for safety.
### Parameters
- **conn** (`sqlite3.Connection`):
An active database connection object.
- **sql** (`str`):
The SQL DELETE statement to execute.
Use placeholders (`:key`) for parameterized queries.
- **data** (`dict`, optional):
Dictionary of parameters to bind to the SQL statement.
Defaults to an empty dict (no parameters).
- **delete_db_on_operational_error** (`bool`, optional):
If `True`, deletes the database file when an `OperationalError` occurs.
Defaults to `True`.
### Returns
- **tuple** (`bool`, `int or str`):
- First element (`bool`): `True` if the delete succeeded, `False` if it failed.
- Second element (`int` or `str`):
- Number of rows affected (`int`) on success.
- Error message (`str`) on failure.
### Notes
- If the WHERE clause is omitted, all rows in the table will be deleted.
- Always use a WHERE clause carefully to avoid unintended full table deletion.
- On schema-related `OperationalError`, the database file can be deleted automatically.
### Example
Delete records older than a specific timestamp:
>>> sql = 'DELETE FROM logs WHERE timestamp < :cutoff'
>>> data = {'cutoff': 1700000000}
>>> delete(conn, sql, data)
(True, 42)
"""
if data is None:
data = {}
c = conn.cursor()
try:
if data:
rowcount = c.execute(sql, data).rowcount
else:
rowcount = c.execute(sql).rowcount
return True, rowcount
except sqlite3.OperationalError as e:
if delete_db_on_operational_error:
rm_db(conn)
return False, f'Operational Error: {e}, Query: {sql}, Data: {data}'
except Exception as e:
return False, f'Query failed: {sql}, Error: {e}, Data: {data}'
def drop_table(conn, table='perfdata'):
"""
Drop a table from the SQLite database.
This function removes a table and all associated indices and triggers from the database.
If the table does not exist, no error is raised.
### Parameters
- **conn** (`sqlite3.Connection`):
An active database connection object.
- **table** (`str`, optional):
Name of the table to drop.
Defaults to `'perfdata'`.
### Returns
- **tuple** (`bool`, `bool or str`):
- First element (`bool`): `True` if the operation succeeded, `False` if an error occurred.
- Second element (`bool` or `str`):
- `True` on success.
- Error message (`str`) describing the failure.
### Notes
- The table name is sanitized to allow only safe characters.
- Dropping a table is permanent: all table data, indices, and triggers are permanently deleted.
- The statement uses `DROP TABLE IF EXISTS` to avoid errors if the table is missing.
### Example
>>> drop_table(conn, table='logs')
(True, True)
"""
table = __filter_str(table)
sql = f'DROP TABLE IF EXISTS "{table}";'
c = conn.cursor()
try:
c.execute(sql)
return True, True
except Exception as e:
return False, f'Query failed: {sql}, Error: {e}'
def get_colnames(col_definition):
"""
Extract a list of column names from a SQL column definition.
This function parses a SQL-style column definition string and returns a list
of column names, ignoring types and constraints.
### Parameters
- **col_definition** (`str`):
A string defining columns in SQL format, e.g., `'col1 TEXT, col2 INTEGER NOT NULL'`.
### Returns
- **list** (`list` of `str`):
A list of extracted column names.
### Notes
- Only the first word of each column definition is considered the column name.
- Data types, constraints (e.g., `PRIMARY KEY`, `NOT NULL`) are ignored.
- Whitespace and commas are used as separators.
### Example
>>> get_colnames('date TEXT PRIMARY KEY, count FLOAT, name TEXT')
['date', 'count', 'name']
"""
return [col.strip().split()[0] for col in col_definition.split(',') if col.strip()]
def get_tables(conn):
"""
List all user-defined tables in the SQLite database.
This function retrieves the names of all tables in the database,
excluding SQLite internal tables (e.g., those starting with `'sqlite_'`).
### Parameters
- **conn** (`sqlite3.Connection`):
An active database connection object.
### Returns
- **tuple** (`bool`, `list or str`):
- First element (`bool`): `True` if the query succeeded, `False` if it failed.
- Second element (`list` or `str`):
- A list of table names (`str`) on success.
- An error message (`str`) on failure.
### Notes
- Only user-created tables are returned.
- Tables created internally by SQLite (e.g., for indices or schema tracking) are excluded.
- Internally calls the `select()` helper function.
### Example
>>> success, tables = get_tables(conn)
>>> if success:
>>> print(tables) # ['users', 'orders', 'logs']
>>> else:
>>> print(tables)
"""
sql = "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%';"
success, result = select(conn, sql, as_dict=False)
if not success:
return success, result
# Extract just the table names (first column from each row)
table_names = [row[0] for row in result]
return True, table_names
def import_csv(conn, filename, table='data', fieldnames=None, skip_header=False, delimiter=',', quotechar='"', newline='', chunksize=1000):
"""
Import a CSV file into a SQLite table.
This function reads a CSV file and inserts its data into the specified SQLite table.
Field names for the table are taken from the provided `fieldnames` string, not from
the CSV header. Supports importing large files efficiently by committing in chunks.
### Parameters
- **conn** (`sqlite3.Connection`):
An active database connection object.
- **filename** (`str`):
Path to the CSV file to import.
- **table** (`str`, optional):
Name of the table to import into.
Defaults to `'data'`.
If `None`, uses a sanitized version of the filename as the table name.
- **fieldnames** (`str`, optional):
A SQL-style column definition string, e.g., `'col1 TEXT, col2 FLOAT'`.
Used to create the table.
Must match the number of columns in the CSV.
- **skip_header** (`bool`, optional):
If `True`, skip the first line of the CSV file. Defaults to `False`.
- **delimiter** (`str`, optional):
Field delimiter used in the CSV file. Defaults to `','`.
- **quotechar** (`str`, optional):
Character used to quote fields in the CSV file. Defaults to `'"'`.
- **newline** (`str`, optional):
Newline control when opening the file. Defaults to `''`.
- **chunksize** (`int`, optional):
Number of rows after which a database commit occurs. Defaults to `1000`.
### Returns
- **tuple** (`bool`, `bool or str`):
- First element (`bool`): `True` if import succeeded, `False` if it failed.
- Second element (`bool` or `str`):
- `True` on success.
- Error message (`str`) describing the failure.
### Notes
- This function creates the destination table before import, replacing it if it exists.
- Field names are taken from `fieldnames`, not from the CSV header.
- Supports importing large CSVs efficiently by committing in chunks.
- Does not use the SQLite CLI tool to avoid dependency and version issues.
- Automatically skips empty rows during import.
- Catches CSV parsing errors, I/O errors, and unexpected exceptions.
### Example
>>> import_csv(
... conn,
... 'examples/EXAMPLE01.csv',
... table='data',
... fieldnames='date TEXT PRIMARY KEY, count FLOAT, name TEXT',
... skip_header=True,
... )
(True, True)
"""
if table is None:
table = __filter_str(filename)
skipped = False
# Create the table
success, result = create_table(conn, fieldnames, table=table, drop_table_first=True)
if not success:
return success, result
new_fieldnames = get_colnames(fieldnames)
try:
with open(filename, newline=newline) as csvfile:
reader = csv.reader(csvfile, delimiter=delimiter, quotechar=quotechar)
i = 0
for csv_row in reader:
if skip_header and not skipped:
skipped = True
continue
if all(s.strip() == '' for s in csv_row):
continue
data = dict(zip(new_fieldnames, csv_row))
insert(conn, data, table)
i += 1
if i > 0 and i % chunksize == 0:
commit(conn)
commit(conn)
return True, True
except csv.Error as e:
return False, f'CSV error in file {filename}, line {reader.line_num}: {e}'
except IOError as e:
return False, f'I/O error "{e.strerror}" while opening or reading {filename}'
except Exception as e:
return False, f'Unknown error opening or reading {filename}:\n{e}'
def insert(conn, data, table='perfdata', delete_db_on_operational_error=True):
"""
Insert a row of values into a SQLite table.
This function inserts a new record into the specified table.
The data must be provided as a dictionary, where keys are column names
and values are the corresponding field values.
### Parameters
- **conn** (`sqlite3.Connection`):
An active database connection object.
- **data** (`dict`):
A dictionary where each key is a column name and each value is the value to insert.
- **table** (`str`, optional):
Name of the table to insert into.
Defaults to `'perfdata'`.
- **delete_db_on_operational_error** (`bool`, optional):
If `True`, deletes the database file when an `OperationalError` occurs.
Defaults to `True`.
### Returns
- **tuple** (`bool`, `bool or str`):
- First element (`bool`): `True` if the insert succeeded, `False` if it failed.
- Second element (`bool` or `str`):
- `True` on success.
- Error message (`str`) describing the failure.
### Notes
- Table names are sanitized to allow only safe characters.
- Field names and values are safely parameterized to prevent SQL injection.
- If an `OperationalError` occurs (e.g., due to a schema mismatch), the database can optionally
be deleted automatically.
### Example
>>> insert(conn, {'hostname': 'server1', 'service': 'http', 'status': 0}, table='status')
(True, True)
"""
table = __filter_str(table)
keys = ','.join(data.keys())
binds = ','.join(f':{key}' for key in data.keys())
sql = f'INSERT INTO "{table}" ({keys}) VALUES ({binds});'
c = conn.cursor()
try:
c.execute(sql, data)
return True, True
except sqlite3.OperationalError as e:
if delete_db_on_operational_error:
rm_db(conn)
return False, f'Operational Error: {e}, Query: {sql}, Data: {data}'
except Exception as e:
return False, f'Query failed: {sql}, Error: {e}, Data: {data}'
def regexp(expr, item):
"""
Implement REGEXP functionality for SQLite queries.
SQLite does not support the REGEXP operator by default.
This function enables REGEXP support by providing a Python implementation
that can be registered with a SQLite connection.
### Parameters
- **expr** (`str`):
The regular expression pattern to match.
- **item** (`str`):
The string to test against the regular expression.
### Returns
- **bool**:
`True` if the regular expression matches the string, `False` otherwise.
### Notes
- Must be registered on the SQLite connection using `create_function('REGEXP', 2, regexp)`.
- Regular expressions use Python's `re` module syntax.
- Commonly used in queries like:
`SELECT * FROM table WHERE column REGEXP 'pattern'`.
### Example
>>> regexp('^abc', 'abcdef')
True
>>> regexp('xyz$', 'abcdef')
False
"""
if item is None:
return False
reg = re.compile(expr)
return reg.search(item) is not None
def replace(conn, data, table='perfdata', delete_db_on_operational_error=True):
"""
Insert or replace a row in a SQLite table.
This function uses the SQLite `REPLACE INTO` statement, which works like
`INSERT`, but if a UNIQUE or PRIMARY KEY constraint violation occurs, it first deletes
the existing row and then inserts the new row.
### Parameters
- **conn** (`sqlite3.Connection`):
An active database connection object.
- **data** (`dict`):
A dictionary where each key is a column name and each value is the value to insert.
- **table** (`str`, optional):
Name of the table to operate on.
Defaults to `'perfdata'`.
- **delete_db_on_operational_error** (`bool`, optional):
If `True`, deletes the database file when an `OperationalError` occurs.
Defaults to `True`.
### Returns
- **tuple** (`bool`, `bool or str`):
- First element (`bool`): `True` if the operation succeeded, `False` if it failed.
- Second element (`bool` or `str`):
- `True` on success.
- Error message (`str`) describing the failure.
### Notes
- `REPLACE` first deletes the existing conflicting row, then attempts to insert the new one.
- If any constraint violation (e.g., `NOT NULL`) occurs during the second step, the operation
aborts and rolls back.
- Field names and values are safely parameterized to prevent SQL injection.
- Table names are sanitized to allow only safe characters.
### Example
>>> replace(conn, {'hostname': 'server1', 'service': 'http', 'status': 0}, table='status')
(True, True)
"""
table = __filter_str(table)
keys = ','.join(data.keys())
binds = ','.join(f':{key}' for key in data.keys())
sql = f'REPLACE INTO "{table}" ({keys}) VALUES ({binds});'
c = conn.cursor()
try:
c.execute(sql, data)
return True, True
except sqlite3.OperationalError as e:
if delete_db_on_operational_error:
rm_db(conn)
return False, f'Operational Error: {e}, Query: {sql}, Data: {data}'
except Exception as e:
return False, f'Query failed: {sql}, Error: {e}, Data: {data}'
def rm_db(conn):
"""
Delete the SQLite database file associated with a connection.
This function retrieves the file path of the SQLite database from the active connection,
closes the connection, and deletes the database file from disk.
### Parameters
- **conn** (`sqlite3.Connection`):
An active database connection object.
### Returns
- **bool**:
Always returns `True`.
### Notes
- Useful when the database schema has changed and `OperationalError` occurs
(e.g., after updates).
- Only the `main` database file is deleted (ignores attached databases).
- Any errors from file deletion are handled externally (through `disk.rm_file()`).
### Example
>>> rm_db(conn)
True
"""
for id_, name, filename in conn.execute('PRAGMA database_list'):
if name == 'main' and filename:
close(conn)
disk.rm_file(filename)
break
return True
def select(conn, sql, data=None, fetchone=False, as_dict=True, delete_db_on_operational_error=True):
"""
Execute a SELECT query against a SQLite database.
This function runs a SQL SELECT statement and retrieves zero or more rows of data.
It supports optional parameter binding, returning results either as dictionaries
or as default SQLite row objects.
### Parameters
- **conn** (`sqlite3.Connection`):
An active database connection object.
- **sql** (`str`):
The SQL SELECT statement to execute.
Use placeholders (`:key`) for parameterized queries.
- **data** (`dict`, optional):
Dictionary of parameters to bind to the SQL query.
Defaults to an empty dict (no parameters).
- **fetchone** (`bool`, optional):
If `True`, fetch only the first row.
If `False` (default), fetch all rows.
- **as_dict** (`bool`, optional):
If `True`, return results as a list of dictionaries.
If `False`, return raw SQLite row objects. Defaults to `True`.
- **delete_db_on_operational_error** (`bool`, optional):
If `True`, deletes the database file when an `OperationalError` occurs.
Defaults to `True`.
### Returns
- **tuple** (`bool`, `list or dict or str`):
- First element (`bool`): `True` if the query succeeded, `False` if it failed.
- Second element (`list`, `dict`, or `str`):
- A list of rows, or a single row if `fetchone=True`.
- Error message (`str`) on failure.
### Notes
- Results are returned as dictionaries if `as_dict=True`.
- If no results are found when `fetchone=True`, returns an empty list `[]`.
- On schema-related `OperationalError`, the database file can optionally be deleted.
### Example
>>> sql = 'SELECT hostname, service FROM status WHERE status = :status'
>>> data = {'status': 0}
>>> success, rows = select(conn, sql, data)
>>> if success:
>>> for row in rows:
>>> print(row['hostname'], row['service'])
>>> else:
>>> print(rows)
"""
if data is None:
data = {}
c = conn.cursor()
try:
if data:
c.execute(sql, data)
else:
c.execute(sql)
rows = c.fetchall()
if as_dict:
rows = [dict(row) for row in rows]