@@ -4,35 +4,46 @@ import (
44 "context"
55 "encoding/json"
66 "fmt"
7- "log"
87 "os"
98
109 "github.com/aws/aws-sdk-go-v2/config"
1110 "github.com/aws/aws-sdk-go-v2/service/dynamodb"
1211 "github.com/aws/aws-sdk-go-v2/service/sqs"
1312 "github.com/umlcloudcomputing/immersion/dataparser/db"
1413 "github.com/umlcloudcomputing/immersion/dataparser/models"
14+ "github.com/umlcloudcomputing/immersion/dataparser/util"
1515)
1616
17- func parseMessage (dbClient * dynamodb.Client , message string ) {
17+ func filterMessage (dbClient * dynamodb.Client , message string ) {
1818 var parsedMessage models.Message [any ]
1919 err := json .Unmarshal ([]byte (message ), & parsedMessage )
20- if err != nil {
21- log .Fatal (err )
22- }
20+ util .CheckError (err )
2321
2422 fmt .Printf ("Parsed a message with the %s header\n " , parsedMessage .Header )
2523
24+ ctx := context .TODO ()
2625 switch parsedMessage .Header {
26+ case "club_information" :
27+ var informationData models.Message [models.ClubInformation ]
28+ err = json .Unmarshal ([]byte (message ), & informationData .Body )
29+ util .CheckError (err )
30+
31+ db .InsertItem (ctx , dbClient , db .CacheTable , informationData )
2732 case "onboarding" :
2833 var onboardingData models.Message [[]models.Onboarding ]
2934 err = json .Unmarshal ([]byte (message ), & onboardingData )
30- if err != nil {
31- log .Fatal (err )
35+ util .CheckError (err )
36+
37+ for _ , i := range onboardingData .Body {
38+ db .InsertItem (ctx , dbClient , db .OnboardingTable , i )
3239 }
40+ case "event" :
41+ var eventData models.Message [[]models.Event ]
42+ err = json .Unmarshal ([]byte (message ), & eventData )
43+ util .CheckError (err )
3344
34- for _ , v := range onboardingData .Body {
35- db .InsertOnboarding ( context . TODO () , dbClient , v )
45+ for _ , i := range eventData .Body {
46+ db .InsertItem ( ctx , dbClient , db . EventTable , i )
3647 }
3748 }
3849}
@@ -41,9 +52,7 @@ func main() {
4152 queueUrl := os .Getenv ("QUEUE_URL" )
4253
4354 cfg , err := config .LoadDefaultConfig (context .TODO ())
44- if err != nil {
45- log .Fatal (err )
46- }
55+ util .CheckError (err )
4756
4857 sqsClient := sqs .NewFromConfig (cfg )
4958 dbClient := dynamodb .NewFromConfig (cfg )
@@ -53,32 +62,26 @@ func main() {
5362 MaxNumberOfMessages : 5 , // play around with these
5463 WaitTimeSeconds : 5 ,
5564 })
56- if err != nil {
57- log .Fatal (err )
58- }
65+ util .CheckError (err )
5966
6067 // print and delete all messages in the queue
6168 for len (response .Messages ) > 0 {
6269 for _ , v := range response .Messages {
6370
64- parseMessage (dbClient , * v .Body )
71+ filterMessage (dbClient , * v .Body )
6572 _ , err = sqsClient .DeleteMessage (context .TODO (), & sqs.DeleteMessageInput {
6673 QueueUrl : & queueUrl ,
6774 ReceiptHandle : v .ReceiptHandle ,
6875 })
69- if err != nil {
70- log .Fatal (err )
71- }
76+ util .CheckError (err )
7277 }
7378
7479 response , err = sqsClient .ReceiveMessage (context .TODO (), & sqs.ReceiveMessageInput {
7580 QueueUrl : & queueUrl ,
7681 MaxNumberOfMessages : 5 ,
7782 WaitTimeSeconds : 5 ,
7883 })
79- if err != nil {
80- log .Fatal (err )
81- }
84+ util .CheckError (err )
8285 }
8386
8487 fmt .Println ("Parsing complete, waiting to be killed." )
0 commit comments