diff --git a/connectors/kafka/dest.go b/connectors/kafka/dest.go new file mode 100644 index 0000000..6d128ca --- /dev/null +++ b/connectors/kafka/dest.go @@ -0,0 +1,157 @@ +package kafka + +import ( + "context" + "errors" + "fmt" + "log/slog" + + "connectrpc.com/connect" + "github.com/IBM/sarama" + adiomv1 "github.com/adiom-data/dsync/gen/adiom/v1" + "github.com/adiom-data/dsync/gen/adiom/v1/adiomv1connect" + "google.golang.org/protobuf/proto" +) + +func DsyncMessageToNamespace(m *sarama.ConsumerMessage, _ map[string]struct{}) (string, error) { + for _, h := range m.Headers { + if string(h.Key) == "ns" { + return string(h.Value), nil + } + } + return "", fmt.Errorf("missing namespace header") +} + +func DsyncMessageToUpdate(m *sarama.ConsumerMessage, _ map[string]struct{}) (*adiomv1.Update, string, error) { + update := &adiomv1.Update{} + if err := proto.Unmarshal(m.Value, update); err != nil { + return nil, "", fmt.Errorf("err unmarshalling proto: %w", err) + } + for _, h := range m.Headers { + if string(h.Key) == "ns" { + return update, string(h.Value), nil + } + } + + return nil, "", fmt.Errorf("missing namespace header") +} + +type destConn struct { + adiomv1connect.UnimplementedConnectorServiceHandler + producer sarama.SyncProducer + namespaceToTopic map[string]string + defaultTopic string + dataType adiomv1.DataType +} + +func NewDestKafka(brokers []string, defaultTopic string, namespacesToTopic map[string]string, user string, password string, dataType adiomv1.DataType) (*destConn, error) { + cfg := sarama.NewConfig() + if user != "" { + cfg.Net.TLS.Enable = true + cfg.Net.SASL.Enable = true + cfg.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypePlaintext) + cfg.Net.SASL.User = user + cfg.Net.SASL.Password = password + } + cfg.Producer.RequiredAcks = sarama.WaitForAll + cfg.Producer.Return.Successes = true + cfg.Producer.Partitioner = sarama.NewHashPartitioner + producer, err := sarama.NewSyncProducer(brokers, cfg) + if err != nil { + return nil, fmt.Errorf("err creating kafka producer: %w", err) + } + return &destConn{ + producer: producer, + namespaceToTopic: namespacesToTopic, + defaultTopic: defaultTopic, + dataType: dataType, + }, nil +} + +func (d *destConn) Teardown() { + _ = d.producer.Close() +} + +// GeneratePlan implements [adiomv1connect.ConnectorServiceHandler]. +func (d *destConn) GeneratePlan(context.Context, *connect.Request[adiomv1.GeneratePlanRequest]) (*connect.Response[adiomv1.GeneratePlanResponse], error) { + return nil, connect.NewError(connect.CodeUnimplemented, errors.ErrUnsupported) +} + +// GetInfo implements [adiomv1connect.ConnectorServiceHandler]. +func (d *destConn) GetInfo(context.Context, *connect.Request[adiomv1.GetInfoRequest]) (*connect.Response[adiomv1.GetInfoResponse], error) { + supported := []adiomv1.DataType{d.dataType} + return connect.NewResponse(&adiomv1.GetInfoResponse{ + Id: "kafka-dest", + DbType: "kafka-dest", + Capabilities: &adiomv1.Capabilities{ + Sink: &adiomv1.Capabilities_Sink{ + SupportedDataTypes: supported, + }, + }, + }), nil +} + +// GetNamespaceMetadata implements [adiomv1connect.ConnectorServiceHandler]. +func (d *destConn) GetNamespaceMetadata(context.Context, *connect.Request[adiomv1.GetNamespaceMetadataRequest]) (*connect.Response[adiomv1.GetNamespaceMetadataResponse], error) { + return nil, connect.NewError(connect.CodeUnimplemented, errors.ErrUnsupported) +} + +// ListData implements [adiomv1connect.ConnectorServiceHandler]. +func (d *destConn) ListData(context.Context, *connect.Request[adiomv1.ListDataRequest]) (*connect.Response[adiomv1.ListDataResponse], error) { + return nil, connect.NewError(connect.CodeUnimplemented, errors.ErrUnsupported) +} + +// StreamLSN implements [adiomv1connect.ConnectorServiceHandler]. +func (d *destConn) StreamLSN(context.Context, *connect.Request[adiomv1.StreamLSNRequest], *connect.ServerStream[adiomv1.StreamLSNResponse]) error { + return connect.NewError(connect.CodeUnimplemented, errors.ErrUnsupported) +} + +// StreamUpdates implements [adiomv1connect.ConnectorServiceHandler]. +func (d *destConn) StreamUpdates(context.Context, *connect.Request[adiomv1.StreamUpdatesRequest], *connect.ServerStream[adiomv1.StreamUpdatesResponse]) error { + return connect.NewError(connect.CodeUnimplemented, errors.ErrUnsupported) +} + +// WriteData implements [adiomv1connect.ConnectorServiceHandler]. +func (d *destConn) WriteData(context.Context, *connect.Request[adiomv1.WriteDataRequest]) (*connect.Response[adiomv1.WriteDataResponse], error) { + slog.Warn("kafka connector does not support initial sync, ignoring") + return connect.NewResponse(&adiomv1.WriteDataResponse{}), nil +} + +func key(id []*adiomv1.BsonValue) []byte { + var res []byte + for _, part := range id { + res = append(res, part.GetData()...) + } + return res +} + +// WriteUpdates implements [adiomv1connect.ConnectorServiceHandler]. +func (d *destConn) WriteUpdates(ctx context.Context, r *connect.Request[adiomv1.WriteUpdatesRequest]) (*connect.Response[adiomv1.WriteUpdatesResponse], error) { + topic := d.defaultTopic + ns := r.Msg.GetNamespace() + header := []sarama.RecordHeader{{Key: []byte("ns"), Value: []byte(ns)}} + if namespaceTopic, ok := d.namespaceToTopic[ns]; ok { + topic = namespaceTopic + } + var messages []*sarama.ProducerMessage + for _, update := range r.Msg.GetUpdates() { + b, err := proto.Marshal(update) + if err != nil { + return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("err marshalling proto: %w", err)) + } + msg := &sarama.ProducerMessage{ + Topic: topic, + Key: sarama.ByteEncoder(key(update.GetId())), + Value: sarama.ByteEncoder(b), + Headers: header, + Partition: -1, + } + messages = append(messages, msg) + } + if err := d.producer.SendMessages(messages); err != nil { + return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("err sending message: %w", err)) + } + return connect.NewResponse(&adiomv1.WriteUpdatesResponse{}), nil +} + +var _ adiomv1connect.ConnectorServiceHandler = &destConn{} diff --git a/connectors/kafka/kafka.go b/connectors/kafka/kafka.go new file mode 100644 index 0000000..0658c9a --- /dev/null +++ b/connectors/kafka/kafka.go @@ -0,0 +1,323 @@ +package kafka + +import ( + "bytes" + "context" + "encoding/gob" + "errors" + "log/slog" + + "connectrpc.com/connect" + "github.com/IBM/sarama" + adiomv1 "github.com/adiom-data/dsync/gen/adiom/v1" + "github.com/adiom-data/dsync/gen/adiom/v1/adiomv1connect" +) + +type MessageToNamespace func(*sarama.ConsumerMessage, map[string]struct{}) (string, error) +type MessageToUpdate func(*sarama.ConsumerMessage, map[string]struct{}) (*adiomv1.Update, string, error) + +var _ adiomv1connect.ConnectorServiceHandler = &kafkaConn{} + +type kafkaConn struct { + adiomv1connect.UnimplementedConnectorServiceHandler + brokers []string + topicToNamespaces map[string][]string + user, password string + offset int64 + dataType adiomv1.DataType + + messageToUpdate MessageToUpdate + messageToNamespace MessageToNamespace +} + +// GetInfo implements [adiomv1connect.ConnectorServiceHandler]. +func (k *kafkaConn) GetInfo(context.Context, *connect.Request[adiomv1.GetInfoRequest]) (*connect.Response[adiomv1.GetInfoResponse], error) { + supported := []adiomv1.DataType{k.dataType} + return connect.NewResponse(&adiomv1.GetInfoResponse{ + Id: "kafka-src", + DbType: "kafka-src", + Capabilities: &adiomv1.Capabilities{ + Source: &adiomv1.Capabilities_Source{ + SupportedDataTypes: supported, + LsnStream: true, + MultiNamespacePlan: true, + DefaultPlan: true, + }, + }, + }), nil +} + +// GetNamespaceMetadata implements [adiomv1connect.ConnectorServiceHandler]. +func (k *kafkaConn) GetNamespaceMetadata(context.Context, *connect.Request[adiomv1.GetNamespaceMetadataRequest]) (*connect.Response[adiomv1.GetNamespaceMetadataResponse], error) { + return connect.NewResponse(&adiomv1.GetNamespaceMetadataResponse{}), nil +} + +// ListData implements [adiomv1connect.ConnectorServiceHandler]. +func (k *kafkaConn) ListData(context.Context, *connect.Request[adiomv1.ListDataRequest]) (*connect.Response[adiomv1.ListDataResponse], error) { + slog.Warn("kafka connector does not support initial sync, ignoring") + return connect.NewResponse(&adiomv1.ListDataResponse{}), nil +} + +// WriteData implements [adiomv1connect.ConnectorServiceHandler]. +func (k *kafkaConn) WriteData(context.Context, *connect.Request[adiomv1.WriteDataRequest]) (*connect.Response[adiomv1.WriteDataResponse], error) { + return nil, connect.NewError(connect.CodeUnimplemented, errors.ErrUnsupported) +} + +// WriteUpdates implements [adiomv1connect.ConnectorServiceHandler]. +func (k *kafkaConn) WriteUpdates(context.Context, *connect.Request[adiomv1.WriteUpdatesRequest]) (*connect.Response[adiomv1.WriteUpdatesResponse], error) { + return nil, connect.NewError(connect.CodeUnimplemented, errors.ErrUnsupported) +} + +func NewKafkaConn(brokers []string, topicToNamespaces map[string][]string, messageToUpdate MessageToUpdate, messageToNamespace MessageToNamespace, user string, password string, offset int64, dataType adiomv1.DataType) *kafkaConn { + return &kafkaConn{ + brokers: brokers, + topicToNamespaces: topicToNamespaces, + messageToUpdate: messageToUpdate, + messageToNamespace: messageToNamespace, + user: user, + password: password, + offset: offset, + dataType: dataType, + } +} + +type streamCursor struct { + Topic string + Partition int32 + Offset int64 +} + +func (c *streamCursor) Encode() ([]byte, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + if err := enc.Encode(c); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func DecodeStreamCursor(in []byte) (*streamCursor, error) { + if len(in) == 0 { + return &streamCursor{}, nil + } + var c streamCursor + br := bytes.NewReader(in) + dec := gob.NewDecoder(br) + if err := dec.Decode(&c); err != nil { + return nil, err + } + return &c, nil +} + +func (k *kafkaConn) NewConfig() *sarama.Config { + cfg := sarama.NewConfig() + if k.user != "" { + cfg.Net.TLS.Enable = true + cfg.Net.SASL.Enable = true + cfg.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypePlaintext) + cfg.Net.SASL.User = k.user + cfg.Net.SASL.Password = k.password + } + return cfg +} + +// GeneratePlan implements adiomv1connect.ConnectorServiceHandler. +func (k *kafkaConn) GeneratePlan(ctx context.Context, r *connect.Request[adiomv1.GeneratePlanRequest]) (*connect.Response[adiomv1.GeneratePlanResponse], error) { + namespaceMap := map[string]struct{}{} + for _, n := range r.Msg.GetNamespaces() { + namespaceMap[n] = struct{}{} + } + + if !r.Msg.GetUpdates() { + return connect.NewResponse(&adiomv1.GeneratePlanResponse{}), nil + } + cfg := k.NewConfig() + client, err := sarama.NewClient(k.brokers, cfg) + if err != nil { + return nil, connect.NewError(connect.CodeInternal, err) + } + defer client.Close() + var updatePartitions []*adiomv1.UpdatesPartition + + for topic, namespaces := range k.topicToNamespaces { + var finalNamespaces []string + if len(namespaces) == 0 { + finalNamespaces = r.Msg.GetNamespaces() + } else if len(r.Msg.GetNamespaces()) == 0 { + finalNamespaces = namespaces + } else { + for _, ns := range namespaces { + if _, ok := namespaceMap[ns]; ok { + finalNamespaces = append(finalNamespaces, ns) + } + } + if len(finalNamespaces) == 0 { + continue + } + } + + partitions, err := client.Partitions(topic) + if err != nil { + return nil, connect.NewError(connect.CodeInternal, err) + } + + for _, partition := range partitions { + offset, err := client.GetOffset(topic, partition, k.offset) + if err != nil { + return nil, connect.NewError(connect.CodeInternal, err) + } + c := &streamCursor{ + Topic: topic, + Partition: partition, + Offset: offset, + } + cursor, err := c.Encode() + if err != nil { + return nil, connect.NewError(connect.CodeInternal, err) + } + updatePartitions = append(updatePartitions, &adiomv1.UpdatesPartition{ + Namespaces: finalNamespaces, + Cursor: cursor, + }) + } + } + return connect.NewResponse(&adiomv1.GeneratePlanResponse{UpdatesPartitions: updatePartitions}), nil +} + +// StreamLSN implements adiomv1connect.ConnectorServiceHandler. +func (k *kafkaConn) StreamLSN(ctx context.Context, r *connect.Request[adiomv1.StreamLSNRequest], s *connect.ServerStream[adiomv1.StreamLSNResponse]) error { + namespaceMap := map[string]struct{}{} + for _, n := range r.Msg.GetNamespaces() { + namespaceMap[n] = struct{}{} + } + + cursor, err := DecodeStreamCursor(r.Msg.GetCursor()) + if err != nil { + return connect.NewError(connect.CodeInternal, err) + } + + cfg := k.NewConfig() + consumer, err := sarama.NewConsumer(k.brokers, cfg) + if err != nil { + return connect.NewError(connect.CodeInternal, err) + } + defer consumer.Close() + + pc, err := consumer.ConsumePartition(cursor.Topic, cursor.Partition, cursor.Offset) + if err != nil { + return connect.NewError(connect.CodeInternal, err) + } + defer pc.Close() + + var lsn uint64 + for { + select { + case <-ctx.Done(): + return nil + case message, ok := <-pc.Messages(): + if !ok { + return nil + } + + nextCursor := &streamCursor{ + Topic: cursor.Topic, + Partition: cursor.Partition, + Offset: message.Offset + 1, + } + encodedNextCursor, err := nextCursor.Encode() + if err != nil { + return connect.NewError(connect.CodeInternal, err) + } + ns, err := k.messageToNamespace(message, namespaceMap) + if err != nil { + return connect.NewError(connect.CodeInternal, err) + } + if _, ok := namespaceMap[ns]; !ok && len(namespaceMap) > 0 { + continue + } + if ns == "" { + continue + } + + lsn++ + // TODO: maybe batch it up + if err := s.Send(&adiomv1.StreamLSNResponse{ + NextCursor: encodedNextCursor, + Lsn: lsn, + }); err != nil { + if errors.Is(err, context.Canceled) { + return nil + } + return connect.NewError(connect.CodeInternal, err) + } + } + } +} + +// StreamUpdates implements adiomv1connect.ConnectorServiceHandler. +func (k *kafkaConn) StreamUpdates(ctx context.Context, r *connect.Request[adiomv1.StreamUpdatesRequest], s *connect.ServerStream[adiomv1.StreamUpdatesResponse]) error { + namespaceMap := map[string]struct{}{} + for _, n := range r.Msg.GetNamespaces() { + namespaceMap[n] = struct{}{} + } + + cursor, err := DecodeStreamCursor(r.Msg.GetCursor()) + if err != nil { + return connect.NewError(connect.CodeInternal, err) + } + + cfg := k.NewConfig() + consumer, err := sarama.NewConsumer(k.brokers, cfg) + if err != nil { + return connect.NewError(connect.CodeInternal, err) + } + defer consumer.Close() + + pc, err := consumer.ConsumePartition(cursor.Topic, cursor.Partition, cursor.Offset) + if err != nil { + return connect.NewError(connect.CodeInternal, err) + } + defer pc.Close() + + for { + select { + case <-ctx.Done(): + return nil + case message, ok := <-pc.Messages(): + if !ok { + return nil + } + + nextCursor := &streamCursor{ + Topic: cursor.Topic, + Partition: cursor.Partition, + Offset: message.Offset + 1, + } + encodedNextCursor, err := nextCursor.Encode() + if err != nil { + return connect.NewError(connect.CodeInternal, err) + } + + update, namespace, err := k.messageToUpdate(message, namespaceMap) + if err != nil { + return connect.NewError(connect.CodeInternal, err) + } + if _, ok := namespaceMap[namespace]; !ok && len(namespaceMap) > 0 { + continue + } + if update != nil { + // TODO: maybe batch it up + if err := s.Send(&adiomv1.StreamUpdatesResponse{ + Updates: []*adiomv1.Update{update}, + Namespace: namespace, + NextCursor: encodedNextCursor, + }); err != nil { + if errors.Is(err, context.Canceled) { + return nil + } + return connect.NewError(connect.CodeInternal, err) + } + } + } + } +} diff --git a/go.mod b/go.mod index 4a800fc..fbd7810 100644 --- a/go.mod +++ b/go.mod @@ -1,12 +1,13 @@ module github.com/adiom-data/dsync -go 1.24 +go 1.24.0 toolchain go1.24.3 require ( connectrpc.com/connect v1.18.1 connectrpc.com/grpcreflect v1.3.0 + github.com/IBM/sarama v1.46.3 github.com/aws/aws-sdk-go-v2 v1.41.1 github.com/aws/aws-sdk-go-v2/config v1.29.17 github.com/aws/aws-sdk-go-v2/credentials v1.17.70 @@ -29,7 +30,7 @@ require ( github.com/rivo/tview v0.0.0-20250625164341-a4a78f1e05cb github.com/samber/slog-multi v1.4.1 github.com/shopspring/decimal v1.3.1 - github.com/stretchr/testify v1.10.0 + github.com/stretchr/testify v1.11.1 github.com/tryvium-travels/memongo v0.12.0 github.com/urfave/cli/v2 v2.27.7 github.com/weaviate/weaviate v1.31.3 @@ -37,7 +38,7 @@ require ( go.akshayshah.org/memhttp v0.1.0 go.mongodb.org/mongo-driver v1.17.4 golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b - golang.org/x/net v0.41.0 + golang.org/x/net v0.46.0 golang.org/x/time v0.12.0 google.golang.org/grpc v1.73.0 google.golang.org/protobuf v1.36.6 @@ -61,6 +62,9 @@ require ( github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.3 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.34.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect + github.com/eapache/queue v1.1.0 // indirect github.com/gdamore/encoding v1.0.1 // indirect github.com/go-openapi/analysis v0.23.0 // indirect github.com/go-openapi/errors v0.22.1 // indirect @@ -72,10 +76,16 @@ require ( github.com/go-openapi/strfmt v0.23.0 // indirect github.com/go-openapi/swag v0.23.1 // indirect github.com/go-openapi/validate v0.24.0 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/lucasb-eyer/go-colorful v1.2.0 // indirect github.com/mailru/easyjson v0.9.0 // indirect @@ -83,16 +93,18 @@ require ( github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/oklog/ulid v1.3.1 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect + github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/samber/lo v1.51.0 // indirect github.com/samber/slog-common v0.19.0 // indirect github.com/spf13/afero v1.11.0 // indirect github.com/stretchr/objx v0.5.2 // indirect golang.org/x/oauth2 v0.30.0 // indirect - golang.org/x/sys v0.33.0 // indirect - golang.org/x/term v0.32.0 // indirect + golang.org/x/sys v0.37.0 // indirect + golang.org/x/term v0.36.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) @@ -103,7 +115,7 @@ require ( github.com/cpuguy83/go-md2man/v2 v2.0.7 // indirect github.com/golang/snappy v1.0.0 // indirect github.com/google/uuid v1.6.0 - github.com/klauspost/compress v1.18.0 // indirect + github.com/klauspost/compress v1.18.1 // indirect github.com/montanaflynn/stats v0.7.1 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect @@ -111,8 +123,8 @@ require ( github.com/xdg-go/stringprep v1.0.4 // indirect github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect - golang.org/x/crypto v0.39.0 // indirect - golang.org/x/sync v0.15.0 - golang.org/x/text v0.26.0 // indirect + golang.org/x/crypto v0.43.0 // indirect + golang.org/x/sync v0.17.0 + golang.org/x/text v0.30.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 2903b32..3e2870f 100644 --- a/go.sum +++ b/go.sum @@ -7,6 +7,8 @@ entgo.io/ent v0.14.3/go.mod h1:aDPE/OziPEu8+OWbzy4UlvWmD2/kbRuWfK2A40hcxJM= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg= github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= +github.com/IBM/sarama v1.46.3 h1:njRsX6jNlnR+ClJ8XmkO+CM4unbrNr/2vB5KK6UA+IE= +github.com/IBM/sarama v1.46.3/go.mod h1:GTUYiF9DMOZVe3FwyGT+dtSPceGFIgA+sPc5u6CBwko= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= @@ -79,6 +81,14 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= +github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/gdamore/encoding v1.0.1 h1:YzKZckdBL6jVt2Gc+5p82qhrGiqMdG/eNs6Wy0u3Uhw= github.com/gdamore/encoding v1.0.1/go.mod h1:0Z0cMFinngz9kS1QfMjCP8TY7em3bZYeeklsSDPivEo= github.com/gdamore/tcell/v2 v2.8.1 h1:KPNxyqclpWpWQlPLx6Xui1pMk8S+7+R37h3g07997NU= @@ -167,6 +177,11 @@ github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= @@ -182,6 +197,18 @@ github.com/jackc/pgx/v5 v5.7.5 h1:JHGfMnQY+IEtGM63d+NGMjoRpysB2JBwDr5fsngwmJs= github.com/jackc/pgx/v5 v5.7.5/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= @@ -194,8 +221,8 @@ github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFF github.com/karrick/godirwalk v1.8.0/go.mod h1:H5KPZjojv4lE+QYImBI8xVtrBRgYrIVsaRPx4tDPEn4= github.com/karrick/godirwalk v1.10.3/go.mod h1:RoGL9dQei4vP9ilrpETWE8CLOZ1kiN0LhBygSwrAsHA= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= -github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= +github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -239,6 +266,8 @@ github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYr github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= github.com/pgvector/pgvector-go v0.3.0 h1:Ij+Yt78R//uYqs3Zk35evZFvr+G0blW0OUN+Q2D1RWc= github.com/pgvector/pgvector-go v0.3.0/go.mod h1:duFy+PXWfW7QQd5ibqutBO4GxLsUZ9RVXhFZGIBsWSA= +github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= +github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -246,6 +275,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8AOIL7EB/X911+m4EHsnWEHeJ0c+3TTBrg= +github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rivo/tview v0.0.0-20250625164341-a4a78f1e05cb h1:n7UJ8X9UnrTZBYXnd1kAIBc067SWyuPIrsocjketYW8= github.com/rivo/tview v0.0.0-20250625164341-a4a78f1e05cb/go.mod h1:cSfIYfhpSGCjp3r/ECJb+GKS7cGJnqV8vfjQPwoXyfY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= @@ -279,14 +310,20 @@ github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3 github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc h1:9lRDQMhESg+zvGYmW5DyG0UqvY96Bu5QYsTLvCHdrgo= github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc/go.mod h1:bciPuU6GHm1iF1pBvUfxfsH0Wmnc2VbpgvbI9ZWuIRs= @@ -355,11 +392,12 @@ golang.org/x/crypto v0.0.0-20190422162423-af44ce270edf/go.mod h1:WFFai1msRO1wXaE golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= -golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM= -golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U= +golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04= +golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0= golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o= golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= @@ -370,17 +408,19 @@ golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210421230115-4e50805a0758/go.mod h1:72T/g9IO56b78aLF+1Kcs5dz7/ng1VjMUvfKvpfy+jM= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= -golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw= -golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= +golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4= +golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210= golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI= golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -393,8 +433,8 @@ golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8= -golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -415,8 +455,8 @@ golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= -golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= +golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -427,8 +467,8 @@ golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek= -golang.org/x/term v0.32.0 h1:DR4lr0TjUs3epypdhTOkMmuF5CDFJ/8pOnbzMZPQ7bg= -golang.org/x/term v0.32.0/go.mod h1:uZG1FhGx848Sqfsq4/DlJr3xGGsYMu/L5GW4abiaEPQ= +golang.org/x/term v0.36.0 h1:zMPR+aF8gfksFprF/Nc/rd1wRS1EI6nDBGyWAvDzx2Q= +golang.org/x/term v0.36.0/go.mod h1:Qu394IJq6V6dCBRgwqshf3mPF85AqzYEzofzRdZkWss= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -441,8 +481,8 @@ golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= -golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= -golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= +golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k= +golang.org/x/text v0.30.0/go.mod h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM= golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE= golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/internal/app/options/connectorflags.go b/internal/app/options/connectorflags.go index e75ac6d..9a919d1 100644 --- a/internal/app/options/connectorflags.go +++ b/internal/app/options/connectorflags.go @@ -10,9 +10,11 @@ import ( "strings" "time" + "github.com/IBM/sarama" "github.com/adiom-data/dsync/connectors/airbyte" "github.com/adiom-data/dsync/connectors/cosmos" "github.com/adiom-data/dsync/connectors/dynamodb" + "github.com/adiom-data/dsync/connectors/kafka" "github.com/adiom-data/dsync/connectors/mongo" "github.com/adiom-data/dsync/connectors/null" "github.com/adiom-data/dsync/connectors/postgres" @@ -21,6 +23,7 @@ import ( "github.com/adiom-data/dsync/connectors/s3vector" "github.com/adiom-data/dsync/connectors/testconn" "github.com/adiom-data/dsync/connectors/vector" + adiomv1 "github.com/adiom-data/dsync/gen/adiom/v1" "github.com/adiom-data/dsync/gen/adiom/v1/adiomv1connect" "github.com/urfave/cli/v2" "github.com/urfave/cli/v2/altsrc" @@ -600,6 +603,59 @@ func GetRegisteredConnectors() []RegisteredConnector { return airbyte.NewSink(dockerImage, c.String("config"), c.String("catalog")), nil }), }, + { + Name: "kafka-dst", + IsConnector: func(s string) bool { + return strings.EqualFold(s, "kafka-dst") + }, + Create: CreateHelper("kafka-dst", "kafka-dst", []cli.Flag{ + altsrc.NewStringSliceFlag(&cli.StringSliceFlag{ + Name: "brokers", + }), + altsrc.NewStringFlag(&cli.StringFlag{ + Name: "default-topic", + }), + altsrc.NewStringSliceFlag(&cli.StringSliceFlag{ + Name: "namespace-topic", + }), + altsrc.NewStringFlag(&cli.StringFlag{ + Name: "sasl-user", + }), + altsrc.NewStringFlag(&cli.StringFlag{ + Name: "sasl-password", + }), + altsrc.NewStringFlag(&cli.StringFlag{ + Name: "data-type", + Value: adiomv1.DataType_DATA_TYPE_MONGO_BSON.String(), + }), + }, func(c *cli.Context, args []string, _ AdditionalSettings) (adiomv1connect.ConnectorServiceHandler, error) { + brokers := c.StringSlice("brokers") + defaultTopic := c.String("default-topic") + namespaceTopic := c.StringSlice("namespace-topic") + user := c.String("sasl-user") + password := c.String("sasl-password") + dataType := adiomv1.DataType(adiomv1.DataType_value[c.String("data-type")]) + + tm := map[string]string{} + for _, m := range namespaceTopic { + ns, topic, ok := strings.Cut(m, ":") + if !ok { + return nil, fmt.Errorf("invalid topic mapping %v", m) + } + tm[ns] = topic + } + return kafka.NewDestKafka(brokers, defaultTopic, tm, user, password, dataType) + }), + }, + { + Name: "kafka-src", + IsConnector: func(s string) bool { + return strings.EqualFold(s, "kafka-src") + }, + Create: CreateHelper("kafka-src", "kafka-src", KafkaSrcFlags(""), func(c *cli.Context, args []string, _ AdditionalSettings) (adiomv1connect.ConnectorServiceHandler, error) { + return ParseKafkaSrcFlags("", c) + }), + }, { Name: "grpc", IsConnector: func(s string) bool { @@ -616,6 +672,74 @@ func GetRegisteredConnectors() []RegisteredConnector { } } +func ParseKafkaSrcFlags(prefix string, c *cli.Context) (adiomv1connect.ConnectorServiceHandler, error) { + prefixDash := prefix + if prefix != "" { + prefixDash = prefix + "-" + } + + brokers := c.StringSlice(prefixDash + "brokers") + topics := c.StringSlice(prefixDash + "topics") + topicMappings := c.StringSlice(prefixDash + "topic-mappings") + user := c.String(prefixDash + "sasl-user") + password := c.String(prefixDash + "sasl-password") + kafkaOffset := c.Int64(prefixDash + "offset") + dataType := adiomv1.DataType(adiomv1.DataType_value[c.String(prefixDash+"data-type")]) + + tm := map[string][]string{} + for _, topic := range topics { + tm[topic] = nil + } + for _, m := range topicMappings { + topic, ns, ok := strings.Cut(m, ":") + if !ok { + return nil, fmt.Errorf("invalid topic mapping %v", m) + } + tm[topic] = append(tm[topic], ns) + } + return kafka.NewKafkaConn(brokers, tm, kafka.DsyncMessageToUpdate, kafka.DsyncMessageToNamespace, user, password, kafkaOffset, dataType), nil +} + +func KafkaSrcFlags(prefix string) []cli.Flag { + prefixDash := prefix + if prefix != "" { + prefixDash = prefix + "-" + } + return []cli.Flag{ + altsrc.NewStringSliceFlag(&cli.StringSliceFlag{ + Name: prefixDash + "brokers", + Category: prefix, + }), + altsrc.NewStringSliceFlag(&cli.StringSliceFlag{ + Name: prefixDash + "topics", + Category: prefix, + }), + altsrc.NewStringSliceFlag(&cli.StringSliceFlag{ + Name: prefixDash + "topic-mappings", + Category: prefix, + }), + altsrc.NewStringFlag(&cli.StringFlag{ + Name: prefixDash + "sasl-user", + Category: prefix, + }), + altsrc.NewStringFlag(&cli.StringFlag{ + Name: prefixDash + "sasl-password", + Category: prefix, + }), + altsrc.NewInt64Flag(&cli.Int64Flag{ + Name: prefixDash + "offset", + Usage: "Custom offset for kafka (a time, or -2 for oldest)", + Value: sarama.OffsetNewest, + Category: prefix, + }), + altsrc.NewStringFlag(&cli.StringFlag{ + Name: prefixDash + "data-type", + Value: adiomv1.DataType_DATA_TYPE_MONGO_BSON.String(), + Category: prefix, + }), + } +} + func WeaviateFlags() []cli.Flag { return []cli.Flag{ altsrc.NewStringFlag(&cli.StringFlag{