forked from seveas/python-hpilo
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhpilo.py
More file actions
1214 lines (1042 loc) · 50.7 KB
/
hpilo.py
File metadata and controls
1214 lines (1042 loc) · 50.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# (c) 2011-2012 Dennis Kaarsemaker <dennis@kaarsemaker.net>
# see COPYING for license details
import os
import platform
import random
import re
import socket
import subprocess
import sys
import warnings
import hpilo_fw
PY3 = sys.version_info[0] >= 3
if PY3:
import io as StringIO
b = lambda x: bytes(x, 'ascii')
class Bogus(Exception): pass
socket.sslerror = Bogus
basestring = str
else:
import cStringIO as StringIO
b = lambda x: x
try:
import ssl
except ImportError:
# Fallback for older python versions
class ssl:
PROTOCOL_TLSv1 = 3
@staticmethod
def wrap_socket(sock, *args, **kwargs):
return socket.ssl(sock)
try:
import xml.etree.cElementTree as etree
except ImportError:
import cElementTree as etree
# Which protocol to use
ILO_RAW = 1
ILO_HTTP = 2
ILO_LOCAL = 3
_untested = []
def untested(meth):
"""Decorator to mark a method as untested"""
meth.untested = True
if hasattr(meth, 'func_name'):
_untested.append(meth.func_name)
else:
_untested.append(meth.__name__)
return meth
class IloError(Exception):
def __init__(self, message, errorcode=None):
super(IloError, self).__init__(message)
self.errorcode = errorcode
class IloCommunicationError(IloError):
pass
class IloLoginFailed(IloError):
possible_messages = ['User login name was not found', 'Login failed', 'Login credentials rejected']
possible_codes = [0x005f, 0x000a]
pass
class IloWarning(Warning):
pass
class Ilo(object):
"""Represents an iLO/iLO2/iLO3/RILOE II management interface on a
specific host. A new connection using the specified login, password and
timeout will be made for each API call. The library will detect which
protocol to use, but you can override this by setting protocol to
ILO_RAW or ILO_HTTP. Use ILO_LOCAL to avoid using a network connection
and use hponcfg instead. Username and password are ignored for ILO_LOCAL
connections."""
XML_HEADER = b('<?xml version="1.0"?>\r\n')
HTTP_HEADER = "POST /ribcl HTTP/1.1\r\nHost: localhost\r\nContent-Length: %d\r\nConnection: Close%s\r\n\r\n"
HTTP_UPLOAD_HEADER = "POST /cgi-bin/uploadRibclFiles HTTP/1.1\r\nHost: localhost\r\nConnection: Close\r\nContent-Length: %d\r\nContent-Type: multipart/form-data; boundary=%s\r\n\r\n"
BLOCK_SIZE = 4096
hponcfg = "/sbin/hponcfg"
if platform.system() == 'Windows':
hponcfg = 'C:\Program Files\HP Lights-Out Configuration Utility\cpqlocfg.exe'
def __init__(self, hostname, login=None, password=None, timeout=60, port=443, protocol=None):
self.hostname = hostname
self.login = login or 'Administrator'
self.password = password or 'Password'
self.timeout = timeout
self.debug = 0
self.port = port
self.protocol = protocol
self.cookie = None
def __str__(self):
return "iLO interface of %s" % self.hostname
def _debug(self, level, message):
if message.__class__.__name__ == 'bytes':
message = message.decode('latin-1')
if self.debug >= level:
sys.stderr.write(re.sub(r'PASSWORD=".*"', 'PASSWORD="********"', message))
if message.startswith('\r'):
sys.stderr.flush()
else:
sys.stderr.write('\n')
def _request(self, xml, progress=None):
"""Given an ElementTree.Element, serialize it and do the request.
Returns an ElementTree.Element containing the response"""
if not self.protocol:
self._detect_protocol()
# Serialize the XML
if hasattr(etree, 'tostringlist'):
xml = b("\r\n").join(etree.tostringlist(xml)) + b('\r\n')
else:
xml = etree.tostring(xml)
header, data = self._communicate(xml, self.protocol, progress=progress)
# This thing usually contains multiple XML messages
messages = []
while data:
pos = data.find('<?xml', 5)
if pos == -1:
message = self._parse_message(data)
data = None
else:
message = self._parse_message(data[:pos])
data = data[pos:]
# _parse_message returns None if a message has no useful content
if message is not None:
messages.append(message)
if not messages:
return header, None
elif len(messages) == 1:
return header, messages[0]
else:
return header, messages
def _detect_protocol(self):
# Use hponcfg when 'connecting' to localhost
if self.hostname == 'localhost':
self.protocol = ILO_LOCAL
return
# Do a bogus request, using the HTTP protocol. If there is no
# header (see special case in communicate(), we should be using the
# raw protocol
header, data = self._communicate(b('<RIBCL VERSION="2.0"></RIBCL>'), ILO_HTTP)
if header:
self.protocol = ILO_HTTP
else:
self.protocol = ILO_RAW
def _upload_file(self, filename, progress):
firmware = open(filename, 'rb').read()
boundary = b('------hpiLO3t' + str(random.randint(100000,1000000)) + 'z')
while boundary in firmware:
boundary = b('------hpiLO3t' + str(random.randint(100000,1000000)) + 'z')
parts = [
b("--") + boundary + b("""\r\nContent-Disposition: form-data; name="fileType"\r\n\r\n"""),
b("\r\n--") + boundary + b('''\r\nContent-Disposition: form-data; name="fwimgfile"; filename="''') + b(filename) + b('''"\r\nContent-Type: application/octet-stream\r\n\r\n'''),
firmware,
b("\r\n--") + boundary + b("--\r\n"),
]
total_bytes = sum([len(x) for x in parts])
sock = self._get_socket()
self._debug(2, self.HTTP_UPLOAD_HEADER % (total_bytes, boundary.decode('ascii')))
sock.write(b(self.HTTP_UPLOAD_HEADER % (total_bytes, boundary.decode('ascii'))))
for part in parts:
if len(part) < 2048:
self._debug(2, part)
sock.write(part)
else:
sent = 0
pkglen = 2048
fwlen = len(part)
while sent < fwlen:
written = sock.write(part[sent:sent+pkglen])
sent += written
if callable(progress):
progress("\r\033[KSent %d/%d bytes (%d%%)" % (sent, fwlen, 100.0*sent/fwlen))
else:
self._debug(2, "\r\033[KSent %d/%d bytes (%d%%)" % (sent, fwlen, 100.0*sent/fwlen))
if callable(progress):
progress("")
else:
self._debug(2, "")
data = ''
try:
while True:
d = sock.read()
data += d.decode('latin-1')
if not d:
break
except socket.sslerror: # Connection closed
e = sys.exc_info()[1]
if not data:
raise IloCommunicationError("Communication with %s:%d failed: %s" % (self.hostname, self.port, str(e)))
self._debug(1, "Received %d bytes" % len(data))
self._debug(2, data)
self.cookie = re.search('Set-Cookie: *(.*)', data).groups(1)
self._debug(2, "Cookie: %s" % self.cookie)
def _get_socket(self):
"""Set up a subprocess or an https connection and do an HTTP/raw socket request"""
if self.protocol == ILO_LOCAL:
self._debug(1, "Launching hponcfg")
try:
sp = subprocess.Popen([self.hponcfg, '--input', '--xmlverbose'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=None)
except OSError:
e = sys.exc_info()[1]
raise IloCommunicationError("Cannot run %s: %s" % (self.hponcfg, str(e)))
sp.write = sp.stdin.write
sp.read = sp.stdout.read
return sp
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
self._debug(1, "Connecting to %s:%d" % (self.hostname, self.port))
try:
sock.connect((self.hostname, self.port))
except socket.timeout:
raise IloCommunicationError("Timeout connecting to %s:%d" % (self.hostname, self.port))
except socket.error:
e = sys.exc_info()[1]
raise IloCommunicationError("Error connecting to %s:%d: %s" % (self.hostname, self.port, str(e)))
try:
return ssl.wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLSv1)
except socket.sslerror:
e = sys.exc_info()[1]
raise IloCommunicationError("Cannot establish ssl session with %s:%d: %s" % (self.hostname, self.port, e.message or str(e)))
def _communicate(self, xml, protocol, progress=None):
sock = self._get_socket()
msglen = msglen_ = len(self.XML_HEADER + xml)
if protocol == ILO_HTTP:
extra_header = ''
if self.cookie:
extra_header = "\r\nCookie: %s" % self.cookie
http_header = self.HTTP_HEADER % (msglen, extra_header)
msglen += len(http_header)
self._debug(1, "Sending XML request, %d bytes" % msglen)
if protocol == ILO_HTTP:
self._debug(2, http_header)
sock.write(b(http_header))
self._debug(2, self.XML_HEADER + xml)
# XML header and data need to arrive in 2 distinct packets
if self.protocol != ILO_LOCAL:
sock.write(self.XML_HEADER)
if b('$EMBED') in xml:
pre, name, post = re.compile(b(r'(.*)\$EMBED:(.*)\$(.*)'), re.DOTALL).match(xml).groups()
sock.write(pre)
sent = 0
fwlen = os.path.getsize(name)
fw = open(name, 'rb').read()
pkglen = 2048
while sent < fwlen:
written = sock.write(fw[sent:sent+pkglen])
sent += written
if callable(progress):
progress("\r\033[KSent %d/%d bytes (%d%%)" % (sent, fwlen, 100.0*sent/fwlen))
else:
self._debug(2, "\r\033[KSent %d/%d bytes (%d%%)" % (sent, fwlen, 100.0*sent/fwlen))
if callable(progress):
progress("")
else:
self._debug(2, "")
sock.write(post.strip())
else:
sock.write(xml)
# And grab the data
if self.protocol == ILO_LOCAL:
# hponcfg doesn't return data until stdin is closed
sock.stdin.close()
data = ''
try:
while True:
d = sock.read().decode('latin-1')
data += d
if not d:
break
if callable(progress) and d.strip().endswith('</RIBCL>'):
d = d[d.find('<?xml'):]
while '<?xml' in d:
end = d.find('<?xml', 5)
if end == -1:
progress(self._parse_message(d, include_inform=True))
break
else:
progress(self._parse_message(d[:end], include_inform=True))
d = d[end:]
except socket.sslerror: # Connection closed
e = sys.exc_info()[1]
if not data:
raise IloCommunicationError("Communication with %s:%d failed: %s" % (self.hostname, self.port, str(e)))
self._debug(1, "Received %d bytes" % len(data))
# Stript out garbage from hponcfg
if self.protocol == ILO_LOCAL:
data = data[data.find('<'):data.rfind('>')+1]
# Do we have HTTP?
header_ = ''
if protocol == ILO_HTTP and data.startswith('HTTP/1.1 200'):
header, data = data.split('\r\n\r\n', 1)
header_ = header
header = [x.split(':', 1) for x in header.split('\r\n')[1:]]
header = dict([(x[0].lower(), x[1].strip()) for x in header])
if header['transfer-encoding'] == 'chunked':
_data, data = data, ''
while _data:
clen, _data = _data.split('\r\n', 1)
clen = int(clen, 16)
if clen == 0:
break
data += _data[:clen]
_data = _data[clen+2:]
elif data.startswith('HTTP/1.1 404'):
# We must be using iLO2 or older, they don't do HTTP for XML requests
# This case is only triggered by the protocol detection
header = None
else:
header = None
self._debug(2, "%s\r\n\r\n%s" % (header_, data))
return header, data
def _root_element(self, element, **attrs):
"""Create a basic XML structure for a message. Return root and innermost element"""
root = etree.Element('RIBCL', VERSION="2.0")
login = etree.SubElement(root, 'LOGIN', USER_LOGIN=self.login, PASSWORD=self.password)
element = etree.SubElement(login, element, **attrs)
return root, element
def _parse_message(self, data, include_inform=False):
"""Parse iLO responses into Element instances and remove useless messages"""
# Bug in some ilo versions causes malformed XML
if '<RIBCL VERSION="2.22"/>' in data:
data = data.replace('<RIBCL VERSION="2.22"/>', '<RIBCL VERSION="2.22">')
if re.search(r'''=+ *[^"'\n=]''', data):
data = re.sub(r'''= *([^"'\n]+?) *\n''', r'="\1"', data)
data = data.strip()
if not data:
return None
message = etree.fromstring(data)
if message.tag == 'RIBCL':
for child in message:
if child.tag == 'INFORM':
if include_inform:
# Filter useless message:
if 'should be updated' in child.text:
return None
return child.text
# RESPONE with status 0 also adds no value
elif child.tag == 'RESPONSE' and int(child.get('STATUS'), 16) == 0:
if child.get('MESSAGE') != 'No error':
warnings.warn(child.get('MESSAGE'), IloWarning)
# These are interesting, something went wrong
elif child.tag == 'RESPONSE':
if 'syntax error' in child.get('MESSAGE') and not self.protocol:
# This is triggered when doing protocol detection, ignore
pass
else:
status = int(child.get('STATUS'), 16)
message = child.get('MESSAGE')
if status in IloLoginFailed.possible_codes or \
message in IloLoginFailed.possible_messages:
raise IloLoginFailed(message, status)
raise IloError(message, status)
# And this type of message is the actual payload.
else:
return message
return None
# This shouldn't be reached as all messages are RIBCL messages. But who knows!
return message
def _element_children_to_dict(self, element):
"""Returns a dict with tag names of all child elements as keys and the
VALUE attributes as values"""
retval = {}
keys = [elt.tag.lower() for elt in element]
if len(keys) != 1 and len(set(keys)) == 1:
# Can't return a dict
retval = []
for elt in element:
key, val, unit = elt.tag.lower(), elt.get('VALUE', elt.get('value', None)), elt.get('UNIT', None)
if val is None:
# HP is not best friends with consistency. Sometimes there are
# attributes, sometimes child tags and sometimes text nodes. Oh
# well, deal with it :)
if list(elt):
val = self._element_to_dict(elt)
elif elt.text:
val = elt.text
elif elt.attrib:
val = self._element_to_dict(elt)
val = self._coerce(val)
if unit:
if isinstance(retval, list):
retval.append((val,unit))
else:
retval[key] = (val, unit)
else:
if isinstance(retval, list):
retval.append(val)
else:
retval[key] = val
return retval
def _element_to_dict(self, element):
"""Returns a dict with tag attributes as items"""
retval = {}
for key, val in element.attrib.items():
retval[key.lower()] = self._coerce(val)
return retval
def _coerce(self, val):
"""Do some data type coercion: unquote, turn integers into integers and
Y/N into booleans"""
if isinstance(val, basestring):
if val.startswith('"') and val.endswith('"'):
val = val[1:-1]
if val.isdigit():
val = int(val)
else:
val = {'Y': True, 'N': False}.get(val, val)
return val
def _raw(self, *tags):
root, inner = self._root_element(tags[0][0], **(tags[0][1]))
for t in tags[1:]:
inner = etree.SubElement(inner, t[0], **t[1])
header, message = self._request(root)
fd = StringIO.StringIO()
etree.ElementTree(message).write(fd)
ret = fd.getvalue()
fd.close()
return ret
def _info_tag(self, infotype, tagname, returntags=None, attrib={}):
root, inner = self._root_element(infotype, MODE='read')
etree.SubElement(inner, tagname, **attrib)
header, message = self._request(root)
# Use tagname as returntags if returntags isn't specificed
if isinstance(returntags, basestring):
returntags = [returntags]
elif returntags is None:
returntags = [tagname]
for tag in returntags:
if message.find(tag) is None:
continue
message = message.find(tag)
if list(message):
return self._element_children_to_dict(message)
else:
return self._element_to_dict(message)
raise IloError("Expected tag '%s' not found" % "' or '".join(returntags))
def _info_tag2(self, infotype, tagname, returntag=None, key=None):
root, inner = self._root_element(infotype, MODE='read')
etree.SubElement(inner, tagname)
header, message = self._request(root)
message = message.find(returntag or tagname)
if key:
retval = {}
else:
retval = []
for elt in message:
elt = self._element_to_dict(elt)
if key:
retval[elt[key]] = elt
else:
retval.append(elt)
return retval
def _control_tag(self, controltype, tagname, returntag=None, attrib={}, elements=[], text=None):
root, inner = self._root_element(controltype, MODE='write')
inner = etree.SubElement(inner, tagname, **attrib)
if text:
inner.text = text
for element in elements:
inner.append(element)
header, message = self._request(root)
if message is None:
return None
message = message.find(returntag or tagname)
if message.text.strip():
return message.text.strip()
raise IloError("You've reached unknown territories, please report a bug")
if list(message):
return self._element_children_to_dict(message)
else:
return self._element_to_dict(message)
def activate_license(self, key):
"""Activate an iLO advanced license"""
license = etree.Element('ACTIVATE', KEY=key)
return self._control_tag('RIB_INFO', 'LICENSE', elements=[license])
def add_user(self, user_login, user_name, password, admin_priv=False,
remote_cons_priv=True, reset_server_priv=False,
virtual_media_priv=False, config_ilo_priv=True):
"""Add a new user to the iLO interface with the specified name,
password and permissions. Permission attributes should be boolean
values."""
attrs = locals()
elements = []
for attribute in [x for x in attrs.keys() if x.endswith('_priv')]:
val = ['No', 'Yes'][bool(attrs[attribute])]
elements.append(etree.Element(attribute.upper(), VALUE=val))
return self._control_tag('USER_INFO', 'ADD_USER', elements=elements,
attrib={'USER_LOGIN': user_login, 'USER_NAME': user_name, 'PASSWORD': password})
@untested
def ahs_clear_data(self):
"""FIXME: I have no relevant hardware. Please report sample output"""
return self._raw(('RIB_INFO', {'MODE': 'WRITE'}), ('AHS_CLEAR_DATA', {}))
def cert_fqdn(self, use_fqdn):
"""Configure whether to use the fqdn or the short hostname for certificate requests"""
use_fqdn = str({True: 'Yes', False: 'No'}.get(use_fqdn, use_fqdn))
return self._control_tag('RIB_INFO', 'CERT_FQDN', attrib={'VALUE': use_fqdn})
def certificate_signing_request(self):
"""Get a certificate signing request from the iLO"""
return self._control_tag('RIB_INFO', 'CERTIFICATE_SIGNING_REQUEST')
def clear_ilo_event_log(self):
"""Clears the iLO event log"""
return self._control_tag('RIB_INFO', 'CLEAR_EVENTLOG')
def clear_server_event_log(self):
"""Clears the server event log"""
return self._control_tag('SERVER_INFO', 'CLEAR_IML')
def clear_server_power_on_time(self):
"""Clears the server power on time"""
return self._control_tag('SERVER_INFO', 'CLEAR_SERVER_POWER_ON_TIME')
def computer_lock_config(self, computer_lock=None, computer_lock_key=None):
"""Configure the computer lock settings"""
if computer_lock_key:
computer_lock = "custom"
if not computer_lock:
raise ValueError("A value must be specified for computer_lock")
elements = [etree.Element('COMPUTER_LOCK', VALUE=computer_lock)]
if computer_lock_key:
elements.append(etree.Element('COMPUTER_LOCK_KEY', VALUE=computer_lock_key))
return self._control_tag('RIB_INFO', 'COMPUTER_LOCK_CONFIG', elements=elements)
def delete_user(self, user_login):
"""Delete the specified user from the ilo"""
return self._control_tag('USER_INFO', 'DELETE_USER', attrib={'USER_LOGIN': user_login})
def eject_virtual_floppy(self):
"""Eject the virtual floppy"""
return self._control_tag('RIB_INFO', 'EJECT_VIRTUAL_FLOPPY')
def eject_virtual_media(self, device="cdrom"):
"""Eject the virtual media attached to the specified device"""
return self._control_tag('RIB_INFO', 'EJECT_VIRTUAL_MEDIA', attrib={"DEVICE": device.upper()})
def factory_defaults(self):
"""Reset the iLO to factory default settings"""
return self._control_tag('RIB_INFO', 'FACTORY_DEFAULTS')
@untested
def get_ahs_status(self):
"""FIXME: I have no relevant hardware. Please report sample output"""
return self._raw(('RIB_INFO', {'MODE': 'READ'}), ('GET_AHS_STATUS', {}))
def get_all_users(self):
"""Get a list of all loginnames"""
data = self._info_tag2('USER_INFO', 'GET_ALL_USERS', key='value')
return [x for x in data if x]
def get_all_user_info(self):
"""Get basic and authorization info of all users"""
return self._info_tag2('USER_INFO', 'GET_ALL_USER_INFO', key='user_login')
def get_cert_subject_info(self):
"""Get ssl certificate subject information"""
return self._info_tag('RIB_INFO', 'GET_CERT_SUBJECT_INFO', 'CSR_CERT_SETTINGS')
def get_dir_config(self):
"""Get directory authentication configuration"""
return self._info_tag('DIR_INFO', 'GET_DIR_CONFIG')
def get_embedded_health(self):
"""Get server health information"""
root, attach = self._root_element('SERVER_INFO', MODE='read')
etree.SubElement(attach, 'GET_EMBEDDED_HEALTH')
header, message = self._request(root)
health = {}
for category in message.find('GET_EMBEDDED_HEALTH_DATA'):
tag = category.tag.lower()
if not list(category):
health[tag] = None
elif not list(category[0]):
health[tag] = {}
for elt in category:
elttag = elt.tag.lower()
if elttag not in health[tag]:
health[tag][elttag] = {}
health[tag][elttag].update(self._element_to_dict(elt))
else:
health[tag] = [self._element_children_to_dict(x) for x in category]
if 'location' in health[tag][0]:
health[tag] = dict((x['location'], x) for x in health[tag])
elif 'label' in health[tag][0]:
health[tag] = dict((x['label'], x) for x in health[tag])
return health
def get_fw_version(self):
"""Get the iLO firmware version"""
return self._info_tag('RIB_INFO', 'GET_FW_VERSION')
def get_global_settings(self):
"""Get global iLO settings"""
return self._info_tag('RIB_INFO', 'GET_GLOBAL_SETTINGS')
def get_host_data(self, decoded_only=True):
"""Get SMBIOS records that describe the host. By default only the ones
where human readable information is available are returned. To get
all records pass :attr:`decoded_only=False` """
root, attach = self._root_element('SERVER_INFO', MODE='read')
etree.SubElement(attach, 'GET_HOST_DATA')
header, message = self._request(root)
records = []
for record in message.find('GET_HOST_DATA').findall('SMBIOS_RECORD'):
if decoded_only and not list(record):
continue
record_ = self._element_to_dict(record)
record_['fields'] = []
for field in list(record):
record_['fields'].append(self._element_to_dict(field))
names = [x['name'] for x in record_['fields']]
if len(names) == len(set(names)):
for field in record_['fields']:
record_[field['name']] = field['value']
del record_['fields']
records.append(record_)
return records
def get_host_power_saver_status(self):
"""Get the configuration of the ProLiant power regulator"""
return self._info_tag('SERVER_INFO', 'GET_HOST_POWER_SAVER_STATUS', 'GET_HOST_POWER_SAVER')
def get_host_power_status(self):
"""Whether the server is powered on or not"""
data = self._info_tag('SERVER_INFO', 'GET_HOST_POWER_STATUS', 'GET_HOST_POWER')
return data['host_power']
def get_host_pwr_micro_ver(self):
"""Get the version of the power micro firmware"""
data = self._info_tag('SERVER_INFO', 'GET_HOST_PWR_MICRO_VER')
return data['pwr_micro']['version']
def get_ilo_event_log(self):
"""Get the full iLO event log"""
return self._info_tag2('RIB_INFO', 'GET_EVENT_LOG', 'EVENT_LOG')
def get_language(self):
"""Get the default language set"""
return self._info_tag('RIB_INFO', 'GET_LANGUAGE')
def get_all_languages(self):
"""Get the list of installed languages - broken because iLO returns invalid XML"""
return self._info_tag('RIB_INFO', 'GET_ALL_LANGUAGES')
def get_network_settings(self):
"""Get the iLO network settings"""
return self._info_tag('RIB_INFO', 'GET_NETWORK_SETTINGS')
def get_oa_info(self):
"""Get information about the OA of the enclosing chassis"""
return self._info_tag('BLADESYSTEM_INFO', 'GET_OA_INFO')
def get_one_time_boot(self):
"""Get the one time boot state of the host"""
# Inconsistency between iLO 2 and 3, let's fix that
data = self._info_tag('SERVER_INFO', 'GET_ONE_TIME_BOOT', ('ONE_TIME_BOOT', 'GET_ONE_TIME_BOOT'))
if 'device' in data['boot_type']:
data['boot_type'] = data['boot_type']['device']
return data
def get_persistent_boot(self):
"""Get the boot order of the host"""
return self._info_tag('SERVER_INFO', 'GET_PERSISTENT_BOOT', 'PERSISTENT_BOOT')
def get_power_cap(self):
"""Get the power cap setting"""
data = self._info_tag('SERVER_INFO', 'GET_POWER_CAP')
return data['power_cap']
def get_power_readings(self):
"""Get current, min, max and average power readings"""
return self._info_tag('SERVER_INFO', 'GET_POWER_READINGS')
def get_pwreg(self):
"""Get the power and power alert threshold settings"""
return self._info_tag('SERVER_INFO', 'GET_PWREG')
def get_server_auto_pwr(self):
"""Get the automatic power on delay setting"""
data = self._info_tag('SERVER_INFO', 'GET_SERVER_AUTO_PWR')
return data['server_auto_pwr']
def get_server_event_log(self):
"""Get the IML log of the server"""
return self._info_tag2('SERVER_INFO', 'GET_EVENT_LOG', 'EVENT_LOG')
def get_server_name(self):
"""Get the name of the server this iLO is managing"""
name = self._info_tag('SERVER_INFO', 'GET_SERVER_NAME', 'SERVER_NAME')
return name['value']
def get_server_power_on_time(self):
"""How many minutes ago has the server been powered on"""
minutes = self._info_tag('SERVER_INFO', 'GET_SERVER_POWER_ON_TIME', 'SERVER_POWER_ON_MINUTES')
return int(minutes['value'])
def get_snmp_im_settings(self):
"""Where does the iLO send SNMP traps to and which traps does it send"""
return self._info_tag('RIB_INFO', 'GET_SNMP_IM_SETTINGS')
def get_sso_settings(self):
"""Get the HP SIM Single Sign-On settings"""
root, attach = self._root_element('SSO_INFO', MODE='read')
etree.SubElement(attach, 'GET_SSO_SETTINGS')
header, message = self._request(root)
retval = {}
for record in message.find('GET_SSO_SETTINGS'):
tag = record.tag.lower()
attrib = record.attrib
if 'VALUE' in attrib:
retval[tag] = self._coerce(attrib['VALUE'])
continue
if tag not in retval:
retval[tag] = {}
retval[tag].update(dict([(x[0].lower(), self._coerce(x[1])) for x in attrib.items()]))
return retval
def get_twofactor_settings(self):
"""Get two-factor authentication settings"""
return self._info_tag('RIB_INFO', 'GET_TWOFACTOR_SETTINGS')
def get_uid_status(self):
"""Get the status of the UID light"""
data = self._info_tag('SERVER_INFO', 'GET_UID_STATUS')
return data['uid']
def get_user(self, user_login):
"""Get user info about a specific user"""
return self._info_tag('USER_INFO', 'GET_USER', attrib={'USER_LOGIN': user_login})
def get_vm_status(self, device="CDROM"):
"""Get the status of virtual media devices. Valid devices are FLOPPY and CDROM"""
return self._info_tag('RIB_INFO', 'GET_VM_STATUS', attrib={'DEVICE': device})
def hotkey_config(self, ctrl_t=None, ctrl_u=None, ctrl_v=None, ctrl_w=None,
ctrl_x=None, ctrl_y=None):
"""Change a set of shortcuts"""
vars = locals()
del vars['self']
elements = [etree.Element(x.upper(), VALUE=vars[x]) for x in vars if vars[x] is not None]
return self._control_tag('RIB_INFO', 'HOTKEY_CONFIG', elements=elements)
def import_certificate(self, certificate):
"""Import a signed SSL certificate"""
return self._control_tag('RIB_INFO', 'IMPORT_CERTIFICATE', text=certificate)
def import_ssh_key(self, user_login, ssh_key):
"""Imports an SSH key for the specified user. The value of ssh_key
should be the content of an id_dsa.pub file"""
# Basic sanity checking
if ' ' not in ssh_key:
raise ValueError("Invalid SSH key")
algo, key = ssh_key.split(' ',2)[:2]
if algo != 'ssh-dss':
raise ValueError("Invalid SSH key, only DSA keys are supported")
try:
key.decode('base64')
except Exception:
raise ValueError("Invalid SSH key")
key = "-----BEGIN SSH KEY-----\r\n%s\r\n%s\r\n%s\r\n-----END SSH KEY-----\r\n" % (algo, key, user_login)
return self._control_tag('RIB_INFO', 'IMPORT_SSH_KEY', text=key)
# Not sure how this would work, and I have no relevant hardware
#def insert_virtual_floppy(self, device, image_location):
# """Insert a virtual floppy"""
# return self._control_tag('RIB_INFO', 'INSERT_VIRTUAL_FLOPPY', attrib={'IMAGE_LOCATION': image_location})
@untested
def delete_ssh_key(self, user_login):
"""Delete a users SSH key"""
return self._control_tag('USER_INFO', 'MOD_USER', attrib={'USER_LOGIN': user_login}, elements=[etree.Element('DEL_USERS_SSH_KEY')])
def insert_virtual_media(self, device, image_url):
"""Insert a virtual floppy or CDROM. Note that you will also need to
use :func:`set_vm_status` to connect the media"""
return self._control_tag('RIB_INFO', 'INSERT_VIRTUAL_MEDIA', attrib={'DEVICE': device.upper(), 'IMAGE_URL': image_url})
def mod_global_settings(self, session_timeout=None, f8_prompt_enabled=None,
f8_login_required=None, lock_configuration=None,
http_port=None, https_port=None, ssh_port=None, ssh_status=None,
vmedia_disable=None, virtual_media_port=None, remote_console_port=None,
min_password=None, enfoce_aes=None, authentication_failure_logging=None,
rbsu_post_ip=None, remote_console_encryption=None, remote_keyboard_model=None,
terminal_services_port=None, high_performance_mouse=None,
shared_console_enable=None, shared_console_port=None,
remote_console_acquire=None, brownout_recovery=None):
"""Modify iLO global settings, only values that are specified will be changed."""
vars = dict(locals())
del vars['self']
elements = [etree.Element(x.upper(), VALUE=str({True: 'Yes', False: 'No'}.get(vars[x], vars[x])))
for x in vars if vars[x] is not None]
return self._control_tag('RIB_INFO', 'MOD_GLOBAL_SETTINGS', elements=elements)
def mod_network_settings(self, enable_nic=None, reg_ddns_server=None,
ping_gateway=None, dhcp_domain_name=None, speed_autoselect=None,
nic_speed=None, full_duplex=None, dhcp_enable=None,
ip_address=None, subnet_mask=None, gateway_ip_address=None,
dns_name=None, domain_name=None, dhcp_gateway=None,
dhcp_dns_server=None, dhcp_wins_server=None, dhcp_static_route=None,
reg_wins_server=None, prim_dns_server=None, sec_dns_server=None,
ter_dns_server=None, prim_wins_server=None, sec_wins_server=None,
static_route_1=None, static_route_2=None, static_route_3=None,
dhcp_sntp_settings=None, sntp_server1=None, sntp_server2=None,
timezone=None, enclosure_ip_enable=None, web_agent_ip_address=None,
shared_network_port=None, vlan_enabled=None, vlan_id=None,
shared_network_port_vlan=None, shared_network_port_vlan_id=None):
"""Configure the network settings for the iLO card"""
vars = dict(locals())
del vars['self']
elements = [etree.Element(x.upper(), VALUE=str({True: 'Yes', False: 'No'}.get(vars[x], vars[x])))
for x in vars if vars[x] is not None]
return self._control_tag('RIB_INFO', 'MOD_NETWORK_SETTINGS', elements=elements)
def mod_dir_config(self, dir_authentication_enabled=None,
dir_local_user_acct=None,dir_server_address=None,
dir_server_port=None,dir_object_dn=None,dir_object_password=None,
dir_user_context_1=None,dir_user_context_2=None,
dir_user_context_3=None,dir_user_context_4=None,
dir_user_context_5=None,dir_user_context_6=None,
dir_user_context_7=None,dir_user_context_8=None,
dir_user_context_9=None,dir_user_context_10=None,
dir_user_context_11=None,dir_user_context_12=None,
dir_user_context_13=None,dir_user_context_14=None,
dir_user_context_15=None,dir_enable_grp_acct=None,
dir_kerberos_enabled=None,dir_kerberos_realm=None,
dir_kerberos_kdc_address=None,dir_kerberos_kdc_port=None,
dir_kerberos_keytab=None,
dir_grpacct1_name=None,dir_grpacct1_sid=None,
dir_grpacct1_priv=None,dir_grpacct2_name=None,
dir_grpacct2_sid=None,dir_grpacct2_priv=None,
dir_grpacct3_name=None,dir_grpacct3_sid=None,
dir_grpacct3_priv=None,dir_grpacct4_name=None,
dir_grpacct4_sid=None,dir_grpacct4_priv=None,
dir_grpacct5_name=None,dir_grpacct5_sid=None,
dir_grpacct5_priv=None,dir_grpacct6_name=None,
dir_grpacct6_sid=None,dir_grpacct6_priv=None):
"""Modify iLO directory configuration, only values that are specified
will be changed."""
vars = dict(locals())
del vars['self']
# create special case for element with text inside
if dir_kerberos_keytab:
keytab_el = etree.Element('DIR_KERBEROS_KEYTAB')
keytab_el.text = dir_kerberos_keytab
del vars['dir_kerberos_keytab']
elements = [etree.Element(x.upper(), VALUE=str({True: 'Yes', \
False: 'No'}.get(vars[x], vars[x])))
for x in vars if vars[x] is not None]
if dir_kerberos_keytab:
elements.append(keytab_el)
return self._control_tag('DIR_INFO','MOD_DIR_CONFIG',elements=elements)
def mod_snmp_im_settings(self, web_agent_ip_address=None, snmp_address_1=None,
snmp_address_2=None, snmp_address_3=None, os_traps=None,
snmp_passthrough_status=None, rib_traps=None, cim_security_mask=None):
"""Configure the SNMP and Insight Manager integration settings"""
vars = dict(locals())
del vars['self']
elements = [etree.Element(x.upper(), VALUE=str({True: 'Yes', False: 'No'}.get(vars[x], vars[x])))
for x in vars if vars[x] is not None]
return self._control_tag('RIB_INFO', 'MOD_SNMP_IM_SETTINGS', elements=elements)
def mod_user(self, user_login, user_name=None, password=None,
admin_priv=None, remote_cons_priv=None, reset_server_priv=None,
virtual_media_priv=None, config_ilo_priv=None):
"""Set attributes for a user, only specified arguments will be changed.
All arguments except user_name and password should be boolean"""
attrs = locals()
elements = []
for attribute in ('user_name', 'password'):
if attrs[attribute] is not None:
elements.append(etree.Element(attribute.upper(), VALUE=attrs[attribute]))
for attribute in [x for x in attrs.keys() if x.endswith('_priv')]:
if attrs[attribute] is not None:
val = ['No', 'Yes'][bool(attrs[attribute])]
elements.append(etree.Element(attribute.upper(), VALUE=val))
return self._control_tag('USER_INFO', 'MOD_USER', attrib={'USER_LOGIN': user_login}, elements=elements)
def press_pwr_btn(self):
"""Press the power button"""
return self._control_tag('SERVER_INFO', 'PRESS_PWR_BTN')
def hold_pwr_btn(self):
"""Press and hold the power button"""
return self._control_tag('SERVER_INFO', 'HOLD_PWR_BTN')
def cold_boot_server(self):
"""Force a cold boot of the server"""
return self._control_tag('SERVER_INFO', 'COLD_BOOT_SERVER')
def warm_boot_server(self):
"""Force a warm boot of the server"""
return self._control_tag('SERVER_INFO', 'WARM_BOOT_SERVER')
def reset_rib(self):
"""Reset the iLO/RILOE board"""
return self._control_tag('RIB_INFO', 'RESET_RIB')
def reset_server(self):
"""Power cycle the server"""
return self._control_tag('SERVER_INFO', 'RESET_SERVER')
@untested
def set_ahs_status(self, status):
"""Enable or disable AHS logging"""
status = {True: 'Enable', False: 'Disable'}[status]
return self._control_tag('RIB_INFO', 'SET_AHS_STATUS', attrib={'VALUE': status})
def set_language(self, lang_id):
"""Set the default language"""
return self._control_tag('RIB_INFO', 'SET_LANGUAGE', attrib={'LANG_ID': lang_id})
def set_host_power(self, host_power=True):
"""Turn host power on or off"""
power = ['No', 'Yes'][bool(host_power)]
return self._control_tag('SERVER_INFO', 'SET_HOST_POWER', attrib={'HOST_POWER': power})
def set_host_power_saver(self, host_power_saver):
"""Set the configuration of the ProLiant power regulator"""
return self._control_tag('SERVER_INFO', 'SET_HOST_POWER_SAVER', attrib={'HOST_POWER_SAVER': str(host_power_saver)})
def set_one_time_boot(self, device):
"""Set one time boot device"""
return self._control_tag('SERVER_INFO', 'SET_ONE_TIME_BOOT', attrib={'VALUE': device.upper()})
def set_persistent_boot(self, devices):
"""Set persistent boot order, devices should be comma-separated"""
elements = [etree.Element('DEVICE', VALUE=x.upper()) for x in devices.split(',')]
return self._control_tag('SERVER_INFO', 'SET_PERSISTENT_BOOT', elements=elements)
def set_pwreg(self, type, threshold=None, duration=None):
"""Set the power alert threshold"""
elements = [etree.Element('PWRALERT', TYPE=type)]
if type.lower() != "disabled":
elements.append(etree.Element('PWRALERT_SETTINGS', THRESHOLD=str(threshold), DURATION=str(duration)))