Skip to content
This repository was archived by the owner on Feb 6, 2019. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ export * from "./lib/etcderror";
export * from "./lib/etcdkv";
export * from "./lib/etcdoptions";
export * from "./lib/keepalivetoken";
export * from "./lib/transaction";
export * from "./lib/etcdlock";
18 changes: 16 additions & 2 deletions lib/etcd.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ const extend = require("lodash.assignin");
const deasyncPromise = require("deasync-promise");

import { KeepAliveToken } from "./keepalivetoken";
import { EtcdTransaction, EtcdCompare, EtcdOpRequest} from "./transaction";
import { EtcdKV } from "./etcdkv";
import { EtcdError } from "./etcderror";
import { EtcdOptions } from "./etcdoptions";

const etcdProto = grpc.load(__dirname + "/../protos/rpc.proto");

/**
* etcd client using the grpc protocol of etcd version 3 and later.
*/
Expand Down Expand Up @@ -44,10 +44,10 @@ export class Etcd {
this.servers = servers;
if (this.servers.length > 1)
console.warn("etcd3: currently only the first server address is used");

this.clients = {
KV: new etcdProto.etcdserverpb.KV(this.servers[0], this.credentials),
Lease: new etcdProto.etcdserverpb.Lease(this.servers[0], this.credentials),
Watch: new etcdProto.etcdserverpb.Watch(this.servers[0], this.credentials),
};
}

Expand Down Expand Up @@ -289,4 +289,18 @@ export class Etcd {
this.keepAlives.push(token);
return token;
}

createTransaction(
compare: Array<EtcdCompare>,
success: Array<EtcdOpRequest>,
failure: Array<EtcdOpRequest>,
): Promise<any> {
let transaction = new EtcdTransaction(compare, success, failure);

return this.callClient(
"KV",
"txn",
transaction.getOp()
);
}
}
117 changes: 117 additions & 0 deletions lib/etcdlock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import { Etcd } from './etcd';
import {
CompareCreated, CompareValue, CompareTypeDef,
PutRequest, RangeRequest, CompareVersion, DeleteRangeRequest
} from './transaction';
import * as uuid from 'node-uuid';

const LOCK_ATTEMPS = 10;

export class Lock {
etcdClient: Etcd;
key: string;
ttl: number = 60;
lease?: string;
uuid?: string;
version = 0;

constructor (
etcdClient: any,
key: string,
ttl: number = 60,
lease?: string
) {
if (!etcdClient) {
throw new Error('No etcd client found');
}

this.etcdClient = etcdClient;
this.key = key;
this.ttl = ttl;
this.version = 0;
this.uuid = uuid.v4();
}

wait (t) {
return new Promise(function(resolve) {
setTimeout(resolve, t)
});
}

*accquire (waitTime = 1000): IterableIterator<any> {
let success = false;
let attemps = LOCK_ATTEMPS;
let response;

while (!success && attemps > 0) {
attemps -= 1;
if (!this.lease) {
this.lease = yield this.etcdClient.leaseGrant(this.ttl);
}

let compare = [
new CompareVersion(
CompareTypeDef.CompareResult.EQUAL,
new Buffer(this.key),
this.version
)
];

let createLock = [
new PutRequest({
key: new Buffer(this.key),
value: new Buffer(this.uuid),
lease: this.lease,
})
];

let getLock = [
new RangeRequest({
key: new Buffer(this.key),
}),
];

response = yield this.etcdClient
.createTransaction(compare, createLock, getLock);
success = response.succeeded;

if (!success) {
yield this.wait(waitTime);
}
}

return success;
}

*release (): IterableIterator<any> {
let success = false;

let compareOps = [
new CompareValue(
CompareTypeDef.CompareResult.EQUAL,
new Buffer(this.key),
new Buffer(this.uuid),
)
];

let deleteOps = [
new DeleteRangeRequest({
key: new Buffer(this.key)
})
];

let result = yield this.etcdClient.createTransaction(
compareOps, deleteOps, []
);

return result.succeeded;
}

*refresh () : IterableIterator<any> {
if (this.lease) {
return this.etcdClient.leaseKeepAlive(this.lease, this.ttl);
}

throw new Error('No lease associated with lock. Is lock created?');
}
}
223 changes: 223 additions & 0 deletions lib/transaction.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
const grpc = require("grpc");
const etcdProto = grpc.load(__dirname + "/../protos/rpc.proto");

