Skip to content
This repository was archived by the owner on May 6, 2020. It is now read-only.

Commit abbbdb5

Browse files
author
Dmitry Voytik
committed
proxy: implement (re)store of proxy's state
Introduce the high availability feature of cc-proxy by implementing store/restore of proxy's state to/from disk. This feature depends on the ability of shim to reconnect to cc-proxy if connection is lost. Fixes #4. Signed-off-by: Dmitry Voytik <dmitry.voytik@huawei.com>
1 parent d7a4dd8 commit abbbdb5

3 files changed

Lines changed: 286 additions & 12 deletions

File tree

proxy.go

Lines changed: 238 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"flag"
2121
"fmt"
2222
"io"
23+
"io/ioutil"
2324
"net"
2425
"net/http"
2526
_ "net/http/pprof"
@@ -124,6 +125,33 @@ func newClient(proxy *proxy, conn net.Conn) *client {
124125
}
125126
}
126127

128+
func (proxy *proxy) restoreTokens(vm *vm, tokens []string) error {
129+
if len(tokens) == 0 {
130+
return nil
131+
}
132+
133+
for _, token := range tokens {
134+
token, err := vm.AllocateTokenAs(Token(token))
135+
if err != nil {
136+
return err
137+
}
138+
proxy.Lock()
139+
proxy.tokenToVM[token] = &tokenInfo{
140+
state: tokenStateAllocated,
141+
vm: vm,
142+
}
143+
proxy.Unlock()
144+
145+
session := vm.findSessionByToken(token)
146+
if session == nil {
147+
return fmt.Errorf("unknown token %s", token)
148+
}
149+
// Signal that the process is already started
150+
close(session.processStarted)
151+
}
152+
return nil
153+
}
154+
127155
func (proxy *proxy) allocateTokens(vm *vm, numIOStreams int) (*api.IOResponse, error) {
128156
url := url.URL{
129157
Scheme: "unix",
@@ -188,6 +216,207 @@ func (proxy *proxy) releaseToken(token Token) (*tokenInfo, error) {
188216
return info, nil
189217
}
190218

219+
var storeStateDir = "/var/lib/clear-containers/proxy/"
220+
221+
type proxyStateOnDisk struct {
222+
SocketPath string `json:"socket_path"`
223+
EnableVMConsole bool `json:"enable_vm_console"`
224+
ContainerIDs []string `json:"container_ids"`
225+
}
226+
227+
// returns false if it's a clean start (i.e. no state is stored) or restoring failed
228+
func (proxy *proxy) restoreState() bool {
229+
proxyStateFilePath := storeStateDir + "proxy_state.json"
230+
if _, err := os.Stat(storeStateDir); os.IsNotExist(err) {
231+
if err := os.MkdirAll(storeStateDir, 0755); err != nil {
232+
proxyLog.Errorf("Couldn't create directory %s: %v",
233+
storeStateDir, err)
234+
}
235+
return false
236+
}
237+
if _, err := os.Stat(proxyStateFilePath); os.IsNotExist(err) {
238+
return false
239+
}
240+
241+
fdata, err := ioutil.ReadFile(proxyStateFilePath)
242+
if err != nil {
243+
proxyLog.Errorf("Couldn't recover from %s: %v", proxyStateFilePath, err)
244+
return false
245+
}
246+
247+
var proxyState proxyStateOnDisk
248+
err = json.Unmarshal(fdata, &proxyState)
249+
if err != nil {
250+
proxyLog.Errorf("Couldn't unmarshal %s: %v", proxyStateFilePath, err)
251+
return false
252+
}
253+
proxyLog.Debugf("proxy: %+v", proxyState)
254+
255+
if len(proxyState.ContainerIDs) == 0 {
256+
return false
257+
}
258+
proxyLog.Warn("Recovering proxy state from: ", proxyStateFilePath)
259+
proxy.socketPath = proxyState.SocketPath
260+
proxy.enableVMConsole = proxyState.EnableVMConsole
261+
262+
for _, contID := range proxyState.ContainerIDs {
263+
go restoreVMState(proxy, contID)
264+
}
265+
266+
return true
267+
}
268+
269+
func (proxy *proxy) storeState() error {
270+
proxyStateFilePath := storeStateDir + "proxy_state.json"
271+
proxy.Lock()
272+
defer proxy.Unlock()
273+
274+
// if there are 0 VMs then remove state from disk
275+
if (len(proxy.vms)) == 0 {
276+
if err := os.Remove(proxyStateFilePath); err != nil {
277+
proxyLog.Errorf("Can not remove %s: %v", proxyStateFilePath, err)
278+
return err
279+
}
280+
return nil
281+
}
282+
283+
proxyState := &proxyStateOnDisk{
284+
SocketPath: proxy.socketPath,
285+
EnableVMConsole: proxy.enableVMConsole,
286+
ContainerIDs: make([]string, 0, len(proxy.vms)),
287+
}
288+
for cid := range proxy.vms {
289+
proxyState.ContainerIDs = append(proxyState.ContainerIDs, cid)
290+
}
291+
292+
data, err := json.MarshalIndent(proxyState, "", "\t")
293+
if err != nil {
294+
return err
295+
}
296+
ioutil.WriteFile(proxyStateFilePath, data, 0644)
297+
298+
return nil
299+
}
300+
301+
// Represents vm struct on disk
302+
type vmStateOnDisk struct {
303+
RegisterVM api.RegisterVM `json:"registerVM"`
304+
Tokens []string `json:"tokens"`
305+
}
306+
307+
func vmStateFilePath(id string) string {
308+
return storeStateDir + "vm_" + id + ".json"
309+
}
310+
311+
func storeVMState(vm *vm, tokens []string) {
312+
odVM := vmStateOnDisk{
313+
RegisterVM: api.RegisterVM{
314+
ContainerID: vm.containerID,
315+
CtlSerial: vm.hyperHandler.GetCtlSockPath(),
316+
IoSerial: vm.hyperHandler.GetIoSockPath(),
317+
Console: vm.console.socketPath,
318+
},
319+
Tokens: tokens,
320+
}
321+
o, err := json.MarshalIndent(&odVM, "", "\t")
322+
if err != nil {
323+
proxyLog.WithField("vm", vm.containerID).Warn("Couldn't marshal VM state")
324+
}
325+
storeFile := vmStateFilePath(vm.containerID)
326+
if err = ioutil.WriteFile(storeFile, o, 0644); err != nil {
327+
proxyLog.WithField("vm", vm.containerID).Warn("Couldn't store VM state to ",
328+
storeFile)
329+
}
330+
}
331+
332+
func delVMAndState(proxy *proxy, vm *vm) {
333+
proxyLog.Infof("Removing on-disk state of %s", vm.containerID)
334+
proxy.Lock()
335+
delete(proxy.vms, vm.containerID)
336+
proxy.Unlock()
337+
proxy.storeState()
338+
storeFile := vmStateFilePath(vm.containerID)
339+
if err := os.Remove(storeFile); err != nil {
340+
proxyLog.WithField("vm", vm.containerID).Warn("Couldn't remove file ",
341+
storeFile)
342+
}
343+
}
344+
345+
func restoreVMState(proxy *proxy, containerID string) {
346+
vmStateFilePath := vmStateFilePath(containerID)
347+
proxy.Lock()
348+
fdata, err := ioutil.ReadFile(vmStateFilePath)
349+
proxy.Unlock()
350+
if err != nil {
351+
proxyLog.Errorf("Couldn't read %s: %v", vmStateFilePath, err)
352+
return
353+
}
354+
355+
var vmState vmStateOnDisk
356+
err = json.Unmarshal(fdata, &vmState)
357+
if err != nil {
358+
proxyLog.Errorf("Can not unmarshal %s: %v", vmStateFilePath, err)
359+
return
360+
}
361+
proxyLog.Debugf("restored vm state: %+v", vmState)
362+
363+
regVM := vmState.RegisterVM
364+
365+
if regVM.ContainerID == "" || regVM.CtlSerial == "" || regVM.IoSerial == "" {
366+
proxyLog.Errorf("wrong VM parameters")
367+
return
368+
}
369+
370+
proxy.Lock()
371+
if _, ok := proxy.vms[regVM.ContainerID]; ok {
372+
proxy.Unlock()
373+
proxyLog.Errorf("%s: container already registered", regVM.ContainerID)
374+
return
375+
}
376+
vm := newVM(regVM.ContainerID, regVM.CtlSerial, regVM.IoSerial)
377+
proxy.vms[regVM.ContainerID] = vm
378+
proxy.Unlock()
379+
380+
proxyLog.Infof("restoreVMState(containerId=%s,ctlSerial=%s,ioSerial=%s,console=%s)",
381+
regVM.ContainerID, regVM.CtlSerial, regVM.IoSerial, regVM.Console)
382+
383+
if regVM.Console != "" && proxy.enableVMConsole {
384+
vm.setConsole(regVM.Console)
385+
}
386+
387+
if err := proxy.restoreTokens(vm, vmState.Tokens); err != nil {
388+
proxyLog.Errorf("Failed to restore tokens: %v", err)
389+
return
390+
}
391+
392+
for _, token := range vmState.Tokens {
393+
session := vm.findSessionByToken(Token(token))
394+
if session == nil {
395+
proxyLog.Errorf("Session must be not nil")
396+
delVMAndState(proxy, vm)
397+
return
398+
}
399+
if err := session.WaitForShim(); err != nil {
400+
proxyLog.Errorf("Failed to re-connect with shim: %v", err)
401+
delVMAndState(proxy, vm)
402+
return
403+
}
404+
}
405+
if err := vm.Reconnect(true); err != nil {
406+
proxyLog.Errorf("Failed to connect: %v", err)
407+
delVMAndState(proxy, vm)
408+
return
409+
}
410+
411+
// We start one goroutine per-VM to monitor the qemu process
412+
proxy.wg.Add(1)
413+
go func() {
414+
<-vm.OnVMLost()
415+
vm.Close()
416+
proxy.wg.Done()
417+
}()
418+
}
419+
191420
// "RegisterVM"
192421
func registerVM(data []byte, userData interface{}, response *handlerResponse) {
193422
client := userData.(*client)
@@ -242,6 +471,8 @@ func registerVM(data []byte, userData interface{}, response *handlerResponse) {
242471
}
243472

244473
client.vm = vm
474+
storeVMState(vm, io.Tokens)
475+
proxy.storeState()
245476

246477
// We start one goroutine per-VM to monitor the qemu process
247478
proxy.wg.Add(1)
@@ -314,9 +545,7 @@ func unregisterVM(data []byte, userData interface{}, response *handlerResponse)
314545

315546
client.log.Info("UnregisterVM()")
316547

317-
proxy.Lock()
318-
delete(proxy.vms, vm.containerID)
319-
proxy.Unlock()
548+
delVMAndState(proxy, vm)
320549

321550
client.vm = nil
322551
}
@@ -598,12 +827,13 @@ func (proxy *proxy) init() error {
598827
var l net.Listener
599828
var err error
600829

601-
// flags
602-
proxy.enableVMConsole = logrus.GetLevel() == logrus.DebugLevel
830+
if !proxy.restoreState() {
831+
// flags
832+
proxy.enableVMConsole = logrus.GetLevel() == logrus.DebugLevel
603833

604-
// Open the proxy socket
605-
if proxy.socketPath, err = getSocketPath(); err != nil {
606-
return fmt.Errorf("couldn't get a rigth socket path: %v", err)
834+
if proxy.socketPath, err = getSocketPath(); err != nil {
835+
return fmt.Errorf("couldn't get a rigth socket path: %v", err)
836+
}
607837
}
608838
fds := listenFds()
609839

proxy_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package main
1616

1717
import (
1818
"encoding/json"
19+
"io/ioutil"
1920
"net"
2021
"os"
2122
"strings"
@@ -900,3 +901,29 @@ func TestHyperstartResponse(t *testing.T) {
900901

901902
rig.Stop()
902903
}
904+
905+
func TestStoreRestore(t *testing.T) {
906+
storeStateDir = "/tmp/clearcontainers/proxy/"
907+
rig := newTestRig(t)
908+
rig.Start()
909+
910+
// clean up a possible state
911+
os.RemoveAll(storeStateDir)
912+
assert.Equal(t, rig.proxy.restoreState(), false)
913+
914+
rig.RegisterVM()
915+
rig.Stop()
916+
// the state must be present on the disk
917+
files, err := ioutil.ReadDir(storeStateDir)
918+
assert.Nil(t, err)
919+
assert.Equal(t, len(files), 2)
920+
assert.Equal(t, files[0].Name(), "proxy_state.json")
921+
922+
rig.Start()
923+
assert.Equal(t, rig.proxy.restoreState(), true)
924+
assert.Nil(t, rig.Client.UnregisterVM(testContainerID))
925+
// the state must be absent on the disk
926+
files, err = ioutil.ReadDir(storeStateDir)
927+
assert.Nil(t, err)
928+
assert.Equal(t, len(files), 0)
929+
}

vm.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,10 @@ func (vm *vm) consoleToLog() {
273273
}
274274

275275
func (vm *vm) Connect() error {
276+
return vm.Reconnect(false)
277+
}
278+
279+
func (vm *vm) Reconnect(reconnect bool) error {
276280
if vm.console.socketPath != "" {
277281
var err error
278282

@@ -289,7 +293,12 @@ func (vm *vm) Connect() error {
289293
return err
290294
}
291295

292-
if err := vm.hyperHandler.WaitForReady(); err != nil {
296+
if reconnect {
297+
if !vm.hyperHandler.IsStarted() {
298+
vm.hyperHandler.CloseSockets()
299+
return errors.New("failed to reconnect to the agent")
300+
}
301+
} else if err := vm.hyperHandler.WaitForReady(); err != nil {
293302
vm.hyperHandler.CloseSockets()
294303
return err
295304
}
@@ -589,6 +598,11 @@ func (session *ioSession) SendSignal(signal syscall.Signal) error {
589598
}
590599

591600
func (vm *vm) AllocateToken() (Token, error) {
601+
return vm.AllocateTokenAs("")
602+
}
603+
604+
// if token == "" then a new token is generate, otherwise provided token is reused
605+
func (vm *vm) AllocateTokenAs(token Token) (Token, error) {
592606
vm.Lock()
593607
defer vm.Unlock()
594608

@@ -598,9 +612,12 @@ func (vm *vm) AllocateToken() (Token, error) {
598612
ioBase := vm.nextIoBase
599613
vm.nextIoBase += uint64(nStreams)
600614

601-
token, err := GenerateToken(32)
602-
if err != nil {
603-
return nilToken, err
615+
if token == "" {
616+
var err error
617+
token, err = GenerateToken(32)
618+
if err != nil {
619+
return nilToken, err
620+
}
604621
}
605622

606623
session := &ioSession{

0 commit comments

Comments
 (0)