Skip to content
This repository was archived by the owner on May 6, 2020. It is now read-only.

Commit d703819

Browse files
author
Dmitry Voytik
committed
proxy: implement (re)store of proxy's state
Introduce the high availability feature of cc-proxy by implementing store/restore of proxy's state to/from disk. This feature depends on the ability of shim to reconnect to cc-proxy if connection is lost. Fixes #4. Signed-off-by: Dmitry Voytik <dmitry.voytik@huawei.com>
1 parent 05e19de commit d703819

4 files changed

Lines changed: 395 additions & 13 deletions

File tree

Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ LOCALSTATEDIR := /var
1616

1717
SOURCES := $(shell find . 2>&1 | grep -E '.*\.(c|h|go)$$')
1818
PROXY_SOCKET := $(LOCALSTATEDIR)/run/clear-containers/proxy.sock
19+
STORE_STATE_DIR := $(LOCALSTATEDIR)/lib/clear-containers/proxy/
1920

2021
DESCRIBE := $(shell git describe 2> /dev/null || true)
2122
DESCRIBE_DIRTY := $(if $(shell git status --porcelain --untracked-files=no 2> /dev/null),${DESCRIBE}-dirty,${DESCRIBE})
@@ -53,7 +54,7 @@ all: cc-proxy $(UNIT_FILES)
5354

5455
cc-proxy: $(SOURCES) Makefile
5556
$(QUIET_GOBUILD)go build -i -o $@ -ldflags \
56-
"-X main.DefaultSocketPath=$(PROXY_SOCKET) -X main.Version=$(VERSION)"
57+
"-X main.DefaultSocketPath=$(PROXY_SOCKET) -X main.Version=$(VERSION) -X main.storeStateDir=$(STORE_STATE_DIR)"
5758

5859
#
5960
# Tests

proxy.go

Lines changed: 281 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"flag"
2121
"fmt"
2222
"io"
23+
"io/ioutil"
2324
"net"
2425
"net/http"
2526
_ "net/http/pprof"
@@ -36,6 +37,13 @@ import (
3637
"github.com/clearcontainers/proxy/api"
3738
)
3839

40+
// storeStateDir is populated at link time with the value of:
41+
// $(LOCALSTATEDIR)/lib/clear-containers/proxy/"
42+
var storeStateDir string = "/var/lib/clearcontainers/proxy/"
43+
44+
const proxyStateDirPerm = 0755
45+
const proxyStateFilesPerm = 0640
46+
3947
// tokenState tracks if an I/O token has been claimed by a shim.
4048
type tokenState int
4149

@@ -84,6 +92,20 @@ type proxy struct {
8492
wg sync.WaitGroup
8593
}
8694

95+
// proxyStateOnDisk is used to re(store) proxy state on disk
96+
type proxyStateOnDisk struct {
97+
Version string `json:"version"`
98+
SocketPath string `json:"socket_path"`
99+
EnableVMConsole bool `json:"enable_vm_console"`
100+
ContainerIDs []string `json:"container_ids"`
101+
}
102+
103+
// vmStateOnDisk is used to re(store) vm struct on disk
104+
type vmStateOnDisk struct {
105+
RegisterVM api.RegisterVM `json:"registerVM"`
106+
Tokens []string `json:"tokens"`
107+
}
108+
87109
type clientKind int
88110

89111
const (
@@ -129,6 +151,33 @@ func newClient(proxy *proxy, conn net.Conn) *client {
129151
}
130152
}
131153

