diff --git a/go.mod b/go.mod index 169a4e6..b360cb9 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,7 @@ module github.com/roadrunner-server/beanstalk/v6 go 1.26 -toolchain go1.26.0 +toolchain go1.26.3 require ( github.com/beanstalkd/go-beanstalk v0.2.0 diff --git a/tests/go.mod b/tests/go.mod index 617790f..7586e85 100644 --- a/tests/go.mod +++ b/tests/go.mod @@ -2,33 +2,36 @@ module tests go 1.26 -toolchain go1.26.0 +toolchain go1.26.3 require ( + connectrpc.com/connect v1.20.0 github.com/Shopify/toxiproxy/v2 v2.12.0 github.com/beanstalkd/go-beanstalk v0.2.0 github.com/google/uuid v1.6.0 - github.com/roadrunner-server/api-go/v6 v6.0.0-beta.4 + github.com/roadrunner-server/api-go/v6 v6.0.0-beta.12 github.com/roadrunner-server/api-plugins/v6 v6.0.0-beta.2 github.com/roadrunner-server/beanstalk/v6 v6.0.0 github.com/roadrunner-server/config/v6 v6.0.0-beta.3 github.com/roadrunner-server/endure/v2 v2.6.2 - github.com/roadrunner-server/goridge/v4 v4.0.0-beta.1 - github.com/roadrunner-server/informer/v6 v6.0.0-beta.1 - github.com/roadrunner-server/jobs/v6 v6.0.0-beta.6 + github.com/roadrunner-server/informer/v6 v6.0.0-beta.2 + github.com/roadrunner-server/jobs/v6 v6.0.0-beta.7 github.com/roadrunner-server/logger/v6 v6.0.0-beta.3 - github.com/roadrunner-server/resetter/v6 v6.0.0-beta.2 - github.com/roadrunner-server/rpc/v6 v6.0.0-beta.3 + github.com/roadrunner-server/resetter/v6 v6.0.0-beta.3 + github.com/roadrunner-server/rpc/v6 v6.0.0-beta.4 github.com/roadrunner-server/server/v6 v6.0.0-beta.5 github.com/stretchr/testify v1.11.1 go.opentelemetry.io/otel/sdk v1.43.0 go.opentelemetry.io/otel/trace v1.43.0 + golang.org/x/net v0.55.0 google.golang.org/genproto v0.0.0-20260504160031-60b97b32f348 + google.golang.org/protobuf v1.36.11 ) replace github.com/roadrunner-server/beanstalk/v6 => ../ require ( + connectrpc.com/grpcreflect v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -51,6 +54,7 @@ require ( github.com/prometheus/procfs v0.20.1 // indirect github.com/roadrunner-server/errors v1.5.0 // indirect github.com/roadrunner-server/events v1.0.1 // indirect + github.com/roadrunner-server/goridge/v4 v4.0.0-beta.2 // indirect github.com/roadrunner-server/pool/v2 v2.0.0-beta.1 // indirect github.com/roadrunner-server/priority_queue v1.0.6 // indirect github.com/roadrunner-server/tcplisten v1.5.2 // indirect @@ -61,8 +65,8 @@ require ( github.com/spf13/pflag v1.0.10 // indirect github.com/spf13/viper v1.21.0 // indirect github.com/subosito/gotenv v1.6.0 // indirect - github.com/tklauser/go-sysconf v0.3.16 // indirect - github.com/tklauser/numcpus v0.11.0 // indirect + github.com/tklauser/go-sysconf v0.4.0 // indirect + github.com/tklauser/numcpus v0.12.0 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/contrib/propagators/jaeger v1.43.0 // indirect @@ -72,12 +76,10 @@ require ( go.uber.org/zap v1.28.0 // indirect go.yaml.in/yaml/v2 v2.4.4 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect - golang.org/x/net v0.54.0 // indirect golang.org/x/sync v0.20.0 // indirect - golang.org/x/sys v0.44.0 // indirect + golang.org/x/sys v0.45.0 // indirect golang.org/x/text v0.37.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20260504160031-60b97b32f348 // indirect - google.golang.org/grpc v1.81.0 // indirect - google.golang.org/protobuf v1.36.11 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20260511170946-3700d4141b60 // indirect + google.golang.org/grpc v1.81.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/tests/go.sum b/tests/go.sum index c93c9f2..2da007b 100644 --- a/tests/go.sum +++ b/tests/go.sum @@ -1,3 +1,7 @@ +connectrpc.com/connect v1.20.0 h1:6TNDAB+WeNd2uolWNlYczB5E0KNNaVMNUEx8JEUsPmQ= +connectrpc.com/connect v1.20.0/go.mod h1:A2ygJrukXwWy32vkCAAHNVguZrqZ+jeZ9rGRnGR4dN4= +connectrpc.com/grpcreflect v1.3.0 h1:Y4V+ACf8/vOb1XOc251Qun7jMB75gCUNw6llvB9csXc= +connectrpc.com/grpcreflect v1.3.0/go.mod h1:nfloOtCS8VUQOQ1+GTdFzVg2CJo4ZGaat8JIovCtDYs= github.com/Shopify/toxiproxy/v2 v2.12.0 h1:d1x++lYZg/zijXPPcv7PH0MvHMzEI5aX/YuUi/Sw+yg= github.com/Shopify/toxiproxy/v2 v2.12.0/go.mod h1:R9Z38Pw6k2cGZWXHe7tbxjGW9azmY1KbDQJ1kd+h7Tk= github.com/beanstalkd/go-beanstalk v0.2.0 h1:6UOJugnu47uNB2jJO/lxyDgeD1Yds7owYi1USELqexA= @@ -56,8 +60,8 @@ github.com/prometheus/common v0.67.5 h1:pIgK94WWlQt1WLwAC5j2ynLaBRDiinoAb86HZHTU github.com/prometheus/common v0.67.5/go.mod h1:SjE/0MzDEEAyrdr5Gqc6G+sXI67maCxzaT3A2+HqjUw= github.com/prometheus/procfs v0.20.1 h1:XwbrGOIplXW/AU3YhIhLODXMJYyC1isLFfYCsTEycfc= github.com/prometheus/procfs v0.20.1/go.mod h1:o9EMBZGRyvDrSPH1RqdxhojkuXstoe4UlK79eF5TGGo= -github.com/roadrunner-server/api-go/v6 v6.0.0-beta.4 h1:wX8IezPUeeBJzlzaBEFSZBE5Bc/Le1Uf/GdFRdFO3HQ= -github.com/roadrunner-server/api-go/v6 v6.0.0-beta.4/go.mod h1:jI30i64yCAxJh7KHc8e1B8NgDcvcnSTI1OIK8lTE+Y0= +github.com/roadrunner-server/api-go/v6 v6.0.0-beta.12 h1:FcRcCvW9OfQvH45SFsI21VoHpOOov56OvOSnO4UKvXs= +github.com/roadrunner-server/api-go/v6 v6.0.0-beta.12/go.mod h1:prGWJ2GoF5YD5PIG7Tb6VKulU3bWoFwr9DCwgxheb80= github.com/roadrunner-server/api-plugins/v6 v6.0.0-beta.2 h1:GqsZzWQ5jMXRF1O/b8IqFz9PLpS7Ui0K4OyACLql2MI= github.com/roadrunner-server/api-plugins/v6 v6.0.0-beta.2/go.mod h1:2v4yUK5Kvbvq8C3IkDoBkuamq9h+7i/JLjyf7k1j5JM= github.com/roadrunner-server/config/v6 v6.0.0-beta.3 h1:G0EUzJ6Yw4UnleM6BhnOBbYPXKDHRmCJiGhC3nXDBwI= @@ -68,22 +72,22 @@ github.com/roadrunner-server/errors v1.5.0 h1:unG7LKIZrSzkCCF3YLRLA5VyqE0KKomofX github.com/roadrunner-server/errors v1.5.0/go.mod h1:g9fo/T2C13cWRDR9PW1r0ZAOSQfNhWAZawyfkGiaHuI= github.com/roadrunner-server/events v1.0.1 h1:waCkKhxhzdK3VcI1xG22l+h+0J+Nfdpxjhyy01Un+kI= github.com/roadrunner-server/events v1.0.1/go.mod h1:WZRqoEVaFm209t52EuoT7ISUtvX6BrCi6bI/7pjkVC0= -github.com/roadrunner-server/goridge/v4 v4.0.0-beta.1 h1:dO1wKnuMr4xMmH6DY2ZaZ6FWS+Vo50+C7fuAcyO/xBk= -github.com/roadrunner-server/goridge/v4 v4.0.0-beta.1/go.mod h1:+gKla9HAyYlk0TsC9VktwtOL63aimsWT3oPsuCLh4/o= -github.com/roadrunner-server/informer/v6 v6.0.0-beta.1 h1:xAd8/KtliP+lpuVdDNB8b6YtkcYEDCV7QDy5dkrVnI0= -github.com/roadrunner-server/informer/v6 v6.0.0-beta.1/go.mod h1:9PoEMzJ5dAh3tUsxpv9uX8Kpo28PUR3hkh6JCD6olBc= -github.com/roadrunner-server/jobs/v6 v6.0.0-beta.6 h1:qjU8lhLxvnYuLdkAbvFM0kjnvQ6+UWmALR5wswdWfqM= -github.com/roadrunner-server/jobs/v6 v6.0.0-beta.6/go.mod h1:+ERXbdsujwQKhlwbbLyqR0ZZgHKnG5nogbITnmMReOI= +github.com/roadrunner-server/goridge/v4 v4.0.0-beta.2 h1:MgH6oiSgcl+vphsQ6JpyedkXQ/DPf8zVpn0z7rdBp10= +github.com/roadrunner-server/goridge/v4 v4.0.0-beta.2/go.mod h1:Wv9CBO9VIU92e5iZIuehLHKakXgMkOzxoT4/oHDjIUA= +github.com/roadrunner-server/informer/v6 v6.0.0-beta.2 h1:tJsNgbQ28mK5CdQCpU+BY6ScWP884nhpGYfwJalZOlU= +github.com/roadrunner-server/informer/v6 v6.0.0-beta.2/go.mod h1:nDn5jjR1ZI1Xhz01j32bSs4PHv78IE96rcqdFr/ZvgU= +github.com/roadrunner-server/jobs/v6 v6.0.0-beta.7 h1:RNb8fkVk2GO02E3FFyCmYz6WFkeXrBfV/daK84WXAD0= +github.com/roadrunner-server/jobs/v6 v6.0.0-beta.7/go.mod h1:kHTJXgjMe/7t9vJN4imEO3RlB7crnnp59X8TmgiEOqw= github.com/roadrunner-server/logger/v6 v6.0.0-beta.3 h1:eoJKXAUSyykDfVX6eTUhmAn6Y8pS/LyI5fDP4H+G5rQ= github.com/roadrunner-server/logger/v6 v6.0.0-beta.3/go.mod h1:MwHb3AbltHYtu7nRpml5NeYu7O+W8rCpDBeNTTEoE1M= github.com/roadrunner-server/pool/v2 v2.0.0-beta.1 h1:jpYXFtdD6QGAdAGPgMxrNi3j1CegCRpb2y+A+3GnXFA= github.com/roadrunner-server/pool/v2 v2.0.0-beta.1/go.mod h1:Bo1wT7RtL3eyQHXBUohNhtj/yAmRt6Rq8smuBg5pWkY= github.com/roadrunner-server/priority_queue v1.0.6 h1:x8bcMyjWs2Z4ySbO9BTP8Dzy2prCuazJY9HHrVTmUVY= github.com/roadrunner-server/priority_queue v1.0.6/go.mod h1:aJ2D9s18+OGpFfNgwoIduraaFYBGv4FKElnpzqO+TBI= -github.com/roadrunner-server/resetter/v6 v6.0.0-beta.2 h1:j3boFQdHx0wJSQtqNnayp4HYafE6QJ5plkuWi7tEVng= -github.com/roadrunner-server/resetter/v6 v6.0.0-beta.2/go.mod h1:M3VQ25p/qeaaf8raLJqo1NAIMNxvlQ6EDhDlIbJTSZs= -github.com/roadrunner-server/rpc/v6 v6.0.0-beta.3 h1:hvVEDIMB9MKI8uWX++MrBzHRzq404ygU0fDs6U2V/3Y= -github.com/roadrunner-server/rpc/v6 v6.0.0-beta.3/go.mod h1:BpDpd2/UceDdsDJNP0iMfmegbXthxiZM4MU6GOJoSXo= +github.com/roadrunner-server/resetter/v6 v6.0.0-beta.3 h1:+hkbf/kXpvFjx4LfkuH8dvR07rBwrStmcqJaffsfL0g= +github.com/roadrunner-server/resetter/v6 v6.0.0-beta.3/go.mod h1:zV+MfVo6jtvrop+04HNcr4z3b/22qyKXukK29kzagYc= +github.com/roadrunner-server/rpc/v6 v6.0.0-beta.4 h1:Qj2nrHIWOHE9Tys+FBG2IdoPtzgIUh6juQ5wXLGGDMw= +github.com/roadrunner-server/rpc/v6 v6.0.0-beta.4/go.mod h1:k5KT3fpnJVd27m0HbGGBiTPXlWI6eJdd6C+ohp5IE0U= github.com/roadrunner-server/server/v6 v6.0.0-beta.5 h1:poCPSHc768UtMqJRgW3rZT5pDboOnwOD4qc28hhXJ4I= github.com/roadrunner-server/server/v6 v6.0.0-beta.5/go.mod h1:SbODuCzC2gcbFhAmJDWvjf34pPrUWP5NxxVsTRQDuZ4= github.com/roadrunner-server/tcplisten v1.5.2 h1:nn8yXYrhRDkfQ9AAu4V075uT4fZRmOnpxkawgE+bWPA= @@ -106,10 +110,10 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= -github.com/tklauser/go-sysconf v0.3.16 h1:frioLaCQSsF5Cy1jgRBrzr6t502KIIwQ0MArYICU0nA= -github.com/tklauser/go-sysconf v0.3.16/go.mod h1:/qNL9xxDhc7tx3HSRsLWNnuzbVfh3e7gh/BmM179nYI= -github.com/tklauser/numcpus v0.11.0 h1:nSTwhKH5e1dMNsCdVBukSZrURJRoHbSEQjdEbY+9RXw= -github.com/tklauser/numcpus v0.11.0/go.mod h1:z+LwcLq54uWZTX0u/bGobaV34u6V7KNlTZejzM6/3MQ= +github.com/tklauser/go-sysconf v0.4.0 h1:7H0uAN+7RkwWRaxhYXDLqa5V3LPrJeV8wmD9dRUgPQU= +github.com/tklauser/go-sysconf v0.4.0/go.mod h1:8mTNWyog7H+MpKijp4VmKJAd2bbYQ2zuUwkYRbUArPI= +github.com/tklauser/numcpus v0.12.0 h1:NR85qdvHA9pFse3x3weVZ0r0ST8R6l5RHbZrlRaqob4= +github.com/tklauser/numcpus v0.12.0/go.mod h1:ABHeXzJnr/qqwguhClkZKT1/8VABcYrsyUiUGobwWJg= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= @@ -136,24 +140,24 @@ go.yaml.in/yaml/v2 v2.4.4 h1:tuyd0P+2Ont/d6e2rl3be67goVK4R6deVxCUX5vyPaQ= go.yaml.in/yaml/v2 v2.4.4/go.mod h1:gMZqIpDtDqOfM0uNfy0SkpRhvUryYH0Z6wdMYcacYXQ= go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= -golang.org/x/net v0.54.0 h1:2zJIZAxAHV/OHCDTCOHAYehQzLfSXuf/5SoL/Dv6w/w= -golang.org/x/net v0.54.0/go.mod h1:Sj4oj8jK6XmHpBZU/zWHw3BV3abl4Kvi+Ut7cQcY+cQ= +golang.org/x/net v0.55.0 h1:bcvxaJn3e1U6InsFWt1JUq1aSjnRxLzT2rtD2KfkDF8= +golang.org/x/net v0.55.0/go.mod h1:L5U2KuzuOe1lY7Z+aWVIKK6qEeJXnXV9yzGA+WCHJww= golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ= -golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/sys v0.45.0 h1:dO4czNzziLiiXplLQgBCEpCvXQ3dnkn0SdaZSYdQ+FY= +golang.org/x/sys v0.45.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc= golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38= gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4= gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E= google.golang.org/genproto v0.0.0-20260504160031-60b97b32f348 h1:JjVGDZYWkJWZcxveJGzfkXC5myDVWAd4dZdgbzrDUv8= google.golang.org/genproto v0.0.0-20260504160031-60b97b32f348/go.mod h1:95PqD4xM+AdOcBGsmgfaofXsiA37uXDtDufVbntT3TU= -google.golang.org/genproto/googleapis/rpc v0.0.0-20260504160031-60b97b32f348 h1:pfIbyB44sWzHiCpRqIen67ZQnVXSfIxWrqUMk1qwODE= -google.golang.org/genproto/googleapis/rpc v0.0.0-20260504160031-60b97b32f348/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= -google.golang.org/grpc v1.81.0 h1:W3G9N3KQf3BU+YuCtGKJk0CmxQNbAISICD/9AORxLIw= -google.golang.org/grpc v1.81.0/go.mod h1:xGH9GfzOyMTGIOXBJmXt+BX/V0kcdQbdcuwQ/zNw42I= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260511170946-3700d4141b60 h1:seT2EwLWM78plQ7wcDfuWBc/4FAEAXDDiaSol4ku4qo= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260511170946-3700d4141b60/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= +google.golang.org/grpc v1.81.1 h1:VnnIIZ88UzOOKLukQi+ImGz8O1Wdp8nAGGnvOfEIWQQ= +google.golang.org/grpc v1.81.1/go.mod h1:xGH9GfzOyMTGIOXBJmXt+BX/V0kcdQbdcuwQ/zNw42I= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/tests/helpers/helpers.go b/tests/helpers/helpers.go index 1a9eec1..3cecbc9 100644 --- a/tests/helpers/helpers.go +++ b/tests/helpers/helpers.go @@ -2,78 +2,57 @@ package helpers import ( "bytes" + "context" + "crypto/tls" "net" "net/http" - "net/rpc" + "slices" "testing" "time" + "connectrpc.com/connect" "github.com/google/uuid" jobsProto "github.com/roadrunner-server/api-go/v6/jobs/v2" + "github.com/roadrunner-server/api-go/v6/jobs/v2/jobsV2connect" jobState "github.com/roadrunner-server/api-plugins/v6/jobs" - goridgeRpc "github.com/roadrunner-server/goridge/v4/pkg/rpc" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/net/http2" + "google.golang.org/protobuf/types/known/emptypb" ) -const ( - push string = "jobs.Push" - pause string = "jobs.Pause" - destroy string = "jobs.Destroy" - resume string = "jobs.Resume" - stat string = "jobs.Stat" -) - -func callPipelinesRPC(t *testing.T, method, address string, pipes ...string) { +func NewJobsClient(t *testing.T, address string) jobsV2connect.JobsServiceClient { t.Helper() - conn, err := (&net.Dialer{}).DialContext(t.Context(), "tcp", address) - require.NoError(t, err) - defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - defer func() { _ = client.Close() }() - - pipe := &jobsProto.Pipelines{Pipelines: make([]string, len(pipes))} - - for i := range len(pipes) { - pipe.GetPipelines()[i] = pipes[i] - } - - er := &jobsProto.JobsHandlerResponse{} - err = client.Call(method, pipe, er) - require.NoError(t, err) + httpc := &http.Client{Transport: &http2.Transport{ + AllowHTTP: true, + DialTLSContext: func(ctx context.Context, network, addr string, _ *tls.Config) (net.Conn, error) { + return new(net.Dialer).DialContext(ctx, network, addr) + }, + }} + t.Cleanup(httpc.CloseIdleConnections) + return jobsV2connect.NewJobsServiceClient(httpc, "http://"+address) } func ResumePipes(address string, pipes ...string) func(t *testing.T) { return func(t *testing.T) { - callPipelinesRPC(t, resume, address, pipes...) + client := NewJobsClient(t, address) + _, err := client.Resume(t.Context(), connect.NewRequest(&jobsProto.Pipelines{Pipelines: slices.Clone(pipes)})) + require.NoError(t, err) } } func PushToPipe(pipeline string, autoAck bool, address string) func(t *testing.T) { return func(t *testing.T) { - conn, err := (&net.Dialer{}).DialContext(t.Context(), "tcp", address) - require.NoError(t, err) - defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - defer func() { _ = client.Close() }() - - req := &jobsProto.PushBatchRequest{Jobs: []*jobsProto.Job{createDummyJob(pipeline, autoAck)}} - - er := &jobsProto.JobsHandlerResponse{} - err = client.Call(push, req, er) + client := NewJobsClient(t, address) + _, err := client.Push(t.Context(), connect.NewRequest(&jobsProto.PushRequest{Job: createDummyJob(pipeline, autoAck)})) require.NoError(t, err) } } func PushToPipeDelayed(address string, pipeline string, delay int64) func(t *testing.T) { return func(t *testing.T) { - conn, err := (&net.Dialer{}).DialContext(t.Context(), "tcp", address) - assert.NoError(t, err) - defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - defer func() { _ = client.Close() }() - - req := &jobsProto.PushBatchRequest{Jobs: []*jobsProto.Job{&jobsProto.Job{ + client := NewJobsClient(t, address) + req := &jobsProto.PushRequest{Job: &jobsProto.Job{ Job: "some/php/namespace", Id: uuid.NewString(), Payload: []byte(`{"hello":"world"}`), @@ -83,10 +62,8 @@ func PushToPipeDelayed(address string, pipeline string, delay int64) func(t *tes Pipeline: pipeline, Delay: delay, }, - }}} - - er := &jobsProto.JobsHandlerResponse{} - err = client.Call(push, req, er) + }} + _, err := client.Push(t.Context(), connect.NewRequest(req)) assert.NoError(t, err) } } @@ -108,60 +85,48 @@ func createDummyJob(pipeline string, autoAck bool) *jobsProto.Job { func PausePipelines(address string, pipes ...string) func(t *testing.T) { return func(t *testing.T) { - callPipelinesRPC(t, pause, address, pipes...) + client := NewJobsClient(t, address) + _, err := client.Pause(t.Context(), connect.NewRequest(&jobsProto.Pipelines{Pipelines: slices.Clone(pipes)})) + require.NoError(t, err) } } func DestroyPipelines(address string, pipes ...string) func(t *testing.T) { return func(t *testing.T) { - conn, err := (&net.Dialer{}).DialContext(t.Context(), "tcp", address) - assert.NoError(t, err) - defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - defer func() { _ = client.Close() }() - - pipe := &jobsProto.Pipelines{Pipelines: make([]string, len(pipes))} - - for i := range len(pipes) { - pipe.GetPipelines()[i] = pipes[i] - } + client := NewJobsClient(t, address) + req := &jobsProto.Pipelines{Pipelines: slices.Clone(pipes)} + // Retry the destroy 10× with 1s gaps; if all attempts fail, return + // without asserting. Some negative tests intentionally destroy + // non-existent pipelines and rely on this silent-after-retry pattern. for range 10 { - er := &jobsProto.JobsHandlerResponse{} - err = client.Call(destroy, pipe, er) - if err != nil { - time.Sleep(time.Second) - continue + _, err := client.Destroy(t.Context(), connect.NewRequest(req)) + if err == nil { + return } - assert.NoError(t, err) - break + time.Sleep(time.Second) } } } func Stats(address string, state *jobState.State) func(t *testing.T) { return func(t *testing.T) { - conn, err := (&net.Dialer{}).DialContext(t.Context(), "tcp", address) - require.NoError(t, err) - defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - defer func() { _ = client.Close() }() - - st := &jobsProto.Stats{} - er := &jobsProto.JobsHandlerResponse{} + client := NewJobsClient(t, address) - err = client.Call(stat, er, st) + resp, err := client.GetStats(t.Context(), connect.NewRequest(&emptypb.Empty{})) require.NoError(t, err) - require.NotNil(t, st) - - state.Queue = st.Stats[0].Queue - state.Pipeline = st.Stats[0].Pipeline - state.Driver = st.Stats[0].Driver - state.Active = st.Stats[0].Active - state.Delayed = st.Stats[0].Delayed - state.Reserved = st.Stats[0].Reserved - state.Ready = st.Stats[0].Ready - state.Priority = st.Stats[0].Priority + require.NotNil(t, resp) + require.NotEmpty(t, resp.Msg.GetStats()) + + st := resp.Msg.GetStats()[0] + state.Queue = st.GetQueue() + state.Pipeline = st.GetPipeline() + state.Driver = st.GetDriver() + state.Active = st.GetActive() + state.Delayed = st.GetDelayed() + state.Reserved = st.GetReserved() + state.Ready = st.GetReady() + state.Priority = st.GetPriority() } } diff --git a/tests/jobs_beanstalk_test.go b/tests/jobs_beanstalk_test.go index 8b2cf9f..b07af27 100644 --- a/tests/jobs_beanstalk_test.go +++ b/tests/jobs_beanstalk_test.go @@ -3,8 +3,6 @@ package tests import ( "context" "log/slog" - "net" - "net/rpc" "os" "os/signal" "slices" @@ -13,6 +11,10 @@ import ( "testing" "time" + "tests/helpers" + mocklogger "tests/mock" + + "connectrpc.com/connect" "github.com/beanstalkd/go-beanstalk" "github.com/google/uuid" jobsProto "github.com/roadrunner-server/api-go/v6/jobs/v2" @@ -20,7 +22,6 @@ import ( beanstalkPlugin "github.com/roadrunner-server/beanstalk/v6" "github.com/roadrunner-server/config/v6" "github.com/roadrunner-server/endure/v2" - goridgeRpc "github.com/roadrunner-server/goridge/v4/pkg/rpc" "github.com/roadrunner-server/informer/v6" "github.com/roadrunner-server/jobs/v6" "github.com/roadrunner-server/logger/v6" @@ -32,8 +33,6 @@ import ( sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" "go.opentelemetry.io/otel/trace" - "tests/helpers" - mocklogger "tests/mock" ) func TestBeanstalkInit(t *testing.T) { @@ -970,13 +969,8 @@ func TestBeanstalkOTEL(t *testing.T) { func declareBeanstalkPipe(address string) func(t *testing.T) { return func(t *testing.T) { - conn, err := (&net.Dialer{}).DialContext(t.Context(), "tcp", address) - require.NoError(t, err) - defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - defer func() { _ = client.Close() }() - - pipe := &jobsProto.DeclareRequest{Pipeline: map[string]string{ + client := helpers.NewJobsClient(t, address) + req := &jobsProto.DeclareRequest{Pipeline: map[string]string{ "driver": "beanstalk", "name": "test-3", "tube": uuid.NewString(), @@ -984,9 +978,7 @@ func declareBeanstalkPipe(address string) func(t *testing.T) { "priority": "3", "tube_priority": "10", }} - - er := &jobsProto.JobsHandlerResponse{} - err = client.Call("jobs.Declare", pipe, er) + _, err := client.Declare(t.Context(), connect.NewRequest(req)) require.NoError(t, err) } }