-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhdp.py
More file actions
214 lines (195 loc) · 8.58 KB
/
hdp.py
File metadata and controls
214 lines (195 loc) · 8.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
#!/usr/bin/python
#coding=utf-8
# Copyright (c) 2019 smithemail@163.com. All rights reserved.
# Author:smithemail@163.com
# Time :2019-09-23
import os
import sys
import json
import logging
import traceback
import ipaddress
import subprocess
from optparse import OptionParser
class HdpDocker(object):
def __init__(self, subnet, cluster_size):
self.subnet = subnet
self.cluster_size = cluster_size
self.ip_to_host = {}
self.master_ip = ''
self.master_host = 'hdpmaster'
self.hdpdocker_images = 'hdpworker'
self.hdp_network = 'hdp_network'
self._initilization()
def _initilization(self):
ips = [str(ip) for ip in ipaddress.IPv4Network(self.subnet, strict=False)]
idx = 1
for ip, _ in zip(ips[2:], range(self.cluster_size)):
if not self.master_ip:
self.master_ip = ip
self.ip_to_host[self.master_ip] = self.master_host
continue
self.ip_to_host[ip] = 'hdpslave' + str(idx)
idx += 1
return True
def _cluster_exec(self, command, include_master=False):
for ip, host in self.ip_to_host.items():
if not include_master and ip == self.master_ip:
continue
docker_cmd = 'docker exec %s bash -c "%s"' % (host, command)
self._exec_command(docker_cmd)
return True
def _master_exec(self, command):
docker_cmd = 'docker exec %s bash -c "%s"' % (self.master_host, command)
return self._exec_command(docker_cmd)
def _exec_command(self, command):
try:
command = command.strip()
p = subprocess.Popen(command, shell=True, encoding='utf8',
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
if out.strip():
logging.debug(out.strip())
if err.strip():
logging.debug(err.strip())
except Exception as e:
logging.error('execute conmand fail: %s', command)
traceback.print_exc()
sys.exit(1)
logging.debug('exec cmd success, %s', command)
return out
def _create_docker_images(self):
cmd = 'docker build -t %s .' % self.hdpdocker_images
self._exec_command(cmd)
return True
def _gen_hosts(self):
with open('hosts', 'w') as writer:
for ip, host in self.ip_to_host.items():
writer.write('%s\t%s\n' % (ip, host))
return True
def _configure_cluster(self):
self._gen_hosts()
cmd = "cat /smith/hosts >> /etc/hosts \
&& service ssh start \
&& ssh-keygen -t rsa -N '' -f ~/.ssh/id_rsa \
&& cp -f /smith/core-site.xml /root/hadoop/etc/hadoop \
&& cp -f /smith/mapred-site.xml /root/hadoop/etc/hadoop \
&& cp -f /smith/hdfs-site.xml /root/hadoop/etc/hadoop \
&& cp -f /smith/yarn-site.xml /root/hadoop/etc/hadoop \
&& cp -f /smith/yarn-env.sh /root/hadoop/etc/hadoop \
&& awk -F'\t' '{print $2}' /smith/hosts > /root/hadoop/etc/hadoop/workers \
&& cat /smith/env >> /root/hadoop/etc/hadoop/hadoop-env.sh"
self._cluster_exec(cmd, True)
master_cmd = "cp /root/.ssh/id_rsa.pub /smith/hdp_master_idrsa.pub"
self._master_exec(master_cmd)
slave_cmd = "cat /smith/hdp_master_idrsa.pub >> /root/.ssh/authorized_keys"
self._cluster_exec(slave_cmd, True)
cmd = 'rm -f hdp_master_idrsa.pub'
self._exec_command(cmd)
for host in self.ip_to_host.values():
cmd = 'ssh-keyscan -H %s >> ~/.ssh/known_hosts' % host
self._master_exec(cmd)
return True
def create_cluster(self):
logging.info('create docker images: %s ...', self.hdpdocker_images)
self._create_docker_images()
logging.info('create docker network: %s ...', self.subnet)
docker_cmd = 'docker network create --subnet %s %s'\
% (self.subnet, self.hdp_network)
self._exec_command(docker_cmd)
logging.info('create cluster master: %s ...', self.master_host)
cwd = os.getcwd()
docker_cmd = 'docker run -itd -v %s:/smith --net %s --ip %s -h %s\
--name %s --privileged %s'
master_cmd = docker_cmd % (cwd, self.hdp_network, self.master_ip,
self.master_host, self.master_host,
' -p 8088:8088 ' + self.hdpdocker_images)
self._exec_command(master_cmd)
for ip, host in self.ip_to_host.items():
if ip == self.master_ip:
continue
logging.info('create cluster slave: %s ...', host)
slave_cmd = docker_cmd % (cwd, self.hdp_network, ip, host,
host, self.hdpdocker_images)
self._exec_command(slave_cmd)
logging.info('configure cluster ...')
self._configure_cluster()
logging.info('start cluster ...')
self.start_cluster(True)
def start_cluster(self, init_start=False):
if init_start:
cmd = 'source /root/.bashrc && hdfs namenode -format && start-all.sh'
self._master_exec(cmd)
return True
with open('hosts', 'r') as reader:
for line in reader:
contain_name = line.strip().split('\t')[1].strip()
cmd = 'docker start %s' % contain_name
self._exec_command(cmd)
logging.info('contain %s start ...', contain_name)
logging.info('contains sshd service start...')
cmd = 'service ssh start'
self._cluster_exec(cmd, True)
logging.info('hadoop service start...')
cmd = 'source /root/.bashrc && start-all.sh'
self._master_exec(cmd)
def status_cluster(self):
cmd = 'source /root/.bashrc && hdfs dfsadmin -report'
out = self._master_exec(cmd)
logging.info(out)
return True
def stop_cluster(self):
with open('hosts', 'r') as reader:
for line in reader:
contain_name = line.strip().split('\t')[1].strip()
logging.info('stop contain %s ...', contain_name)
cmd = 'docker stop %s' % (contain_name)
self._exec_command(cmd)
def clean_cluster(self):
with open('hosts', 'r') as reader:
cmd = 'docker inspect %s -f "{{json .NetworkSettings.Networks }}"'\
% (self.master_host)
json_string = self._exec_command(cmd)
for line in reader:
contain_name = line.split('\t')[1].strip()
logging.info('removing contain %s ...', contain_name)
cmd = 'docker rm -f %s' % (contain_name)
self._exec_command(cmd)
try:
networks = json.loads(json_string).keys()
except json.JSONDecodeError as je:
logging.warning('parse network string error, json:%s', json_string.strip())
return False
except Exception as e:
logging.warn('find network error: %s', str(e))
return False
for network in networks:
logging.info('removing network %s ...', network)
cmd = 'docker network rm %s' % network
self._exec_command(cmd)
return True;
def run(self, command):
process = {
'create': self.create_cluster,
'start' : self.start_cluster,
'status': self.status_cluster,
'stop' : self.stop_cluster,
'clean' : self.clean_cluster
}
if command not in process:
logging.error('not find relate operation of %s' % command)
return False
return process[command]()
if __name__ == '__main__':
log_format = '[%(levelname)s %(asctime)s] %(message)s'
logging.basicConfig(level='INFO', format=log_format)
parser = OptionParser()
parser.add_option('-c', '--cmd', type='string', dest='command', default='status',
help='操作命令: create, start, status, stop, clean, 默认:%default')
parser.add_option('--subnet', type='string', dest='subnet',
default='172.17.0.1/16', help='用于部署的子网, 默认:%default')
parser.add_option('--size', type='int', dest='cluster_size', default=4,
help='集群数量, 默认:%default')
option, args = parser.parse_args()
hdp_docker = HdpDocker(option.subnet, option.cluster_size)
hdp_docker.run(option.command)