154+
func (proxy *proxy) restoreTokens(vm *vm, tokens []string) error {
155+
if vm == nil {
156+
return fmt.Errorf("vm parameter must be not nil")
157+
}
158+
159+
for _, token := range tokens {
160+
token, err := vm.AllocateTokenAs(Token(token))
161+
if err != nil {
162+
return err
163+
}
164+
proxy.Lock()
165+
proxy.tokenToVM[token] = &tokenInfo{
166+
state: tokenStateAllocated,
167+
vm: vm,
168+
}
169+
proxy.Unlock()
170+
171+
session := vm.findSessionByToken(token)
172+
if session == nil {
173+
return fmt.Errorf("unknown token %s", token)
174+
}
175+
// Signal that the process is already started
176+
close(session.processStarted)
177+
}
178+
return nil
179+
}
180+
132181
func (proxy *proxy) allocateTokens(vm *vm, numIOStreams int) (*api.IOResponse, error) {
133182
url := url.URL{
134183
Scheme: "unix",
@@ -193,6 +242,229 @@ func (proxy *proxy) releaseToken(token Token) (*tokenInfo, error) {
193242
return info, nil
194243
}
195244

245+
// returns false if it's a clean start (i.e. no state is stored) or restoring failed
246+
func (proxy *proxy) restoreState() bool {
247+
proxyStateFilePath := storeStateDir + "proxy_state.json"
248+
if _, err := os.Stat(storeStateDir); os.IsNotExist(err) {
249+
err := os.MkdirAll(storeStateDir, proxyStateDirPerm)
250+
if err != nil {
251+
proxyLog.Errorf("Couldn't create directory %s: %v",
252+
storeStateDir, err)
253+
}
254+
return false
255+
}
256+
257+
fdata, err := ioutil.ReadFile(proxyStateFilePath)
258+
if err != nil {
259+
proxyLog.Errorf("Couldn't recover from %s: %v", proxyStateFilePath, err)
260+
return false
261+
}
262+
263+
var proxyState proxyStateOnDisk
264+
err = json.Unmarshal(fdata, &proxyState)
265+
if err != nil {
266+
proxyLog.Errorf("Couldn't unmarshal %s: %v", proxyStateFilePath, err)
267+
return false
268+
}
269+
proxyLog.Debugf("proxy: %+v", proxyState)
270+
271+
if len(proxyState.ContainerIDs) == 0 {
272+
return false
273+
}
274+
proxyLog.Warn("Recovering proxy state from: ", proxyStateFilePath)
275+
if proxyState.Version != Version {
276+
proxyLog.Warnf("Stored state version (%s) mismatches proxy"+
277+
" version (%s). Aborting", proxyState.Version, Version)
278+
return false
279+
}
280+
281+
proxy.socketPath = proxyState.SocketPath
282+
proxy.enableVMConsole = proxyState.EnableVMConsole
283+
284+
for _, contID := range proxyState.ContainerIDs {
285+
go restoreVMState(proxy, contID)
286+
}
287+
288+
return true
289+
}
290+
291+
func (proxy *proxy) storeState() {
292+
proxyStateFilePath := storeStateDir + "proxy_state.json"
293+
proxy.Lock()
294+
defer proxy.Unlock()
295+
296+
// if there are 0 VMs then remove state from disk
297+
if (len(proxy.vms)) == 0 {
298+
if err := os.Remove(proxyStateFilePath); err != nil {
299+
proxyLog.Errorf("Couldn't remove %s: %v",
300+
proxyStateFilePath, err)
301+
}
302+
return
303+
}
304+
305+
proxyState := &proxyStateOnDisk{
306+
Version: Version,
307+
SocketPath: proxy.socketPath,
308+
EnableVMConsole: proxy.enableVMConsole,
309+
ContainerIDs: make([]string, 0, len(proxy.vms)),
310+
}
311+
for cid := range proxy.vms {
312+
proxyState.ContainerIDs = append(proxyState.ContainerIDs, cid)
313+
}
314+
315+
data, err := json.MarshalIndent(proxyState, "", "\t")
316+
if err != nil {
317+
proxyLog.Errorf("Couldn't marshal proxy state %+v", proxyState)
318+
}
319+
err = ioutil.WriteFile(proxyStateFilePath, data, proxyStateFilesPerm)
320+
if err != nil {
321+
proxyLog.Errorf("Couldn't store proxy state to %s: %v",
322+
proxyStateFilePath, err)
323+
}
324+
}
325+
326+
func vmStateFilePath(id string) string {
327+
return storeStateDir + "vm_" + id + ".json"
328+
}
329+
330+
func storeVMState(vm *vm, tokens []string) {
331+
odVM := vmStateOnDisk{
332+
RegisterVM: api.RegisterVM{
333+
ContainerID: vm.containerID,
334+
CtlSerial: vm.hyperHandler.GetCtlSockPath(),
335+
IoSerial: vm.hyperHandler.GetIoSockPath(),
336+
Console: vm.console.socketPath,
337+
},
338+
Tokens: tokens,
339+
}
340+
o, err := json.MarshalIndent(&odVM, "", "\t")
341+
if err != nil {
342+
proxyLog.WithField("vm", vm.containerID).Warnf(
343+
"Couldn't marshal VM state: %v", err)
344+
return
345+
}
346+
storeFile := vmStateFilePath(vm.containerID)
347+
err = ioutil.WriteFile(storeFile, o, proxyStateFilesPerm)
348+
if err != nil {
349+
proxyLog.WithField("vm", vm.containerID).Warnf(
350+
"Couldn't store VM state to %s: %v", storeFile, err)
351+
}
352+
}
353+
354+
func delVMAndState(proxy *proxy, vm *vm) {
355+
if proxy == nil {
356+
proxyLog.Error("proxy parameter must be not nil")
357+
return
358+
}
359+
if vm == nil {
360+
proxyLog.Error("vm parameter must be not nil")
361+
return
362+
}
363+
proxyLog.Infof("Removing on-disk state of %s", vm.containerID)
364+
proxy.Lock()
365+
delete(proxy.vms, vm.containerID)
366+
proxy.Unlock()
367+
proxy.storeState()
368+
storeFile := vmStateFilePath(vm.containerID)
369+
if err := os.Remove(storeFile); err != nil {
370+
proxyLog.WithField("vm", vm.containerID).Warnf(
371+
"Couldn't remove file %s: %v", storeFile, err)
372+
}
373+
}
374+
375+
func readVMState(containerID string) *vmStateOnDisk {
376+
if containerID == "" {
377+
proxyLog.Errorf("containerID parameter must be not empty")
378+
return nil
379+
}
380+
vmStateFilePath := vmStateFilePath(containerID)
381+
fdata, err := ioutil.ReadFile(vmStateFilePath)
382+
if err != nil {
383+
proxyLog.Errorf("Couldn't read %s: %v", vmStateFilePath, err)
384+
return nil
385+
}
386+
387+
var vmState vmStateOnDisk
388+
err = json.Unmarshal(fdata, &vmState)
389+
if err != nil {
390+
proxyLog.Errorf("Couldn't unmarshal %s: %v", vmStateFilePath, err)
391+
return nil
392+
}
393+
proxyLog.Debugf("restoring vm state: %+v", vmState)
394+
return &vmState
395+
}
396+
397+
func restoreTokens(proxy *proxy, vmState *vmStateOnDisk, vm *vm) {
398+
if err := proxy.restoreTokens(vm, vmState.Tokens); err != nil {
399+
proxyLog.Errorf("Failed to restore tokens: %v", err)
400+
return
401+
}
402+
403+
for _, token := range vmState.Tokens {
404+
session := vm.findSessionByToken(Token(token))
405+
if session == nil {
406+
proxyLog.Errorf("Session must be not nil")
407+
delVMAndState(proxy, vm)
408+
return
409+
}
410+
if err := session.WaitForShim(); err != nil {
411+
proxyLog.Errorf("Failed to re-connect with shim: %v", err)
412+
delVMAndState(proxy, vm)
413+
return
414+
}
415+
}
416+
}
417+
418+
func restoreVMState(proxy *proxy, containerID string) {
419+
if proxy == nil {
420+
proxyLog.Errorf("proxy parameter must be not nil")
421+
return
422+
}
423+
424+
vmState := readVMState(containerID)
425+
if vmState == nil {
426+
return
427+
}
428+
429+
regVM := vmState.RegisterVM
430+
if regVM.ContainerID == "" || regVM.CtlSerial == "" || regVM.IoSerial == "" {
431+
proxyLog.Errorf("wrong VM parameters")
432+
return
433+
}
434+
435+
proxy.Lock()
436+
if _, ok := proxy.vms[regVM.ContainerID]; ok {
437+
proxy.Unlock()
438+
proxyLog.Errorf("%s: container already registered", regVM.ContainerID)
439+
return
440+
}
441+
vm := newVM(regVM.ContainerID, regVM.CtlSerial, regVM.IoSerial)
442+
proxy.vms[regVM.ContainerID] = vm
443+
proxy.Unlock()
444+
445+
proxyLog.Infof("restoreVMState(containerId=%s,ctlSerial=%s,ioSerial=%s,console=%s)",
446+
regVM.ContainerID, regVM.CtlSerial, regVM.IoSerial, regVM.Console)
447+
448+
if regVM.Console != "" && proxy.enableVMConsole {
449+
vm.setConsole(regVM.Console)
450+
}
451+
452+
restoreTokens(proxy, vmState, vm)
453+
if err := vm.Reconnect(true); err != nil {
454+
proxyLog.Errorf("Failed to connect: %v", err)
455+
delVMAndState(proxy, vm)
456+
return
457+
}
458+
459+
// We start one goroutine per-VM to monitor the qemu process
460+
proxy.wg.Add(1)
461+
go func() {
462+
<-vm.OnVMLost()
463+
vm.Close()
464+
proxy.wg.Done()
465+
}()
466+
}
467+
196468
// "RegisterVM"
197469
func registerVM(data []byte, userData interface{}, response *handlerResponse) {
198470
client := userData.(*client)
@@ -247,6 +519,8 @@ func registerVM(data []byte, userData interface{}, response *handlerResponse) {
247519
}
248520

249521
client.vm = vm
522+
storeVMState(vm, io.Tokens)
523+
proxy.storeState()
250524

251525
if proxyKSM != nil {
252526
proxyKSM.kick()
@@ -323,9 +597,7 @@ func unregisterVM(data []byte, userData interface{}, response *handlerResponse)
323597

324598
client.log.Info("UnregisterVM()")
325599

326-
proxy.Lock()
327-
delete(proxy.vms, vm.containerID)
328-
proxy.Unlock()
600+
delVMAndState(proxy, vm)
329601

330602
client.vm = nil
331603
}
@@ -617,12 +889,13 @@ func (proxy *proxy) init() error {
617889
var l net.Listener
618890
var err error
619891

620-
// flags
621-
proxy.enableVMConsole = logrus.GetLevel() == logrus.DebugLevel
892+
if !proxy.restoreState() {
893+
// flags
894+
proxy.enableVMConsole = logrus.GetLevel() == logrus.DebugLevel
622895

623-
// Open the proxy socket
624-
if proxy.socketPath, err = getSocketPath(); err != nil {
625-
return fmt.Errorf("couldn't get a rigth socket path: %v", err)
896+
if proxy.socketPath, err = getSocketPath(); err != nil {
897+
return fmt.Errorf("couldn't get a right socket path: %v", err)
898+
}
626899
}
627900
fds := listenFds()
628901

0 commit comments

Comments
 (0)