Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/streaming/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
myfile
45 changes: 45 additions & 0 deletions examples/streaming/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# gRPC streaming Example

This example builds a plugin & client which can stream a large amount of data
between them while staying below reasonable message size limits of the gRPC
protocol.

> Note: [hashicorp/go-plugin sets an upper limit on message size](https://github.com/hashicorp/go-plugin/blob/d0d30899ca2d91b0869cb73db95afca180e769cf/grpc_client.go#L39-L41). At time of writing, that value is `math.MaxInt32` bytes, or approximately 2GB.

## To execute

Build the plugin

```
go build -o ./plugin/streamer ./plugin
```

Launch the client:

```
go run main.go myfile
```

The client will first write data to the streamer plugin, and then the client will read that
data back from the plugin. The plugin writes the data it receives in a file called `myfile`,
due to the argument passed to the client above.

## To re-generate protobuf definitions

Install protobuf tooling

```
brew install protobuf@29
```

```
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.36.6
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.3.0
```

generate files

```
cd proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative streamer.proto
```
87 changes: 87 additions & 0 deletions examples/streaming/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright IBM Corp. 2016, 2025
// SPDX-License-Identifier: MPL-2.0

package main

import (
"context"
"fmt"
"log"
"os"
"os/exec"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/go-plugin/examples/streaming/shared"
"google.golang.org/grpc"
)

func main() {
if len(os.Args) != 2 {
log.Fatal("expected path to file as an argument")
}
path := os.Args[1]

logger := hclog.New(&hclog.LoggerOptions{
Level: hclog.Trace,
Output: os.Stderr,
JSONFormat: true,
})

msgSizeLimit := 1000
chunkSize := 10

client := plugin.NewClient(&plugin.ClientConfig{
HandshakeConfig: plugin.HandshakeConfig{
ProtocolVersion: 1,
MagicCookieKey: "BASIC_PLUGIN",
MagicCookieValue: "hello",
},
Plugins: map[string]plugin.Plugin{
"streamer": &shared.StreamerPlugin{},
},
Cmd: exec.Command("./plugin/streamer"),
AllowedProtocols: []plugin.Protocol{
plugin.ProtocolGRPC,
},
Logger: logger,
GRPCDialOptions: []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(msgSizeLimit)),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(msgSizeLimit)),
},
})
defer client.Kill()

logger.Debug("launching a client")

rpcClient, err := client.Client()
if err != nil {
log.Fatal(err)
}

raw, err := rpcClient.Dispense("streamer")
if err != nil {
log.Fatal(err)
}

ctx := context.Background()

streamer := raw.(shared.Streamer)
err = streamer.Configure(ctx, path, int64(chunkSize))
if err != nil {
log.Fatal(err)
}

err = streamer.Write(ctx, []byte("Lorem ipsum dolor sit amet"))
if err != nil {
log.Fatal(err)
}

logger.Debug("writing finished")

b, err := streamer.Read(ctx)
if err != nil {
log.Fatal(err)
}
logger.Debug(fmt.Sprintf("received %d bytes", len(b)), "bytes", string(b))
}
1 change: 1 addition & 0 deletions examples/streaming/plugin/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
streamer
89 changes: 89 additions & 0 deletions examples/streaming/plugin/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright IBM Corp. 2016, 2025
// SPDX-License-Identifier: MPL-2.0

package main

import (
"context"
"errors"
"io"
"os"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/go-plugin/examples/streaming/shared"
)

type FileStreamer struct {
logger hclog.Logger
path string
}

func (fs *FileStreamer) Configure(ctx context.Context, path string, _ int64) error {
fs.path = path
return nil
}

func (fs *FileStreamer) Read(ctx context.Context) ([]byte, error) {
fs.logger.Debug("FileStreamer: Read", "path", fs.path)
f, err := os.OpenFile(fs.path, os.O_RDONLY, 0644)
if err != nil {
return nil, err
}
defer func() {
cErr := f.Close()
err = errors.Join(err, cErr)
}()
return io.ReadAll(f)
}

func (fs *FileStreamer) Write(ctx context.Context, b []byte) error {
fs.logger.Debug("FileStreamer: Write", "path", fs.path)
f, err := os.OpenFile(fs.path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return err
}
defer func() {
cErr := f.Close()
err = errors.Join(err, cErr)
}()

n, err := f.Write(b)
if err != nil {
return err
}
fs.logger.Debug("FileStreamer: Write finished", "bytes written", n)
return nil
}

var handshakeConfig = plugin.HandshakeConfig{
ProtocolVersion: 1,
MagicCookieKey: "BASIC_PLUGIN",
MagicCookieValue: "hello",
}

func main() {
logger := hclog.New(&hclog.LoggerOptions{
Level: hclog.Trace,
Output: os.Stderr,
JSONFormat: true,
})

streamer := &FileStreamer{
logger: logger,
}
var pluginMap = map[string]plugin.Plugin{
"streamer": &shared.StreamerPlugin{
Impl: streamer,
},
}

logger.Debug("plugin launched, about to be served")

plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: handshakeConfig,
Plugins: pluginMap,
GRPCServer: plugin.DefaultGRPCServer,
Logger: logger,
})
}
Loading