-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathget_schemas.py
More file actions
122 lines (115 loc) · 4.93 KB
/
get_schemas.py
File metadata and controls
122 lines (115 loc) · 4.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import glob
import json
import os
import textwrap
import re
valid_types = {'datetime': True, 'string': True, 'int': True, 'boolean': True, 'long': True, 'bool': True, 'dynamic': True, 'real': True, 'guid': True, 'double': True, 'object': True, 'enum': True, 'decimal': True, 'timespan': True}
# Extract tables from markdown files in Microsoft documentation.
def get_table_details(fn, base_dir):
inside_table = False
table_name = None
details = {}
data = open(fn).read()
# Parse [!INCLUDE [awscloudtrail](../includes/awscloudtrail-include.md)]
for include_fn in re.findall(r'\[!INCLUDE \[.*?\]\((.*?)\)\]', data):
if 'reusable-content' in include_fn:
print(include_fn)
exit()
include_path = os.path.abspath(os.path.join(os.path.dirname(fn), include_fn))
parsed_dir = os.path.dirname(os.path.dirname(include_path)) + os.sep
if not parsed_dir.startswith(base_dir + os.sep):
raise Exception(f"Include path {parsed_dir} is not in {base_dir}")
data += open(include_path).read() + '\n'
for line in data.splitlines():
line = line.strip()
if not line:
continue
line = line.replace('`','')
if not table_name and line.startswith('# '):
table_name = line.split()[1]
if (
line.lower().startswith('## columns')
or line.lower().startswith('| column name')
or line.lower().startswith('|column name')
):
inside_table = True
continue
# if not line.startswith('|'):
# inside_table = False
if line.startswith('#'):
inside_table = False
if not inside_table or not line.startswith('|'):
continue
column_details = line.replace(' ','').replace('\t','').split('|')
if len(column_details) < 4:
continue
column_name = column_details[1]
column_type = column_details[2].lower()
if column_type == 'bigint':
column_type = 'long'
if column_type == 'list':
column_type = 'string' # some tables refer to non-existing type 'list'
if column_type == 'nullablebool':
column_type = 'boolean'
if column_type == 'integer':
column_type = 'int'
if column_type == 'array':
column_type = 'string'
if column_type == 'object':
column_type = 'dynamic' # object types should map to dynamic in KQL
if column_type == 'enum':
column_type = 'string' # enum types should map to string
if column_name == 'Column' or column_name.startswith('--') or not column_name:
continue
if not column_type in valid_types:
print(f"Warning: Unknown column type '{column_type}' in {fn}, mapping to 'string'")
column_type = 'string' # Use string as a safe fallback for unknown types
details[column_name] = column_type
return table_name, details
def merge_additional_columns(tables, env_name):
additional_columns = json.load(open('additional_columns.json'))[env_name]
for table_name, extra_fields in additional_columns.items():
if table_name not in tables:
tables[table_name] = {}
for field_name, field_type in extra_fields.items():
tables[table_name][field_name] = field_type
environments = {
'm365': {
'dir_name': 'defender-docs/defender-xdr',
'base_dir': 'defender-docs',
'glob': '*-table.md',
'help': textwrap.dedent("""
git clone --depth=1 https://github.com/MicrosoftDocs/defender-docs
"""),
'magic_functions': [
'FileProfile',
'DeviceFromIP'
]
},
'sentinel': {
'dir_name': 'azure-reference-other/azure-monitor-ref/tables',
'base_dir': 'azure-reference-other',
'glob': '*.md',
'help': textwrap.dedent("""
git clone https://github.com/MicrosoftDocs/azure-reference-other ; git checkout 97f4433e37c4a95922407dcc4c3014c4badb6881
"""),
}
}
def main():
environment_details = {}
for env_name, env_details in environments.items():
if not os.path.exists(env_details['dir_name']):
print(f"ERROR: {env_details['dir_name']} does not exist. To create it, run:\n{env_details['help'].strip()}")
exit(1)
base_dir = os.path.abspath(env_details['base_dir'])
tables = {}
glob_pattern = os.path.join(env_details['dir_name'], env_details['glob'])
for table_fn in sorted(glob.glob(glob_pattern)):
table_name, details = get_table_details(table_fn, base_dir)
tables[table_name] = details
merge_additional_columns(tables, env_name)
details = dict(tables=tables, magic_functions=env_details.get('magic_functions', []))
environment_details[env_name] = details
print(json.dumps(environment_details, indent=2))
if __name__ == '__main__':
main()