Skip to content

Commit c3138ee

Browse files
committed
Implemented shared queues, added an example and tests.
Signed-off-by: Hiram Chirino <hiram@hiramchirino.com>
1 parent 87055a2 commit c3138ee

File tree

16 files changed

+694
-30
lines changed

16 files changed

+694
-30
lines changed

src/main/java/io/roastedroot/proxywasm/impl/Imports.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.roastedroot.proxywasm.v1.LogLevel;
1313
import io.roastedroot.proxywasm.v1.MapType;
1414
import io.roastedroot.proxywasm.v1.MetricType;
15+
import io.roastedroot.proxywasm.v1.QueueName;
1516
import io.roastedroot.proxywasm.v1.StreamType;
1617
import io.roastedroot.proxywasm.v1.WasmException;
1718
import io.roastedroot.proxywasm.v1.WasmResult;
@@ -1002,4 +1003,77 @@ int proxySetSharedData(int keyDataPtr, int keySize, int valueDataPtr, int valueS
10021003
return e.result().getValue();
10031004
}
10041005
}
1006+
1007+
@WasmExport
1008+
int proxyRegisterSharedQueue(int queueNameDataPtr, int queueNameSize, int returnQueueId) {
1009+
try {
1010+
// Get queue name from memory
1011+
String queueName = string(readMemory(queueNameDataPtr, queueNameSize));
1012+
1013+
var vmId = handler.getProperty("vm_id");
1014+
1015+
// Register shared queue using handler
1016+
int queueId = handler.registerSharedQueue(new QueueName(vmId, queueName));
1017+
putUint32(returnQueueId, queueId);
1018+
return WasmResult.OK.getValue();
1019+
1020+
} catch (WasmException e) {
1021+
return e.result().getValue();
1022+
}
1023+
}
1024+
1025+
@WasmExport
1026+
int proxyResolveSharedQueue(
1027+
int vmIdDataPtr,
1028+
int vmIdSize,
1029+
int queueNameDataPtr,
1030+
int queueNameSize,
1031+
int returnQueueId) {
1032+
try {
1033+
// Get vm id from memory
1034+
String vmId = string(readMemory(vmIdDataPtr, vmIdSize));
1035+
// Get queue name from memory
1036+
String queueName = string(readMemory(queueNameDataPtr, queueNameSize));
1037+
1038+
// Resolve shared queue using handler
1039+
int queueId = handler.resolveSharedQueue(new QueueName(vmId, queueName));
1040+
putUint32(returnQueueId, queueId);
1041+
return WasmResult.OK.getValue();
1042+
1043+
} catch (WasmException e) {
1044+
return e.result().getValue();
1045+
}
1046+
}
1047+
1048+
@WasmExport
1049+
int proxyEnqueueSharedQueue(int queueId, int valueDataPtr, int valueSize) {
1050+
try {
1051+
// Get value from memory
1052+
byte[] value = readMemory(valueDataPtr, valueSize);
1053+
1054+
// Enqueue shared queue using handler
1055+
WasmResult result = handler.enqueueSharedQueue(queueId, value);
1056+
return result.getValue();
1057+
1058+
} catch (WasmException e) {
1059+
return e.result().getValue();
1060+
}
1061+
}
1062+
1063+
@WasmExport
1064+
int proxyDequeueSharedQueue(int queueId, int returnValueData, int returnValueSize) {
1065+
try {
1066+
// Dequeue shared queue using handler
1067+
byte[] value = handler.dequeueSharedQueue(queueId);
1068+
if (value == null) {
1069+
return WasmResult.EMPTY.getValue();
1070+
}
1071+
1072+
copyIntoInstance(value, returnValueData, returnValueSize);
1073+
return WasmResult.OK.getValue();
1074+
1075+
} catch (WasmException e) {
1076+
return e.result().getValue();
1077+
}
1078+
}
10051079
}

src/main/java/io/roastedroot/proxywasm/v1/ChainedHandler.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,4 +306,24 @@ public SharedData getSharedData(String key) throws WasmException {
306306
public WasmResult setSharedData(String key, byte[] value, int cas) {
307307
return next().setSharedData(key, value, cas);
308308
}
309+
310+
@Override
311+
public int registerSharedQueue(QueueName queueName) throws WasmException {
312+
return next().registerSharedQueue(queueName);
313+
}
314+
315+
@Override
316+
public int resolveSharedQueue(QueueName queueName) throws WasmException {
317+
return next().resolveSharedQueue(queueName);
318+
}
319+
320+
@Override
321+
public byte[] dequeueSharedQueue(int queueId) throws WasmException {
322+
return next().dequeueSharedQueue(queueId);
323+
}
324+
325+
@Override
326+
public WasmResult enqueueSharedQueue(int queueId, byte[] value) {
327+
return next().enqueueSharedQueue(queueId, value);
328+
}
309329
}

src/main/java/io/roastedroot/proxywasm/v1/Handler.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,4 +440,20 @@ default SharedData getSharedData(String key) throws WasmException {
440440
default WasmResult setSharedData(String key, byte[] value, int cas) {
441441
return WasmResult.UNIMPLEMENTED;
442442
}
443+
444+
default int registerSharedQueue(QueueName name) throws WasmException {
445+
throw new WasmException(WasmResult.UNIMPLEMENTED);
446+
}
447+
448+
default int resolveSharedQueue(QueueName name) throws WasmException {
449+
throw new WasmException(WasmResult.UNIMPLEMENTED);
450+
}
451+
452+
default byte[] dequeueSharedQueue(int queueId) throws WasmException {
453+
throw new WasmException(WasmResult.UNIMPLEMENTED);
454+
}
455+
456+
default WasmResult enqueueSharedQueue(int queueId, byte[] value) {
457+
return WasmResult.UNIMPLEMENTED;
458+
}
443459
}

