-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsecurityutils.py
More file actions
288 lines (230 loc) · 9.61 KB
/
securityutils.py
File metadata and controls
288 lines (230 loc) · 9.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
'''
OVERVIEW:
This script is used to get collect and work with Security Rules associated with Azure Networks.
REQUIREMENTS:
- Azure Subscription
- Virtual Machine deployment with Network Interface
- Access to the resoruce group the virtual machine lives in .
- Pip install
azure-common
azure-mgmt
azure-cli
azure-cli-core
'''
import argparse
import sys
from azure.common.client_factory import get_client_from_cli_profile
from azure.common.credentials import ServicePrincipalCredentials
from azure.mgmt.compute import ComputeManagementClient
from azure.mgmt.network import NetworkManagementClient
from azure.mgmt.network.v2017_03_01.models import NetworkSecurityGroup
from azure.mgmt.network.v2017_03_01.models import SecurityRule, SecurityRuleDirection, SecurityRuleProtocol, SecurityRuleAccess
from azure.mgmt.resource.resources import ResourceManagementClient
'''
Obtain an instance of ServicePrincipalCredentials
Params:
sp_client_id : Service Principal Client Id
sp_secret : Service Principal Client Secret
sp_tenant : Azure Tenant for Service Principal
'''
def getServicePrincipal(sp_client_id, sp_secret, sp_tenant):
return ServicePrincipalCredentials(
client_id = sp_client_id,
secret = sp_secret,
tenant = sp_tenant
)
'''
Obtain an instance of NetworkManagementClient
Params:
spCred : Instance of ServicePrincipalCredentials
subscription_id : Azure Subscription ID
'''
def getNetworkClient( spCred, subscription_id):
return NetworkManagementClient(
spCred,
subscription_id
)
'''
Obtain an instance of NetworkManagementClient using logged in profile
'''
def getNetworkClient():
return get_client_from_cli_profile(NetworkManagementClient)
'''
Obtain an instance of ResourceManagementClient
Params:
spCred : Instance of ServicePrincipalCredentials
subscription_id : Azure Subscription ID
'''
def getResourceClient( spCred, subscription_id):
return ResourceManagementClient(
spCred,
subscription_id
)
'''
Obtain an instance of ResourceManagementClient using logged in profile
'''
def getResourceClient():
return get_client_from_cli_profile(ResourceManagementClient)
'''
Get all network interfaces in a resource group, returns list of NetworkInterfacesOperations
'''
def getNetworkInterfaces(network_client, resource_group):
network_interfaces = []
interface_list = network_client.network_interfaces.list(resource_group_name=resource_group)
for interface in interface_list:
network_interfaces.append(interface)
return network_interfaces
'''
Get list of effective ruls in rg and nsg associated with an interface.
Takes client and NetworkInterfaceOperations.
'''
def getEffectiveRules(network_client, resource_group, network_interface):
return getEffectiveRulesByName(network_client, resource_group, network_interface.name)
def getEffectiveRulesByName(network_client, resource_group, network_interface_name):
returnRules = []
effective_groups = network_client.network_interfaces.list_effective_network_security_groups(resource_group_name=resource_group, network_interface_name=network_interface_name)
effective_groups_list = effective_groups.result()
for effective_group in effective_groups_list.value:
nsgid = effective_group.network_security_group.id.split('/')
resource_group_index = nsgid.index('resourceGroups')
nsg_index = nsgid.index('networkSecurityGroups')
nsg_info = {}
nsg_info["resource_group"] = nsgid[resource_group_index + 1]
nsg_info["network_security_group"] = nsgid[nsg_index + 1]
nsg_info["rules"] = []
for rl in effective_group.effective_security_rules:
name = rl.name.split('/')
nsg_info["rules"].append(name[-1])
returnRules.append(nsg_info)
return returnRules
'''
Load rules associated with a security group.
Params:
network_client : Instance of NetworkManagementClient
resource_group : Azure Resource Group in the subscription
security_group : Network security group in the Azure Resource Group
'''
def loadInboundSecurityRules(network_client, resource_group, security_group):
return_rules = []
result = network_client.security_rules.list(resource_group_name=resource_group, network_security_group_name=security_group)
try:
foundRule = result.next()
while foundRule:
if foundRule.direction == 'Inbound':
return_rules.append(foundRule)
foundRule = result.next()
except StopIteration as ex:
print("Iteration complete")
except Exception as ex:
print("Unknown exception")
return return_rules
'''
Search a network security group looking for a rule by a specified name.
If found, return the instance of SecurityRule.
Params:
network_client : Instance of NetworkManagementClient
resource_group : Azure Resource Group in the subscription
security_group : Network security group in the Azure Resource Group
nsg_rule_name : Network security rule name to search for
'''
def getSecurityRule(network_client, resource_group, security_group, nsg_rule_name):
security_rule = None
result = network_client.security_rules.list(resource_group_name=resource_group, network_security_group_name=security_group)
try:
foundRule = result.next()
while foundRule:
if( foundRule.name == nsg_rule_name):
security_rule = foundRule
break
foundRule = result.next()
except StopIteration as ex:
print("Iteration complete")
except Exception as ex:
print("Unknown exception")
return security_rule
'''
Takes a list of SecurityRule objects and splits them, based on destination port,
into an allow and deny dictionaries of dict[priority] = rule
Params:
rule_list : List of SecurityRule objects
destination_port : Port for access
'''
def splitRules(rule_list, destination_port):
allow = {}
deny = {}
for rule in rule_list:
current_dict = allow
if rule.access != 'Allow':
current_dict = deny
str_port = str(destination_port)
if (rule.destination_port_range == str_port) or (rule.destination_port_range == '*') or (str_port in rule.destination_port_ranges):
current_dict[rule.priority] = rule
return allow, deny
'''
Takes an instances of SecurityRule and determines if it's IP based.
Security rule is IP based IF:
- source_address prefix is NOT equal to 'AzureLoadBalancer' or 'VirtualNetwork'
- source_address_prefix is not None or source_address_prefixes is not empty
'''
def isRuleIpBased(security_rule):
return_value = False
if security_rule.source_address_prefix == 'AzureLoadBalancer' or security_rule.source_address_prefix == 'VirtualNetwork':
return_value = False
elif security_rule.source_address_prefix != None or len(security_rule.source_address_prefixes) > 0:
return_value = True
return return_value
'''
Takes an instances of SecurityRule and determines an IP is in it.
'''
def isIpPresent(security_rule, ip_addr):
return_value = False
if security_rule.source_address_prefix ==ip_addr or ip_addr in security_rule.source_address_prefixes:
return_value = True
return return_value
'''
Updates an existing security rule to add in new IP address.
Params:
network_client : Instance of NetworkManagementClient
existing_rule : Instance of Security_Rule or None
resource_group : Azure Resource Group in the subscription
nsg_name : Network security group in the Azure Resource Group
requested_ip : External IP Address
'''
def updateSecurityRule(network_client, existing_rule, resource_group, nsg_name, requested_ip):
existing_rule.source_address_prefixes.append(requested_ip)
async_security_rule = network_client.security_rules.create_or_update(resource_group, nsg_name, existing_rule.name, existing_rule)
updated_security_rule = async_security_rule.result()
print(updated_security_rule)
'''
Create a security rule in a given network security group. The rule will allow inbound traffic
on the selected port through the NSG for the specified IP address.
Params:
network_client : Instance of NetworkManagementClient
resource_group : Azure Resource Group in the subscription
nsg_name : Network security group in the Azure Resource Group
nsg_rule_name : Network security rule name to search for
priority: Priority between 100 - 4001
port: Port for access
requested_ip : External IP Address
'''
def createSecurityRule(network_client, resource_group, nsg_name, nsg_rule_name, priority, port, requested_ip):
print("Create or update rule ", nsg_rule_name)
ruleConfiguration = {
'access': SecurityRuleAccess.allow,
'description':'New Test security rule',
'destination_address_prefix':'*',
'destination_port_range': str(port),
'direction': SecurityRuleDirection.inbound,
'priority': int(priority),
'protocol': SecurityRuleProtocol.tcp,
'source_port_range':'*',
'source_address_prefix' : requested_ip
}
async_security_rule = network_client.security_rules.create_or_update(
resource_group,
nsg_name,
nsg_rule_name,
ruleConfiguration
)
security_rule = async_security_rule.result()
print(security_rule)