export module CompareTypeDef
{
export enum CompareResult
{
EQUAL = 0,
GREATER = 1,
LESS = 2,
}
export enum CompareTarget {
VERSION = 0,
CREATE = 1,
MOD = 2,
VALUE = 3,
}
}

export enum SortOrder {
NONE = 0, // default, no sorting
ASCEND = 1, // lowest target value first
DESCEND = 2, // highest target value first
}

export enum SortTarget {
KEY = 0,
VERSION = 1,
CREATE = 2,
MOD = 3,
VALUE = 4,
}

export class EtcdCompare {
compare: any;

constructor (
operator: CompareTypeDef.CompareResult,
targetType: CompareTypeDef.CompareTarget,
key: Buffer,
compareTo: any,
) {
this.compare = {};
this.compare.result = operator;
this.compare.target = targetType;
this.compare.key = key;

switch(targetType) {
case CompareTypeDef.CompareTarget.CREATE:
this.compare.create_revision = compareTo;
break;
case CompareTypeDef.CompareTarget.VERSION:
this.compare.version = compareTo;
break;
case CompareTypeDef.CompareTarget.MOD:
this.compare.mod_revision = compareTo;
break;
case CompareTypeDef.CompareTarget.VALUE:
this.compare.value = compareTo;
break;
}
}

getOp() {
return this.compare;
}
};

export class CompareValue extends EtcdCompare {
constructor (
operator: CompareTypeDef.CompareResult,
key: Buffer,
compareTo: Buffer
) {
super(operator, CompareTypeDef.CompareTarget.VALUE, key, compareTo);
}
}

export class CompareLastRevision extends EtcdCompare {
constructor (
operator: CompareTypeDef.CompareResult,
key: Buffer,
compareTo: Number
) {
super(
operator,
CompareTypeDef.CompareTarget.MOD,
key,
compareTo
);
}
}

export class CompareCreated extends EtcdCompare {
constructor (
operator: CompareTypeDef.CompareResult,
key: Buffer,
compareTo: Number
) {
super(
operator,
CompareTypeDef.CompareTarget.CREATE,
key,
compareTo
);
}
}

export class CompareVersion extends EtcdCompare {
constructor (
operator: CompareTypeDef.CompareResult,
key: Buffer,
compareTo: Number
) {
super(
operator,
CompareTypeDef.CompareTarget.VERSION,
key,
compareTo
);
}
}

export class EtcdOpRequest {
type: any;
request: any;

constructor () {}

getType () {
return this.type;
}

getOp(): any {
let op = {};
if (!this.type) {
throw new Error('Operation type not defined');
}

op[this.type] = this.request;
return new etcdProto.etcdserverpb.RequestOp(op);
}

getRequest () {
return this.request;
}
}

export class DeleteRangeRequest extends EtcdOpRequest {
type = 'request_delete_range';
request: {
key: Buffer,
range_end?: Number,
};

constructor (request) {
super();
this.request = new etcdProto.etcdserverpb.DeleteRangeRequest(request);
}
}

export class RangeRequest extends EtcdOpRequest {
type = 'request_range';
request: {
key: Buffer,
range_end?: any,
limit?: number,
revision?: number,
sort_order?: SortOrder,
sort_target?: SortTarget,
serializable?: boolean,
keys_only?: boolean,
count_only?: boolean,
};

constructor (request: any) {
super();
this.request = request;
}
}

export class PutRequest extends EtcdOpRequest {
type = 'request_put';
request: {
key: Buffer,
value: Buffer,
lease?: Number,
};

constructor (request) {
super();
this.request = new etcdProto.etcdserverpb.PutRequest(request);
}
}

export class EtcdTransaction {
transaction: any;

constructor (
compareOps: Array<EtcdCompare>,
successOps?: Array<EtcdOpRequest>,
failureOps?: Array<EtcdOpRequest>
) {
this.transaction = {};

this.transaction.compare = this.mapOps(compareOps);
this.transaction.success = this.mapOps(successOps);
this.transaction.failure = this.mapOps(failureOps);
}

mapOps(ops) {
if (!ops) {
return [];
}
return ops.map(op => {
return op.getOp();
});
}

getOp() {
return this.transaction;
}
}
Loading