src/main/java/io/roastedroot/proxywasm/v1/ProxyWasm.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,10 @@ public void sendHttpCallResponse(
278278
this.httpCallResponseBody = null;
279279
}
280280

281+
public void sendOnQueueReady(int queueId) {
282+
this.exports.proxyOnQueueReady(pluginContext.id(), queueId);
283+
}
284+
281285
public int contextId() {
282286
return pluginContext.id();
283287
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.roastedroot.proxywasm.v1;
2+
3+
import java.util.Objects;
4+
5+
public class QueueName {
6+
private final String vmId;
7+
private final String name;
8+
9+
public QueueName(String vmId, String name) {
10+
this.vmId = vmId;
11+
this.name = name;
12+
}
13+
14+
@Override
15+
public boolean equals(Object o) {
16+
if (o == null || getClass() != o.getClass()) {
17+
return false;
18+
}
19+
QueueName queue = (QueueName) o;
20+
return Objects.equals(vmId, queue.vmId) && Objects.equals(name, queue.name);
21+
}
22+
23+
@Override
24+
public int hashCode() {
25+
return Objects.hash(vmId, name);
26+
}
27+
28+
public String vmId() {
29+
return vmId;
30+
}
31+
32+
public String name() {
33+
return name;
34+
}
35+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
## Attribution
2+
3+
This example originally came from:
4+
https://github.com/proxy-wasm/proxy-wasm-go-sdk/blob/ab4161dcf9246a828008b539a82a1556cf0f2e24/examples/shared_queue
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
module github.com/proxy-wasm/proxy-wasm-go-sdk/examples/shared_queue
2+
3+
go 1.24
4+
5+
require github.com/proxy-wasm/proxy-wasm-go-sdk v0.0.0-20250212164326-ab4161dcf924
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
2+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
4+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
5+
github.com/proxy-wasm/proxy-wasm-go-sdk v0.0.0-20250212164326-ab4161dcf924 h1:wTcK6gcyTKJMeDka69AMjZYvisdI8CBXzTEfZ+2pOxI=
6+
github.com/proxy-wasm/proxy-wasm-go-sdk v0.0.0-20250212164326-ab4161dcf924/go.mod h1:9mBRvh8I6Td6sg3CwEY+zGFE4DKaIoieCaca1kQnDBE=
7+
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
8+
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
9+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
10+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright 2020-2024 Tetrate
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package main
16+
17+
import (
18+
"fmt"
19+
20+
"github.com/proxy-wasm/proxy-wasm-go-sdk/proxywasm"
21+
"github.com/proxy-wasm/proxy-wasm-go-sdk/proxywasm/types"
22+
)
23+
24+
func main() {}
25+
func init() {
26+
proxywasm.SetVMContext(&vmContext{})
27+
}
28+
29+
// vmContext implements types.VMContext.
30+
type vmContext struct {
31+
// Embed the default VM context here,
32+
// so that we don't need to reimplement all the methods.
33+
types.DefaultVMContext
34+
}
35+
36+
// NewPluginContext implements types.VMContext.
37+
func (*vmContext) NewPluginContext(contextID uint32) types.PluginContext {
38+
return &receiverPluginContext{contextID: contextID}
39+
}
40+
41+
// receiverPluginContext implements types.PluginContext.
42+
type receiverPluginContext struct {
43+
// Embed the default plugin context here,
44+
// so that we don't need to reimplement all the methods.
45+
contextID uint32
46+
types.DefaultPluginContext
47+
queueName string
48+
}
49+
50+
// OnPluginStart implements types.PluginContext.
51+
func (ctx *receiverPluginContext) OnPluginStart(pluginConfigurationSize int) types.OnPluginStartStatus {
52+
// Get Plugin configuration.
53+
config, err := proxywasm.GetPluginConfiguration()
54+
if err != nil {
55+
panic(fmt.Sprintf("failed to get plugin config: %v", err))
56+
}
57+
58+
// Treat the config as the queue name for receiving.
59+
ctx.queueName = string(config)
60+
61+
queueID, err := proxywasm.RegisterSharedQueue(ctx.queueName)
62+
if err != nil {
63+
panic("failed register queue")
64+
}
65+
proxywasm.LogInfof("queue \"%s\" registered as queueID=%d by contextID=%d", ctx.queueName, queueID, ctx.contextID)
66+
return types.OnPluginStartStatusOK
67+
}
68+
69+
// OnQueueReady implements types.PluginContext.
70+
func (ctx *receiverPluginContext) OnQueueReady(queueID uint32) {
71+
data, err := proxywasm.DequeueSharedQueue(queueID)
72+
switch err {
73+
case types.ErrorStatusEmpty:
74+
return
75+
case nil:
76+
proxywasm.LogInfof("(contextID=%d) dequeued data from %s(queueID=%d): %s", ctx.contextID, ctx.queueName, queueID, string(data))
77+
default:
78+
proxywasm.LogCriticalf("error retrieving data from queue %d: %v", queueID, err)
79+
}
80+
}
2.38 MB
Binary file not shown.

0 commit comments

Comments
 (0)