diff --git a/Makefile b/Makefile index 371815e..f3bb5ce 100644 --- a/Makefile +++ b/Makefile @@ -189,3 +189,27 @@ stop-dependencies: .PHONY: nuke-dependencies nuke-dependencies: $(DOCKER_COMPOSE) down --timeout 60 --volumes --remove-orphans + +# Compatibility tests +COMPAT_DIR = tests/compatibility +COMPAT_PARALLEL ?= 1 + +.PHONY: test-compatibility +test-compatibility: test-compatibility-pull + cd $(COMPAT_DIR) && GOFLAGS="" go test \ + -tags testcompatibility \ + -timeout 60m \ + -parallel $(COMPAT_PARALLEL) \ + ./matrix/ + +.PHONY: test-compatibility-pull +test-compatibility-pull: + DOCKER_CLI_HINTS=false docker pull -q temporalio/auto-setup:1.29.2 + DOCKER_CLI_HINTS=false docker pull -q temporalio/server:1.27.4 + DOCKER_CLI_HINTS=false docker pull -q temporalio/admin-tools:1.27 + DOCKER_CLI_HINTS=false docker pull -q postgres:15 + DOCKER_CLI_HINTS=false docker pull -q postgres:12 + +.PHONY: test-compatibility-tidy +test-compatibility-tidy: + cd $(COMPAT_DIR) && GOFLAGS="" go mod tidy diff --git a/tests/compatibility/go.mod b/tests/compatibility/go.mod new file mode 100644 index 0000000..80aff75 --- /dev/null +++ b/tests/compatibility/go.mod @@ -0,0 +1,69 @@ +module github.com/temporalio/s2s-proxy/tests/compatibility + +go 1.25.0 + +require ( + github.com/docker/go-connections v0.5.0 + github.com/stretchr/testify v1.11.1 + github.com/testcontainers/testcontainers-go v0.36.0 + go.temporal.io/api v1.46.0 + go.temporal.io/server v1.27.4 + google.golang.org/grpc v1.72.2 + google.golang.org/protobuf v1.36.8 + gopkg.in/yaml.v3 v3.0.1 +) + +require ( + dario.cat/mergo v1.0.1 // indirect + github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect + github.com/Microsoft/go-winio v0.6.2 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/containerd/log v0.1.0 // indirect + github.com/containerd/platforms v0.2.1 // indirect + github.com/cpuguy83/dockercfg v0.3.2 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/distribution/reference v0.6.0 // indirect + github.com/docker/docker v28.0.1+incompatible // indirect + github.com/docker/go-units v0.5.0 // indirect + github.com/ebitengine/purego v0.8.2 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-ole/go-ole v1.2.6 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect + github.com/klauspost/compress v1.18.0 // indirect + github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect + github.com/magiconair/properties v1.8.9 // indirect + github.com/moby/docker-image-spec v1.3.1 // indirect + github.com/moby/patternmatcher v0.6.0 // indirect + github.com/moby/sys/sequential v0.5.0 // indirect + github.com/moby/sys/user v0.1.0 // indirect + github.com/moby/sys/userns v0.1.0 // indirect + github.com/moby/term v0.5.0 // indirect + github.com/morikuni/aec v1.0.0 // indirect + github.com/opencontainers/go-digest v1.0.0 // indirect + github.com/opencontainers/image-spec v1.1.1 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect + github.com/shirou/gopsutil/v4 v4.25.1 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect + github.com/tklauser/go-sysconf v0.3.12 // indirect + github.com/tklauser/numcpus v0.6.1 // indirect + github.com/yusufpapurcu/wmi v1.2.4 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect + go.opentelemetry.io/otel v1.35.0 // indirect + go.opentelemetry.io/otel/metric v1.35.0 // indirect + go.opentelemetry.io/otel/trace v1.35.0 // indirect + golang.org/x/crypto v0.36.0 // indirect + golang.org/x/net v0.37.0 // indirect + golang.org/x/sys v0.31.0 // indirect + golang.org/x/text v0.23.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250324211829-b45e905df463 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463 // indirect +) + +replace github.com/temporalio/s2s-proxy => ../.. diff --git a/tests/compatibility/go.sum b/tests/compatibility/go.sum new file mode 100644 index 0000000..5cb4ad1 --- /dev/null +++ b/tests/compatibility/go.sum @@ -0,0 +1,192 @@ +dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s= +dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= +github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU= +github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8= +github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= +github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= +github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= +github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= +github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= +github.com/containerd/platforms v0.2.1 h1:zvwtM3rz2YHPQsF2CHYM8+KtB5dvhISiXh5ZpSBQv6A= +github.com/containerd/platforms v0.2.1/go.mod h1:XHCb+2/hzowdiut9rkudds9bE5yJ7npe7dG/wG+uFPw= +github.com/cpuguy83/dockercfg v0.3.2 h1:DlJTyZGBDlXqUZ2Dk2Q3xHs/FtnooJJVaad2S9GKorA= +github.com/cpuguy83/dockercfg v0.3.2/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc= +github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= +github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= +github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= +github.com/docker/docker v28.0.1+incompatible h1:FCHjSRdXhNRFjlHMTv4jUNlIBbTeRjrWfeFuJp7jpo0= +github.com/docker/docker v28.0.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c= +github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= +github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= +github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/ebitengine/purego v0.8.2 h1:jPPGWs2sZ1UgOSgD2bClL0MJIqu58nOmIcBuXr62z1I= +github.com/ebitengine/purego v0.8.2/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= +github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 h1:5ZPtiqj0JL5oKWmcsq4VMaAW5ukBEgSGXEN89zeH1Jo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3/go.mod h1:ndYquD05frm2vACXE1nsccT4oJzjhw2arTS2cpUD1PI= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= +github.com/magiconair/properties v1.8.9 h1:nWcCbLq1N2v/cpNsy5WvQ37Fb+YElfq20WJ/a8RkpQM= +github.com/magiconair/properties v1.8.9/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= +github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= +github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= +github.com/moby/patternmatcher v0.6.0 h1:GmP9lR19aU5GqSSFko+5pRqHi+Ohk1O69aFiKkVGiPk= +github.com/moby/patternmatcher v0.6.0/go.mod h1:hDPoyOpDY7OrrMDLaYoY3hf52gNCR/YOUYxkhApJIxc= +github.com/moby/sys/sequential v0.5.0 h1:OPvI35Lzn9K04PBbCLW0g4LcFAJgHsvXsRyewg5lXtc= +github.com/moby/sys/sequential v0.5.0/go.mod h1:tH2cOOs5V9MlPiXcQzRC+eEyab644PWKGRYaaV5ZZlo= +github.com/moby/sys/user v0.1.0 h1:WmZ93f5Ux6het5iituh9x2zAG7NFY9Aqi49jjE1PaQg= +github.com/moby/sys/user v0.1.0/go.mod h1:fKJhFOnsCN6xZ5gSfbM6zaHGgDJMrqt9/reuj4T7MmU= +github.com/moby/sys/userns v0.1.0 h1:tVLXkFOxVu9A64/yh59slHVv9ahO9UIev4JZusOLG/g= +github.com/moby/sys/userns v0.1.0/go.mod h1:IHUYgu/kao6N8YZlp9Cf444ySSvCmDlmzUcYfDHOl28= +github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= +github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= +github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= +github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= +github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= +github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= +github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040= +github.com/opencontainers/image-spec v1.1.1/go.mod h1:qpqAh3Dmcf36wStyyWU+kCeDgrGnAve2nCC8+7h8Q0M= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/shirou/gopsutil/v4 v4.25.1 h1:QSWkTc+fu9LTAWfkZwZ6j8MSUk4A2LV7rbH0ZqmLjXs= +github.com/shirou/gopsutil/v4 v4.25.1/go.mod h1:RoUCUpndaJFtT+2zsZzzmhvbfGoDCJ7nFXKJf8GqJbI= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/testcontainers/testcontainers-go v0.36.0 h1:YpffyLuHtdp5EUsI5mT4sRw8GZhO/5ozyDT1xWGXt00= +github.com/testcontainers/testcontainers-go v0.36.0/go.mod h1:yk73GVJ0KUZIHUtFna6MO7QS144qYpoY8lEEtU9Hed0= +github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= +github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= +github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= +github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= +github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 h1:sbiXRNDSWJOTobXh5HyQKjq6wUC5tNybqjIqDpAY4CU= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0/go.mod h1:69uWxva0WgAA/4bu2Yy70SLDBwZXuQ6PbBpbsa5iZrQ= +go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ= +go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 h1:1fTNlAIJZGWLP5FVu0fikVry1IsiUnXjf7QFvoNN3Xw= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0/go.mod h1:zjPK58DtkqQFn+YUMbx0M2XV3QgKU0gS9LeGohREyK4= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 h1:IeMeyr1aBvBiPVYihXIaeIZba6b8E1bYp7lbdxK8CQg= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0/go.mod h1:oVdCUtjq9MK9BlS7TtucsQwUcXcymNiEDjgDD2jMtZU= +go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M= +go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE= +go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY= +go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg= +go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o= +go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w= +go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs= +go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc= +go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= +go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= +go.temporal.io/api v1.46.0 h1:O1efPDB6O2B8uIeCDIa+3VZC7tZMvYsMZYQapSbHvCg= +go.temporal.io/api v1.46.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/server v1.27.4 h1:ztU0HJJZ4etpupPAG3BcFyvQ1rDvSUX9yBzj0KO71gM= +go.temporal.io/server v1.27.4/go.mod h1:6v5/iYPKipHW9dw4B48IeYoLlr2WsBHESXDQ4D8F3ys= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= +golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c= +golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= +golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/term v0.30.0 h1:PQ39fJZ+mfadBm0y5WlL4vlM7Sx1Hgf13sMIY2+QS9Y= +golang.org/x/term v0.30.0/go.mod h1:NYYFdzHoI5wRh/h5tDMdMqCqPJZEuNqVR5xJLd/n67g= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= +golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= +golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0= +golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/api v0.0.0-20250324211829-b45e905df463 h1:hE3bRWtU6uceqlh4fhrSnUyjKHMKB9KrTLLG+bc0ddM= +google.golang.org/genproto/googleapis/api v0.0.0-20250324211829-b45e905df463/go.mod h1:U90ffi8eUL9MwPcrJylN5+Mk2v3vuPDptd5yyNUiRR8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463 h1:e0AIkUUhxyBKh6ssZNrAMeqhA7RKUj42346d1y02i2g= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= +google.golang.org/grpc v1.72.2 h1:TdbGzwb82ty4OusHWepvFWGLgIbNo1/SUynEN0ssqv8= +google.golang.org/grpc v1.72.2/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM= +google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= +google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= +gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU= diff --git a/tests/compatibility/matrix/matrix_test.go b/tests/compatibility/matrix/matrix_test.go new file mode 100644 index 0000000..14ca5c4 --- /dev/null +++ b/tests/compatibility/matrix/matrix_test.go @@ -0,0 +1,36 @@ +//go:build testcompatibility + +package matrix + +import ( + "testing" + + "github.com/temporalio/s2s-proxy/tests/compatibility/specifications" + "github.com/temporalio/s2s-proxy/tests/compatibility/specifications/temporal_1_27_4_postgres12" + "github.com/temporalio/s2s-proxy/tests/compatibility/specifications/temporal_1_29_2_postgres15" +) + +func TestTopology_1_29_2_same(t *testing.T) { + t.Parallel() + run(t, []specifications.ClusterSpec{ + temporal_1_29_2_postgres15.New(), + temporal_1_29_2_postgres15.New(), + }) +} + +func TestTopology_1_29_2_triplet(t *testing.T) { + t.Parallel() + run(t, []specifications.ClusterSpec{ + temporal_1_29_2_postgres15.New(), + temporal_1_29_2_postgres15.New(), + temporal_1_29_2_postgres15.New(), + }) +} + +func TestTopology_1_29_2_vs_1_27_4(t *testing.T) { + t.Parallel() + run(t, []specifications.ClusterSpec{ + temporal_1_29_2_postgres15.New(), + temporal_1_27_4_postgres12.New(), + }) +} diff --git a/tests/compatibility/matrix/run.go b/tests/compatibility/matrix/run.go new file mode 100644 index 0000000..0ded280 --- /dev/null +++ b/tests/compatibility/matrix/run.go @@ -0,0 +1,63 @@ +//go:build testcompatibility + +package matrix + +import ( + "fmt" + "math/rand" + "testing" + + "github.com/temporalio/s2s-proxy/tests/compatibility/specifications" + "github.com/temporalio/s2s-proxy/tests/compatibility/suite" + "github.com/temporalio/s2s-proxy/tests/compatibility/topology" +) + +// run runs the full test suite for a single topology of cluster specs. +func run(t *testing.T, specs []specifications.ClusterSpec) { + t.Helper() + + top := topology.NewTopology(t, newTopologyID(), specs) + + healthPassed := t.Run("Health", func(t *testing.T) { + for _, cluster := range top.Clusters() { + if !t.Run(cluster.Name(), func(t *testing.T) { suite.RunClusterHealthSuite(t, top, cluster) }) { + return + } + } + + for _, proxy := range top.Proxies() { + if !t.Run(proxy.Name(), func(t *testing.T) { suite.RunProxyHealthSuite(t, top, proxy) }) { + return + } + } + }) + + // If health checks fail, we skip the rest of the tests to avoid misleading failures. + // If health / Proxy tests fail, this is likely indication of cluster configuration issues. + if !healthPassed { + return + } + + t.Run("Connectivity", func(t *testing.T) { + for _, proxy := range top.Proxies() { + t.Run(proxy.Name(), func(t *testing.T) { + suite.RunConnectivitySuite(t, top, proxy) + }) + } + }) + + t.Run("Replication", func(t *testing.T) { + for _, cluster := range top.Clusters() { + t.Run(cluster.Name(), func(t *testing.T) { + suite.RunReplicationSuite(t, top, cluster) + }) + } + }) +} + +// newTopologyID generates a random topology ID for test isolation. +// Used to separate parallel topology runs in the global Docker runtime. +// For example, to ensure that the "cluster-a" in "topology-1" and "topology-2" are isolated. +func newTopologyID() string { + return fmt.Sprintf("%08x", rand.Uint32()) //nolint:gosec +} diff --git a/tests/compatibility/specifications/temporal_1_27_4_postgres12/config.go b/tests/compatibility/specifications/temporal_1_27_4_postgres12/config.go new file mode 100644 index 0000000..ad0d892 --- /dev/null +++ b/tests/compatibility/specifications/temporal_1_27_4_postgres12/config.go @@ -0,0 +1,29 @@ +//go:build testcompatibility + +package temporal_1_27_4_postgres12 + +import ( + _ "embed" + + "github.com/temporalio/s2s-proxy/tests/compatibility/specifications" +) + +//go:embed server_config.yaml +var serverConfigYAML string + +//go:embed server_setup_schema.sh +var serverSetupSchemaTemplate string + +func New() specifications.ClusterSpec { + return specifications.ClusterSpec{ + Server: specifications.ServerSpec{ + Image: "temporalio/server:1.27.4", + AdminToolsImage: "temporalio/admin-tools:1.27", + ConfigTemplate: serverConfigYAML, + SetupSchemaTemplate: serverSetupSchemaTemplate, + }, + Database: specifications.DatabaseSpec{ + Image: "postgres:12", + }, + } +} diff --git a/tests/compatibility/specifications/temporal_1_27_4_postgres12/server_config.yaml b/tests/compatibility/specifications/temporal_1_27_4_postgres12/server_config.yaml new file mode 100644 index 0000000..bfdf954 --- /dev/null +++ b/tests/compatibility/specifications/temporal_1_27_4_postgres12/server_config.yaml @@ -0,0 +1,91 @@ +log: + stdout: true + level: "warn" + +persistence: + defaultStore: default + visibilityStore: visibility + numHistoryShards: 1 + datastores: + default: + sql: + pluginName: "postgres12" + databaseName: "temporal" + connectAddr: "[[.DatabaseAlias]]:5432" + connectProtocol: "tcp" + user: "temporal" + password: "temporal" + maxConns: 20 + maxIdleConns: 20 + maxConnLifetime: "1h" + visibility: + sql: + pluginName: "postgres12" + databaseName: "temporal_visibility" + connectAddr: "[[.DatabaseAlias]]:5432" + connectProtocol: "tcp" + user: "temporal" + password: "temporal" + maxConns: 10 + maxIdleConns: 10 + maxConnLifetime: "1h" + +global: + membership: + maxJoinDuration: 30s + broadcastAddress: "{{ default .Env.TEMPORAL_BROADCAST_ADDRESS "" }}" + +services: + frontend: + rpc: + grpcPort: 7233 + membershipPort: 6933 + bindOnIP: "0.0.0.0" + httpPort: 7243 + matching: + rpc: + grpcPort: 7235 + membershipPort: 6935 + bindOnIP: "0.0.0.0" + history: + rpc: + grpcPort: 7234 + membershipPort: 6934 + bindOnIP: "0.0.0.0" + worker: + rpc: + grpcPort: 7239 + membershipPort: 6939 + bindOnIP: "0.0.0.0" + +clusterMetadata: + enableGlobalNamespace: true + failoverVersionIncrement: 10 + masterClusterName: "[[.ClusterName]]" + currentClusterName: "[[.ClusterName]]" + clusterInformation: + [[.ClusterName]]: + enabled: true + initialFailoverVersion: [[.InitialFailoverVersion]] + rpcName: "frontend" + rpcAddress: "[[.NetworkAlias]]:7233" + +dcRedirectionPolicy: + policy: "all-apis-forwarding" + +dynamicConfigClient: + filepath: "config/dynamicconfig/docker.yaml" + pollInterval: "10s" + +archival: + history: + state: "disabled" + visibility: + state: "disabled" + +namespaceDefaults: + archival: + history: + state: "disabled" + visibility: + state: "disabled" diff --git a/tests/compatibility/specifications/temporal_1_27_4_postgres12/server_setup_schema.sh b/tests/compatibility/specifications/temporal_1_27_4_postgres12/server_setup_schema.sh new file mode 100644 index 0000000..5398979 --- /dev/null +++ b/tests/compatibility/specifications/temporal_1_27_4_postgres12/server_setup_schema.sh @@ -0,0 +1,12 @@ +set -e +export SQL_PASSWORD=temporal +T="temporal-sql-tool --plugin postgres12 --ep [[.DatabaseAlias]] -u temporal -p 5432" + +$T --db temporal setup-schema -v 0.0 +$T --db temporal update-schema -d /etc/temporal/schema/postgresql/v12/temporal/versioned + +$T --db temporal_visibility create +$T --db temporal_visibility setup-schema -v 0.0 +$T --db temporal_visibility update-schema -d /etc/temporal/schema/postgresql/v12/visibility/versioned + +echo "temporal schema setup complete" diff --git a/tests/compatibility/specifications/temporal_1_29_2_postgres15/config.go b/tests/compatibility/specifications/temporal_1_29_2_postgres15/config.go new file mode 100644 index 0000000..c2aba40 --- /dev/null +++ b/tests/compatibility/specifications/temporal_1_29_2_postgres15/config.go @@ -0,0 +1,29 @@ +//go:build testcompatibility + +package temporal_1_29_2_postgres15 + +import ( + _ "embed" + + "github.com/temporalio/s2s-proxy/tests/compatibility/specifications" +) + +//go:embed server_config.yaml +var serverConfigTemplate string + +//go:embed server_setup_schema.sh +var setupSchemaTemplate string + +func New() specifications.ClusterSpec { + return specifications.ClusterSpec{ + Server: specifications.ServerSpec{ + Image: "temporalio/server:1.29.2", + AdminToolsImage: "temporalio/admin-tools:1.29", + ConfigTemplate: serverConfigTemplate, + SetupSchemaTemplate: setupSchemaTemplate, + }, + Database: specifications.DatabaseSpec{ + Image: "postgres:15", + }, + } +} diff --git a/tests/compatibility/specifications/temporal_1_29_2_postgres15/server_config.yaml b/tests/compatibility/specifications/temporal_1_29_2_postgres15/server_config.yaml new file mode 100644 index 0000000..bfdf954 --- /dev/null +++ b/tests/compatibility/specifications/temporal_1_29_2_postgres15/server_config.yaml @@ -0,0 +1,91 @@ +log: + stdout: true + level: "warn" + +persistence: + defaultStore: default + visibilityStore: visibility + numHistoryShards: 1 + datastores: + default: + sql: + pluginName: "postgres12" + databaseName: "temporal" + connectAddr: "[[.DatabaseAlias]]:5432" + connectProtocol: "tcp" + user: "temporal" + password: "temporal" + maxConns: 20 + maxIdleConns: 20 + maxConnLifetime: "1h" + visibility: + sql: + pluginName: "postgres12" + databaseName: "temporal_visibility" + connectAddr: "[[.DatabaseAlias]]:5432" + connectProtocol: "tcp" + user: "temporal" + password: "temporal" + maxConns: 10 + maxIdleConns: 10 + maxConnLifetime: "1h" + +global: + membership: + maxJoinDuration: 30s + broadcastAddress: "{{ default .Env.TEMPORAL_BROADCAST_ADDRESS "" }}" + +services: + frontend: + rpc: + grpcPort: 7233 + membershipPort: 6933 + bindOnIP: "0.0.0.0" + httpPort: 7243 + matching: + rpc: + grpcPort: 7235 + membershipPort: 6935 + bindOnIP: "0.0.0.0" + history: + rpc: + grpcPort: 7234 + membershipPort: 6934 + bindOnIP: "0.0.0.0" + worker: + rpc: + grpcPort: 7239 + membershipPort: 6939 + bindOnIP: "0.0.0.0" + +clusterMetadata: + enableGlobalNamespace: true + failoverVersionIncrement: 10 + masterClusterName: "[[.ClusterName]]" + currentClusterName: "[[.ClusterName]]" + clusterInformation: + [[.ClusterName]]: + enabled: true + initialFailoverVersion: [[.InitialFailoverVersion]] + rpcName: "frontend" + rpcAddress: "[[.NetworkAlias]]:7233" + +dcRedirectionPolicy: + policy: "all-apis-forwarding" + +dynamicConfigClient: + filepath: "config/dynamicconfig/docker.yaml" + pollInterval: "10s" + +archival: + history: + state: "disabled" + visibility: + state: "disabled" + +namespaceDefaults: + archival: + history: + state: "disabled" + visibility: + state: "disabled" diff --git a/tests/compatibility/specifications/temporal_1_29_2_postgres15/server_setup_schema.sh b/tests/compatibility/specifications/temporal_1_29_2_postgres15/server_setup_schema.sh new file mode 100644 index 0000000..5398979 --- /dev/null +++ b/tests/compatibility/specifications/temporal_1_29_2_postgres15/server_setup_schema.sh @@ -0,0 +1,12 @@ +set -e +export SQL_PASSWORD=temporal +T="temporal-sql-tool --plugin postgres12 --ep [[.DatabaseAlias]] -u temporal -p 5432" + +$T --db temporal setup-schema -v 0.0 +$T --db temporal update-schema -d /etc/temporal/schema/postgresql/v12/temporal/versioned + +$T --db temporal_visibility create +$T --db temporal_visibility setup-schema -v 0.0 +$T --db temporal_visibility update-schema -d /etc/temporal/schema/postgresql/v12/visibility/versioned + +echo "temporal schema setup complete" diff --git a/tests/compatibility/specifications/types.go b/tests/compatibility/specifications/types.go new file mode 100644 index 0000000..61e8049 --- /dev/null +++ b/tests/compatibility/specifications/types.go @@ -0,0 +1,22 @@ +//go:build testcompatibility + +package specifications + +// ClusterSpec describes the Temporal Cluster configuration. +type ClusterSpec struct { + Server ServerSpec + Database DatabaseSpec +} + +// ServerSpec describes the Temporal Cluster's Server configuration. +type ServerSpec struct { + Image string // e.g. "temporalio/server:1.29.2" + AdminToolsImage string // e.g. "temporalio/admin-tools:1.29" + ConfigTemplate string // YAML template, [[ ]] delimiters for substitution + SetupSchemaTemplate string // Shell script template, [[ ]] delimiters for substitution +} + +// DatabaseSpec describes the Temporal Cluster's Database configuration. +type DatabaseSpec struct { + Image string // Image is the Docker image for the database container, e.g. "postgres:15". +} diff --git a/tests/compatibility/suite/ops.go b/tests/compatibility/suite/ops.go new file mode 100644 index 0000000..95e545f --- /dev/null +++ b/tests/compatibility/suite/ops.go @@ -0,0 +1,180 @@ +//go:build testcompatibility + +package suite + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + commonv1 "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" + replicationv1 "go.temporal.io/api/replication/v1" + workflowservicev1 "go.temporal.io/api/workflowservice/v1" + "google.golang.org/protobuf/types/known/durationpb" + + "github.com/temporalio/s2s-proxy/tests/compatibility/topology" +) + +// Ops provides a slew of test operations for making changes and assertions across the topology +type Ops struct { + t testing.TB + top topology.Topology +} + +// Active returns the cluster where ns is active. +func (ops Ops) Active(ns string) topology.ClusterHandle { + for _, c := range ops.top.Clusters() { + ctx, cancel := context.WithTimeout(context.Background(), rpcTimeout) + resp, err := c.FrontendClient().DescribeNamespace(ctx, &workflowservicev1.DescribeNamespaceRequest{Namespace: ns}) + cancel() + if err != nil { + continue + } + if resp.GetReplicationConfig().GetActiveClusterName() == c.Name() { + return c + } + } + return nil +} + +// Passives returns the clusters where ns is not active. +func (ops Ops) Passives(ns string) []topology.ClusterHandle { + active := ops.Active(ns) + if active == nil { + return nil + } + var passives []topology.ClusterHandle + for _, c := range ops.top.Clusters() { + if c.Name() != active.Name() { + passives = append(passives, c) + } + } + return passives +} + +// RegisterNamespace registers a global namespace with the provided name on target. +func (ops Ops) RegisterNamespace( + ctx context.Context, + target topology.ClusterHandle, + base string, +) string { + + // Step 1. Create a deduped namespace name (in case of parallel test runs). + ns := fmt.Sprintf("%s-%d", base, time.Now().UnixNano()%1_000_000) + + // Step 2. Register the namespace on target, including all clusters in the replication config. + var clusterConfigs []*replicationv1.ClusterReplicationConfig + for _, c := range ops.top.Clusters() { + clusterConfigs = append(clusterConfigs, &replicationv1.ClusterReplicationConfig{ + ClusterName: c.Name(), + }) + } + require.New(ops.t).Eventually(func() bool { + nsCtx, cancel := context.WithTimeout(ctx, rpcTimeout) + defer cancel() + _, err := target.FrontendClient().RegisterNamespace(nsCtx, &workflowservicev1.RegisterNamespaceRequest{ + Namespace: ns, + IsGlobalNamespace: true, + WorkflowExecutionRetentionPeriod: durationpb.New(72 * time.Hour), + Clusters: clusterConfigs, + ActiveClusterName: target.Name(), + }) + if err != nil { + ops.t.Logf("RegisterNamespace attempt failed (expected if metadata not yet propagated): %v", err) + } + return err == nil + }, registerNamespaceTimeout, registerNamespaceInterval, + "register global namespace %s on %s", ns, target.Name()) + return ns +} + +// WaitForNamespace polls cluster until ns is visible. +func (ops Ops) WaitForNamespace(cluster topology.ClusterHandle, ns string) { + require.New(ops.t).Eventually(func() bool { + ctx, cancel := context.WithTimeout(context.Background(), rpcTimeout) + defer cancel() + resp, err := cluster.FrontendClient().DescribeNamespace(ctx, + &workflowservicev1.DescribeNamespaceRequest{Namespace: ns}) + return err == nil && resp.GetNamespaceInfo().GetName() == ns + }, replicationTimeout, replicationInterval, + "namespace %s should replicate to %s", ns, cluster.Name()) +} + +// WaitForWorkflowVisible polls each passive cluster until workflowID is visible in ns. +func (ops Ops) WaitForWorkflowVisible(ctx context.Context, passives []topology.ClusterHandle, ns, wfID string) { + for _, passive := range passives { + passive := passive + require.New(ops.t).Eventually(func() bool { + return isWorkflowVisibleOn(ctx, passive, ns, wfID) + }, replicationTimeout, replicationInterval, + "workflow should replicate to %s", passive.Name()) + } +} + +// WaitForWorkflowTerminated asserts Eventually that workflowID has TERMINATED status on each cluster in passives. +func (ops Ops) WaitForWorkflowTerminated(ctx context.Context, passives []topology.ClusterHandle, ns, wfID string) { + for _, passive := range passives { + passive := passive + require.New(ops.t).Eventually(func() bool { + return isWorkflowTerminatedOn(ctx, passive, ns, wfID) + }, replicationTimeout, replicationInterval, + "TERMINATED should replicate to %s", passive.Name()) + } +} + +// TerminateWorkflow terminates wfID on cluster. +func (ops Ops) TerminateWorkflow(ctx context.Context, cluster topology.ClusterHandle, ns, wfID string) { + termCtx, cancel := context.WithTimeout(ctx, writeTimeout) + defer cancel() + _, err := cluster.FrontendClient().TerminateWorkflowExecution(termCtx, &workflowservicev1.TerminateWorkflowExecutionRequest{ + Namespace: ns, + WorkflowExecution: &commonv1.WorkflowExecution{WorkflowId: wfID}, + Reason: "compatibility test complete", + }) + require.New(ops.t).NoError(err, "terminate workflow on %s", cluster.Name()) +} + +// SetActive issues UpdateNamespace to fail ns over to target, then polls Active() until target confirms it is the active cluster. +func (ops Ops) SetActive(ns string, target topology.ClusterHandle) { + failCtx, cancel := context.WithTimeout(context.Background(), writeTimeout) + defer cancel() + _, err := target.FrontendClient().UpdateNamespace(failCtx, &workflowservicev1.UpdateNamespaceRequest{ + Namespace: ns, + ReplicationConfig: &replicationv1.NamespaceReplicationConfig{ + ActiveClusterName: target.Name(), + }, + }) + require.New(ops.t).NoError(err, "SetActive: UpdateNamespace to %s", target.Name()) + + require.New(ops.t).Eventually(func() bool { + active := ops.Active(ns) + return active != nil && active.Name() == target.Name() + }, replicationTimeout, replicationInterval, + "namespace %s should become active on %s", ns, target.Name()) +} + +func isWorkflowVisibleOn(ctx context.Context, cluster topology.ClusterHandle, ns, wfID string) bool { + rCtx, cancel := context.WithTimeout(ctx, rpcTimeout) + defer cancel() + resp, err := cluster.FrontendClient().DescribeWorkflowExecution(rCtx, &workflowservicev1.DescribeWorkflowExecutionRequest{ + Namespace: ns, + Execution: &commonv1.WorkflowExecution{WorkflowId: wfID}, + }) + return err == nil && resp.GetWorkflowExecutionInfo() != nil +} + +func isWorkflowTerminatedOn(ctx context.Context, cluster topology.ClusterHandle, ns, wfID string) bool { + rCtx, cancel := context.WithTimeout(ctx, rpcTimeout) + defer cancel() + resp, err := cluster.FrontendClient().DescribeWorkflowExecution(rCtx, &workflowservicev1.DescribeWorkflowExecutionRequest{ + Namespace: ns, + Execution: &commonv1.WorkflowExecution{WorkflowId: wfID}, + }) + if err != nil { + return false + } + return resp.GetWorkflowExecutionInfo().GetStatus() == enumspb.WORKFLOW_EXECUTION_STATUS_TERMINATED +} diff --git a/tests/compatibility/suite/suite.go b/tests/compatibility/suite/suite.go new file mode 100644 index 0000000..56a39a2 --- /dev/null +++ b/tests/compatibility/suite/suite.go @@ -0,0 +1,19 @@ +//go:build testcompatibility + +package suite + +import ( + "github.com/stretchr/testify/suite" + + "github.com/temporalio/s2s-proxy/tests/compatibility/topology" +) + +// BaseSuite is the shared base for all compatibility suites, providing access to the topology and topology operations. +type BaseSuite struct { + suite.Suite + top topology.Topology +} + +func (s *BaseSuite) Ops() Ops { + return Ops{t: s.T(), top: s.top} +} diff --git a/tests/compatibility/suite/suite_cluster_health.go b/tests/compatibility/suite/suite_cluster_health.go new file mode 100644 index 0000000..5a7674a --- /dev/null +++ b/tests/compatibility/suite/suite_cluster_health.go @@ -0,0 +1,43 @@ +//go:build testcompatibility + +package suite + +import ( + "context" + "testing" + + "github.com/stretchr/testify/suite" + workflowservicev1 "go.temporal.io/api/workflowservice/v1" + + "github.com/temporalio/s2s-proxy/tests/compatibility/topology" +) + +type ClusterHealthSuite struct { + BaseSuite + cluster topology.ClusterHandle +} + +func (s *ClusterHealthSuite) TestGetSystemInfo() { + ctx, cancel := context.WithTimeout(context.Background(), healthCheckTimeout) + defer cancel() + + // Step 1: GetSystemInfo - verifies gRPC connectivity and basic Temporal responsiveness. + s.T().Logf("checking cluster %s at %s", s.cluster.Name(), s.cluster.HostAddr()) + sysResp, err := s.cluster.FrontendClient().GetSystemInfo(ctx, &workflowservicev1.GetSystemInfoRequest{}) + s.Require().NoError(err, "GetSystemInfo on %s", s.cluster.Name()) + s.T().Logf("cluster %s: Temporal server version=%s", s.cluster.Name(), sysResp.GetServerVersion()) +} + +func (s *ClusterHealthSuite) TestListNamespaces() { + ctx, cancel := context.WithTimeout(context.Background(), healthCheckTimeout) + defer cancel() + + // Step 1: ListNamespaces - verifies the namespace endpoint is healthy. + _, err := s.cluster.FrontendClient().ListNamespaces(ctx, &workflowservicev1.ListNamespacesRequest{}) + s.Require().NoError(err, "ListNamespaces on %s", s.cluster.Name()) +} + +func RunClusterHealthSuite(t *testing.T, top topology.Topology, cluster topology.ClusterHandle) { + s := &ClusterHealthSuite{BaseSuite: BaseSuite{top: top}, cluster: cluster} + suite.Run(t, s) +} diff --git a/tests/compatibility/suite/suite_connectivity.go b/tests/compatibility/suite/suite_connectivity.go new file mode 100644 index 0000000..cdb0d0b --- /dev/null +++ b/tests/compatibility/suite/suite_connectivity.go @@ -0,0 +1,45 @@ +//go:build testcompatibility + +package suite + +import ( + "context" + "testing" + + "github.com/stretchr/testify/suite" + workflowservicev1 "go.temporal.io/api/workflowservice/v1" + + "github.com/temporalio/s2s-proxy/tests/compatibility/topology" +) + +type ConnectivitySuite struct { + BaseSuite + proxy topology.ProxyHandle +} + +func (s *ConnectivitySuite) TestProxyForwarding() { + ctx, cancel := context.WithTimeout(context.Background(), rpcTimeout) + defer cancel() + + // Step 1: GetSystemInfo via proxy - verifies the proxy forwards gRPC requests to the backend cluster. + resp, err := s.proxy.FrontendClient().GetSystemInfo(ctx, &workflowservicev1.GetSystemInfoRequest{}) + s.Require().NoError(err) + + // Step 2: Verify capabilities - confirms a live Temporal response was returned through the proxy. + s.Require().NotNil(resp.GetCapabilities()) +} + +func (s *ConnectivitySuite) TestListNamespacesViaProxy() { + ctx, cancel := context.WithTimeout(context.Background(), rpcTimeout) + defer cancel() + + // Step 1: ListNamespaces via proxy - verifies the namespace endpoint is reachable through the proxy. + resp, err := s.proxy.FrontendClient().ListNamespaces(ctx, &workflowservicev1.ListNamespacesRequest{}) + s.Require().NoError(err) + s.Require().NotNil(resp) +} + +func RunConnectivitySuite(t *testing.T, top topology.Topology, proxy topology.ProxyHandle) { + s := &ConnectivitySuite{BaseSuite: BaseSuite{top: top}, proxy: proxy} + suite.Run(t, s) +} diff --git a/tests/compatibility/suite/suite_proxy_health.go b/tests/compatibility/suite/suite_proxy_health.go new file mode 100644 index 0000000..01dda36 --- /dev/null +++ b/tests/compatibility/suite/suite_proxy_health.go @@ -0,0 +1,37 @@ +//go:build testcompatibility + +package suite + +import ( + "context" + "testing" + + "github.com/stretchr/testify/suite" + workflowservicev1 "go.temporal.io/api/workflowservice/v1" + + "github.com/temporalio/s2s-proxy/tests/compatibility/topology" +) + +type ProxyHealthSuite struct { + BaseSuite + proxy topology.ProxyHandle +} + +func (s *ProxyHealthSuite) TestGetSystemInfo() { + ctx, cancel := context.WithTimeout(context.Background(), healthCheckTimeout) + defer cancel() + + // Step 1: GetSystemInfo - verifies gRPC connectivity through the proxy. + s.T().Logf("checking %s at %s", s.proxy.Name(), s.proxy.HostAddr()) + resp, err := s.proxy.FrontendClient().GetSystemInfo(ctx, &workflowservicev1.GetSystemInfoRequest{}) + s.Require().NoError(err, "GetSystemInfo via %s", s.proxy.Name()) + + // Step 2: Verify capabilities - confirms the proxy is forwarding a live Temporal response. + s.Require().NotNil(resp.GetCapabilities(), "%s: GetSystemInfo returned nil capabilities", s.proxy.Name()) + s.T().Logf("%s: reachable, server version=%s", s.proxy.Name(), resp.GetServerVersion()) +} + +func RunProxyHealthSuite(t *testing.T, top topology.Topology, proxy topology.ProxyHandle) { + s := &ProxyHealthSuite{BaseSuite: BaseSuite{top: top}, proxy: proxy} + suite.Run(t, s) +} diff --git a/tests/compatibility/suite/suite_replication.go b/tests/compatibility/suite/suite_replication.go new file mode 100644 index 0000000..bcd1830 --- /dev/null +++ b/tests/compatibility/suite/suite_replication.go @@ -0,0 +1,86 @@ +//go:build testcompatibility + +package suite + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/suite" + commonv1 "go.temporal.io/api/common/v1" + taskqueuev1 "go.temporal.io/api/taskqueue/v1" + workflowservicev1 "go.temporal.io/api/workflowservice/v1" + + "github.com/temporalio/s2s-proxy/tests/compatibility/topology" +) + +type ReplicationSuite struct { + BaseSuite + active topology.ClusterHandle +} + +func (s *ReplicationSuite) TestNamespaceReplication() { + ctx := context.Background() + + // Step 1: Register a global namespace with s.active as the initial active cluster. + ns := s.Ops().RegisterNamespace(ctx, s.active, "compatibility-ns") + + // Step 2: Validate that the active cluster for this namespace is the one we passed in. + active := s.Ops().Active(ns) + s.Require().NotNil(active) + s.Require().Equal(s.active.Name(), active.Name()) + + // Step 3: Verify namespace has replicated to all passive clusters. + for _, passive := range s.Ops().Passives(ns) { + s.Ops().WaitForNamespace(passive, ns) + } +} + +func (s *ReplicationSuite) TestWorkflowReplication() { + ctx := context.Background() + + // Step 1: Register a global namespace with s.active as the initial active cluster. + ns := s.Ops().RegisterNamespace(ctx, s.active, "compatibility-wf-ns") + + // Step 2: Validate that the active cluster for this namespace is the one we passed in. + active := s.Ops().Active(ns) + s.Require().NotNil(active) + s.Require().Equal(s.active.Name(), active.Name()) + + // Step 3: Wait for namespace replication to all passive clusters. + passives := s.Ops().Passives(ns) + for _, c := range passives { + s.Ops().WaitForNamespace(c, ns) + } + + // Step 4: Start a workflow on the active cluster. + workflowID := fmt.Sprintf("compatibility-wf-%s-%d", s.active.Name(), time.Now().UnixNano()) + startCtx, cancel := context.WithTimeout(ctx, writeTimeout) + defer cancel() + _, err := s.active.FrontendClient().StartWorkflowExecution(startCtx, + &workflowservicev1.StartWorkflowExecutionRequest{ + Namespace: ns, + WorkflowId: workflowID, + WorkflowType: &commonv1.WorkflowType{Name: "compatibility-wf"}, + TaskQueue: &taskqueuev1.TaskQueue{Name: "compatibility-tq"}, + RequestId: workflowID + "-start", + }) + s.Require().NoError(err, "start workflow on %s", s.active.Name()) + + // Step 5: Verify workflow history has replicated to all passive clusters. + s.Ops().WaitForWorkflowVisible(ctx, passives, ns, workflowID) + + // Step 6: Terminate the workflow on the active cluster. + s.Ops().TerminateWorkflow(ctx, s.active, ns, workflowID) + + // Step 7: Verify termination has replicated to all passive clusters. + s.Ops().WaitForWorkflowTerminated(ctx, passives, ns, workflowID) +} + +// RunReplicationSuite runs the ReplicationSuite with the given active cluster against the provided Env. +func RunReplicationSuite(t *testing.T, top topology.Topology, active topology.ClusterHandle) { + s := &ReplicationSuite{BaseSuite: BaseSuite{top: top}, active: active} + suite.Run(t, s) +} diff --git a/tests/compatibility/suite/timeouts.go b/tests/compatibility/suite/timeouts.go new file mode 100644 index 0000000..91e7db2 --- /dev/null +++ b/tests/compatibility/suite/timeouts.go @@ -0,0 +1,19 @@ +//go:build testcompatibility + +package suite + +import "time" + +const ( + replicationTimeout = 60 * time.Second + replicationInterval = 2 * time.Second + + registerNamespaceTimeout = 2 * time.Minute + registerNamespaceInterval = 3 * time.Second + + rpcTimeout = 10 * time.Second + + writeTimeout = 15 * time.Second + + healthCheckTimeout = 30 * time.Second +) diff --git a/tests/compatibility/topology/addr.go b/tests/compatibility/topology/addr.go new file mode 100644 index 0000000..d39abfd --- /dev/null +++ b/tests/compatibility/topology/addr.go @@ -0,0 +1,17 @@ +//go:build testcompatibility + +package topology + +import "fmt" + +// listenAddr returns a bind address for the given port, e.g. "0.0.0.0:6233". +func listenAddr(port int) string { return fmt.Sprintf("0.0.0.0:%d", port) } + +// exposedPort returns the testcontainers exposed-port string, e.g. "6233/tcp". +func exposedPort(port int) string { return fmt.Sprintf("%d/tcp", port) } + +// containerAddr returns a Docker-network address, e.g. "proxy-a:6233". +func containerAddr(alias string, port int) string { return fmt.Sprintf("%s:%d", alias, port) } + +// hostAddr returns a host-side dial address for a mapped port, e.g. "127.0.0.1:32768". +func hostAddr(port string) string { return "127.0.0.1:" + port } diff --git a/tests/compatibility/topology/cluster.go b/tests/compatibility/topology/cluster.go new file mode 100644 index 0000000..c5b5e38 --- /dev/null +++ b/tests/compatibility/topology/cluster.go @@ -0,0 +1,238 @@ +//go:build testcompatibility + +package topology + +import ( + "bytes" + "context" + "errors" + "fmt" + "strings" + "text/template" + "time" + + "github.com/docker/go-connections/nat" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" + workflowservicev1 "go.temporal.io/api/workflowservice/v1" + adminservicev1 "go.temporal.io/server/api/adminservice/v1" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/temporalio/s2s-proxy/tests/compatibility/specifications" +) + +const ( + serverGRPCPort = 7233 + schemaSetupTimeout = 90 * time.Second + serverConfigMountPath = "/etc/temporal/config/config_template.yaml" +) + +type clusterHandle struct { + name string + hostAddr string + internalAddr string + + conn *grpc.ClientConn + + db databaseHandle + container testcontainers.Container + logConsumer *logConsumer +} + +func (c *clusterHandle) Name() string { return c.name } +func (c *clusterHandle) HostAddr() string { return c.hostAddr } +func (c *clusterHandle) InternalAddr() string { return c.internalAddr } + +func (c *clusterHandle) FrontendClient() workflowservicev1.WorkflowServiceClient { + return workflowservicev1.NewWorkflowServiceClient(c.conn) +} + +func (c *clusterHandle) AdminClient() adminservicev1.AdminServiceClient { + return adminservicev1.NewAdminServiceClient(c.conn) +} + +func (c *clusterHandle) Logs() []string { + if c.logConsumer == nil { + return nil + } + return c.logConsumer.Logs() +} + +// Stop the cluster. +func (c *clusterHandle) Stop(ctx context.Context) error { + + // Step 1. Close the gRPC connection. + var errs []error + if c.conn != nil { + if err := c.conn.Close(); err != nil { + errs = append(errs, err) + } + } + + // Step 2. Terminate the Temporal Container. + if c.container != nil { + if err := c.container.Terminate(ctx); err != nil { + errs = append(errs, fmt.Errorf("terminate server: %w", err)) + } + } + + // Step 3. Stop the database container. + if c.db != nil { + if err := c.db.Stop(ctx); err != nil { + errs = append(errs, fmt.Errorf("stop database: %w", err)) + } + } + if len(errs) > 0 { + return fmt.Errorf("cluster %s stop: %w", c.name, errors.Join(errs...)) + } + return nil +} + +// templateData is used for rendering templated files. +// Add attributes here to use in templating (eg server config, server setup). +type templateData struct { + ClusterName string + DatabaseAlias string + NetworkAlias string + InitialFailoverVersion int +} + +func renderTemplate(name, tmpl string, data any) string { + t := template.Must( + // Use [[ ]] as our substitution delimiters to avoid clashes with yaml's {{}} substitution + template.New(name).Delims("[[", "]]").Parse(tmpl), + ) + var buf bytes.Buffer + if err := t.Execute(&buf, data); err != nil { + panic(fmt.Sprintf("render %s: %v", name, err)) + } + return buf.String() +} + +func runSchemaSetup(ctx context.Context, adminToolsImage, script string, network Network) error { + req := testcontainers.ContainerRequest{ + Image: adminToolsImage, + Networks: []string{network.Name}, + Entrypoint: []string{"/bin/sh", "-c"}, + Cmd: []string{script}, + WaitingFor: wait.ForLog("temporal schema setup complete").WithStartupTimeout(schemaSetupTimeout), + } + + c, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + if err != nil { + return fmt.Errorf("schema setup container: %w", err) + } + + return c.Terminate(ctx) +} + +// makeContainerName returns a unique Docker container name for this topology run. +func makeContainerName(topologyID, kind, label string) string { + return fmt.Sprintf("s2sproxy-compat-%s-%s-%s", topologyID, kind, label) +} + +// startCluster starts the Temporal Cluster (including dependencies). +func startCluster( + ctx context.Context, + spec specifications.ClusterSpec, + net Network, + label string, // label is the positional key ("a", "b", ...) + topologyID string, // unique identifier for this topology instance to dedupe docker resources across parallel test runs +) (ClusterHandle, error) { + name := "cluster-" + label + networkAlias := "temporal-" + label + + // initialFailoverVersion uses label to derive monotonically increasing version numbers across clusters, e.g. "a" -> 1, "b" -> 2, etc. + initialFailoverVersion := int(label[0]-'a') + 1 + + // Step 1. Start the database container. + dbContainerName := makeContainerName(topologyID, "db", label) + db, err := startDatabase(ctx, spec.Database.Image, net, dbContainerName, "db-"+label) + if err != nil { + return nil, fmt.Errorf("start database for %s: %w", name, err) + } + + // Step 2. Prepare cluster configuration. + dbAlias := db.Name() + + env := make(map[string]string) + for k, v := range db.Env() { + env[k] = v + } + env["BIND_ON_IP"] = "0.0.0.0" + + data := templateData{ + ClusterName: name, + DatabaseAlias: dbAlias, + NetworkAlias: networkAlias, + InitialFailoverVersion: initialFailoverVersion, + } + + configContent := renderTemplate("server-config", spec.Server.ConfigTemplate, data) + schemaScript := renderTemplate("schema-setup", spec.Server.SetupSchemaTemplate, data) + + // Step 3. Run schema setup, then start the Temporal server container. + serverContainerName := makeContainerName(topologyID, "temporal", label) + if err := runSchemaSetup(ctx, spec.Server.AdminToolsImage, schemaScript, net); err != nil { + _ = db.Stop(ctx) + return nil, fmt.Errorf("schema setup for %s: %w", networkAlias, err) + } + + serverReq := testcontainers.ContainerRequest{ + Name: serverContainerName, + Image: spec.Server.Image, + Env: env, + Files: []testcontainers.ContainerFile{ + { + Reader: strings.NewReader(configContent), + ContainerFilePath: serverConfigMountPath, + FileMode: 0644, + }, + }, + ExposedPorts: []string{exposedPort(serverGRPCPort)}, + Networks: []string{net.Name}, + NetworkAliases: map[string][]string{net.Name: {networkAlias}}, + WaitingFor: wait.ForListeningPort(nat.Port(exposedPort(serverGRPCPort))).WithStartupTimeout(3 * time.Minute), + } + c, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: serverReq, + Started: true, + }) + if err != nil { + _ = db.Stop(ctx) + return nil, fmt.Errorf("start temporal %s: %w", networkAlias, err) + } + + mappedPort, err := c.MappedPort(ctx, nat.Port(exposedPort(serverGRPCPort))) + if err != nil { + _ = c.Terminate(ctx) + _ = db.Stop(ctx) + return nil, fmt.Errorf("get mapped port for temporal %s: %w", networkAlias, err) + } + + lc := attachLogConsumer(ctx, c) + hostAddr := hostAddr(mappedPort.Port()) + internalAddr := containerAddr(networkAlias, serverGRPCPort) + + // Step 4. Create a gRPC client connection to the Temporal server. + conn, err := grpc.NewClient(hostAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + _ = c.Terminate(ctx) + _ = db.Stop(ctx) + return nil, fmt.Errorf("dial gRPC for %s: %w", name, err) + } + + return &clusterHandle{ + name: name, + hostAddr: hostAddr, + internalAddr: internalAddr, + conn: conn, + db: db, + container: c, + logConsumer: lc, + }, nil +} diff --git a/tests/compatibility/topology/database.go b/tests/compatibility/topology/database.go new file mode 100644 index 0000000..6cfabe6 --- /dev/null +++ b/tests/compatibility/topology/database.go @@ -0,0 +1,89 @@ +//go:build testcompatibility + +package topology + +import ( + "context" + "fmt" + + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" +) + +// databaseHandle represents a running database for a Temporal cluster. +type databaseHandle interface { + Name() string // short network alias, e.g. "db-a" + InternalAddr() string // "alias:port" on the Docker network + Env() map[string]string // Temporal env vars (DB, DB_PORT, POSTGRES_SEEDS, etc.) + Stop(ctx context.Context) error +} + +const ( + postgresUser = "temporal" + postgresPassword = "temporal" + postgresDBName = "temporal" + postgresDBType = "postgres12" + postgresPort = "5432" + postgresAuthMethod = "trust" // scram-sha-256 (default in postgres 15+) is not supported by older lib/pq +) + +// startDatabase starts a postgres database container. +func startDatabase(ctx context.Context, dbImage string, net Network, containerName, networkAlias string) (databaseHandle, error) { + req := testcontainers.ContainerRequest{ + Name: containerName, + Image: dbImage, + Env: map[string]string{ + "POSTGRES_USER": postgresUser, + "POSTGRES_PASSWORD": postgresPassword, + "POSTGRES_DB": postgresDBName, + "POSTGRES_HOST_AUTH_METHOD": postgresAuthMethod, + }, + Networks: []string{net.Name}, + NetworkAliases: map[string][]string{net.Name: {networkAlias}}, + WaitingFor: wait.ForLog("database system is ready to accept connections"). + WithOccurrence(2), + } + + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + if err != nil { + return nil, fmt.Errorf("start postgres %s: %w", containerName, err) + } + return &postgresHandle{ + container: container, + networkAlias: networkAlias, + }, nil +} + +// postgresHandle implements databaseHandle for a running Postgres container. +type postgresHandle struct { + container testcontainers.Container + networkAlias string +} + +// Name returns the short network alias, e.g. "db-a". +func (h *postgresHandle) Name() string { return h.networkAlias } + +// InternalAddr returns "alias:port" for use on the Docker network. +func (h *postgresHandle) InternalAddr() string { return h.networkAlias + ":" + postgresPort } + +// Env returns environment variables consumed by Temporal to connect to this database. +func (h *postgresHandle) Env() map[string]string { + return map[string]string{ + "DB": postgresDBType, + "DB_PORT": postgresPort, + "POSTGRES_SEEDS": h.networkAlias, + "POSTGRES_USER": postgresUser, + "POSTGRES_PASSWORD": postgresPassword, + } +} + +// Stop terminates the postgres container. +func (h *postgresHandle) Stop(ctx context.Context) error { + if err := h.container.Terminate(ctx); err != nil { + return fmt.Errorf("terminate postgres: %w", err) + } + return nil +} diff --git a/tests/compatibility/topology/interfaces.go b/tests/compatibility/topology/interfaces.go new file mode 100644 index 0000000..7dff423 --- /dev/null +++ b/tests/compatibility/topology/interfaces.go @@ -0,0 +1,67 @@ +//go:build testcompatibility + +package topology + +import ( + "context" + "testing" + + workflowservicev1 "go.temporal.io/api/workflowservice/v1" + adminservicev1 "go.temporal.io/server/api/adminservice/v1" +) + +// Network is a Docker bridge network scoped to one test combination. +type Network struct { + Name string +} + +// ConfigFile pairs rendered file content with its container mount path. +type ConfigFile struct { + Content string + MountPath string +} + +// ClusterHandle is what tests use to interact with a running Temporal cluster. +type ClusterHandle interface { + // Name returns the Temporal cluster name, e.g. "cluster-a". + Name() string + // HostAddr returns "127.0.0.1:" - the test process dials this. + HostAddr() string + // InternalAddr returns "alias:7233" - proxy containers use this on the Docker network. + InternalAddr() string + // FrontendClient returns a gRPC client for the WorkflowService. + FrontendClient() workflowservicev1.WorkflowServiceClient + // AdminClient returns a gRPC client for the AdminService. + AdminClient() adminservicev1.AdminServiceClient + // Logs returns buffered log lines from the Temporal container. + Logs() []string + // Stop terminates all containers backing this cluster. + Stop(ctx context.Context) error +} + +// ProxyHandle is a handle to a single running proxy container. +type ProxyHandle interface { + // Name returns the proxy name, e.g. "proxy-a" or "proxy-b". + Name() string + // InternalAddrs returns Docker-network addresses for all cluster connections. + InternalAddrs() []string + // HostAddr returns "127.0.0.1:" - the test process dials this. + HostAddr() string + // FrontendClient returns a gRPC client for the WorkflowService via this proxy. + FrontendClient() workflowservicev1.WorkflowServiceClient + // Logs returns buffered log lines from the proxy container. + Logs() []string + // Stop terminates the proxy container. + Stop(ctx context.Context) error +} + +// Topology is the full test topology that suites operate on. +type Topology interface { + Clusters() []ClusterHandle + Proxies() []ProxyHandle + T() *testing.T + // DumpLogs emits all container logs to t.Log(). Callers decide when to invoke it. + DumpLogs() + // Teardown stops all containers and removes the Docker network. + Teardown(ctx context.Context) +} diff --git a/tests/compatibility/topology/logs.go b/tests/compatibility/topology/logs.go new file mode 100644 index 0000000..4e87e02 --- /dev/null +++ b/tests/compatibility/topology/logs.go @@ -0,0 +1,39 @@ +//go:build testcompatibility + +package topology + +import ( + "context" + "sync" + + "github.com/testcontainers/testcontainers-go" +) + +// logConsumer buffers log lines for later retrieval via Logs(). +type logConsumer struct { + mu sync.Mutex + lines []string +} + +func (c *logConsumer) Accept(l testcontainers.Log) { + c.mu.Lock() + defer c.mu.Unlock() + c.lines = append(c.lines, string(l.Content)) +} + +// Logs returns all buffered log lines. +func (c *logConsumer) Logs() []string { + c.mu.Lock() + defer c.mu.Unlock() + result := make([]string, len(c.lines)) + copy(result, c.lines) + return result +} + +// attachLogConsumer attaches a new logConsumer to a running container and starts log streaming. +func attachLogConsumer(ctx context.Context, c testcontainers.Container) *logConsumer { + lc := &logConsumer{} + c.FollowOutput(lc) + _ = c.StartLogProducer(ctx) + return lc +} diff --git a/tests/compatibility/topology/network.go b/tests/compatibility/topology/network.go new file mode 100644 index 0000000..afb7e88 --- /dev/null +++ b/tests/compatibility/topology/network.go @@ -0,0 +1,19 @@ +//go:build testcompatibility + +package topology + +import ( + "context" + "fmt" + + "github.com/testcontainers/testcontainers-go/network" +) + +// newNetwork creates a new Docker network. +func newNetwork(ctx context.Context) (Network, error) { + net, err := network.New(ctx) + if err != nil { + return Network{}, fmt.Errorf("create docker network: %w", err) + } + return Network{Name: net.Name}, nil +} diff --git a/tests/compatibility/topology/proxy.go b/tests/compatibility/topology/proxy.go new file mode 100644 index 0000000..2784287 --- /dev/null +++ b/tests/compatibility/topology/proxy.go @@ -0,0 +1,178 @@ +//go:build testcompatibility + +package topology + +import ( + "context" + "errors" + "fmt" + "math/rand" + "os" + "os/exec" + "path/filepath" + "runtime" + "strings" + "sync" + "time" + + "github.com/docker/go-connections/nat" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" + workflowservicev1 "go.temporal.io/api/workflowservice/v1" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +const ( + proxyPProfPort = 6060 + proxyReadinessTimeout = 30 * time.Second +) + +// proxyHandle implements ProxyHandle for a single proxy container. +type proxyHandle struct { + name string + container testcontainers.Container + internalAddrs []string + hostAddr string + conn *grpc.ClientConn + logConsumer *logConsumer +} + +func (p *proxyHandle) Name() string { return p.name } +func (p *proxyHandle) InternalAddrs() []string { return p.internalAddrs } +func (p *proxyHandle) HostAddr() string { return p.hostAddr } + +func (p *proxyHandle) FrontendClient() workflowservicev1.WorkflowServiceClient { + return workflowservicev1.NewWorkflowServiceClient(p.conn) +} + +func (p *proxyHandle) Logs() []string { + if p.logConsumer == nil { + return nil + } + return p.logConsumer.Logs() +} + +func (p *proxyHandle) Stop(ctx context.Context) error { + var errs []error + + // Step 1. Close the gRPC connection. + if p.conn != nil { + if err := p.conn.Close(); err != nil { + errs = append(errs, err) + } + } + + // Step 2. Terminate the proxy container. + if p.container != nil { + if err := p.container.Terminate(ctx); err != nil { + errs = append(errs, fmt.Errorf("terminate %s: %w", p.name, err)) + } + } + if len(errs) > 0 { + return fmt.Errorf("%s stop: %w", p.name, errors.Join(errs...)) + } + return nil +} + +// repoRoot finds the repository root by climbing from the current file's location. +func repoRoot() (string, error) { + _, filename, _, ok := runtime.Caller(0) + if !ok { + return "", fmt.Errorf("could not determine caller file") + } + // filename is .../tests/compatibility/topology/proxy.go + // climb: topology/ -> compatibility/ -> tests/ -> repo root + dir := filepath.Dir(filename) + root := filepath.Join(dir, "..", "..", "..") + abs, err := filepath.Abs(root) + if err != nil { + return "", fmt.Errorf("abs repo root: %w", err) + } + return abs, nil +} + +var ( + proxyBuildOnce sync.Once + proxyImageTag string + proxyBuildErr error +) + +func ensureProxyImage() (string, error) { + // We want to ensure the proxy image is cached per test run to reduce test run time + // We can't use commit sha or similar as a cache key due to local changes and multiple runs per commit during development + // So we just use a random string! + proxyBuildOnce.Do(func() { + runID := fmt.Sprintf("%04x", rand.Uint32()&0xffff) //nolint:gosec + proxyImageTag, proxyBuildErr = buildProxyImage(context.Background(), runID) + }) + return proxyImageTag, proxyBuildErr +} + +// buildProxyImage builds the proxy Docker image and returns the image tag. +func buildProxyImage(ctx context.Context, runID string) (string, error) { + root, err := repoRoot() + if err != nil { + return "", err + } + tag := fmt.Sprintf("temporalio/s2s-proxy:compat-test-%s-%s", runtime.GOARCH, runID) + cmd := exec.CommandContext(ctx, "docker", "build", + "--platform", "linux/"+runtime.GOARCH, "-t", tag, root) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + if err := cmd.Run(); err != nil { + return "", fmt.Errorf("build proxy image: %w", err) + } + return tag, nil +} + +// startProxy starts a single proxy container and returns a ProxyHandle. +func startProxy(ctx context.Context, spec proxySpec, topologyID, label string) (ProxyHandle, error) { + containerName := makeContainerName(topologyID, "proxy", label) + exposedPorts := append(append([]string(nil), spec.exposedPorts...), exposedPort(proxyPProfPort)) + req := testcontainers.ContainerRequest{ + Name: containerName, + Image: spec.image, + Env: map[string]string{"CONFIG_YML": spec.configFile.MountPath}, + Files: []testcontainers.ContainerFile{ + {Reader: strings.NewReader(spec.configFile.Content), ContainerFilePath: spec.configFile.MountPath, FileMode: 0644}, + }, + ExposedPorts: exposedPorts, + Networks: []string{spec.network.Name}, + NetworkAliases: map[string][]string{spec.network.Name: {spec.networkAlias}}, + WaitingFor: wait.ForHTTP("/debug/pprof/"). + WithPort(nat.Port(exposedPort(proxyPProfPort))). + WithStartupTimeout(proxyReadinessTimeout), + } + c, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + if err != nil { + return nil, fmt.Errorf("start proxy %s: %w", spec.networkAlias, err) + } + + mappedPort, err := c.MappedPort(ctx, nat.Port(spec.exposedPorts[0])) + if err != nil { + _ = c.Terminate(ctx) + return nil, fmt.Errorf("get mapped port for proxy %s: %w", spec.networkAlias, err) + } + + logs := attachLogConsumer(ctx, c) + addr := hostAddr(mappedPort.Port()) + + conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + _ = c.Terminate(ctx) + return nil, fmt.Errorf("dial proxy %s: %w", spec.networkAlias, err) + } + + return &proxyHandle{ + name: spec.name, + container: c, + internalAddrs: spec.internalAddrs, + hostAddr: addr, + conn: conn, + logConsumer: logs, + }, nil +} diff --git a/tests/compatibility/topology/proxy_generator.go b/tests/compatibility/topology/proxy_generator.go new file mode 100644 index 0000000..3d11858 --- /dev/null +++ b/tests/compatibility/topology/proxy_generator.go @@ -0,0 +1,169 @@ +//go:build testcompatibility + +package topology + +import ( + "fmt" + + "gopkg.in/yaml.v3" +) + +// startPort is the base port for per-proxy sequential allocation. +const startPort = 10001 + +// proxyYAMLConfig mirrors the YAML shape expected by s2s-proxy. +type proxyYAMLConfig struct { + ClusterConnections []clusterConnYAML `yaml:"clusterConnections"` + Profiling profilingYAML `yaml:"profiling"` +} + +type profilingYAML struct { + PprofAddress string `yaml:"pprofAddress"` +} + +type clusterConnYAML struct { + Name string `yaml:"name"` + Local clusterDefYAML `yaml:"local"` + Remote clusterDefYAML `yaml:"remote"` +} + +type clusterDefYAML struct { + ConnectionType string `yaml:"connectionType"` + TcpClient tcpInfoYAML `yaml:"tcpClient,omitempty"` + TcpServer tcpInfoYAML `yaml:"tcpServer,omitempty"` + MuxCount int `yaml:"muxCount,omitempty"` + MuxAddressInfo tcpInfoYAML `yaml:"muxAddressInfo,omitempty"` +} + +type tcpInfoYAML struct { + Address string `yaml:"address,omitempty"` +} + +// proxySpec is the fully-resolved configuration for a proxy container. +type proxySpec struct { + name string + networkAlias string + image string + network Network + configFile ConfigFile + exposedPorts []string + internalAddrs []string +} + +// buildProxySpecs computes full-mesh proxy specs for N clusters. +// +// For N clusters there are N*(N-1)/2 cluster pairs. Each pair (i, j) where i < j +// assigns proxy-i as the mux-client and proxy-j as the mux-server. Ports are +// allocated sequentially per proxy starting from startPort. +func buildProxySpecs(clusters []ClusterHandle, imageTag string, net Network) []proxySpec { + total := len(clusters) + + type pairConn struct { + remoteName string + listenPort int + internalAddr string + connType string + muxAddr string + muxListenPort int // 0 for mux-client (not a server, not exposed) + muxCount int + } + + // Collect connections for each proxy index. + connsByProxy := make([][]pairConn, total) + + // Each proxy gets its own port counter; uniqueness within a container is sufficient. + nextPort := make([]int, total) + for i := range nextPort { + nextPort[i] = startPort + } + + for i := 0; i < total; i++ { + for j := i + 1; j < total; j++ { + aliasI := "proxy-" + clusters[i].Name() + aliasJ := "proxy-" + clusters[j].Name() + + iPort := nextPort[i] + nextPort[i]++ + jPort := nextPort[j] + nextPort[j]++ + muxPort := nextPort[j] + nextPort[j]++ + + // proxy-i: mux-client for this pair + connsByProxy[i] = append(connsByProxy[i], pairConn{ + remoteName: clusters[j].Name(), + listenPort: iPort, + internalAddr: containerAddr(aliasI, iPort), + connType: "mux-client", + muxAddr: containerAddr(aliasJ, muxPort), + muxCount: 1, + }) + + // proxy-j: mux-server for this pair + connsByProxy[j] = append(connsByProxy[j], pairConn{ + remoteName: clusters[i].Name(), + listenPort: jPort, + internalAddr: containerAddr(aliasJ, jPort), + connType: "mux-server", + muxAddr: listenAddr(muxPort), + muxListenPort: muxPort, + muxCount: 1, + }) + } + } + + specs := make([]proxySpec, total) + for i, cluster := range clusters { + alias := "proxy-" + cluster.Name() + conns := connsByProxy[i] + + // Build exposed ports and internal addrs. + var exposedPorts []string + internalAddrs := make([]string, len(conns)) + for k, conn := range conns { + exposedPorts = append(exposedPorts, exposedPort(conn.listenPort)) + if conn.muxListenPort != 0 { + exposedPorts = append(exposedPorts, exposedPort(conn.muxListenPort)) + } + internalAddrs[k] = conn.internalAddr + } + + // Render proxy YAML config inline. + var clusterConns []clusterConnYAML + for _, conn := range conns { + clusterConns = append(clusterConns, clusterConnYAML{ + Name: conn.remoteName, + Local: clusterDefYAML{ + ConnectionType: "tcp", + TcpClient: tcpInfoYAML{Address: cluster.InternalAddr()}, + TcpServer: tcpInfoYAML{Address: listenAddr(conn.listenPort)}, + }, + Remote: clusterDefYAML{ + ConnectionType: conn.connType, + MuxCount: conn.muxCount, + MuxAddressInfo: tcpInfoYAML{Address: conn.muxAddr}, + }, + }) + } + cfg := proxyYAMLConfig{ + ClusterConnections: clusterConns, + Profiling: profilingYAML{PprofAddress: listenAddr(proxyPProfPort)}, + } + data, err := yaml.Marshal(cfg) + if err != nil { + panic(fmt.Sprintf("marshal proxy config for %s: %v", alias, err)) + } + + specs[i] = proxySpec{ + name: alias, + networkAlias: alias, + image: imageTag, + network: net, + configFile: ConfigFile{Content: string(data), MountPath: "/etc/proxy/config.yaml"}, + exposedPorts: exposedPorts, + internalAddrs: internalAddrs, + } + } + + return specs +} diff --git a/tests/compatibility/topology/topology.go b/tests/compatibility/topology/topology.go new file mode 100644 index 0000000..7941373 --- /dev/null +++ b/tests/compatibility/topology/topology.go @@ -0,0 +1,182 @@ +//go:build testcompatibility + +package topology + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + adminservicev1 "go.temporal.io/server/api/adminservice/v1" + + "github.com/temporalio/s2s-proxy/tests/compatibility/specifications" +) + +const ( + cleanupTimeout = 90 * time.Second + addRemoteClusterRequestTimeout = 15 * time.Second + addRemoteClusterRetryTimeout = 180 * time.Second + addRemoteClusterRetryInterval = 3 * time.Second +) + +type topologyImpl struct { + t *testing.T + clusters []ClusterHandle + proxies []ProxyHandle +} + +func (top *topologyImpl) Clusters() []ClusterHandle { return top.clusters } +func (top *topologyImpl) Proxies() []ProxyHandle { return top.proxies } +func (top *topologyImpl) T() *testing.T { return top.t } + +func (top *topologyImpl) DumpLogs() { + for _, cluster := range top.clusters { + for _, line := range cluster.Logs() { + top.t.Logf("[%s] %s", cluster.Name(), line) + } + } + for _, proxy := range top.proxies { + for _, line := range proxy.Logs() { + top.t.Logf("[%s] %s", proxy.Name(), line) + } + } +} + +func (top *topologyImpl) Teardown(ctx context.Context) { + + // Step 1. Stop proxies (concurrently). + var pwg sync.WaitGroup + for _, proxy := range top.proxies { + proxy := proxy + pwg.Add(1) + go func() { + defer pwg.Done() + _ = proxy.Stop(ctx) + }() + } + pwg.Wait() + + // Step 2. Stop clusters (concurrently). + var cwg sync.WaitGroup + for _, cluster := range top.clusters { + cluster := cluster + cwg.Add(1) + go func() { + defer cwg.Done() + _ = cluster.Stop(ctx) + }() + } + cwg.Wait() +} + +// NewTopology starts all clusters, proxies and network, connects them together, and returns a Topology. +func NewTopology(t *testing.T, topologyID string, specs []specifications.ClusterSpec) Topology { + t.Helper() + ctx := context.Background() + + top := &topologyImpl{t: t} + t.Cleanup(func() { + cleanupCtx, cancel := context.WithTimeout(context.Background(), cleanupTimeout) + defer cancel() + if t.Failed() { + top.DumpLogs() + } + top.Teardown(cleanupCtx) + }) + + // Step 1. Network: Create a docker network. + net, err := newNetwork(ctx) + if err != nil { + t.Fatalf("create docker network: %v", err) + } + + // Step 2a. Clusters: Start all clusters in parallel. + type clusterResult struct { + idx int + cluster ClusterHandle + err error + } + ch := make(chan clusterResult, len(specs)) + + for i, spec := range specs { + i, spec := i, spec + label := string(rune('a' + i)) // "a", "b", ... + go func() { + c, startErr := startCluster(ctx, spec, net, label, topologyID) + ch <- clusterResult{idx: i, cluster: c, err: startErr} + }() + } + + // Step 2b. Clusters: Wait for all clusters to start. + clusterHandles := make([]ClusterHandle, len(specs)) + for range specs { + result := <-ch + if result.err != nil { + t.Fatalf("start cluster-%s: %v", string(rune('a'+result.idx)), result.err) + } + clusterHandles[result.idx] = result.cluster + } + top.clusters = clusterHandles + + // Step 3a. Proxies: Build proxy image. + imageTag, err := ensureProxyImage() + if err != nil { + t.Fatalf("build proxy image: %v", err) + } + + // Step 3b. Proxies: Build proxy specs (full-mesh, fully rendered). + proxySpecs := buildProxySpecs(clusterHandles, imageTag, net) + + // Step 3c. Proxies: Start all proxies in parallel. + type proxyChanResult struct { + idx int + handle ProxyHandle + err error + } + pch := make(chan proxyChanResult, len(clusterHandles)) + for i := range clusterHandles { + i := i + go func() { + handle, startErr := startProxy(ctx, proxySpecs[i], topologyID, string(rune('a'+i))) + pch <- proxyChanResult{idx: i, handle: handle, err: startErr} + }() + } + + // Step 3d. Proxies: Wait for all proxies to start. + proxyHandles := make([]ProxyHandle, len(clusterHandles)) + for range clusterHandles { + result := <-pch + if result.err != nil { + t.Fatalf("start %s: %v", proxySpecs[result.idx].name, result.err) + } + proxyHandles[result.idx] = result.handle + } + top.proxies = proxyHandles + + // Step 4. Connectivity: Register each remote cluster via proxies. + req := require.New(t) + + for i, cluster := range top.clusters { + cluster := cluster + proxy := top.proxies[i] + for _, proxyAddr := range proxy.InternalAddrs() { + proxyAddr := proxyAddr + req.Eventually(func() bool { + reqCtx, cancel := context.WithTimeout(ctx, addRemoteClusterRequestTimeout) + defer cancel() + _, err := cluster.AdminClient().AddOrUpdateRemoteCluster(reqCtx, &adminservicev1.AddOrUpdateRemoteClusterRequest{ + FrontendAddress: proxyAddr, + EnableRemoteClusterConnection: true, + }) + if err != nil { + t.Logf("AddOrUpdateRemoteCluster %s via %s: %v", cluster.Name(), proxyAddr, err) + } + return err == nil + }, addRemoteClusterRetryTimeout, addRemoteClusterRetryInterval, "configure %s via %s: timed out", cluster.Name(), proxyAddr) + } + } + + return top +}