Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ test.log
/tmp/*
/examples/tmp/*
/bin/serve/docker/prometheus/data
/bin/serve/docker/nats/certs/*.pem
1 change: 1 addition & 0 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
golang 1.21.7
golangci-lint 1.64.5
52 changes: 52 additions & 0 deletions bin/coverage.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#!/usr/bin/env bash

set -e

# Default values
should_open=false
package_path="./..."

# Function to display usage
usage() {
echo "Usage: $0 [-p|--package <package_path>] [-o|--open] [-h|--help]"
echo " -p, --package Specify the package path to test (default: all packages)"
echo " -o, --open Open the coverage report in the default browser"
echo " -h, --help Display this help message"
}

# Parse arguments
while [[ "$#" -gt 0 ]]; do
case $1 in
-p|--package) package_path="$2"; shift ;;
-o|--open) should_open=true ;;
-h|--help)
usage
exit 0
;;
*)
echo "Unknown parameter passed: $1"
usage
exit 1
;;
esac
shift
done

# TODO: work through lint issues and enable golanci-lint
# lint
# golangci-lint run $package_path

# unit test
go test $package_path -coverpkg=$package_path -race -covermode=atomic -coverprofile=coverage.out
go tool cover -html=coverage.out -o ~/Desktop/coverage.html

# report
echo ""
echo "==="
echo "Coverage report is generated at ~/Desktop/coverage.html"
echo "==="

# open
if [ "$should_open" = true ]; then
open ~/Desktop/coverage.html
fi
2 changes: 2 additions & 0 deletions bin/serve/docker.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#!/usr/bin/env bash

./bin/serve/gen-certs.sh

pushd ./bin/serve/docker

make start
Expand Down
15 changes: 15 additions & 0 deletions bin/serve/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,21 @@
version: '3'
services:

nats:
image: docker.io/nats:2.9.20
ports:
- "4222:4222"
- "6222:6222"
- "8222:8222"
volumes:
- "./nats/certs/server.pem:/certs/server.pem"
- "./nats/certs/server-key.pem:/certs/server-key.pem"
command:
- "-tls"
- "-tlscert=/certs/server.pem"
- "-tlskey=/certs/server-key.pem"
- "-js"

# grafana:
# build:
# context: ./grafana
Expand Down
66 changes: 66 additions & 0 deletions bin/serve/gen-certs.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/usr/bin/env bash

set -e

if [ -z "$(which mkcert)" ]; then
echo "mkcert not installed. Installing....."
HOMEBREW_NO_AUTO_UPDATE=1 brew install mkcert
fi

check_and_generate_certificate() {
if [ ! -f "$(mkcert -CAROOT)/rootCA.pem" ]; then
cat << EOF
#####################################################################
# ERROR: The mkcert root CA certificate has not been generated. #
# Please run the following command to install it: #
# #
# mkcert --install #
# #
# This is required for nats to start properly. #
#####################################################################
EOF
return 1
fi


# Check if server.pem exists and if it is a directory
if [ -d ./server.pem ]; then
echo "server.pem is a directory. Deleting certs contents."
rm -rf ./*
echo "Directory cleaned up."
fi

# Check if the server.pem file exists
if [ ! -f ./server.pem ]; then
echo "server.pem does not exist. Generating a new certificate."
mkcert -cert-file server.pem -key-file server-key.pem localhost 127.0.0.1 ::1
return 0
fi

# Get the certificate expiration date
enddate=$(openssl x509 -enddate -noout -in ./server.pem | cut -d= -f2)

# Convert the expiration date to a Unix timestamp
enddate_timestamp=$(date -j -f "%b %d %T %Y %Z" "$enddate" "+%Y%m%d%H%M%S")

# Get today's date as a Unix timestamp
current_timestamp=$(date "+%Y%m%d%H%M%S")

# Compare the dates
if [ "$enddate_timestamp" -le "$current_timestamp" ]; then
echo "Certificate has expired or is expiring today. Generating a new one."
mkcert -cert-file server.pem -key-file server-key.pem localhost 127.0.0.1 ::1
return 0
fi

echo "Certificate is still valid."
}

mkdir -p ./bin/serve/docker/nats/certs

pushd ./bin/serve/docker/nats/certs

check_and_generate_certificate

popd

9 changes: 6 additions & 3 deletions bin/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ go install github.com/jstemmer/go-junit-report/v2@latest

list=`go list ./... | grep -v mocks | grep -v docs | grep -v errors | grep -v examples`

go test -v -coverpkg=./... -race -covermode=atomic -coverprofile=coverage.out 2>&1 $list > test.log
go test -v -coverpkg=./... -race -covermode=atomic -coverprofile=coverage.out $list 2>&1 | tee test.log
test_exit=${PIPESTATUS[0]}

cat test.log
cat test.log | go-junit-report > junit.xml

cat test.log | go-junit-report -set-exit-code > junit.xml
if [ $test_exit -ne 0 ]; then
exit $test_exit
fi

coverage=$(go tool cover -func coverage.out | grep total | awk '{print $3}')

Expand Down
8 changes: 4 additions & 4 deletions circuitbreaker/circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ func (cb *CircuitBreaker) Allow() bool {
allowed = true
}

logger.Debug("circuit breaker allow check", logger.Fields{
"state": cb.state.String(),
cb.config.Logger.Debug("circuit breaker allow check", logger.Fields{
"state": cb.state,
"allowed": allowed,
"attempts": cb.attempts,
"max_calls": cb.config.HalfOpenMaxCalls,
Expand All @@ -123,13 +123,13 @@ func (cb *CircuitBreaker) RecordStart() bool {

switch cb.state {
case StateOpen:
logger.Debug("attempt rejected - circuit open", logger.Fields{
cb.config.Logger.Debug("attempt rejected - circuit open", logger.Fields{
"state": cb.state.String(),
})
return false
case StateHalfOpen:
if cb.attempts >= cb.config.HalfOpenMaxCalls {
logger.Debug("attempt rejected - max half-open calls reached", logger.Fields{
cb.config.Logger.Debug("attempt rejected - max half-open calls reached", logger.Fields{
"attempts": cb.attempts,
"max_calls": cb.config.HalfOpenMaxCalls,
})
Expand Down
43 changes: 11 additions & 32 deletions messaging/natsjscm/natsjscm.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,43 +73,15 @@ func NewConnectionManager(config ConnectionConfig) (Connector, error) {

// Add connection event handlers
options = append(options,
nats.ReconnectHandler(func(_ *nats.Conn) {
cm.handleReconnect()
nats.DisconnectHandler(func(_ *nats.Conn) {
cm.retryConnection()
}),
)

cm.config.Options = options
return cm, nil
}

func (cm *ConnectionManager) handleReconnect() {
cm.mu.Lock()
defer cm.mu.Unlock()

cm.log.Debugf("NATS connection reconnected")

// Recreate JetStream context after reconnect
if cm.nc == nil {
return
}

js, err := cm.createJetStreamContext(cm.nc)
if err != nil {
cm.log.Errorf("Failed to recreate JetStream context %s", err)
go cm.retryConnection()
return
}

cm.js = js
cm.log.Debugf("JetStream context recreated")
}

// createJetStreamContext creates a new JetStream context with the current configuration
func (cm *ConnectionManager) createJetStreamContext(nc *nats.Conn) (jetstream.JetStream, error) {
// Create JetStream context
return jetstream.New(nc)
}

// retryConnection attempts to reconnect to NATS periodically
func (cm *ConnectionManager) retryConnection() {
for {
Expand All @@ -129,7 +101,7 @@ func (cm *ConnectionManager) retryConnection() {
continue
}

js, err := cm.createJetStreamContext(nc)
js, err := jetstream.New(nc)
if err != nil {
cm.log.Errorf("Failed to recreate JetStream context: %s", err)
nc.Close()
Expand All @@ -148,23 +120,28 @@ func (cm *ConnectionManager) retryConnection() {

// Connect establishes a connection to NATS if not already connected
func (cm *ConnectionManager) Connect() error {
cm.log.Debugf("Starting Connect")

cm.mu.Lock()
defer cm.mu.Unlock()

cm.log.Debugf("Checking NATS connection")
// If already connected, increment reference count
if cm.nc != nil && cm.nc.IsConnected() {
cm.refs++
return nil
}

cm.log.Debugf("Connecting to NATS")
// Connect to NATS
nc, err := nats.Connect(cm.config.URL, cm.config.Options...)
if err != nil {
return fmt.Errorf("failed to connect to NATS: %w", err)
}

cm.log.Debugf("Getting JetStream context")
// Create JetStream context
js, err := cm.createJetStreamContext(nc)
js, err := jetstream.New(nc)
if err != nil {
nc.Close()
return fmt.Errorf("failed to create JetStream context: %w", err)
Expand All @@ -174,6 +151,8 @@ func (cm *ConnectionManager) Connect() error {
cm.js = js
cm.refs = 1

cm.log.Debugf("Connected to NATS")

return nil
}

Expand Down
28 changes: 2 additions & 26 deletions messaging/natsjscm/natsjscm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,11 @@ import (

"github.com/nats-io/nats.go"
"github.com/simiancreative/simiango/messaging/natsjscm"
"github.com/simiancreative/simiango/mocks/logger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// Mock Logger for testing
type mockLogger struct {
debugMessages []string
errorMessages []string
}

func newMockLogger() *mockLogger {
return &mockLogger{
debugMessages: []string{},
errorMessages: []string{},
}
}

func (m *mockLogger) Debugf(format string, args ...interface{}) {
m.debugMessages = append(m.debugMessages, fmt.Sprintf(format, args...))
}

func (m *mockLogger) Errorf(format string, args ...interface{}) {
m.errorMessages = append(m.errorMessages, fmt.Sprintf(format, args...))
}

func MockServer(args ...int) func() {
port := 0
if len(args) > 0 {
Expand Down Expand Up @@ -105,7 +85,7 @@ func TestNewConnectionManager(t *testing.T) {
config: natsjscm.ConnectionConfig{
URL: "nats://localhost:4222",
ReconnectWait: 5 * time.Second,
Logger: newMockLogger(),
Logger: &logger.MockLogger{},
},
expectError: false,
},
Expand Down Expand Up @@ -302,14 +282,10 @@ func TestEnsureStream(t *testing.T) {
func TestConnectionEvents(t *testing.T) {
shutdown := MockServer()

// Create a mock logger to capture log messages
logger := newMockLogger()

// Create a connection manager with event handlers
cm, err := natsjscm.NewConnectionManager(natsjscm.ConnectionConfig{
URL: os.Getenv("NATS_HOST"),
ReconnectWait: 100 * time.Millisecond,
Logger: logger,
})
require.NoError(t, err)

Expand Down
Loading
Loading