Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/roadrunner-server/beanstalk/v6

go 1.26

toolchain go1.26.0
toolchain go1.26.3

require (
github.com/beanstalkd/go-beanstalk v0.2.0
Expand Down
30 changes: 16 additions & 14 deletions tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,36 @@ module tests

go 1.26

toolchain go1.26.0
toolchain go1.26.3

Comment thread
rustatian marked this conversation as resolved.
require (
connectrpc.com/connect v1.20.0
github.com/Shopify/toxiproxy/v2 v2.12.0
github.com/beanstalkd/go-beanstalk v0.2.0
github.com/google/uuid v1.6.0
github.com/roadrunner-server/api-go/v6 v6.0.0-beta.4
github.com/roadrunner-server/api-go/v6 v6.0.0-beta.12
github.com/roadrunner-server/api-plugins/v6 v6.0.0-beta.2
github.com/roadrunner-server/beanstalk/v6 v6.0.0
github.com/roadrunner-server/config/v6 v6.0.0-beta.3
github.com/roadrunner-server/endure/v2 v2.6.2
github.com/roadrunner-server/goridge/v4 v4.0.0-beta.1
github.com/roadrunner-server/informer/v6 v6.0.0-beta.1
github.com/roadrunner-server/jobs/v6 v6.0.0-beta.6
github.com/roadrunner-server/informer/v6 v6.0.0-beta.2
github.com/roadrunner-server/jobs/v6 v6.0.0-beta.7
github.com/roadrunner-server/logger/v6 v6.0.0-beta.3
github.com/roadrunner-server/resetter/v6 v6.0.0-beta.2
github.com/roadrunner-server/rpc/v6 v6.0.0-beta.3
github.com/roadrunner-server/resetter/v6 v6.0.0-beta.3
github.com/roadrunner-server/rpc/v6 v6.0.0-beta.4
github.com/roadrunner-server/server/v6 v6.0.0-beta.5
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/otel/sdk v1.43.0
go.opentelemetry.io/otel/trace v1.43.0
golang.org/x/net v0.55.0
google.golang.org/genproto v0.0.0-20260504160031-60b97b32f348
google.golang.org/protobuf v1.36.11
)

replace github.com/roadrunner-server/beanstalk/v6 => ../

require (
connectrpc.com/grpcreflect v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
Expand All @@ -51,6 +54,7 @@ require (
github.com/prometheus/procfs v0.20.1 // indirect
github.com/roadrunner-server/errors v1.5.0 // indirect
github.com/roadrunner-server/events v1.0.1 // indirect
github.com/roadrunner-server/goridge/v4 v4.0.0-beta.2 // indirect
github.com/roadrunner-server/pool/v2 v2.0.0-beta.1 // indirect
github.com/roadrunner-server/priority_queue v1.0.6 // indirect
github.com/roadrunner-server/tcplisten v1.5.2 // indirect
Expand All @@ -61,8 +65,8 @@ require (
github.com/spf13/pflag v1.0.10 // indirect
github.com/spf13/viper v1.21.0 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/tklauser/go-sysconf v0.3.16 // indirect
github.com/tklauser/numcpus v0.11.0 // indirect
github.com/tklauser/go-sysconf v0.4.0 // indirect
github.com/tklauser/numcpus v0.12.0 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/contrib/propagators/jaeger v1.43.0 // indirect
Expand All @@ -72,12 +76,10 @@ require (
go.uber.org/zap v1.28.0 // indirect
go.yaml.in/yaml/v2 v2.4.4 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/net v0.54.0 // indirect
golang.org/x/sync v0.20.0 // indirect
golang.org/x/sys v0.44.0 // indirect
golang.org/x/sys v0.45.0 // indirect
golang.org/x/text v0.37.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260504160031-60b97b32f348 // indirect
google.golang.org/grpc v1.81.0 // indirect
google.golang.org/protobuf v1.36.11 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260511170946-3700d4141b60 // indirect
google.golang.org/grpc v1.81.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
52 changes: 28 additions & 24 deletions tests/go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
connectrpc.com/connect v1.20.0 h1:6TNDAB+WeNd2uolWNlYczB5E0KNNaVMNUEx8JEUsPmQ=
connectrpc.com/connect v1.20.0/go.mod h1:A2ygJrukXwWy32vkCAAHNVguZrqZ+jeZ9rGRnGR4dN4=
connectrpc.com/grpcreflect v1.3.0 h1:Y4V+ACf8/vOb1XOc251Qun7jMB75gCUNw6llvB9csXc=
connectrpc.com/grpcreflect v1.3.0/go.mod h1:nfloOtCS8VUQOQ1+GTdFzVg2CJo4ZGaat8JIovCtDYs=
github.com/Shopify/toxiproxy/v2 v2.12.0 h1:d1x++lYZg/zijXPPcv7PH0MvHMzEI5aX/YuUi/Sw+yg=
github.com/Shopify/toxiproxy/v2 v2.12.0/go.mod h1:R9Z38Pw6k2cGZWXHe7tbxjGW9azmY1KbDQJ1kd+h7Tk=
github.com/beanstalkd/go-beanstalk v0.2.0 h1:6UOJugnu47uNB2jJO/lxyDgeD1Yds7owYi1USELqexA=
Expand Down Expand Up @@ -56,8 +60,8 @@ github.com/prometheus/common v0.67.5 h1:pIgK94WWlQt1WLwAC5j2ynLaBRDiinoAb86HZHTU
github.com/prometheus/common v0.67.5/go.mod h1:SjE/0MzDEEAyrdr5Gqc6G+sXI67maCxzaT3A2+HqjUw=
github.com/prometheus/procfs v0.20.1 h1:XwbrGOIplXW/AU3YhIhLODXMJYyC1isLFfYCsTEycfc=
github.com/prometheus/procfs v0.20.1/go.mod h1:o9EMBZGRyvDrSPH1RqdxhojkuXstoe4UlK79eF5TGGo=
github.com/roadrunner-server/api-go/v6 v6.0.0-beta.4 h1:wX8IezPUeeBJzlzaBEFSZBE5Bc/Le1Uf/GdFRdFO3HQ=
github.com/roadrunner-server/api-go/v6 v6.0.0-beta.4/go.mod h1:jI30i64yCAxJh7KHc8e1B8NgDcvcnSTI1OIK8lTE+Y0=
github.com/roadrunner-server/api-go/v6 v6.0.0-beta.12 h1:FcRcCvW9OfQvH45SFsI21VoHpOOov56OvOSnO4UKvXs=
github.com/roadrunner-server/api-go/v6 v6.0.0-beta.12/go.mod h1:prGWJ2GoF5YD5PIG7Tb6VKulU3bWoFwr9DCwgxheb80=
github.com/roadrunner-server/api-plugins/v6 v6.0.0-beta.2 h1:GqsZzWQ5jMXRF1O/b8IqFz9PLpS7Ui0K4OyACLql2MI=
github.com/roadrunner-server/api-plugins/v6 v6.0.0-beta.2/go.mod h1:2v4yUK5Kvbvq8C3IkDoBkuamq9h+7i/JLjyf7k1j5JM=
github.com/roadrunner-server/config/v6 v6.0.0-beta.3 h1:G0EUzJ6Yw4UnleM6BhnOBbYPXKDHRmCJiGhC3nXDBwI=
Expand All @@ -68,22 +72,22 @@ github.com/roadrunner-server/errors v1.5.0 h1:unG7LKIZrSzkCCF3YLRLA5VyqE0KKomofX
github.com/roadrunner-server/errors v1.5.0/go.mod h1:g9fo/T2C13cWRDR9PW1r0ZAOSQfNhWAZawyfkGiaHuI=
github.com/roadrunner-server/events v1.0.1 h1:waCkKhxhzdK3VcI1xG22l+h+0J+Nfdpxjhyy01Un+kI=
github.com/roadrunner-server/events v1.0.1/go.mod h1:WZRqoEVaFm209t52EuoT7ISUtvX6BrCi6bI/7pjkVC0=
github.com/roadrunner-server/goridge/v4 v4.0.0-beta.1 h1:dO1wKnuMr4xMmH6DY2ZaZ6FWS+Vo50+C7fuAcyO/xBk=
github.com/roadrunner-server/goridge/v4 v4.0.0-beta.1/go.mod h1:+gKla9HAyYlk0TsC9VktwtOL63aimsWT3oPsuCLh4/o=
github.com/roadrunner-server/informer/v6 v6.0.0-beta.1 h1:xAd8/KtliP+lpuVdDNB8b6YtkcYEDCV7QDy5dkrVnI0=
github.com/roadrunner-server/informer/v6 v6.0.0-beta.1/go.mod h1:9PoEMzJ5dAh3tUsxpv9uX8Kpo28PUR3hkh6JCD6olBc=
github.com/roadrunner-server/jobs/v6 v6.0.0-beta.6 h1:qjU8lhLxvnYuLdkAbvFM0kjnvQ6+UWmALR5wswdWfqM=
github.com/roadrunner-server/jobs/v6 v6.0.0-beta.6/go.mod h1:+ERXbdsujwQKhlwbbLyqR0ZZgHKnG5nogbITnmMReOI=
github.com/roadrunner-server/goridge/v4 v4.0.0-beta.2 h1:MgH6oiSgcl+vphsQ6JpyedkXQ/DPf8zVpn0z7rdBp10=
github.com/roadrunner-server/goridge/v4 v4.0.0-beta.2/go.mod h1:Wv9CBO9VIU92e5iZIuehLHKakXgMkOzxoT4/oHDjIUA=
github.com/roadrunner-server/informer/v6 v6.0.0-beta.2 h1:tJsNgbQ28mK5CdQCpU+BY6ScWP884nhpGYfwJalZOlU=
github.com/roadrunner-server/informer/v6 v6.0.0-beta.2/go.mod h1:nDn5jjR1ZI1Xhz01j32bSs4PHv78IE96rcqdFr/ZvgU=
github.com/roadrunner-server/jobs/v6 v6.0.0-beta.7 h1:RNb8fkVk2GO02E3FFyCmYz6WFkeXrBfV/daK84WXAD0=
github.com/roadrunner-server/jobs/v6 v6.0.0-beta.7/go.mod h1:kHTJXgjMe/7t9vJN4imEO3RlB7crnnp59X8TmgiEOqw=
github.com/roadrunner-server/logger/v6 v6.0.0-beta.3 h1:eoJKXAUSyykDfVX6eTUhmAn6Y8pS/LyI5fDP4H+G5rQ=
github.com/roadrunner-server/logger/v6 v6.0.0-beta.3/go.mod h1:MwHb3AbltHYtu7nRpml5NeYu7O+W8rCpDBeNTTEoE1M=
github.com/roadrunner-server/pool/v2 v2.0.0-beta.1 h1:jpYXFtdD6QGAdAGPgMxrNi3j1CegCRpb2y+A+3GnXFA=
github.com/roadrunner-server/pool/v2 v2.0.0-beta.1/go.mod h1:Bo1wT7RtL3eyQHXBUohNhtj/yAmRt6Rq8smuBg5pWkY=
github.com/roadrunner-server/priority_queue v1.0.6 h1:x8bcMyjWs2Z4ySbO9BTP8Dzy2prCuazJY9HHrVTmUVY=
github.com/roadrunner-server/priority_queue v1.0.6/go.mod h1:aJ2D9s18+OGpFfNgwoIduraaFYBGv4FKElnpzqO+TBI=
github.com/roadrunner-server/resetter/v6 v6.0.0-beta.2 h1:j3boFQdHx0wJSQtqNnayp4HYafE6QJ5plkuWi7tEVng=
github.com/roadrunner-server/resetter/v6 v6.0.0-beta.2/go.mod h1:M3VQ25p/qeaaf8raLJqo1NAIMNxvlQ6EDhDlIbJTSZs=
github.com/roadrunner-server/rpc/v6 v6.0.0-beta.3 h1:hvVEDIMB9MKI8uWX++MrBzHRzq404ygU0fDs6U2V/3Y=
github.com/roadrunner-server/rpc/v6 v6.0.0-beta.3/go.mod h1:BpDpd2/UceDdsDJNP0iMfmegbXthxiZM4MU6GOJoSXo=
github.com/roadrunner-server/resetter/v6 v6.0.0-beta.3 h1:+hkbf/kXpvFjx4LfkuH8dvR07rBwrStmcqJaffsfL0g=
github.com/roadrunner-server/resetter/v6 v6.0.0-beta.3/go.mod h1:zV+MfVo6jtvrop+04HNcr4z3b/22qyKXukK29kzagYc=
github.com/roadrunner-server/rpc/v6 v6.0.0-beta.4 h1:Qj2nrHIWOHE9Tys+FBG2IdoPtzgIUh6juQ5wXLGGDMw=
github.com/roadrunner-server/rpc/v6 v6.0.0-beta.4/go.mod h1:k5KT3fpnJVd27m0HbGGBiTPXlWI6eJdd6C+ohp5IE0U=
github.com/roadrunner-server/server/v6 v6.0.0-beta.5 h1:poCPSHc768UtMqJRgW3rZT5pDboOnwOD4qc28hhXJ4I=
github.com/roadrunner-server/server/v6 v6.0.0-beta.5/go.mod h1:SbODuCzC2gcbFhAmJDWvjf34pPrUWP5NxxVsTRQDuZ4=
github.com/roadrunner-server/tcplisten v1.5.2 h1:nn8yXYrhRDkfQ9AAu4V075uT4fZRmOnpxkawgE+bWPA=
Expand All @@ -106,10 +110,10 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
github.com/tklauser/go-sysconf v0.3.16 h1:frioLaCQSsF5Cy1jgRBrzr6t502KIIwQ0MArYICU0nA=
github.com/tklauser/go-sysconf v0.3.16/go.mod h1:/qNL9xxDhc7tx3HSRsLWNnuzbVfh3e7gh/BmM179nYI=
github.com/tklauser/numcpus v0.11.0 h1:nSTwhKH5e1dMNsCdVBukSZrURJRoHbSEQjdEbY+9RXw=
github.com/tklauser/numcpus v0.11.0/go.mod h1:z+LwcLq54uWZTX0u/bGobaV34u6V7KNlTZejzM6/3MQ=
github.com/tklauser/go-sysconf v0.4.0 h1:7H0uAN+7RkwWRaxhYXDLqa5V3LPrJeV8wmD9dRUgPQU=
github.com/tklauser/go-sysconf v0.4.0/go.mod h1:8mTNWyog7H+MpKijp4VmKJAd2bbYQ2zuUwkYRbUArPI=
github.com/tklauser/numcpus v0.12.0 h1:NR85qdvHA9pFse3x3weVZ0r0ST8R6l5RHbZrlRaqob4=
github.com/tklauser/numcpus v0.12.0/go.mod h1:ABHeXzJnr/qqwguhClkZKT1/8VABcYrsyUiUGobwWJg=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
Expand All @@ -136,24 +140,24 @@ go.yaml.in/yaml/v2 v2.4.4 h1:tuyd0P+2Ont/d6e2rl3be67goVK4R6deVxCUX5vyPaQ=
go.yaml.in/yaml/v2 v2.4.4/go.mod h1:gMZqIpDtDqOfM0uNfy0SkpRhvUryYH0Z6wdMYcacYXQ=
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
golang.org/x/net v0.54.0 h1:2zJIZAxAHV/OHCDTCOHAYehQzLfSXuf/5SoL/Dv6w/w=
golang.org/x/net v0.54.0/go.mod h1:Sj4oj8jK6XmHpBZU/zWHw3BV3abl4Kvi+Ut7cQcY+cQ=
golang.org/x/net v0.55.0 h1:bcvxaJn3e1U6InsFWt1JUq1aSjnRxLzT2rtD2KfkDF8=
golang.org/x/net v0.55.0/go.mod h1:L5U2KuzuOe1lY7Z+aWVIKK6qEeJXnXV9yzGA+WCHJww=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ=
golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/sys v0.45.0 h1:dO4czNzziLiiXplLQgBCEpCvXQ3dnkn0SdaZSYdQ+FY=
golang.org/x/sys v0.45.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc=
golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38=
gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4=
gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E=
google.golang.org/genproto v0.0.0-20260504160031-60b97b32f348 h1:JjVGDZYWkJWZcxveJGzfkXC5myDVWAd4dZdgbzrDUv8=
google.golang.org/genproto v0.0.0-20260504160031-60b97b32f348/go.mod h1:95PqD4xM+AdOcBGsmgfaofXsiA37uXDtDufVbntT3TU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260504160031-60b97b32f348 h1:pfIbyB44sWzHiCpRqIen67ZQnVXSfIxWrqUMk1qwODE=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260504160031-60b97b32f348/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8=
google.golang.org/grpc v1.81.0 h1:W3G9N3KQf3BU+YuCtGKJk0CmxQNbAISICD/9AORxLIw=
google.golang.org/grpc v1.81.0/go.mod h1:xGH9GfzOyMTGIOXBJmXt+BX/V0kcdQbdcuwQ/zNw42I=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260511170946-3700d4141b60 h1:seT2EwLWM78plQ7wcDfuWBc/4FAEAXDDiaSol4ku4qo=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260511170946-3700d4141b60/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8=
google.golang.org/grpc v1.81.1 h1:VnnIIZ88UzOOKLukQi+ImGz8O1Wdp8nAGGnvOfEIWQQ=
google.golang.org/grpc v1.81.1/go.mod h1:xGH9GfzOyMTGIOXBJmXt+BX/V0kcdQbdcuwQ/zNw42I=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
137 changes: 51 additions & 86 deletions tests/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,78 +2,57 @@ package helpers

import (
"bytes"
"context"
"crypto/tls"
"net"
"net/http"
"net/rpc"
"slices"
"testing"
"time"

"connectrpc.com/connect"
"github.com/google/uuid"
jobsProto "github.com/roadrunner-server/api-go/v6/jobs/v2"
"github.com/roadrunner-server/api-go/v6/jobs/v2/jobsV2connect"
jobState "github.com/roadrunner-server/api-plugins/v6/jobs"
goridgeRpc "github.com/roadrunner-server/goridge/v4/pkg/rpc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/http2"
"google.golang.org/protobuf/types/known/emptypb"
)

const (
push string = "jobs.Push"
pause string = "jobs.Pause"
destroy string = "jobs.Destroy"
resume string = "jobs.Resume"
stat string = "jobs.Stat"
)

func callPipelinesRPC(t *testing.T, method, address string, pipes ...string) {
func NewJobsClient(t *testing.T, address string) jobsV2connect.JobsServiceClient {
t.Helper()
conn, err := (&net.Dialer{}).DialContext(t.Context(), "tcp", address)
require.NoError(t, err)
defer func() { _ = conn.Close() }()
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
defer func() { _ = client.Close() }()

pipe := &jobsProto.Pipelines{Pipelines: make([]string, len(pipes))}

for i := range len(pipes) {
pipe.GetPipelines()[i] = pipes[i]
}

er := &jobsProto.JobsHandlerResponse{}
err = client.Call(method, pipe, er)
require.NoError(t, err)
httpc := &http.Client{Transport: &http2.Transport{
AllowHTTP: true,
DialTLSContext: func(ctx context.Context, network, addr string, _ *tls.Config) (net.Conn, error) {
return new(net.Dialer).DialContext(ctx, network, addr)
},
}}
t.Cleanup(httpc.CloseIdleConnections)
return jobsV2connect.NewJobsServiceClient(httpc, "http://"+address)
Comment thread
rustatian marked this conversation as resolved.
}

func ResumePipes(address string, pipes ...string) func(t *testing.T) {
return func(t *testing.T) {
callPipelinesRPC(t, resume, address, pipes...)
client := NewJobsClient(t, address)
_, err := client.Resume(t.Context(), connect.NewRequest(&jobsProto.Pipelines{Pipelines: slices.Clone(pipes)}))
require.NoError(t, err)
}
}

func PushToPipe(pipeline string, autoAck bool, address string) func(t *testing.T) {
return func(t *testing.T) {
conn, err := (&net.Dialer{}).DialContext(t.Context(), "tcp", address)
require.NoError(t, err)
defer func() { _ = conn.Close() }()
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
defer func() { _ = client.Close() }()

req := &jobsProto.PushBatchRequest{Jobs: []*jobsProto.Job{createDummyJob(pipeline, autoAck)}}

er := &jobsProto.JobsHandlerResponse{}
err = client.Call(push, req, er)
client := NewJobsClient(t, address)
_, err := client.Push(t.Context(), connect.NewRequest(&jobsProto.PushRequest{Job: createDummyJob(pipeline, autoAck)}))
require.NoError(t, err)
}
}

func PushToPipeDelayed(address string, pipeline string, delay int64) func(t *testing.T) {
return func(t *testing.T) {
conn, err := (&net.Dialer{}).DialContext(t.Context(), "tcp", address)
assert.NoError(t, err)
defer func() { _ = conn.Close() }()
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
defer func() { _ = client.Close() }()

req := &jobsProto.PushBatchRequest{Jobs: []*jobsProto.Job{&jobsProto.Job{
client := NewJobsClient(t, address)
req := &jobsProto.PushRequest{Job: &jobsProto.Job{
Job: "some/php/namespace",
Id: uuid.NewString(),
Payload: []byte(`{"hello":"world"}`),
Expand All @@ -83,10 +62,8 @@ func PushToPipeDelayed(address string, pipeline string, delay int64) func(t *tes
Pipeline: pipeline,
Delay: delay,
},
}}}

er := &jobsProto.JobsHandlerResponse{}
err = client.Call(push, req, er)
}}
_, err := client.Push(t.Context(), connect.NewRequest(req))
assert.NoError(t, err)
}
}
Expand All @@ -108,60 +85,48 @@ func createDummyJob(pipeline string, autoAck bool) *jobsProto.Job {

func PausePipelines(address string, pipes ...string) func(t *testing.T) {
return func(t *testing.T) {
callPipelinesRPC(t, pause, address, pipes...)
client := NewJobsClient(t, address)
_, err := client.Pause(t.Context(), connect.NewRequest(&jobsProto.Pipelines{Pipelines: slices.Clone(pipes)}))
require.NoError(t, err)
}
}

func DestroyPipelines(address string, pipes ...string) func(t *testing.T) {
return func(t *testing.T) {
conn, err := (&net.Dialer{}).DialContext(t.Context(), "tcp", address)
assert.NoError(t, err)
defer func() { _ = conn.Close() }()
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
defer func() { _ = client.Close() }()

pipe := &jobsProto.Pipelines{Pipelines: make([]string, len(pipes))}

for i := range len(pipes) {
pipe.GetPipelines()[i] = pipes[i]
}
client := NewJobsClient(t, address)
req := &jobsProto.Pipelines{Pipelines: slices.Clone(pipes)}

// Retry the destroy 10× with 1s gaps; if all attempts fail, return
// without asserting. Some negative tests intentionally destroy
// non-existent pipelines and rely on this silent-after-retry pattern.
for range 10 {
er := &jobsProto.JobsHandlerResponse{}
err = client.Call(destroy, pipe, er)
if err != nil {
time.Sleep(time.Second)
continue
_, err := client.Destroy(t.Context(), connect.NewRequest(req))
if err == nil {
return
}
assert.NoError(t, err)
break
time.Sleep(time.Second)
}
}
}

func Stats(address string, state *jobState.State) func(t *testing.T) {
return func(t *testing.T) {
conn, err := (&net.Dialer{}).DialContext(t.Context(), "tcp", address)
require.NoError(t, err)
defer func() { _ = conn.Close() }()
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
defer func() { _ = client.Close() }()

st := &jobsProto.Stats{}
er := &jobsProto.JobsHandlerResponse{}
client := NewJobsClient(t, address)

err = client.Call(stat, er, st)
resp, err := client.GetStats(t.Context(), connect.NewRequest(&emptypb.Empty{}))
require.NoError(t, err)
require.NotNil(t, st)

state.Queue = st.Stats[0].Queue
state.Pipeline = st.Stats[0].Pipeline
state.Driver = st.Stats[0].Driver
state.Active = st.Stats[0].Active
state.Delayed = st.Stats[0].Delayed
state.Reserved = st.Stats[0].Reserved
state.Ready = st.Stats[0].Ready
state.Priority = st.Stats[0].Priority
require.NotNil(t, resp)
require.NotEmpty(t, resp.Msg.GetStats())

st := resp.Msg.GetStats()[0]
state.Queue = st.GetQueue()
state.Pipeline = st.GetPipeline()
state.Driver = st.GetDriver()
state.Active = st.GetActive()
state.Delayed = st.GetDelayed()
state.Reserved = st.GetReserved()
state.Ready = st.GetReady()
state.Priority = st.GetPriority()
}
}

Expand Down
Loading
Loading