diff --git a/index.ts b/index.ts index 51af19a..d1a7cc6 100644 --- a/index.ts +++ b/index.ts @@ -3,3 +3,5 @@ export * from "./lib/etcderror"; export * from "./lib/etcdkv"; export * from "./lib/etcdoptions"; export * from "./lib/keepalivetoken"; +export * from "./lib/transaction"; +export * from "./lib/etcdlock"; diff --git a/lib/etcd.ts b/lib/etcd.ts index 3202112..aa83c52 100644 --- a/lib/etcd.ts +++ b/lib/etcd.ts @@ -3,12 +3,12 @@ const extend = require("lodash.assignin"); const deasyncPromise = require("deasync-promise"); import { KeepAliveToken } from "./keepalivetoken"; +import { EtcdTransaction, EtcdCompare, EtcdOpRequest} from "./transaction"; import { EtcdKV } from "./etcdkv"; import { EtcdError } from "./etcderror"; import { EtcdOptions } from "./etcdoptions"; const etcdProto = grpc.load(__dirname + "/../protos/rpc.proto"); - /** * etcd client using the grpc protocol of etcd version 3 and later. */ @@ -44,10 +44,10 @@ export class Etcd { this.servers = servers; if (this.servers.length > 1) console.warn("etcd3: currently only the first server address is used"); - this.clients = { KV: new etcdProto.etcdserverpb.KV(this.servers[0], this.credentials), Lease: new etcdProto.etcdserverpb.Lease(this.servers[0], this.credentials), + Watch: new etcdProto.etcdserverpb.Watch(this.servers[0], this.credentials), }; } @@ -289,4 +289,18 @@ export class Etcd { this.keepAlives.push(token); return token; } + + createTransaction( + compare: Array, + success: Array, + failure: Array, + ): Promise { + let transaction = new EtcdTransaction(compare, success, failure); + + return this.callClient( + "KV", + "txn", + transaction.getOp() + ); + } } diff --git a/lib/etcdlock.ts b/lib/etcdlock.ts new file mode 100644 index 0000000..30a662f --- /dev/null +++ b/lib/etcdlock.ts @@ -0,0 +1,117 @@ +import { Etcd } from './etcd'; +import { + CompareCreated, CompareValue, CompareTypeDef, + PutRequest, RangeRequest, CompareVersion, DeleteRangeRequest +} from './transaction'; +import * as uuid from 'node-uuid'; + +const LOCK_ATTEMPS = 10; + +export class Lock { + etcdClient: Etcd; + key: string; + ttl: number = 60; + lease?: string; + uuid?: string; + version = 0; + + constructor ( + etcdClient: any, + key: string, + ttl: number = 60, + lease?: string + ) { + if (!etcdClient) { + throw new Error('No etcd client found'); + } + + this.etcdClient = etcdClient; + this.key = key; + this.ttl = ttl; + this.version = 0; + this.uuid = uuid.v4(); + } + + wait (t) { + return new Promise(function(resolve) { + setTimeout(resolve, t) + }); + } + + *accquire (waitTime = 1000): IterableIterator { + let success = false; + let attemps = LOCK_ATTEMPS; + let response; + + while (!success && attemps > 0) { + attemps -= 1; + if (!this.lease) { + this.lease = yield this.etcdClient.leaseGrant(this.ttl); + } + + let compare = [ + new CompareVersion( + CompareTypeDef.CompareResult.EQUAL, + new Buffer(this.key), + this.version + ) + ]; + + let createLock = [ + new PutRequest({ + key: new Buffer(this.key), + value: new Buffer(this.uuid), + lease: this.lease, + }) + ]; + + let getLock = [ + new RangeRequest({ + key: new Buffer(this.key), + }), + ]; + + response = yield this.etcdClient + .createTransaction(compare, createLock, getLock); + success = response.succeeded; + + if (!success) { + yield this.wait(waitTime); + } + } + + return success; + } + + *release (): IterableIterator { + let success = false; + + let compareOps = [ + new CompareValue( + CompareTypeDef.CompareResult.EQUAL, + new Buffer(this.key), + new Buffer(this.uuid), + ) + ]; + + let deleteOps = [ + new DeleteRangeRequest({ + key: new Buffer(this.key) + }) + ]; + + let result = yield this.etcdClient.createTransaction( + compareOps, deleteOps, [] + ); + + return result.succeeded; + } + + *refresh () : IterableIterator { + if (this.lease) { + return this.etcdClient.leaseKeepAlive(this.lease, this.ttl); + } + + throw new Error('No lease associated with lock. Is lock created?'); + } +} diff --git a/lib/transaction.ts b/lib/transaction.ts new file mode 100644 index 0000000..1dd4b38 --- /dev/null +++ b/lib/transaction.ts @@ -0,0 +1,223 @@ +const grpc = require("grpc"); +const etcdProto = grpc.load(__dirname + "/../protos/rpc.proto"); + +export module CompareTypeDef +{ + export enum CompareResult + { + EQUAL = 0, + GREATER = 1, + LESS = 2, + } + export enum CompareTarget { + VERSION = 0, + CREATE = 1, + MOD = 2, + VALUE = 3, + } +} + +export enum SortOrder { + NONE = 0, // default, no sorting + ASCEND = 1, // lowest target value first + DESCEND = 2, // highest target value first +} + +export enum SortTarget { + KEY = 0, + VERSION = 1, + CREATE = 2, + MOD = 3, + VALUE = 4, +} + +export class EtcdCompare { + compare: any; + + constructor ( + operator: CompareTypeDef.CompareResult, + targetType: CompareTypeDef.CompareTarget, + key: Buffer, + compareTo: any, + ) { + this.compare = {}; + this.compare.result = operator; + this.compare.target = targetType; + this.compare.key = key; + + switch(targetType) { + case CompareTypeDef.CompareTarget.CREATE: + this.compare.create_revision = compareTo; + break; + case CompareTypeDef.CompareTarget.VERSION: + this.compare.version = compareTo; + break; + case CompareTypeDef.CompareTarget.MOD: + this.compare.mod_revision = compareTo; + break; + case CompareTypeDef.CompareTarget.VALUE: + this.compare.value = compareTo; + break; + } + } + + getOp() { + return this.compare; + } +}; + +export class CompareValue extends EtcdCompare { + constructor ( + operator: CompareTypeDef.CompareResult, + key: Buffer, + compareTo: Buffer + ) { + super(operator, CompareTypeDef.CompareTarget.VALUE, key, compareTo); + } +} + +export class CompareLastRevision extends EtcdCompare { + constructor ( + operator: CompareTypeDef.CompareResult, + key: Buffer, + compareTo: Number + ) { + super( + operator, + CompareTypeDef.CompareTarget.MOD, + key, + compareTo + ); + } +} + +export class CompareCreated extends EtcdCompare { + constructor ( + operator: CompareTypeDef.CompareResult, + key: Buffer, + compareTo: Number + ) { + super( + operator, + CompareTypeDef.CompareTarget.CREATE, + key, + compareTo + ); + } +} + +export class CompareVersion extends EtcdCompare { + constructor ( + operator: CompareTypeDef.CompareResult, + key: Buffer, + compareTo: Number + ) { + super( + operator, + CompareTypeDef.CompareTarget.VERSION, + key, + compareTo + ); + } +} + +export class EtcdOpRequest { + type: any; + request: any; + + constructor () {} + + getType () { + return this.type; + } + + getOp(): any { + let op = {}; + if (!this.type) { + throw new Error('Operation type not defined'); + } + + op[this.type] = this.request; + return new etcdProto.etcdserverpb.RequestOp(op); + } + + getRequest () { + return this.request; + } +} + +export class DeleteRangeRequest extends EtcdOpRequest { + type = 'request_delete_range'; + request: { + key: Buffer, + range_end?: Number, + }; + + constructor (request) { + super(); + this.request = new etcdProto.etcdserverpb.DeleteRangeRequest(request); + } +} + +export class RangeRequest extends EtcdOpRequest { + type = 'request_range'; + request: { + key: Buffer, + range_end?: any, + limit?: number, + revision?: number, + sort_order?: SortOrder, + sort_target?: SortTarget, + serializable?: boolean, + keys_only?: boolean, + count_only?: boolean, + }; + + constructor (request: any) { + super(); + this.request = request; + } +} + +export class PutRequest extends EtcdOpRequest { + type = 'request_put'; + request: { + key: Buffer, + value: Buffer, + lease?: Number, + }; + + constructor (request) { + super(); + this.request = new etcdProto.etcdserverpb.PutRequest(request); + } +} + +export class EtcdTransaction { + transaction: any; + + constructor ( + compareOps: Array, + successOps?: Array, + failureOps?: Array + ) { + this.transaction = {}; + + this.transaction.compare = this.mapOps(compareOps); + this.transaction.success = this.mapOps(successOps); + this.transaction.failure = this.mapOps(failureOps); + } + + mapOps(ops) { + if (!ops) { + return []; + } + return ops.map(op => { + return op.getOp(); + }); + } + + getOp() { + return this.transaction; + } +} diff --git a/package.json b/package.json index b6e54e5..c8ededa 100644 --- a/package.json +++ b/package.json @@ -32,9 +32,11 @@ "dependencies": { "deasync-promise": "^1.0.1", "grpc": "^1.0.0", - "lodash.assignin": "^4.2.0" + "lodash.assignin": "^4.2.0", + "node-uuid": "^1.4.7" }, "devDependencies": { + "@types/node": "^7.0.5", "ava": "^0.16.0", "tap-xunit": "^1.4.0", "typedoc": "github:datenmetzgerx/typedoc#typescript-2-build", diff --git a/protos/kv.proto b/protos/kv.proto index 9a8004c..4e64b50 100644 --- a/protos/kv.proto +++ b/protos/kv.proto @@ -35,4 +35,7 @@ message Event { // A DELETE/EXPIRE event contains the deleted key with // its modification revision set to the revision of deletion. KeyValue kv = 2; + + // prev_kv holds the key-value pair before the event happens. + KeyValue prev_kv = 3; } diff --git a/protos/rpc.proto b/protos/rpc.proto index c99b756..27a87c1 100644 --- a/protos/rpc.proto +++ b/protos/rpc.proto @@ -4,6 +4,7 @@ package etcdserverpb; import "kv.proto"; import "auth.proto"; + service KV { // Range gets the keys in the range from the key-value store. rpc Range(RangeRequest) returns (RangeResponse) { @@ -97,8 +98,15 @@ service Lease { }; } + // LeaseTimeToLive retrieves lease information. + rpc LeaseTimeToLive(LeaseTimeToLiveRequest) returns (LeaseTimeToLiveResponse) { + option (google.api.http) = { + post: "/v3alpha/kv/lease/timetolive" + body: "*" + }; + } + // TODO(xiangli) List all existing Leases? - // TODO(xiangli) Get details information (expirations, leased keys, etc.) of a lease? } service Cluster { @@ -338,11 +346,12 @@ message RangeRequest { bytes key = 1; // range_end is the upper bound on the requested range [key, range_end). // If range_end is '\0', the range is all keys >= key. - // If the range_end is one bit larger than the given key, - // then the range requests get the all keys with the prefix (the given key). - // If both key and range_end are '\0', then range requests returns all keys. + // If range_end is key plus one (e.g., "aa"+1 == "ab", "a\xff"+1 == "b"), + // then the range request gets all keys prefixed with key. + // If both key and range_end are '\0', then the range request returns all keys. bytes range_end = 2; - // limit is a limit on the number of keys returned for the request. + // limit is a limit on the number of keys returned for the request. When limit is set to 0, + // it is treated as no limit. int64 limit = 3; // revision is the point-in-time of the key-value store to use for the range. // If revision is less or equal to zero, the range is over the newest key-value store. @@ -368,6 +377,22 @@ message RangeRequest { // count_only when set returns only the count of the keys in the range. bool count_only = 9; + + // min_mod_revision is the lower bound for returned key mod revisions; all keys with + // lesser mod revisions will be filtered away. + int64 min_mod_revision = 10; + + // max_mod_revision is the upper bound for returned key mod revisions; all keys with + // greater mod revisions will be filtered away. + int64 max_mod_revision = 11; + + // min_create_revision is the lower bound for returned key create revisions; all keys with + // lesser create trevisions will be filtered away. + int64 min_create_revision = 12; + + // max_create_revision is the upper bound for returned key create revisions; all keys with + // greater create revisions will be filtered away. + int64 max_create_revision = 13; } message RangeResponse { @@ -389,10 +414,24 @@ message PutRequest { // lease is the lease ID to associate with the key in the key-value store. A lease // value of 0 indicates no lease. int64 lease = 3; + + // If prev_kv is set, etcd gets the previous key-value pair before changing it. + // The previous key-value pair will be returned in the put response. + bool prev_kv = 4; + + // If ignore_value is set, etcd updates the key using its current value. + // Returns an error if the key does not exist. + bool ignore_value = 5; + + // If ignore_lease is set, etcd updates the key using its current lease. + // Returns an error if the key does not exist. + bool ignore_lease = 6; } message PutResponse { ResponseHeader header = 1; + // if prev_kv is set in the request, the previous key-value pair will be returned. + mvccpb.KeyValue prev_kv = 2; } message DeleteRangeRequest { @@ -400,14 +439,22 @@ message DeleteRangeRequest { bytes key = 1; // range_end is the key following the last key to delete for the range [key, range_end). // If range_end is not given, the range is defined to contain only the key argument. + // If range_end is one bit larger than the given key, then the range is all + // the all keys with the prefix (the given key). // If range_end is '\0', the range is all keys greater than or equal to the key argument. bytes range_end = 2; + + // If prev_kv is set, etcd gets the previous key-value pairs before deleting it. + // The previous key-value pairs will be returned in the delte response. + bool prev_kv = 3; } message DeleteRangeResponse { ResponseHeader header = 1; // deleted is the number of keys deleted by the delete range request. int64 deleted = 2; + // if prev_kv is set in the request, the previous key-value pairs will be returned. + repeated mvccpb.KeyValue prev_kvs = 3; } message RequestOp { @@ -433,6 +480,7 @@ message Compare { EQUAL = 0; GREATER = 1; LESS = 2; + NOT_EQUAL = 3; } enum CompareTarget { VERSION = 0; @@ -548,6 +596,8 @@ message WatchCreateRequest { // range_end is the end of the range [key, range_end) to watch. If range_end is not given, // only the key argument is watched. If range_end is equal to '\0', all keys greater than // or equal to the key argument are watched. + // If the range_end is one bit larger than the given key, + // then all keys with the prefix (the given key) will be watched. bytes range_end = 2; // start_revision is an optional revision to watch from (inclusive). No start_revision is "now". int64 start_revision = 3; @@ -556,6 +606,19 @@ message WatchCreateRequest { // wish to recover a disconnected watcher starting from a recent known revision. // The etcd server may decide how often it will send notifications based on current load. bool progress_notify = 4; + + enum FilterType { + // filter out put event. + NOPUT = 0; + // filter out delete event. + NODELETE = 1; + } + // filters filter the events at server side before it sends back to the watcher. + repeated FilterType filters = 5; + + // If prev_kv is set, created watcher gets the previous KV before the event happens. + // If the previous KV is already compacted, nothing will be returned. + bool prev_kv = 6; } message WatchCancelRequest { @@ -626,6 +689,25 @@ message LeaseKeepAliveResponse { int64 TTL = 3; } +message LeaseTimeToLiveRequest { + // ID is the lease ID for the lease. + int64 ID = 1; + // keys is true to query all the keys attached to this lease. + bool keys = 2; +} + +message LeaseTimeToLiveResponse { + ResponseHeader header = 1; + // ID is the lease ID from the keep alive request. + int64 ID = 2; + // TTL is the remaining TTL in seconds for the lease; the lease will expire in under TTL+1 seconds. + int64 TTL = 3; + // GrantedTTL is the initial granted time in seconds upon lease creation/renewal. + int64 grantedTTL = 4; + // Keys is the list of keys attached to this lease. + repeated bytes keys = 5; +} + message Member { // ID is the member ID for this member. uint64 ID = 1; diff --git a/test/kv.ts b/test/kv.ts index 347d464..8f1e070 100644 --- a/test/kv.ts +++ b/test/kv.ts @@ -101,15 +101,6 @@ test("get() returnType value returns a string", (t) => { }); }); -test("get() returnType json returns a object", (t) => { - const etcd = t.context.etcd as Etcd; - return etcd.set(t.context.rkey, { "test": 34 }).then(() => { - return etcd.get(t.context.rkey, "json"); - }).then((value) => { - t.deepEqual(value, { "test": 34 }); - }); -}); - test("get() returnType buffer returns a Buffer", (t) => { const etcd = t.context.etcd as Etcd; return etcd.set(t.context.rkey, { "test": 34 }).then(() => { diff --git a/tsconfig.json b/tsconfig.json index d73a885..4b4dd26 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,9 +1,9 @@ { "compilerOptions": { "module": "commonjs", - "target": "es5", + "target": "es6", "noImplicitAny": false, "sourceMap": false, "declaration": true } -} \ No newline at end of file +} diff --git a/typings.json b/typings.json index d546eff..e329039 100644 --- a/typings.json +++ b/typings.json @@ -3,5 +3,8 @@ "es2015-array": "registry:env/es2015-array#1.0.0+20160526151700", "es2015-promise": "registry:env/es2015-promise#1.0.0+20160526151700", "node": "registry:env/node#4.0.0+20160918225031" + }, + "dependencies": { + "node-uuid": "registry:npm/node-uuid#1.4.7+20160723033700" } }