@@ -108,10 +108,13 @@ func (m *Manager) NewTopicCreator(cfg TopicCreatorConfig) (*TopicCreator, error)
108108//
109109// Topics that already exist will be updated.
110110func (c * TopicCreator ) CreateTopics (ctx context.Context , topics ... apmqueue.Topic ) error {
111- // TODO(axw) how should we record topics?
112- ctx , span := c .m .tracer .Start (ctx , "CreateTopics" , trace .WithAttributes (
113- semconv .MessagingSystemKey .String ("kafka" ),
114- ))
111+ ctx , span := c .m .tracer .Start (
112+ ctx ,
113+ "CreateTopics" ,
114+ trace .WithAttributes (
115+ semconv .MessagingSystemKey .String ("kafka" ),
116+ ),
117+ )
115118 defer span .End ()
116119
117120 namespacePrefix := c .m .cfg .namespacePrefix ()
@@ -127,12 +130,32 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi
127130 return fmt .Errorf ("failed to list kafka topics: %w" , err )
128131 }
129132
130- // missingTopics contains topics which need to be created.
131- missingTopics := make ([]string , 0 , len (topicNames ))
132- // updatePartitions contains topics which partitions' need to be updated.
133- updatePartitions := make ([]string , 0 , len (topicNames ))
134- // existingTopics contains the existing topics, used by AlterTopicConfigs.
135- existingTopics := make ([]string , 0 , len (topicNames ))
133+ missingTopics , updatePartitions , existingTopics := c .categorizeTopics (existing , topicNames )
134+
135+ var updateErrors []error
136+ if err := c .createMissingTopics (ctx , span , missingTopics ); err != nil {
137+ updateErrors = append (updateErrors , err )
138+ }
139+
140+ if err := c .updateTopicPartitions (ctx , span , updatePartitions ); err != nil {
141+ updateErrors = append (updateErrors , err )
142+ }
143+
144+ if err := c .alterTopicConfigs (ctx , span , existingTopics ); err != nil {
145+ updateErrors = append (updateErrors , err )
146+ }
147+
148+ return errors .Join (updateErrors ... )
149+ }
150+
151+ func (c * TopicCreator ) categorizeTopics (
152+ existing kadm.TopicDetails ,
153+ topicNames []string ,
154+ ) (missingTopics , updatePartitions , existingTopics []string ) {
155+ missingTopics = make ([]string , 0 , len (topicNames ))
156+ updatePartitions = make ([]string , 0 , len (topicNames ))
157+ existingTopics = make ([]string , 0 , len (topicNames ))
158+
136159 for _ , wantTopic := range topicNames {
137160 if ! existing .Has (wantTopic ) {
138161 missingTopics = append (missingTopics , wantTopic )
@@ -144,6 +167,18 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi
144167 }
145168 }
146169
170+ return missingTopics , updatePartitions , existingTopics
171+ }
172+
173+ func (c * TopicCreator ) createMissingTopics (
174+ ctx context.Context ,
175+ span trace.Span ,
176+ missingTopics []string ,
177+ ) error {
178+ if len (missingTopics ) == 0 {
179+ return nil
180+ }
181+
147182 responses , err := c .m .adminClient .CreateTopics (ctx ,
148183 int32 (c .partitionCount ),
149184 - 1 , // default.replication.factor
@@ -155,6 +190,9 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi
155190 span .SetStatus (codes .Error , err .Error ())
156191 return fmt .Errorf ("failed to create kafka topics: %w" , err )
157192 }
193+
194+ namespacePrefix := c .m .cfg .namespacePrefix ()
195+
158196 loggerFields := []zap.Field {
159197 zap .Int ("partition_count" , c .partitionCount ),
160198 }
@@ -167,14 +205,14 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi
167205 var updateErrors []error
168206 for _ , response := range responses .Sorted () {
169207 topicName := strings .TrimPrefix (response .Topic , namespacePrefix )
208+
170209 logger := c .m .cfg .Logger .With (loggerFields ... )
171210 if c .m .cfg .TopicLogFieldsFunc != nil {
172211 logger = logger .With (c .m .cfg .TopicLogFieldsFunc (topicName )... )
173212 }
213+
174214 if err := response .Err ; err != nil {
175215 if errors .Is (err , kerr .TopicAlreadyExists ) {
176- // NOTE(axw) apmotel currently does nothing with span events,
177- // hence we log as well as create a span event.
178216 logger .Debug ("kafka topic already exists" ,
179217 zap .String ("topic" , topicName ),
180218 )
@@ -197,92 +235,165 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi
197235 }
198236 continue
199237 }
238+
200239 c .created .Add (context .Background (), 1 , metric .WithAttributeSet (
201240 attribute .NewSet (
202241 semconv .MessagingSystemKey .String ("kafka" ),
203242 attribute .String ("outcome" , "success" ),
204243 attribute .String ("topic" , topicName ),
205244 ),
206245 ))
246+
207247 logger .Info ("created kafka topic" , zap .String ("topic" , topicName ))
208248 }
209249
210- // Update the topic partitions.
211- if len (updatePartitions ) > 0 {
212- updateResp , err := c .m .adminClient .UpdatePartitions (ctx ,
213- c .partitionCount ,
214- updatePartitions ... ,
250+ if len (updateErrors ) > 0 {
251+ return errors .Join (updateErrors ... )
252+ }
253+
254+ return nil
255+ }
256+
257+ func (c * TopicCreator ) updateTopicPartitions (
258+ ctx context.Context ,
259+ span trace.Span ,
260+ updatePartitions []string ,
261+ ) error {
262+ if len (updatePartitions ) == 0 {
263+ return nil
264+ }
265+
266+ updateResp , err := c .m .adminClient .UpdatePartitions (
267+ ctx ,
268+ c .partitionCount ,
269+ updatePartitions ... ,
270+ )
271+ if err != nil {
272+ span .RecordError (err )
273+ span .SetStatus (codes .Error , err .Error ())
274+ return fmt .Errorf (
275+ "failed to update partitions for kafka topics: %v: %w" ,
276+ updatePartitions , err ,
277+ )
278+ }
279+
280+ namespacePrefix := c .m .cfg .namespacePrefix ()
281+
282+ loggerFields := []zap.Field {
283+ zap .Int ("partition_count" , c .partitionCount ),
284+ }
285+ if len (c .origTopicConfigs ) > 0 {
286+ loggerFields = append (loggerFields ,
287+ zap .Reflect ("topic_configs" , c .origTopicConfigs ),
215288 )
216- if err != nil {
289+ }
290+
291+ var updateErrors []error
292+ for _ , response := range updateResp .Sorted () {
293+ topicName := strings .TrimPrefix (response .Topic , namespacePrefix )
294+
295+ logger := c .m .cfg .Logger .With (loggerFields ... )
296+ if c .m .cfg .TopicLogFieldsFunc != nil {
297+ logger = logger .With (c .m .cfg .TopicLogFieldsFunc (topicName )... )
298+ }
299+
300+ if errors .Is (response .Err , kerr .InvalidRequest ) {
301+ // If UpdatePartitions partition count isn't greater than the
302+ // current number of partitions, each individual response
303+ // returns `INVALID_REQUEST`.
304+ continue
305+ }
306+
307+ if err := response .Err ; err != nil {
217308 span .RecordError (err )
218309 span .SetStatus (codes .Error , err .Error ())
219- return fmt .Errorf ("failed to update partitions for kafka topics: %v: %w" ,
220- updatePartitions , err ,
221- )
310+ updateErrors = append (updateErrors , fmt .Errorf (
311+ "failed to update partitions for topic %q: %w" ,
312+ topicName , err ,
313+ ))
314+ continue
222315 }
223- for _ , response := range updateResp .Sorted () {
224- topicName := strings .TrimPrefix (response .Topic , namespacePrefix )
225- logger := c .m .cfg .Logger .With (loggerFields ... )
226- if c .m .cfg .TopicLogFieldsFunc != nil {
227- logger = logger .With (c .m .cfg .TopicLogFieldsFunc (topicName )... )
228- }
229316
230- if errors .Is (response .Err , kerr .InvalidRequest ) {
231- // If UpdatePartitions partition count isn't greater than the
232- // current number of partitions, each individual response
233- // returns `INVALID_REQUEST`.
234- continue
235- }
236- if err := response .Err ; err != nil {
237- span .RecordError (err )
238- span .SetStatus (codes .Error , err .Error ())
239- updateErrors = append (updateErrors , fmt .Errorf (
240- "failed to update partitions for topic %q: %w" ,
241- topicName , err ,
242- ))
243- continue
244- }
245- logger .Info ("updated partitions for kafka topic" ,
246- zap .String ("topic" , topicName ),
247- )
248- }
317+ logger .Info (
318+ "updated partitions for kafka topic" ,
319+ zap .String ("topic" , topicName ),
320+ )
249321 }
250- if len (existingTopics ) > 0 && len (c .topicConfigs ) > 0 {
251- alterCfg := make ([]kadm.AlterConfig , 0 , len (c .topicConfigs ))
252- for k , v := range c .topicConfigs {
253- alterCfg = append (alterCfg , kadm.AlterConfig {Name : k , Value : v })
254- }
255- alterResp , err := c .m .adminClient .AlterTopicConfigs (ctx ,
256- alterCfg , existingTopics ... ,
322+
323+ if len (updateErrors ) > 0 {
324+ return errors .Join (updateErrors ... )
325+ }
326+
327+ return nil
328+ }
329+
330+ func (c * TopicCreator ) alterTopicConfigs (
331+ ctx context.Context ,
332+ span trace.Span ,
333+ existingTopics []string ,
334+ ) error {
335+ if len (existingTopics ) == 0 || len (c .topicConfigs ) == 0 {
336+ return nil
337+ }
338+
339+ // Remove cleanup.policy if it exists,
340+ // since this field cannot be altered.
341+ delete (c .topicConfigs , "cleanup.policy" )
342+
343+ alterCfg := make ([]kadm.AlterConfig , 0 , len (c .topicConfigs ))
344+ for k , v := range c .topicConfigs {
345+ alterCfg = append (alterCfg , kadm.AlterConfig {Name : k , Value : v })
346+ }
347+
348+ alterResp , err := c .m .adminClient .AlterTopicConfigs (ctx , alterCfg , existingTopics ... )
349+ if err != nil {
350+ span .RecordError (err )
351+ span .SetStatus (codes .Error , err .Error ())
352+ return fmt .Errorf (
353+ "failed to update configuration for kafka topics: %v: %w" ,
354+ existingTopics , err ,
355+ )
356+ }
357+
358+ namespacePrefix := c .m .cfg .namespacePrefix ()
359+
360+ loggerFields := []zap.Field {
361+ zap .Int ("partition_count" , c .partitionCount ),
362+ }
363+ if len (c .origTopicConfigs ) > 0 {
364+ loggerFields = append (loggerFields ,
365+ zap .Reflect ("topic_configs" , c .origTopicConfigs ),
257366 )
258- if err != nil {
367+ }
368+
369+ var updateErrors []error
370+ for _ , response := range alterResp {
371+ topicName := strings .TrimPrefix (response .Name , namespacePrefix )
372+
373+ logger := c .m .cfg .Logger .With (loggerFields ... )
374+ if c .m .cfg .TopicLogFieldsFunc != nil {
375+ logger = logger .With (c .m .cfg .TopicLogFieldsFunc (topicName )... )
376+ }
377+
378+ if err := response .Err ; err != nil {
259379 span .RecordError (err )
260380 span .SetStatus (codes .Error , err .Error ())
261- return fmt .Errorf (
262- "failed to update configuration for kafka topics: %v:%w" ,
263- existingTopics , err ,
264- )
381+ updateErrors = append (updateErrors , fmt .Errorf (
382+ "failed to alter configuration for topic %q: %w" ,
383+ topicName , err ,
384+ ))
385+ continue
265386 }
266- for _ , response := range alterResp {
267- topicName := strings .TrimPrefix (response .Name , namespacePrefix )
268- logger := c .m .cfg .Logger .With (loggerFields ... )
269- if c .m .cfg .TopicLogFieldsFunc != nil {
270- logger = logger .With (c .m .cfg .TopicLogFieldsFunc (topicName )... )
271- }
272387
273- if err := response .Err ; err != nil {
274- span .RecordError (err )
275- span .SetStatus (codes .Error , err .Error ())
276- updateErrors = append (updateErrors , fmt .Errorf (
277- "failed to alter configuration for topic %q: %w" ,
278- topicName , err ,
279- ))
280- continue
281- }
282- logger .Info ("altered configuration for kafka topic" ,
283- zap .String ("topic" , topicName ),
284- )
285- }
388+ logger .Info (
389+ "altered configuration for kafka topic" ,
390+ zap .String ("topic" , topicName ),
391+ )
286392 }
287- return errors .Join (updateErrors ... )
393+
394+ if len (updateErrors ) > 0 {
395+ return errors .Join (updateErrors ... )
396+ }
397+
398+ return nil
288399}
0 commit comments