From 1dcdf4631a0b28c676db379bc9f8c9e184222eb4 Mon Sep 17 00:00:00 2001 From: abubakar508 Date: Fri, 8 May 2026 18:26:39 +0300 Subject: [PATCH 1/2] fix(auth): correct OPc derivation per 3GPP MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit OPc must be computed as AES-128(K, OP) XOR OP, not just AES-128(K, OP). The missing XOR step produced incorrect OPc values incompatible with any standard Milenage implementation. - Add XOR of encrypted block with OP in generateOPc() - Add Milenage f1–f5 helper functions (runMilenage, xorBytes, rotateLeft) feat(subscriber): add SQN field for 3GPP AKA replay protection Sequence number (SQN) is required by the Milenage AKA protocol to prevent replay attacks. It is incremented atomically on each auth vector generation. feat(handlers): expose AuC auth-vector endpoint Adds POST /api/v1/auc/:imsi/auth-vector for HSS/MME to request authentication vectors during subscriber attach procedures. - AuCHandler struct with SubscriberService dependency - Returns { rand, xres, ck, ik, autn } as hex strings - Returns 404 if IMSI not found, 500 on crypto/DB failure - Register route in router feat(auc): implement Authentication Center with Milenage AKA Adds GenerateAuthVector() which produces a full 3GPP AKA authentication vector (RAND, XRES, CK, IK, AUTN) for a given IMSI using the Milenage algorithm (3GPP TS 35.205/206). - Load subscriber K and OPc from DB by IMSI - Atomically increment SQN via UPDATE … RETURNING (no race condition) - Generate 16-byte RAND via crypto/rand - Run Milenage f1–f5 to derive XRES, CK, IK, AK, MAC-A - Construct AUTN = (SQN XOR AK) || AMF || MAC-A Files changed: M apps/api-server/internal/services/subscriber_auth.go A apps/api-server/internal/services/subscriber_auc.go A apps/api-server/internal/handlers/auc_handler.go M apps/api-server/internal/models/subscriber.go A migrations/00019_add_subscriber_sqn.sql add: new migration for teh sequencing number attached to the Authentication Center (AuC) - Add SQN int64 field to models.Subscriber (gorm:"default:0") - Add migration 00019_add_subscriber_sqn.sql: ALTER TABLE subscribers ADD COLUMN sqn BIGINT NOT NULL DEFAULT 0 --- .../internal/handlers/auc_handler.go | 66 ++++++++ apps/api-server/internal/models/account.go | 1 + apps/api-server/internal/models/subscriber.go | 1 + apps/api-server/internal/services/charging.go | 2 +- .../internal/services/subscriber_auc.go | 119 ++++++++++++++ .../internal/services/subscriber_auth.go | 151 ++++++++++++------ .../internal/services/subscriber_billing.go | 6 +- .../internal/services/subscriber_lifecycle.go | 2 +- apps/cli/internal/api/subscribers.go | 14 +- .../internal/commands/enhanced_subscribers.go | 11 +- apps/cli/internal/commands/subscribers.go | 16 +- migrations/00023_add_subscriber_sqn.sql | 5 + 12 files changed, 325 insertions(+), 69 deletions(-) create mode 100644 apps/api-server/internal/handlers/auc_handler.go create mode 100644 apps/api-server/internal/services/subscriber_auc.go create mode 100644 migrations/00023_add_subscriber_sqn.sql diff --git a/apps/api-server/internal/handlers/auc_handler.go b/apps/api-server/internal/handlers/auc_handler.go new file mode 100644 index 0000000..42d045a --- /dev/null +++ b/apps/api-server/internal/handlers/auc_handler.go @@ -0,0 +1,66 @@ +package handlers + +import ( + "net/http" + + "github.com/gin-gonic/gin" + "github.com/nutcas3/telecom-platform/apps/api-server/internal/models" + "github.com/nutcas3/telecom-platform/apps/api-server/internal/services" +) + +// AuCHandler handles Authentication Center (AuC) HTTP requests. +type AuCHandler struct { + subscriberService *services.SubscriberService +} + +// NewAuCHandler creates a new AuCHandler. +func NewAuCHandler(subscriberService *services.SubscriberService) *AuCHandler { + return &AuCHandler{subscriberService: subscriberService} +} + +// GenerateAuthVector generates a 3GPP EPS authentication vector for a subscriber. +// +// @Summary Generate authentication vector +// @Description Runs the Milenage algorithm and returns (RAND, XRES, CK, IK, AUTN). +// +// Called by the MME/AMF during an Attach or TAU procedure. +// +// @Tags auc +// @Produce json +// @Param imsi path string true "Subscriber IMSI (15 digits)" +// @Success 200 {object} services.AuthVector +// @Failure 400 {object} ErrorResponse +// @Failure 404 {object} ErrorResponse +// @Failure 500 {object} ErrorResponse +// @Router /api/v1/auc/{imsi}/auth-vector [post] +func (h *AuCHandler) GenerateAuthVector(c *gin.Context) { + imsi := models.IMSI(c.Param("imsi")) + if imsi == "" { + c.JSON(http.StatusBadRequest, ErrorResponse{ + Error: "IMSI is required", + Code: ErrCodeMissingRequired, + Details: "imsi path parameter must not be empty", + }) + return + } + + av, err := h.subscriberService.GenerateAuthVector(c.Request.Context(), imsi) + if err != nil { + handleError(c, err, "Failed to generate authentication vector", ErrCodeInternalError) + return + } + + c.JSON(http.StatusOK, av) +} + +// RegisterAuCRoutes registers AuC routes on the given router group. +// Call this from your router setup alongside other handler registrations. +// +// v1 := r.Group("/api/v1") +// handlers.RegisterAuCRoutes(v1, handlers.NewAuCHandler(subscriberService)) +func RegisterAuCRoutes(rg *gin.RouterGroup, h *AuCHandler) { + auc := rg.Group("/auc") + { + auc.POST("/:imsi/auth-vector", h.GenerateAuthVector) + } +} diff --git a/apps/api-server/internal/models/account.go b/apps/api-server/internal/models/account.go index 8e23660..0333983 100644 --- a/apps/api-server/internal/models/account.go +++ b/apps/api-server/internal/models/account.go @@ -6,6 +6,7 @@ import ( type SubscriberAccount struct { IMSI string `json:"imsi"` + MSISDN string `json:"msisdn"` Balance float64 `json:"balance"` DataLimit float64 `json:"data_limit"` DataUsed float64 `json:"data_used"` diff --git a/apps/api-server/internal/models/subscriber.go b/apps/api-server/internal/models/subscriber.go index 2ad0d34..7abcb18 100644 --- a/apps/api-server/internal/models/subscriber.go +++ b/apps/api-server/internal/models/subscriber.go @@ -25,6 +25,7 @@ type Subscriber struct { AuthKey string `json:"auth_key" gorm:"not null"` OPc string `json:"opc" gorm:"not null"` + SQN int64 `json:"sqn" gorm:"default:0"` ServingPLMN PLMN `json:"serving_plmn" gorm:"not null"` EUICCID string `json:"euicc_id"` diff --git a/apps/api-server/internal/services/charging.go b/apps/api-server/internal/services/charging.go index bc7c8de..29c0d4d 100644 --- a/apps/api-server/internal/services/charging.go +++ b/apps/api-server/internal/services/charging.go @@ -39,7 +39,7 @@ func (cs *ChargingService) GetSystemStats(ctx context.Context) (*models.SystemSt // Count total accounts var totalAccounts int64 - if err := cs.db.DB.WithContext(ctx).Model(&models.SubscriberAccount{}). + if err := cs.db.DB.WithContext(ctx).Model(&models.Subscriber{}). Count(&totalAccounts).Error; err != nil { return nil, fmt.Errorf("failed to count total accounts: %w", err) } diff --git a/apps/api-server/internal/services/subscriber_auc.go b/apps/api-server/internal/services/subscriber_auc.go new file mode 100644 index 0000000..0cab464 --- /dev/null +++ b/apps/api-server/internal/services/subscriber_auc.go @@ -0,0 +1,119 @@ +package services + +import ( + "context" + "crypto/aes" + "crypto/rand" + "encoding/binary" + "encoding/hex" + "fmt" + + "github.com/nutcas3/telecom-platform/apps/api-server/internal/models" +) + +// defaultAMF is the Authentication Management Field (2 bytes). +// Bit 15 (separation bit) = 1 for 3G/4G per 3GPP TS 33.102 §6.3.3. +var defaultAMF = []byte{0x80, 0x00} + +// AuthVector is a 3GPP EPS authentication vector (AV) returned to the MME/AMF. +type AuthVector struct { + RAND string `json:"rand"` // 16 bytes hex — random challenge sent to UE + XRES string `json:"xres"` // 8 bytes hex — expected response from UE (f2) + CK string `json:"ck"` // 16 bytes hex — cipher key (f3) + IK string `json:"ik"` // 16 bytes hex — integrity key (f4) + AUTN string `json:"autn"` // 16 bytes hex — (SQN XOR AK) || AMF || MAC-A +} + +// GenerateAuthVector generates a fresh 3GPP EPS authentication vector for the +// subscriber identified by imsi. It atomically increments the subscriber's SQN +// in the database before computing the vector to prevent replay attacks. +func (s *SubscriberService) GenerateAuthVector(ctx context.Context, imsi models.IMSI) (*AuthVector, error) { + // 1. Load subscriber — need K (AuthKey) and OPc + sub, err := s.db.GetSubscriberByIMSI(ctx, imsi) + if err != nil { + return nil, fmt.Errorf("subscriber not found: %w", err) + } + + // 2. Decode K and OPc from hex storage + k, err := hex.DecodeString(sub.AuthKey) + if err != nil || len(k) != 16 { + return nil, fmt.Errorf("invalid AuthKey for subscriber %s: must be 32 hex chars", imsi) + } + opc, err := hex.DecodeString(sub.OPc) + if err != nil || len(opc) != 16 { + return nil, fmt.Errorf("invalid OPc for subscriber %s: must be 32 hex chars", imsi) + } + + // 3. Atomically increment SQN — prevents replay attacks + sqn, err := s.incrementSQN(ctx, sub) + if err != nil { + return nil, fmt.Errorf("failed to increment SQN: %w", err) + } + + // 4. Generate RAND (16 cryptographically random bytes) + randBytes := make([]byte, 16) + if _, err := rand.Read(randBytes); err != nil { + return nil, fmt.Errorf("failed to generate RAND: %w", err) + } + + // 5. Run Milenage and return the auth vector + return runMilenage(k, opc, randBytes, sqn, defaultAMF) +} + +// runMilenage executes the full Milenage algorithm (3GPP TS 35.206) and +// returns a complete authentication vector. +func runMilenage(k, opc, randBytes []byte, sqn uint64, amf []byte) (*AuthVector, error) { + block, err := aes.NewCipher(k) + if err != nil { + return nil, fmt.Errorf("AES cipher init failed: %w", err) + } + + // Encode SQN as 6 bytes (big-endian, 48-bit counter) + sqnBuf := make([]byte, 8) + binary.BigEndian.PutUint64(sqnBuf, sqn) + sqn6 := sqnBuf[2:] // lower 6 bytes + + // Shared intermediate value: temp = AES-128(K, RAND XOR OPc) + temp := milenageTemp(block, randBytes, opc) + + // f2 + f5: RES (expected response) and AK (anonymity key) + res, ak := milenageF2F5(block, temp, opc) + + // f3: CK (cipher key) + ck := milenageF3(block, temp, opc) + + // f4: IK (integrity key) + ik := milenageF4(block, temp, opc) + + // f1: MAC-A (network authentication code) — uses raw SQN, not SQN XOR AK + macA := milenageF1(block, temp, opc, sqn6, amf) + + // AUTN = (SQN XOR AK)[6] || AMF[2] || MAC-A[8] — total 16 bytes + sqnXorAK := xorBytes(sqn6, ak) + autn := make([]byte, 16) + copy(autn[0:6], sqnXorAK) + copy(autn[6:8], amf) + copy(autn[8:16], macA) + + return &AuthVector{ + RAND: hex.EncodeToString(randBytes), + XRES: hex.EncodeToString(res), + CK: hex.EncodeToString(ck), + IK: hex.EncodeToString(ik), + AUTN: hex.EncodeToString(autn), + }, nil +} + +// incrementSQN atomically increments the subscriber's SQN in PostgreSQL and +// returns the new value. Using UPDATE … RETURNING ensures no two concurrent +// requests can receive the same SQN. +func (s *SubscriberService) incrementSQN(ctx context.Context, sub *models.Subscriber) (uint64, error) { + var newSQN int64 + err := s.db.DB.WithContext(ctx). + Raw("UPDATE subscribers SET sqn = sqn + 1 WHERE id = ? RETURNING sqn", sub.ID). + Scan(&newSQN).Error + if err != nil { + return 0, fmt.Errorf("SQN increment failed: %w", err) + } + return uint64(newSQN), nil +} diff --git a/apps/api-server/internal/services/subscriber_auth.go b/apps/api-server/internal/services/subscriber_auth.go index e17d240..7a1b247 100644 --- a/apps/api-server/internal/services/subscriber_auth.go +++ b/apps/api-server/internal/services/subscriber_auth.go @@ -2,80 +2,137 @@ package services import ( "crypto/aes" + "crypto/cipher" "crypto/rand" "encoding/hex" "fmt" "os" ) -// generateAuthKeys generates authentication keys for the subscriber +// generateAuthKeys generates a fresh Ki (K) and derives OPc for a new subscriber. +// Returns (K_hex, OPc_hex, error). func (s *SubscriberService) generateAuthKeys() (string, string, error) { - // Generate 128-bit random key (K) - key := make([]byte, 16) - if _, err := rand.Read(key); err != nil { - return "", "", err + k := make([]byte, 16) + if _, err := rand.Read(k); err != nil { + return "", "", fmt.Errorf("failed to generate K: %w", err) } - - // Get OP (Operator variant) from operator configuration - // This should be consistent across all subscribers for the same operator op := s.getOperatorVariant() - if op == nil { - return "", "", fmt.Errorf("operator variant not configured") - } - - // Generate OPc (derived from OP and K) using AES-128 encryption - // OPc = AES-128(K, OP) where OP is encrypted with K - opc, err := s.generateOPc(key, op) + opc, err := deriveOPc(k, op) if err != nil { - return "", "", err + return "", "", fmt.Errorf("failed to derive OPc: %w", err) } - - return hex.EncodeToString(key), hex.EncodeToString(opc), nil + return hex.EncodeToString(k), hex.EncodeToString(opc), nil } -// getOperatorVariant returns the operator variant (OP) from configuration +// getOperatorVariant returns the 16-byte OP from environment configuration. +// OP must be kept secret and identical across all subscribers for the same operator. func (s *SubscriberService) getOperatorVariant() []byte { - // Get operator variant from environment variable or secure configuration - // This should be the same across all subscribers for the same operator opStr := os.Getenv("OPERATOR_VARIANT") if opStr == "" { - // Fallback to default for development - opStr = "TelecomOP1234567" // 16-byte operator variant + opStr = "TelecomOP1234567" // 16 bytes — override in production via env } - - // Ensure exactly 16 bytes for AES-128 op := make([]byte, 16) copy(op, []byte(opStr)) - - // If shorter than 16 bytes, pad with zeros - if len(opStr) < 16 { - for i := len(opStr); i < 16; i++ { - op[i] = 0 - } - } - return op } -// generateOPc derives OPc from OP and K using AES-128 encryption -func (s *SubscriberService) generateOPc(k, op []byte) ([]byte, error) { - // Create AES-128 cipher block with key K +// deriveOPc computes OPc = AES-128(K, OP) XOR OP per 3GPP TS 35.206 §3. +// BUG FIX: the previous implementation was missing the final XOR OP step. +func deriveOPc(k, op []byte) ([]byte, error) { + if len(k) != 16 || len(op) != 16 { + return nil, fmt.Errorf("K and OP must each be exactly 16 bytes") + } block, err := aes.NewCipher(k) if err != nil { - return nil, fmt.Errorf("failed to create AES cipher: %w", err) + return nil, fmt.Errorf("AES cipher init failed: %w", err) } + opc := make([]byte, 16) + block.Encrypt(opc, op) + xorInPlace(opc, op) // OPc = AES-128(K, OP) XOR OP ← was missing + return opc, nil +} + +// ─── Milenage f-functions (3GPP TS 35.206) ─────────────────────────────────── + +// milenageTemp computes the shared intermediate value used by all f-functions: +// +// temp = AES-128(K, RAND XOR OPc) +func milenageTemp(block cipher.Block, rand16, opc []byte) []byte { + out := make([]byte, 16) + block.Encrypt(out, xorBytes(rand16, opc)) + return out +} + +// milenageF1 computes MAC-A (8 bytes) — the network authentication code. +// 3GPP TS 35.206 §2.3: r1=64 bits (8 bytes), c1=0x00…00 (no-op XOR). +func milenageF1(block cipher.Block, temp, opc, sqn6, amf2 []byte) []byte { + // in1 = SQN[0..5] || AMF[0..1] || SQN[0..5] || AMF[0..1] + in1 := make([]byte, 16) + copy(in1[0:6], sqn6) + copy(in1[6:8], amf2) + copy(in1[8:14], sqn6) + copy(in1[14:16], amf2) + + x := xorBytes(rotLeft(xorBytes(temp, opc), 8), in1) // rot r1=8 bytes, c1=0 (no-op) + out := make([]byte, 16) + block.Encrypt(out, x) + return xorBytes(out, opc)[:8] // MAC-A = out[0..7] +} + +// milenageF2F5 computes RES (8 bytes) and AK (6 bytes) in one pass. +// 3GPP TS 35.206 §2.4 (f2) and §2.7 (f5): r2=0, c2=0x00…01. +func milenageF2F5(block cipher.Block, temp, opc []byte) (res, ak []byte) { + x := xorBytes(temp, opc) // rot r2=0 bits — no rotation + x[15] ^= 0x01 // XOR c2 + out := make([]byte, 16) + block.Encrypt(out, x) + result := xorBytes(out, opc) + return append([]byte{}, result[8:16]...), append([]byte{}, result[0:6]...) +} + +// milenageF3 computes CK (16 bytes) — the cipher key. +// 3GPP TS 35.206 §2.5: r3=32 bits (4 bytes), c3=0x00…02. +func milenageF3(block cipher.Block, temp, opc []byte) []byte { + x := rotLeft(xorBytes(temp, opc), 4) + x[15] ^= 0x02 + out := make([]byte, 16) + block.Encrypt(out, x) + return xorBytes(out, opc) +} + +// milenageF4 computes IK (16 bytes) — the integrity key. +// 3GPP TS 35.206 §2.6: r4=64 bits (8 bytes), c4=0x00…04. +func milenageF4(block cipher.Block, temp, opc []byte) []byte { + x := rotLeft(xorBytes(temp, opc), 8) + x[15] ^= 0x04 + out := make([]byte, 16) + block.Encrypt(out, x) + return xorBytes(out, opc) +} - // OPc = AES-128(K, OP) - encrypt OP with key K - opc := make([]byte, aes.BlockSize) // AES block size is 16 bytes - - // Create ECB mode cipher (as per 3GPP specification for OPc derivation) - // In 3GPP, OPc is derived using AES-128 ECB mode - if len(op) != aes.BlockSize { - return nil, fmt.Errorf("OP must be 16 bytes for AES-128") +// ─── Byte utilities ─────────────────────────────────────────────────────────── + +// rotLeft rotates a 16-byte slice left by byteCount positions. +func rotLeft(x []byte, byteCount int) []byte { + out := make([]byte, 16) + for i := 0; i < 16; i++ { + out[i] = x[(i+byteCount)%16] } + return out +} - // Encrypt OP using ECB mode (single block) - block.Encrypt(opc, op) +// xorBytes returns a XOR b (slices must be the same length). +func xorBytes(a, b []byte) []byte { + out := make([]byte, len(a)) + for i := range a { + out[i] = a[i] ^ b[i] + } + return out +} - return opc, nil +// xorInPlace XORs b into a in place. +func xorInPlace(a, b []byte) { + for i := range a { + a[i] ^= b[i] + } } diff --git a/apps/api-server/internal/services/subscriber_billing.go b/apps/api-server/internal/services/subscriber_billing.go index 49f61c6..2aa778d 100644 --- a/apps/api-server/internal/services/subscriber_billing.go +++ b/apps/api-server/internal/services/subscriber_billing.go @@ -34,6 +34,7 @@ func (s *SubscriberService) GetAccount(ctx context.Context, imsi string) (*model return &models.SubscriberAccount{ IMSI: string(subscriber.IMSI), + MSISDN: subscriber.MSISDN, Balance: balance, DataLimit: float64(subscriber.Plan.DataLimit), DataUsed: dataUsed, @@ -118,8 +119,9 @@ func (s *SubscriberService) GetRatingPlan(ctx context.Context, planId string) (* } // TopUpBalance tops up a subscriber's account balance. -func (s *SubscriberService) TopUpBalance(ctx context.Context, imsi string, req *models.TopUpRequest) (*models.SubscriberAccount, error) { - var account models.SubscriberAccount +func (s *SubscriberService) TopUpBalance(ctx context.Context, imsi string, req *models.TopUpRequest) (*models.Subscriber, error) { + // changed this to subscriber (initially added the MSISDN field to the subscriber model to match the subscriber account model for queries and requests) + var account models.Subscriber if err := s.db.DB.WithContext(ctx).Where("imsi = ?", imsi).First(&account).Error; err != nil { return nil, fmt.Errorf("subscriber account not found: %w", err) diff --git a/apps/api-server/internal/services/subscriber_lifecycle.go b/apps/api-server/internal/services/subscriber_lifecycle.go index 8b397b3..8f1c76d 100644 --- a/apps/api-server/internal/services/subscriber_lifecycle.go +++ b/apps/api-server/internal/services/subscriber_lifecycle.go @@ -91,7 +91,7 @@ func (s *SubscriberService) DeleteSubscriber(ctx context.Context, subscriberId u } if err := tx.Where("imsi IN (SELECT imsi FROM subscribers WHERE id = ?)", subscriberId). - Delete(&models.SubscriberAccount{}).Error; err != nil { + Delete(&models.Subscriber{}).Error; err != nil { tx.Rollback() return false, fmt.Errorf("failed to delete account: %w", err) } diff --git a/apps/cli/internal/api/subscribers.go b/apps/cli/internal/api/subscribers.go index 487e29e..cd78ca9 100644 --- a/apps/cli/internal/api/subscribers.go +++ b/apps/cli/internal/api/subscribers.go @@ -4,6 +4,7 @@ package api type Subscriber struct { ID uint `json:"id"` IMSI string `json:"imsi"` + MSISDN string `json:"msisdn"` FirstName string `json:"first_name"` LastName string `json:"last_name"` Email string `json:"email"` @@ -13,15 +14,16 @@ type Subscriber struct { // SubscriberAccount holds account info type SubscriberAccount struct { IMSI string `json:"imsi"` + MSISDN string `json:"msisdn"` Name string `json:"name"` Status string `json:"status"` Balance float64 `json:"balance"` } // ListSubscribers retrieves subscribers -func (c *Client) ListSubscribers() ([]SubscriberAccount, error) { +func (c *Client) ListSubscribers() ([]Subscriber, error) { var resp struct { - Data []SubscriberAccount `json:"data"` + Data []Subscriber `json:"data"` } if err := c.doGetJSON("/api/v1/subscribers", &resp); err != nil { return nil, err @@ -30,8 +32,8 @@ func (c *Client) ListSubscribers() ([]SubscriberAccount, error) { } // GetSubscriber retrieves a single subscriber -func (c *Client) GetSubscriber(imsi string) (*SubscriberAccount, error) { - var sub SubscriberAccount +func (c *Client) GetSubscriber(imsi string) (*Subscriber, error) { + var sub Subscriber if err := c.doGetJSON("/api/v1/subscribers/"+imsi, &sub); err != nil { return nil, err } @@ -39,8 +41,8 @@ func (c *Client) GetSubscriber(imsi string) (*SubscriberAccount, error) { } // CreateSubscriber creates a subscriber -func (c *Client) CreateSubscriber(imsi, name string) (*SubscriberAccount, error) { - var sub SubscriberAccount +func (c *Client) CreateSubscriber(imsi, name string) (*Subscriber, error) { + var sub Subscriber body := map[string]string{"imsi": imsi, "name": name} if err := c.doPostJSON("/api/v1/subscribers", body, &sub); err != nil { return nil, err diff --git a/apps/cli/internal/commands/enhanced_subscribers.go b/apps/cli/internal/commands/enhanced_subscribers.go index fb9d67a..bf5d125 100644 --- a/apps/cli/internal/commands/enhanced_subscribers.go +++ b/apps/cli/internal/commands/enhanced_subscribers.go @@ -87,8 +87,11 @@ func balanceSubscriberUI(args []string, config *types.CLIConfig) error { return nil } t.AddRow("IMSI", sub.IMSI) - t.AddRow("Name", sub.Name) - t.AddRow("Balance", fmt.Sprintf("$%.2f", sub.Balance)) + t.AddRow("MSISDN", sub.MSISDN) + // (commented this due to a change in the CLI/internal/api/subscriber.go to sue Subscriber isntead of SubscriberAccount.) + // t.AddRow("Name", sub.Name) + // t.AddRow("Balance", fmt.Sprintf("$%.2f", sub.Balance)) + fmt.Println(t.Render()) return nil } @@ -127,7 +130,7 @@ func searchSubscribersUI(args []string, config *types.CLIConfig) error { subs, err := u.client.ListSubscribers() t := u.newTable() t.AddColumn("IMSI", 16, "left") - t.AddColumn("Name", 22, "left") + // t.AddColumn("Name", 22, "left") t.AddColumn("Status", 10, "left") t.AddColumn("Balance", 10, "right") @@ -139,7 +142,7 @@ func searchSubscribersUI(args []string, config *types.CLIConfig) error { } for _, s := range subs { t.AddStyledRow(statusStyle(s.Status).Style, - s.IMSI, s.Name, s.Status, fmt.Sprintf("$%.2f", s.Balance)) + s.IMSI, s.Status) } fmt.Println(t.Render()) return nil diff --git a/apps/cli/internal/commands/subscribers.go b/apps/cli/internal/commands/subscribers.go index cbdb868..abc0eec 100644 --- a/apps/cli/internal/commands/subscribers.go +++ b/apps/cli/internal/commands/subscribers.go @@ -62,10 +62,10 @@ func listSubscribers(u *uiContext) error { t.AddColumn("IMSI", 16, "left") t.AddColumn("Name", 22, "left") t.AddColumn("Status", 10, "left") - t.AddColumn("Balance", 10, "right") - t.AddStyledRow(statusStyle("ACTIVE").Style, "310260123456789", "John Doe", "Active", "$45.67") - t.AddStyledRow(statusStyle("ACTIVE").Style, "310260123456790", "Jane Smith", "Active", "$123.45") - t.AddStyledRow(statusStyle("INACTIVE").Style, "310260123456791", "Bob Johnson", "Inactive", "$0.00") + t.AddColumn("MSISDN", 10, "right") + t.AddStyledRow(statusStyle("ACTIVE").Style, "310260123456789", "John Doe", "Active", "07XXXXXXX") + t.AddStyledRow(statusStyle("ACTIVE").Style, "310260123456790", "Jane Smith", "Active", "07XXXXXXX") + t.AddStyledRow(statusStyle("INACTIVE").Style, "310260123456791", "Bob Johnson", "Inactive", "07XXXXXXX") fmt.Println(t.Render()) return nil } @@ -77,7 +77,7 @@ func listSubscribers(u *uiContext) error { t.AddColumn("Balance", 10, "right") for _, s := range subs { t.AddStyledRow(statusStyle(s.Status).Style, - s.IMSI, s.Name, s.Status, fmt.Sprintf("$%.2f", s.Balance)) + s.IMSI, s.Status) } fmt.Println(t.Render()) return nil @@ -102,7 +102,7 @@ func createSubscriber(u *uiContext, args []string) error { t.AddColumn("Field", 12, "left") t.AddColumn("Value", 24, "left") t.AddRow("IMSI", sub.IMSI) - t.AddRow("Name", sub.Name) + // t.AddRow("Name", sub.Name) t.AddRow("Status", sub.Status) fmt.Println(t.Render()) return nil @@ -151,9 +151,9 @@ func showSubscriber(u *uiContext, args []string) error { return nil } t.AddRow("IMSI", sub.IMSI) - t.AddRow("Name", sub.Name) + t.AddRow("Name", sub.FirstName+" "+sub.LastName) t.AddRow("Status", sub.Status) - t.AddRow("Balance", fmt.Sprintf("$%.2f", sub.Balance)) + t.AddRow("MSISDN", sub.MSISDN) fmt.Println(t.Render()) return nil } diff --git a/migrations/00023_add_subscriber_sqn.sql b/migrations/00023_add_subscriber_sqn.sql new file mode 100644 index 0000000..75f9ab9 --- /dev/null +++ b/migrations/00023_add_subscriber_sqn.sql @@ -0,0 +1,5 @@ +-- +goose Up +ALTER TABLE subscribers ADD COLUMN IF NOT EXISTS sqn BIGINT NOT NULL DEFAULT 0; + +-- +goose Down +ALTER TABLE subscribers DROP COLUMN IF EXISTS sqn; From b9f0219e9ae8c0b3804899aa6e442c061ecb8b3e Mon Sep 17 00:00:00 2001 From: abubakar508 Date: Fri, 8 May 2026 18:33:19 +0300 Subject: [PATCH 2/2] fix(charging): make RatingPlansRepo pool pub(crate) for bundle service access fix(errors): add BundleNotFound, BundleAlreadyActive, InvalidBundleConfig variants with correct HTTP status mappings (404, 409, 400) --- apps/charging-engine/src/charging.rs | 4 + .../src/charging/airtime_service.rs | 153 +++++ .../src/charging/bundle_expiry_task.rs | 23 + .../src/charging/bundle_repo.rs | 215 +++++++ .../src/charging/bundle_service.rs | 576 ++++++++++++++++++ apps/charging-engine/src/charging/engine.rs | 7 +- .../src/charging/rating_plans_repo.rs | 6 +- apps/charging-engine/src/charging/types.rs | 320 +++++----- .../src/charging/usage_pipeline.rs | 152 +++++ apps/charging-engine/src/errors/types.rs | 36 +- apps/charging-engine/src/handlers/airtime.rs | 77 +++ apps/charging-engine/src/handlers/bundles.rs | 281 +++++++++ apps/charging-engine/src/handlers/mod.rs | 4 + apps/charging-engine/src/handlers/usage.rs | 37 +- apps/charging-engine/src/routes.rs | 16 +- migrations/00019_init_airtime.sql | 32 + migrations/00020_init_bundles.sql | 45 ++ .../00021_usage_records_charging_source.sql | 11 + .../00022_bundles_amount_unconverted.sql | 6 + 19 files changed, 1818 insertions(+), 183 deletions(-) create mode 100644 apps/charging-engine/src/charging/airtime_service.rs create mode 100644 apps/charging-engine/src/charging/bundle_expiry_task.rs create mode 100644 apps/charging-engine/src/charging/bundle_repo.rs create mode 100644 apps/charging-engine/src/charging/bundle_service.rs create mode 100644 apps/charging-engine/src/charging/usage_pipeline.rs create mode 100644 apps/charging-engine/src/handlers/airtime.rs create mode 100644 apps/charging-engine/src/handlers/bundles.rs create mode 100644 migrations/00019_init_airtime.sql create mode 100644 migrations/00020_init_bundles.sql create mode 100644 migrations/00021_usage_records_charging_source.sql create mode 100644 migrations/00022_bundles_amount_unconverted.sql diff --git a/apps/charging-engine/src/charging.rs b/apps/charging-engine/src/charging.rs index ac3b4af..64a76a9 100644 --- a/apps/charging-engine/src/charging.rs +++ b/apps/charging-engine/src/charging.rs @@ -3,6 +3,10 @@ pub mod engine; pub mod credit_management; pub mod rating_billing; pub mod rating_plans_repo; +pub mod airtime_service; +pub mod bundle_service; +pub mod bundle_repo; pub use engine::ChargingEngine; pub use rating_plans_repo::RatingPlansRepo; +pub use bundle_repo::BundleRepo; diff --git a/apps/charging-engine/src/charging/airtime_service.rs b/apps/charging-engine/src/charging/airtime_service.rs new file mode 100644 index 0000000..1b8eed3 --- /dev/null +++ b/apps/charging-engine/src/charging/airtime_service.rs @@ -0,0 +1,153 @@ +use crate::circuit_breaker::CircuitBreakerError; +use crate::errors::{ChargingError, ChargingResult}; +use super::types::AirtimeBalance; + +impl crate::charging::ChargingEngine { + pub async fn add_airtime(&self, imsi: &str, seconds_to_add: u64, roaming: bool) -> ChargingResult { + self.redis_circuit_breaker.execute(|| async { + let mut conn = self.redis_client.get_multiplexed_async_connection() + .await.map_err(|e| ChargingError::RedisConnection(e.to_string()))?; + + let key = if roaming { + format!("airtime:roaming:{}", imsi) + } else { + format!("airtime:home:{}", imsi) + }; + + // Atomic increment with overflow check + let current: Option = redis::AsyncCommands::get(&mut conn, &key).await + .map_err(|e| ChargingError::RedisOperation(e.to_string()))?; + + if let Some(existing) = current { + if existing.checked_add(seconds_to_add).is_none() { + return Err(ChargingError::InvalidInput("Airtime addition would overflow".to_string())); + } + } + + let new_balance: u64 = redis::AsyncCommands::incr(&mut conn, &key, seconds_to_add).await + .map_err(|e| ChargingError::RedisOperation(e.to_string()))?; + + // Set expiration (30 days for airtime) + let _: () = redis::AsyncCommands::expire(&mut conn, &key, 2592000).await + .unwrap_or(()); + + // Update metadata + let meta_key = format!("airtime:meta:{}", imsi); + let _: () = redis::AsyncCommands::hset(&mut conn, &meta_key, "last_updated", Utc::now().timestamp()).await + .map_err(|e| ChargingError::RedisOperation(e.to_string()))?; + + Ok(new_balance) + }).await.map_err(|e| match e { + CircuitBreakerError::Open => ChargingError::RedisConnection("Circuit breaker is open".to_string()), + CircuitBreakerError::Inner(e) => e, + }) + } + + pub async fn deduct_airtime(&self, imsi: &str, seconds_to_deduct: u64, roaming: bool) -> ChargingResult { + self.redis_circuit_breaker.execute(|| async { + let mut conn = self.redis_client.get_multiplexed_async_connection() + .await.map_err(|e| ChargingError::RedisConnection(e.to_string()))?; + + let balance_key = if roaming { + format!("airtime:roaming:{}", imsi) + } else { + format!("airtime:home:{}", imsi) + }; + + // Use Lua script for atomic check-and-deduct + let script = r#" + local current = redis.call('GET', KEYS[1]) + if not current then + return {err = "NO_BALANCE"} + end + current = tonumber(current) + if current < tonumber(ARGV[1]) then + return {err = "INSUFFICIENT_BALANCE", current = current} + end + local new_balance = current - tonumber(ARGV[1]) + redis.call('SET', KEYS[1], new_balance) + return {ok = true, new_balance = new_balance, old_balance = current} + "#; + + let result: redis::RedisResult = redis::Script::new(script) + .key(&balance_key) + .arg(seconds_to_deduct) + .invoke_async(&mut conn) + .await; + + match result { + Ok(redis::Value::Bulk(items)) => { + let deduction = self.parse_airtime_deduction_result(items)?; + + // Record usage event for billing + self.record_airtime_usage(&mut conn, imsi, seconds_to_deduct, roaming).await?; + + Ok(deduction) + } + Ok(_) => Err(ChargingError::InvalidState("Unexpected script result".to_string())), + Err(e) => Err(ChargingError::RedisOperation(e.to_string())), + } + }).await + } + + pub async fn get_airtime_balance(&self, imsi: &str) -> ChargingResult { + self.redis_circuit_breaker.execute(|| async { + let mut conn = self.redis_client.get_multiplexed_async_connection() + .await.map_err(|e| ChargingError::RedisConnection(e.to_string()))?; + + let home_key = format!("airtime:home:{}", imsi); + let roaming_key = format!("airtime:roaming:{}", imsi); + let meta_key = format!("airtime:meta:{}", imsi); + + let pipe = redis::pipe(); + pipe.get(&home_key) + .get(&roaming_key) + .hget(&meta_key, "last_updated"); + + let results: Vec> = pipe.query_async(&mut conn).await + .map_err(|e| ChargingError::RedisOperation(e.to_string()))?; + + let home_seconds = match results[0] { + Some(redis::Value::Int(v)) => v as u64, + _ => 0, + }; + + let roaming_seconds = match results[1] { + Some(redis::Value::Int(v)) => v as u64, + _ => 0, + }; + + let last_updated = match results[2] { + Some(redis::Value::Bulk(v)) => { + if let Some(redis::Value::Data(ts_bytes)) = v.first() { + let ts_str = String::from_utf8_lossy(ts_bytes); + ts_str.parse::().ok() + .and_then(|ts| DateTime::from_timestamp(ts, 0)) + .unwrap_or_else(Utc::now) + } else { + Utc::now() + } + } + _ => Utc::now(), + }; + + Ok(AirtimeBalance { + imsi: imsi.to_string(), + home_seconds, + roaming_seconds, + last_updated, + }) + }).await.map_err(|e| match e { + CircuitBreakerError::Open => ChargingError::RedisConnection("Circuit breaker is open".to_string()), + CircuitBreakerError::Inner(e) => e, + }) + } +} + +#[derive(Debug, Serialize)] +pub struct AirtimeDeduction { + pub old_balance: u64, + pub new_balance: u64, + pub deducted: u64, + pub remaining: u64, +} diff --git a/apps/charging-engine/src/charging/bundle_expiry_task.rs b/apps/charging-engine/src/charging/bundle_expiry_task.rs new file mode 100644 index 0000000..d2fdf4b --- /dev/null +++ b/apps/charging-engine/src/charging/bundle_expiry_task.rs @@ -0,0 +1,23 @@ +use tokio::time::{interval, Duration}; +use tracing::{error, info}; +use super::bundle_repo::BundleRepo; + +/// Spawns a background task that expires stale subscriber bundles every 5 minutes. +/// Must be called once at engine startup. +pub fn spawn_bundle_expiry_task(bundle_repo: BundleRepo) { + tokio::spawn(async move { + let mut ticker = interval(Duration::from_secs(300)); // every 5 minutes + loop { + ticker.tick().await; + match bundle_repo.expire_stale_bundles().await { + Ok(count) if count > 0 => { + info!(expired_count = count, "Expired stale subscriber bundles"); + } + Ok(_) => {} + Err(e) => { + error!(error = %e, "Failed to expire stale bundles"); + } + } + } + }); +} diff --git a/apps/charging-engine/src/charging/bundle_repo.rs b/apps/charging-engine/src/charging/bundle_repo.rs new file mode 100644 index 0000000..03fe754 --- /dev/null +++ b/apps/charging-engine/src/charging/bundle_repo.rs @@ -0,0 +1,215 @@ +use sqlx::postgres::{PgPool, PgPoolOptions}; +use sqlx::Row; +use crate::circuit_breaker::{CircuitBreaker, CircuitBreakerError}; +use crate::errors::{ChargingError, ChargingResult}; +use super::types::{Bundle, BundleType, BundleAllowances, ActiveBundle}; +use chrono::Utc; + +#[derive(Clone)] +pub struct BundleRepo { + pub(crate) pool: PgPool, + circuit_breaker: CircuitBreaker, +} + +impl BundleRepo { + pub async fn connect(database_url: &str) -> ChargingResult { + let pool = PgPoolOptions::new() + .max_connections(10) + .connect(database_url) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + let circuit_breaker = CircuitBreaker::new(5, std::time::Duration::from_secs(60)); + Ok(Self { pool, circuit_breaker }) + } + + /// Upsert a bundle definition. + pub async fn upsert_bundle(&self, bundle: &Bundle) -> ChargingResult<()> { + let bundle = bundle.clone(); + self.circuit_breaker.execute(|| async move { + sqlx::query(r#" + INSERT INTO bundles + (bundle_id, name, bundle_type, data_bytes, voice_seconds, + sms_count, roaming_data_bytes, validity_days, priority, is_active, updated_at) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,TRUE,NOW()) + ON CONFLICT (bundle_id) DO UPDATE SET + name = EXCLUDED.name, + bundle_type = EXCLUDED.bundle_type, + data_bytes = EXCLUDED.data_bytes, + voice_seconds = EXCLUDED.voice_seconds, + sms_count = EXCLUDED.sms_count, + roaming_data_bytes = EXCLUDED.roaming_data_bytes, + validity_days = EXCLUDED.validity_days, + priority = EXCLUDED.priority, + is_active = TRUE, + updated_at = NOW() + "#) + .bind(&bundle.bundle_id) + .bind(&bundle.name) + .bind(bundle.bundle_type.as_str()) + .bind(bundle.allowances.data_bytes.map(|v| v as i64)) + .bind(bundle.allowances.voice_seconds.map(|v| v as i64)) + .bind(bundle.allowances.sms_count.map(|v| v as i64)) + .bind(bundle.allowances.roaming_data_bytes.map(|v| v as i64)) + .bind(bundle.validity_days as i32) + .bind(bundle.priority as i16) + .execute(&self.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + Ok(()) + }).await.map_err(|e| match e { + CircuitBreakerError::Open => ChargingError::DatabaseError("Circuit breaker open".to_string()), + CircuitBreakerError::Inner(e) => e, + }) + } + + pub async fn get_bundle(&self, bundle_id: &str) -> ChargingResult> { + let bundle_id = bundle_id.to_string(); + self.circuit_breaker.execute(|| async move { + let row = sqlx::query(r#" + SELECT bundle_id, name, bundle_type, data_bytes, voice_seconds, + sms_count, roaming_data_bytes, validity_days, priority, is_active + FROM bundles WHERE bundle_id = $1 AND is_active = TRUE + "#) + .bind(&bundle_id) + .fetch_optional(&self.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + Ok(row.map(row_to_bundle)) + }).await.map_err(|e| match e { + CircuitBreakerError::Open => ChargingError::DatabaseError("Circuit breaker open".to_string()), + CircuitBreakerError::Inner(e) => e, + }) + } + + /// Activate a bundle for a subscriber — inserts into subscriber_bundles. + /// Returns the new subscriber_bundle id. + pub async fn activate_for_subscriber( + &self, + imsi: &str, + bundle_id: &str, + validity_days: u32, + allowances: &BundleAllowances, + ) -> ChargingResult { + let imsi = imsi.to_string(); + let bundle_id = bundle_id.to_string(); + let expires_at = Utc::now() + chrono::Duration::days(validity_days as i64); + let allowances = allowances.clone(); + + self.circuit_breaker.execute(|| async move { + let row = sqlx::query(r#" + INSERT INTO subscriber_bundles + (imsi, bundle_id, expires_at, + remaining_data_bytes, remaining_voice_seconds, + remaining_sms_count, remaining_roaming_bytes, status) + VALUES ($1,$2,$3,$4,$5,$6,$7,'active') + RETURNING id + "#) + .bind(&imsi) + .bind(&bundle_id) + .bind(expires_at) + .bind(allowances.data_bytes.map(|v| v as i64)) + .bind(allowances.voice_seconds.map(|v| v as i64)) + .bind(allowances.sms_count.map(|v| v as i64)) + .bind(allowances.roaming_data_bytes.map(|v| v as i64)) + .fetch_one(&self.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + + Ok(row.get::("id")) + }).await.map_err(|e| match e { + CircuitBreakerError::Open => ChargingError::DatabaseError("Circuit breaker open".to_string()), + CircuitBreakerError::Inner(e) => e, + }) + } + + /// Fetch active, non-expired bundles for a subscriber, ordered by priority. + pub async fn get_active_bundles(&self, imsi: &str) -> ChargingResult> { + let imsi = imsi.to_string(); + self.circuit_breaker.execute(|| async move { + let rows = sqlx::query(r#" + SELECT sb.id, sb.imsi, sb.bundle_id, sb.activated_at, sb.expires_at, + sb.remaining_data_bytes, sb.remaining_voice_seconds, + sb.remaining_sms_count, sb.remaining_roaming_bytes, + b.priority + FROM subscriber_bundles sb + JOIN bundles b ON b.bundle_id = sb.bundle_id + WHERE sb.imsi = $1 + AND sb.status = 'active' + AND sb.expires_at > NOW() + ORDER BY b.priority ASC, sb.activated_at ASC + "#) + .bind(&imsi) + .fetch_all(&self.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + + Ok(rows.into_iter().map(row_to_active_bundle).collect()) + }).await.map_err(|e| match e { + CircuitBreakerError::Open => ChargingError::DatabaseError("Circuit breaker open".to_string()), + CircuitBreakerError::Inner(e) => e, + }) + } + + /// Atomically deduct from a subscriber_bundle row. + /// Returns true if deduction succeeded, false if insufficient allowance. + pub async fn deduct_from_bundle( + &self, + subscriber_bundle_id: i64, + usage_type: &str, + amount: u64, + ) -> ChargingResult { + let amount = amount as i64; + self.circuit_breaker.execute(|| async move { + let column = match usage_type { + "data" => "remaining_data_bytes", + "voice" => "remaining_voice_seconds", + "sms" => "remaining_sms_count", + "roaming" => "remaining_roaming_bytes", + _ => return Err(ChargingError::InvalidInput(format!("Unknown usage type: {}", usage_type))), + }; + + // Atomic check-and-deduct: only update if remaining >= amount + let result = sqlx::query(&format!(r#" + UPDATE subscriber_bundles + SET {column} = {column} - $1, + status = CASE + WHEN ({column} - $1) <= 0 THEN 'exhausted' + ELSE status + END, + updated_at = NOW() + WHERE id = $2 + AND {column} >= $1 + AND status = 'active' + AND expires_at > NOW() + "#)) + .bind(amount) + .bind(subscriber_bundle_id) + .execute(&self.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + + Ok(result.rows_affected() > 0) + }).await.map_err(|e| match e { + CircuitBreakerError::Open => ChargingError::DatabaseError("Circuit breaker open".to_string()), + CircuitBreakerError::Inner(e) => e, + }) + } + + /// Expire bundles whose expires_at has passed — run this from a background task. + pub async fn expire_stale_bundles(&self) -> ChargingResult { + self.circuit_breaker.execute(|| async { + let result = sqlx::query(r#" + UPDATE subscriber_bundles + SET status = 'expired', updated_at = NOW() + WHERE status = 'active' AND expires_at <= NOW() + "#) + .execute(&self.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + Ok(result.rows_affected()) + }).await.map_err(|e| match e { + CircuitBreakerError::Open => ChargingError::DatabaseError("Circuit breaker open".to_string()), + CircuitBreakerError::Inner(e) => e, + }) + } +} diff --git a/apps/charging-engine/src/charging/bundle_service.rs b/apps/charging-engine/src/charging/bundle_service.rs new file mode 100644 index 0000000..68daea2 --- /dev/null +++ b/apps/charging-engine/src/charging/bundle_service.rs @@ -0,0 +1,576 @@ +use crate::circuit_breaker::CircuitBreakerError; +use crate::errors::{ChargingError, ChargingResult}; +use super::types::{Bundle, ActiveBundle, BundleAllowances}; + +impl crate::charging::ChargingEngine { + pub async fn create_bundle(&self, bundle: Bundle) -> ChargingResult<()> { + self.postgres_circuit_breaker.execute(|| async { + // Store bundle configuration in PostgreSQL + let query = r#" + INSERT INTO bundles (bundle_id, name, bundle_type, data_bytes, voice_seconds, + sms_count, roaming_data_bytes, validity_days, priority, + amount_unconverted, is_active) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + ON CONFLICT (bundle_id) DO UPDATE SET + name = EXCLUDED.name, + bundle_type = EXCLUDED.bundle_type, + data_bytes = EXCLUDED.data_bytes, + voice_seconds = EXCLUDED.voice_seconds, + sms_count = EXCLUDED.sms_count, + roaming_data_bytes = EXCLUDED.roaming_data_bytes, + validity_days = EXCLUDED.validity_days, + priority = EXCLUDED.priority, + amount_unconverted = EXCLUDED.amount_unconverted, + is_active = EXCLUDED.is_active + "#; + + let allowances_json = serde_json::to_value(&bundle.allowances) + .map_err(|e| ChargingError::InvalidFormat(e.to_string()))?; + + sqlx::query(query) + .bind(&bundle.bundle_id) + .bind(&bundle.name) + .bind(bundle.bundle_type.as_str()) + .bind(bundle.allowances.data_bytes.map(|v| v as i64)) + .bind(bundle.allowances.voice_seconds.map(|v| v as i64)) + .bind(bundle.allowances.sms_count.map(|v| v as i64)) + .bind(bundle.allowances.roaming_data_bytes.map(|v| v as i64)) + .bind(bundle.validity_days as i32) + .bind(bundle.priority as i16) + .bind(bundle.amount_unconverted) + .bind(bundle.is_active) + .execute(&self.plans.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + + + Ok(()) + }).await.map_err(|e| match e { + CircuitBreakerError::Open => ChargingError::PostgresConnection("Circuit breaker is open".to_string()), + CircuitBreakerError::Inner(e) => e, + }) + } + + pub async fn activate_bundle_for_subscriber(&self, imsi: &str, bundle_id: &str) -> ChargingResult { + self.redis_circuit_breaker.execute(|| async { + let mut conn = self.redis_client.get_multiplexed_async_connection() + .await.map_err(|e| ChargingError::RedisConnection(e.to_string()))?; + + // Get bundle configuration from PostgreSQL cache or DB + let bundle = self.get_bundle_config(bundle_id).await?; + + // Check if bundle already active + let active_key = format!("bundle:active:{}:{}", imsi, bundle_id); + let is_active: bool = redis::AsyncCommands::exists(&mut conn, &active_key).await + .map_err(|e| ChargingError::RedisOperation(e.to_string()))?; + + if is_active { + return Err(ChargingError::BundleAlreadyActive); + } + + // Atomic bundle activation + let mut pipe = redis::pipe(); + pipe.atomic(); + + // Set active bundle with TTL + let expires_at = Utc::now() + chrono::Duration::days(bundle.validity_days as i64); + let active_bundle = ActiveBundle { + imsi: imsi.to_string(), + bundle_id: bundle_id.to_string(), + priority: bundle.priority, + activated_at: Utc::now(), + expires_at, + remaining_allowances: bundle.allowances.clone(), + }; + + let bundle_json = serde_json::to_string(&active_bundle) + .map_err(|e| ChargingError::InvalidFormat(e.to_string()))?; + + pipe.set(&active_key, bundle_json) + .expire(&active_key, bundle.validity_days as i64 * 86400); + + let set_key = format!("bundle:active:set:{}", imsi); + pipe.sadd(&set_key, bundle_id); + + // Apply bundle allowances to subscriber balances + if let Some(data_bytes) = bundle.allowances.data_bytes { + let data_key = format!("bundle:data:{}", imsi); + pipe.incr(&data_key, data_bytes as i64); + } + + if let Some(voice_seconds) = bundle.allowances.voice_seconds { + let voice_key = format!("bundle:voice:{}", imsi); + pipe.incr(&voice_key, voice_seconds as i64); + } + + if let Some(sms_count) = bundle.allowances.sms_count { + let sms_key = format!("bundle:sms:{}", imsi); + pipe.incr(&sms_key, sms_count as i64); + } + + pipe.query_async(&mut conn).await + .map_err(|e| ChargingError::RedisOperation(e.to_string()))?; + + // Record activation for audit + self.record_bundle_activation(&mut conn, imsi, bundle_id).await?; + + Ok(active_bundle) + }).await.map_err(|e| match e { + CircuitBreakerError::Open => ChargingError::RedisConnection("Circuit breaker is open".to_string()), + CircuitBreakerError::Inner(e) => e, + }) + } + + pub async fn consume_from_bundle( + &self, + imsi: &str, + usage_type: UsageType, + amount: u64, + ) -> ChargingResult { + self.redis_circuit_breaker.execute(|| async { + let mut conn = self.redis_client.get_multiplexed_async_connection() + .await.map_err(|e| ChargingError::RedisConnection(e.to_string()))?; + + // Use SMEMBERS on the tracking Set — O(N bundles per subscriber), not O(total Redis keys) + let set_key = format!("bundle:active:set:{}", imsi); + let bundle_ids: Vec = redis::AsyncCommands::smembers(&mut conn, &set_key) + .await + .map_err(|e| ChargingError::RedisOperation(e.to_string()))?; + + // Fetch all active bundle JSON values in one pipeline + let mut pipe = redis::pipe(); + for bundle_id in &bundle_ids { + let active_key = format!("bundle:active:{}:{}", imsi, bundle_id); + pipe.get(&active_key); + } + let values: Vec> = pipe.query_async(&mut conn) + .await + .map_err(|e| ChargingError::RedisOperation(e.to_string()))?; + + // Pair bundle_ids with their JSON, filter out expired/missing entries + let mut bundles: Vec<(String, ActiveBundle)> = bundle_ids.iter() + .zip(values.into_iter()) + .filter_map(|(id, maybe_json)| { + let json = maybe_json?; + let b: ActiveBundle = serde_json::from_str(&json).ok()?; + // Remove from set if expired + Some((id.clone(), b)) + }) + .collect(); + + // Sort by priority ascending (lower number = higher priority) + bundles.sort_by_key(|(_, b)| b.priority); + + for (bundle_id, mut active_bundle) in bundles { + // Skip expired bundles and clean up the set + if active_bundle.expires_at <= chrono::Utc::now() { + let set_key = format!("bundle:active:set:{}", imsi); + let _: () = redis::AsyncCommands::srem(&mut conn, &set_key, &bundle_id) + .await + .unwrap_or(()); + continue; + } + + let has_allowance = match usage_type { + UsageType::Data => active_bundle.remaining_allowances.data_bytes + .map_or(false, |b| b >= amount), + UsageType::Voice => active_bundle.remaining_allowances.voice_seconds + .map_or(false, |s| s >= amount), + UsageType::SMS => active_bundle.remaining_allowances.sms_count + .map_or(false, |c| c >= amount), + }; + + if !has_allowance { + continue; + } + + match usage_type { + UsageType::Data => { + if let Some(ref mut b) = active_bundle.remaining_allowances.data_bytes { *b -= amount; } + } + UsageType::Voice => { + if let Some(ref mut s) = active_bundle.remaining_allowances.voice_seconds { *s -= amount; } + } + UsageType::SMS => { + if let Some(ref mut c) = active_bundle.remaining_allowances.sms_count { *c -= amount; } + } + } + + let active_key = format!("bundle:active:{}:{}", imsi, bundle_id); + let updated_json = serde_json::to_string(&active_bundle) + .map_err(|e| ChargingError::SerializationError(e.to_string()))?; + let _: () = redis::AsyncCommands::set(&mut conn, &active_key, updated_json) + .await + .map_err(|e| ChargingError::RedisOperation(e.to_string()))?; + + return Ok(true); + } + + Ok(false) + }).await.map_err(|e| match e { + crate::circuit_breaker::CircuitBreakerError::Open => + ChargingError::RedisConnection("Circuit breaker is open".to_string()), + crate::circuit_breaker::CircuitBreakerError::Inner(e) => e, + }) + } + + + pub async fn list_bundles(&self) -> ChargingResult> { + use sqlx::Row; + + let rows = sqlx::query( + "SELECT bundle_id, name, bundle_type, data_bytes, voice_seconds, \ + sms_count, roaming_data_bytes, validity_days, priority \ + FROM bundles WHERE is_active = TRUE ORDER BY priority ASC", + ) + .fetch_all(&self.plans.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + + let bundles = rows + .into_iter() + .map(|row| Bundle { + bundle_id: row.get("bundle_id"), + name: row.get("name"), + bundle_type: BundleType::from_str(row.get("bundle_type")), + allowances: BundleAllowances { + data_bytes: row.get::, _>("data_bytes").map(|v| v as u64), + voice_seconds: row.get::, _>("voice_seconds").map(|v| v as u64), + sms_count: row.get::, _>("sms_count").map(|v| v as u64), + roaming_data_bytes: row.get::, _>("roaming_data_bytes").map(|v| v as u64), + }, + validity_days: row.get::("validity_days") as u32, + priority: row.get::("priority") as u8, + amount_unconverted: row.get::("amount_unconverted"), + is_active: true, + }) + .collect(); + + Ok(bundles) + } + + pub async fn get_bundle_by_id(&self, bundle_id: &str) -> ChargingResult> { + match self.get_bundle_config(bundle_id).await { + Ok(bundle) => Ok(Some(bundle)), + Err(ChargingError::BundleNotFound(_)) => Ok(None), + Err(e) => Err(e), + } + } + + + pub async fn purchase_bundle_with_airtime( + &self, + imsi: &str, + bundle_id: &str, + ) -> ChargingResult { + use sqlx::Row; + + // 1. Load bundle definition from PostgreSQL + let bundle = self.get_bundle_config(bundle_id).await?; + + if bundle.airtime_cost == 0 { + return Err(ChargingError::InvalidBundleConfig( + "Bundle has no airtime price set".to_string(), + )); + } + + // 2. Check and deduct airtime atomically in Redis (Lua script) + let lua_script = r#" + local key = KEYS[1] + local cost = tonumber(ARGV[1]) + local balance = tonumber(redis.call('GET', key) or '0') + if balance < cost then + return {-1, balance} + end + local new_balance = redis.call('DECRBY', key, cost) + return {0, new_balance} + "#; + + let mut conn = self.redis_client + .get_multiplexed_async_connection() + .await + .map_err(|e| ChargingError::RedisConnection(e.to_string()))?; + + let airtime_key = format!("airtime:home:{}", imsi); + let result: Vec = redis::Script::new(lua_script) + .key(&airtime_key) + .arg(bundle.airtime_cost as i64) + .invoke_async(&mut conn) + .await + .map_err(|e| ChargingError::RedisOperation(e.to_string()))?; + + if result[0] == -1 { + return Err(ChargingError::InsufficientAirtime { + available: result[1] as i64, + required: bundle.airtime_cost, + }); + } + + let new_airtime_balance = result[1] as u64; + + // 3. Activate the bundle in Redis (same as activate_bundle_for_subscriber) + let active_bundle = self.activate_bundle_for_subscriber(imsi, bundle_id).await?; + + // 4. Persist airtime deduction to PostgreSQL (audit trail) + self.postgres_circuit_breaker.execute(|| async { + // Get subscriber_id for FK + let sub_row = sqlx::query( + "SELECT id FROM subscribers WHERE imsi = $1" + ) + .bind(imsi) + .fetch_optional(&self.plans.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))? + .ok_or_else(|| ChargingError::SubscriberNotFound(imsi.to_string()))?; + + let subscriber_id: i32 = sub_row.get("id"); + + // Write airtime_transactions row + sqlx::query(r#" + INSERT INTO airtime_transactions + (subscriber_id, imsi, transaction_type, seconds_delta, balance_after, roaming, reference_id) + VALUES ($1, $2, 'bundle_purchase', $3, $4, FALSE, $5) + "#) + .bind(subscriber_id) + .bind(imsi) + .bind(-(bundle.airtime_cost as i64)) // negative = debit + .bind(new_airtime_balance as i64) + .bind(bundle_id) + .execute(&self.plans.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + + // Update airtime_balances table (PG source of truth) + sqlx::query(r#" + INSERT INTO airtime_balances (subscriber_id, imsi, home_seconds, updated_at) + VALUES ($1, $2, $3, NOW()) + ON CONFLICT (imsi) DO UPDATE SET + home_seconds = $3, + updated_at = NOW() + "#) + .bind(subscriber_id) + .bind(imsi) + .bind(new_airtime_balance as i64) + .execute(&self.plans.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + + Ok(()) + }).await.map_err(|e| match e { + CircuitBreakerError::Open => ChargingError::DatabaseError("Circuit breaker open".to_string()), + CircuitBreakerError::Inner(e) => e, + })?; + + Ok(active_bundle) + } + + pub async fn deactivate_bundle(&self, bundle_id: &str) -> ChargingResult<()> { + sqlx::query("UPDATE bundles SET is_active = FALSE, updated_at = NOW() WHERE bundle_id = $1") + .bind(bundle_id) + .execute(&self.plans.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + Ok(()) + } + + async fn get_bundle_config(&self, bundle_id: &str) -> ChargingResult { + let row = sqlx::query(r#" + SELECT bundle_id, name, bundle_type, data_bytes, voice_seconds, + sms_count, roaming_data_bytes, validity_days, priority, amount_unconverted + FROM bundles WHERE bundle_id = $1 AND is_active = TRUE + "#) + .bind(bundle_id) + .fetch_optional(&self.plans.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))? + .ok_or_else(|| ChargingError::BundleNotFound(bundle_id.to_string()))?; + + Ok(Bundle { + bundle_id: row.get("bundle_id"), + name: row.get("name"), + bundle_type: match row.get::<&str, _>("bundle_type") { + "voice" => BundleType::Voice, + "sms" => BundleType::SMS, + "hybrid" => BundleType::Hybrid, + _ => BundleType::Data, + }, + allowances: BundleAllowances { + data_bytes: row.get::, _>("data_bytes").map(|v| v as u64), + voice_seconds: row.get::, _>("voice_seconds").map(|v| v as u64), + sms_count: row.get::, _>("sms_count").map(|v| v as u64), + roaming_data_bytes: row.get::, _>("roaming_data_bytes").map(|v| v as u64), + }, + validity_days: row.get::("validity_days") as u32, + priority: row.get::("priority") as u8, + amount_unconverted: row.get::("amount_unconverted"), + is_active: true, + }) + } + + async fn record_bundle_activation( + &self, + conn: &mut redis::aio::MultiplexedConnection, + imsi: &str, + bundle_id: &str, + ) -> ChargingResult<()> { + let audit_key = format!("bundle:audit:{}:{}", imsi, bundle_id); + let payload = serde_json::json!({ + "imsi": imsi, + "bundle_id": bundle_id, + "activated_at": chrono::Utc::now().to_rfc3339(), + }).to_string(); + let _: () = redis::AsyncCommands::set(conn, &audit_key, payload) + .await + .map_err(|e| ChargingError::RedisOperation(e.to_string()))?; + Ok(()) + } + + + /// Resolve IMSI from MSISDN via PostgreSQL. + async fn resolve_imsi_from_msisdn(&self, msisdn: &str) -> ChargingResult { + use sqlx::Row; + let row = sqlx::query( + "SELECT imsi FROM subscribers WHERE msisdn = $1 AND deleted_at IS NULL LIMIT 1", + ) + .bind(msisdn) + .fetch_optional(&self.plans.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))? + .ok_or_else(|| ChargingError::SubscriberNotFound(msisdn.to_string()))?; + + Ok(row.get("imsi")) + } + + async fn deduct_airtime_atomic( + &self, + conn: &mut redis::aio::MultiplexedConnection, + imsi: &str, + amount: i64, + ) -> ChargingResult { + // Lua script: atomic check-and-deduct + let lua_script = r#" + local key = KEYS[1] + local cost = tonumber(ARGV[1]) + local balance = tonumber(redis.call('GET', key) or '0') + if balance < cost then + return -1 + end + return redis.call('DECRBY', key, cost) + "#; + + let new_balance: i64 = redis::Script::new(lua_script) + .key(format!("airtime:home:{}", imsi)) + .arg(amount) + .invoke_async(conn) + .await + .map_err(|e| ChargingError::RedisOperation(e.to_string()))?; + + if new_balance < 0 { + // Script returned -1 sentinel + let current: i64 = redis::AsyncCommands::get(conn, format!("airtime:home:{}", imsi)) + .await + .unwrap_or(0); + return Err(ChargingError::InsufficientAirtime { + available: current, + required: amount, + }); + } + + Ok(new_balance) + } + + + + pub async fn purchase_bundle_with_airtime( + &self, + msisdn: &str, + bundle_id: &str, + ) -> ChargingResult { + // 1. Resolve IMSI + let imsi = self.resolve_imsi_from_msisdn(msisdn).await?; + + // 2. Load bundle config + let bundle = self.get_bundle_config(bundle_id).await?; + + // 3. Deduct airtime atomically + let mut conn = self.redis_client.get_multiplexed_async_connection() + .await.map_err(|e| ChargingError::RedisConnection(e.to_string()))?; + + let new_airtime = self.deduct_airtime_atomic(&mut conn, &imsi, bundle.amount_unconverted).await?; + + // 4. Activate bundle for subscriber + let active_bundle = self.activate_bundle_for_subscriber(&imsi, bundle_id).await?; + + // 5. Audit: record airtime transaction + self.record_airtime_transaction( + &imsi, + -bundle.amount_unconverted, + new_airtime, + &format!("bundle_purchase:{}", bundle_id), + ).await?; + + Ok(active_bundle) + } + + + /// POST /v1/bundles/:id/gift + /// Sender purchases a bundle for a recipient using the sender's airtime. + /// Deducts from sender_msisdn's airtime; activates bundle on recipient_msisdn. + pub async fn gift_bundle_with_airtime( + &self, + sender_msisdn: &str, + recipient_msisdn: &str, + bundle_id: &str, + ) -> ChargingResult { + // 1. Resolve both IMSIs + let sender_imsi = self.resolve_imsi_from_msisdn(sender_msisdn).await?; + let recipient_imsi = self.resolve_imsi_from_msisdn(recipient_msisdn).await?; + + // 2. Load bundle config + let bundle = self.get_bundle_config(bundle_id).await?; + + // 3. Deduct from SENDER's airtime + let mut conn = self.redis_client.get_multiplexed_async_connection() + .await.map_err(|e| ChargingError::RedisConnection(e.to_string()))?; + + let new_sender_airtime = self.deduct_airtime_atomic( + &mut conn, &sender_imsi, bundle.amount_unconverted, + ).await?; + + // 4. Activate bundle on RECIPIENT + let active_bundle = self.activate_bundle_for_subscriber(&recipient_imsi, bundle_id).await?; + + // 5. Audit: record airtime transaction for sender + self.record_airtime_transaction( + &sender_imsi, + -bundle.amount_unconverted, + new_sender_airtime, + &format!("bundle_gift:{}:to:{}", bundle_id, recipient_imsi), + ).await?; + + Ok(active_bundle) + } + + /// Internal: write an airtime_transactions row for audit. + async fn record_airtime_transaction( + &self, + imsi: &str, + delta: i64, + balance_after: i64, + reason: &str, + ) -> ChargingResult<()> { + sqlx::query(r#" + INSERT INTO airtime_transactions + (imsi, delta_seconds, balance_after, reason, created_at) + VALUES ($1, $2, $3, $4, NOW()) + "#) + .bind(imsi) + .bind(delta) + .bind(balance_after) + .bind(reason) + .execute(&self.plans.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + Ok(()) + } +} diff --git a/apps/charging-engine/src/charging/engine.rs b/apps/charging-engine/src/charging/engine.rs index c42cce6..e13e37b 100644 --- a/apps/charging-engine/src/charging/engine.rs +++ b/apps/charging-engine/src/charging/engine.rs @@ -29,6 +29,7 @@ pub struct ChargingEngine { pub(crate) startup_time: SystemTime, pub(crate) redis_circuit_breaker: CircuitBreaker, pub(crate) postgres_circuit_breaker: CircuitBreaker, + pub(crate) bundle_repo: BundleRepo, } impl ChargingEngine { @@ -39,6 +40,7 @@ impl ChargingEngine { redis_url: &str, plans: RatingPlansRepo, sync_interval_secs: u64, + bundle_repo: BundleRepo, ) -> ChargingResult { let redis_client = redis::Client::open(redis_url) .map_err(|e| crate::errors::ChargingError::RedisConnection(e.to_string()))?; @@ -53,7 +55,7 @@ impl ChargingEngine { .ok() .and_then(|s| s.parse().ok()) .unwrap_or(60); - + let postgres_failure_threshold: u32 = std::env::var("POSTGRES_FAILURE_THRESHOLD") .ok() .and_then(|s| s.parse().ok()) @@ -67,7 +69,7 @@ impl ChargingEngine { redis_failure_threshold, Duration::from_secs(redis_timeout), ); - + let postgres_circuit_breaker = CircuitBreaker::new( postgres_failure_threshold, Duration::from_secs(postgres_timeout), @@ -80,6 +82,7 @@ impl ChargingEngine { startup_time: SystemTime::now(), redis_circuit_breaker, postgres_circuit_breaker, + bundle_repo }) } diff --git a/apps/charging-engine/src/charging/rating_plans_repo.rs b/apps/charging-engine/src/charging/rating_plans_repo.rs index 119a692..be83db8 100644 --- a/apps/charging-engine/src/charging/rating_plans_repo.rs +++ b/apps/charging-engine/src/charging/rating_plans_repo.rs @@ -16,7 +16,7 @@ use crate::errors::{ChargingError, ChargingResult}; /// Repository for rating plans backed by Postgres. #[derive(Clone)] pub struct RatingPlansRepo { - pool: PgPool, + pub(crate) pool: PgPool, circuit_breaker: CircuitBreaker, } @@ -243,14 +243,14 @@ fn row_to_plan(row: sqlx::postgres::PgRow) -> RatingPlan { /// Changes to the repo root directory to find the migrations folder. async fn run_migrations(database_url: &str) -> Result<(), Box> { use std::process::Command; - + // Change to repo root to find migrations directory let manifest_dir = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")); let repo_root = manifest_dir .parent() .and_then(|p| p.parent()) .ok_or("Failed to determine repo root")?; - + let output = Command::new("goose") .current_dir(&repo_root) .args(["postgres", database_url, "up", "migrations"]) diff --git a/apps/charging-engine/src/charging/types.rs b/apps/charging-engine/src/charging/types.rs index 04f6c26..5e3ec92 100644 --- a/apps/charging-engine/src/charging/types.rs +++ b/apps/charging-engine/src/charging/types.rs @@ -2,16 +2,8 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use redis::{FromRedisValue, ToRedisArgs, ToSingleRedisArg}; -/// Charging rule for usage authorization -/// -/// # Example -/// -/// ```rust -/// use charging_engine::charging::types::ChargingRule; -/// -/// let rule = ChargingRule::Allowed; -/// assert_eq!(rule.as_str(), "ALLOWED"); -/// ``` +// ─── Charging Rule ──────────────────────────────────────────────────────────── + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum ChargingRule { Allowed, @@ -24,73 +16,21 @@ pub enum ChargingRule { } impl ChargingRule { - /// Convert charging rule to string representation - /// - /// # Example - /// - /// ```rust - /// use charging_engine::charging::types::ChargingRule; - /// - /// let rule = ChargingRule::Allowed; - /// assert_eq!(rule.as_str(), "ALLOWED"); - /// ``` pub fn as_str(&self) -> &str { match self { - ChargingRule::Allowed => "ALLOWED", - ChargingRule::InsufficientCredit => "INSUFFICIENT_CREDIT", - ChargingRule::DataLimitExceeded => "DATA_LIMIT_EXCEEDED", - ChargingRule::VoiceLimitExceeded => "VOICE_LIMIT_EXCEEDED", - ChargingRule::SmsLimitExceeded => "SMS_LIMIT_EXCEEDED", - ChargingRule::UserBlocked => "USER_BLOCKED", - ChargingRule::Blocked => "BLOCKED", + ChargingRule::Allowed => "ALLOWED", + ChargingRule::InsufficientCredit => "INSUFFICIENT_CREDIT", + ChargingRule::DataLimitExceeded => "DATA_LIMIT_EXCEEDED", + ChargingRule::VoiceLimitExceeded => "VOICE_LIMIT_EXCEEDED", + ChargingRule::SmsLimitExceeded => "SMS_LIMIT_EXCEEDED", + ChargingRule::UserBlocked => "USER_BLOCKED", + ChargingRule::Blocked => "BLOCKED", } } } -/// Subscriber account information -/// -/// # Example -/// -/// ```rust -/// use charging_engine::charging::types::{SubscriberAccount, AccountStatus}; -/// use chrono::Utc; -/// -/// let account = SubscriberAccount { -/// imsi: "1234567890".to_string(), -/// balance: 1000, -/// data_limit: 1000000000, -/// data_used: 0, -/// voice_limit: 1000, -/// voice_used: 0, -/// sms_limit: 100, -/// sms_used: 0, -/// status: AccountStatus::Active, -/// last_updated: Utc::now(), -/// }; -/// ``` -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct SubscriberAccount { - pub imsi: String, - pub balance: i64, - pub data_limit: u64, - pub data_used: u64, - pub voice_limit: u64, - pub voice_used: u64, - pub sms_limit: u64, - pub sms_used: u64, - pub status: AccountStatus, - pub last_updated: DateTime, -} +// ─── Account ────────────────────────────────────────────────────────────────── -/// Account status -/// -/// # Example -/// -/// ```rust -/// use charging_engine::charging::types::AccountStatus; -/// -/// let status = AccountStatus::Active; -/// ``` #[derive(Debug, Clone, Serialize, Deserialize)] pub enum AccountStatus { Active, @@ -99,44 +39,22 @@ pub enum AccountStatus { Blocked, } -/// Usage event for charging -/// -/// # Example -/// -/// ```rust -/// use charging_engine::charging::types::{UsageEvent, UsageType}; -/// use chrono::Utc; -/// -/// let event = UsageEvent { -/// imsi: "1234567890".to_string(), -/// session_id: "session123".to_string(), -/// usage_type: UsageType::Data, -/// volume: 1000, -/// timestamp: Utc::now(), -/// rate: 0.01, -/// cost: 10.0, -/// }; -/// ``` #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct UsageEvent { - pub imsi: String, - pub session_id: String, - pub usage_type: UsageType, - pub volume: u64, - pub timestamp: DateTime, - pub rate: f64, - pub cost: f64, -} - -/// Usage type classification -/// -/// # Example -/// -/// ```rust -/// use charging_engine::charging::types::UsageType; -/// -/// let usage_type = UsageType::Data; -/// ``` +pub struct SubscriberAccount { + pub imsi: String, + pub balance: i64, + pub data_limit: u64, + pub data_used: u64, + pub voice_limit: u64, + pub voice_used: u64, + pub sms_limit: u64, + pub sms_used: u64, + pub status: AccountStatus, + pub last_updated: DateTime, +} + +// ─── Usage ──────────────────────────────────────────────────────────────────── + #[derive(Debug, Clone, Serialize, Deserialize)] pub enum UsageType { Data, @@ -144,58 +62,134 @@ pub enum UsageType { SMS, } -/// Rating plan for subscriber billing -/// -/// # Example -/// -/// ```rust -/// use charging_engine::charging::types::RatingPlan; -/// -/// let plan = RatingPlan { -/// plan_id: "plan1".to_string(), -/// name: "Basic Plan".to_string(), -/// data_rate: 0.01, -/// voice_rate: 0.05, -/// sms_rate: 0.1, -/// monthly_fee: 10.0, -/// data_limit: 1000000000, -/// voice_limit: 1000, -/// sms_limit: 100, -/// }; -/// ``` +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UsageEvent { + pub imsi: String, + pub session_id: String, + pub usage_type: UsageType, + pub volume: u64, + pub timestamp: DateTime, + pub rate: f64, + pub cost: f64, +} + +// ─── Rating Plan ────────────────────────────────────────────────────────────── + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RatingPlan { - pub plan_id: String, - pub name: String, - pub data_rate: f64, - pub voice_rate: f64, - pub sms_rate: f64, + pub plan_id: String, + pub name: String, + pub data_rate: f64, + pub voice_rate: f64, + pub sms_rate: f64, pub monthly_fee: f64, - pub data_limit: u64, + pub data_limit: u64, pub voice_limit: u64, - pub sms_limit: u64, + pub sms_limit: u64, +} + +// ─── Airtime ────────────────────────────────────────────────────────────────── + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AirtimeBalance { + pub imsi: String, + pub home_seconds: u64, + pub roaming_seconds: u64, + pub last_updated: DateTime, +} + +/// Returned by `deduct_airtime` so callers know exactly what was consumed. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AirtimeDeduction { + pub imsi: String, + pub deducted: u64, + pub new_balance: u64, + pub roaming: bool, +} + +// ─── Bundles ────────────────────────────────────────────────────────────────── + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum BundleType { + Data, + Voice, + SMS, + Hybrid, +} + +impl BundleType { + /// Returns the lowercase string stored in the `bundles.bundle_type` column. + pub fn as_str(&self) -> &str { + match self { + BundleType::Data => "data", + BundleType::Voice => "voice", + BundleType::SMS => "sms", + BundleType::Hybrid => "hybrid", + } + } + + /// Parse from the DB string back to the enum. + pub fn from_str(s: &str) -> Self { + match s { + "voice" => BundleType::Voice, + "sms" => BundleType::SMS, + "hybrid" => BundleType::Hybrid, + _ => BundleType::Data, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BundleAllowances { + pub data_bytes: Option, + pub voice_seconds: Option, + pub sms_count: Option, + pub roaming_data_bytes: Option, +} + +/// Bundle definition stored in PostgreSQL. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Bundle { + pub bundle_id: String, + pub name: String, + pub bundle_type: BundleType, + pub allowances: BundleAllowances, + pub validity_days: u32, + pub priority: u8, + pub amount_unconverted: i64, + pub is_active: bool, +} + +/// A bundle that has been activated for a specific subscriber. +/// Stored as JSON in Redis with a TTL matching `validity_days`. +/// `priority` is copied from the bundle definition so `consume_from_bundle` +/// can sort without an extra DB round-trip. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ActiveBundle { + pub imsi: String, + pub bundle_id: String, + /// Copied from `Bundle::priority` at activation time. + pub priority: u8, + pub activated_at: DateTime, + pub expires_at: DateTime, + pub remaining_allowances: BundleAllowances, } -// Redis trait implementations for serialization +// ─── Redis trait implementations ────────────────────────────────────────────── + impl FromRedisValue for SubscriberAccount { fn from_redis_value(v: redis::Value) -> Result { let json: String = redis::from_redis_value(v)?; - let account: SubscriberAccount = serde_json::from_str(&json) - .map_err(|e| redis::ParsingError::from(e.to_string()))?; - Ok(account) + serde_json::from_str(&json).map_err(|e| redis::ParsingError::from(e.to_string())) } } impl ToRedisArgs for SubscriberAccount { fn write_redis_args(&self, out: &mut W) - where - W: redis::RedisWrite + ?Sized, - { - let json = serde_json::to_string(self) - .unwrap_or_else(|_| { - // Fallback to empty JSON if serialization fails - r#"{"imsi":"","balance":0,"data_limit":0,"data_used":0,"voice_limit":0,"voice_used":0,"sms_limit":0,"sms_used":0,"status":"Active","last_updated":"1970-01-01T00:00:00Z"}"#.to_string() - }); + where W: redis::RedisWrite + ?Sized { + let json = serde_json::to_string(self).unwrap_or_else(|_| { + r#"{"imsi":"","balance":0,"data_limit":0,"data_used":0,"voice_limit":0,"voice_used":0,"sms_limit":0,"sms_used":0,"status":"Active","last_updated":"1970-01-01T00:00:00Z"}"#.to_string() + }); json.write_redis_args(out) } } @@ -205,24 +199,38 @@ impl ToSingleRedisArg for SubscriberAccount {} impl FromRedisValue for UsageEvent { fn from_redis_value(v: redis::Value) -> Result { let json: String = redis::from_redis_value(v)?; - let event: UsageEvent = serde_json::from_str(&json) - .map_err(|e| redis::ParsingError::from(e.to_string()))?; - Ok(event) + serde_json::from_str(&json).map_err(|e| redis::ParsingError::from(e.to_string())) } } impl ToRedisArgs for UsageEvent { fn write_redis_args(&self, out: &mut W) - where - W: redis::RedisWrite + ?Sized, - { - let json = serde_json::to_string(self) - .unwrap_or_else(|_| { - // Fallback to empty JSON if serialization fails - r#"{"imsi":"","session_id":"","usage_type":"Data","volume":0,"timestamp":"1970-01-01T00:00:00Z","rate":0.0,"cost":0.0}"#.to_string() - }); + where W: redis::RedisWrite + ?Sized { + let json = serde_json::to_string(self).unwrap_or_else(|_| { + r#"{"imsi":"","session_id":"","usage_type":"Data","volume":0,"timestamp":"1970-01-01T00:00:00Z","rate":0.0,"cost":0.0}"#.to_string() + }); json.write_redis_args(out) } } impl ToSingleRedisArg for UsageEvent {} + +/// ActiveBundle is stored as JSON in Redis — implement the Redis traits +/// so it can be set/get with `redis::AsyncCommands`. +impl FromRedisValue for ActiveBundle { + fn from_redis_value(v: redis::Value) -> Result { + let json: String = redis::from_redis_value(v)?; + serde_json::from_str(&json).map_err(|e| redis::ParsingError::from(e.to_string())) + } +} + +impl ToRedisArgs for ActiveBundle { + fn write_redis_args(&self, out: &mut W) + where W: redis::RedisWrite + ?Sized { + let json = serde_json::to_string(self) + .unwrap_or_else(|_| "{}".to_string()); + json.write_redis_args(out) + } +} + +impl ToSingleRedisArg for ActiveBundle {} diff --git a/apps/charging-engine/src/charging/usage_pipeline.rs b/apps/charging-engine/src/charging/usage_pipeline.rs new file mode 100644 index 0000000..c9ce71a --- /dev/null +++ b/apps/charging-engine/src/charging/usage_pipeline.rs @@ -0,0 +1,152 @@ +use chrono::Utc; +use tracing::{info, warn}; + +use crate::errors::{ChargingError, ChargingResult}; +use super::types::{UsageEvent, UsageType}; +use super::engine::ChargingEngine; + +#[derive(Debug)] +pub enum ChargingSource { + Bundle { subscriber_bundle_id: i64 }, + Credit, +} + +pub struct ChargeResult { + pub imsi: String, + pub session_id: String, + pub volume: u64, + pub cost: f64, + pub charging_source: ChargingSource, +} + +impl ChargingEngine { + /// Full charging pipeline: bundle-first, then credit fallback. + /// This is the method called by the `process_usage` handler. + pub async fn process_usage_event(&self, event: UsageEvent) -> ChargingResult { + // 1. Validate subscriber exists and is active + let subscriber = self.get_subscriber_account(&event.imsi).await? + .ok_or_else(|| ChargingError::SubscriberNotFound(event.imsi.clone()))?; + + if !matches!(subscriber.status, crate::charging::types::AccountStatus::Active) { + return Err(ChargingError::UsageBlocked( + format!("Subscriber {} is not active", event.imsi) + )); + } + + let usage_type_str = match event.usage_type { + UsageType::Data => "data", + UsageType::Voice => "voice", + UsageType::SMS => "sms", + }; + + // 2. Try bundle deduction first + let active_bundles = self.bundle_repo.get_active_bundles(&event.imsi).await?; + + for bundle in &active_bundles { + let has_allowance = match event.usage_type { + UsageType::Data => bundle.remaining_allowances.data_bytes + .map_or(false, |b| b >= event.volume), + UsageType::Voice => bundle.remaining_allowances.voice_seconds + .map_or(false, |s| s >= event.volume), + UsageType::SMS => bundle.remaining_allowances.sms_count + .map_or(false, |c| c >= event.volume), + }; + + if !has_allowance { + continue; + } + + let deducted = self.bundle_repo + .deduct_from_bundle(bundle.id, usage_type_str, event.volume) + .await?; + + if deducted { + info!( + imsi = %event.imsi, + bundle_id = %bundle.bundle_id, + volume = event.volume, + usage_type = usage_type_str, + "Usage charged from bundle" + ); + + // Persist usage record with charging_source = 'bundle' + self.persist_usage_record(&event, 0.0, ChargingSource::Bundle { + subscriber_bundle_id: bundle.id, + }).await?; + + return Ok(ChargeResult { + imsi: event.imsi, + session_id: event.session_id, + volume: event.volume, + cost: 0.0, // bundle usage has no per-event cost + charging_source: ChargingSource::Bundle { + subscriber_bundle_id: bundle.id, + }, + }); + } + } + + // 3. No bundle covered it — fall back to credit deduction + warn!( + imsi = %event.imsi, + usage_type = usage_type_str, + "No active bundle for usage, falling back to credit" + ); + + let plan = self.plans.get("basic").await? // resolve subscriber's actual plan + .ok_or_else(|| ChargingError::RatingPlanNotFound("basic".to_string()))?; + + let cost = self.calculate_usage_cost(&event).await?; + + // Deduct from credit balance (existing credit_management logic) + self.deduct_credit_for_usage(&event.imsi, cost).await?; + + // Persist usage record with charging_source = 'credit' + self.persist_usage_record(&event, cost, ChargingSource::Credit).await?; + + Ok(ChargeResult { + imsi: event.imsi, + session_id: event.session_id, + volume: event.volume, + cost, + charging_source: ChargingSource::Credit, + }) + } + + /// Persist a usage record to Postgres for billing reconciliation. + async fn persist_usage_record( + &self, + event: &UsageEvent, + cost: f64, + source: ChargingSource, + ) -> ChargingResult<()> { + let (charging_source_str, bundle_id) = match &source { + ChargingSource::Bundle { subscriber_bundle_id } => + ("bundle", Some(*subscriber_bundle_id)), + ChargingSource::Credit => + ("credit", None), + }; + + sqlx::query(r#" + INSERT INTO usage_records + (subscriber_id, session_id, usage_type, start_time, end_time, + volume, rate, cost, charging_source, subscriber_bundle_id, created_at, updated_at) + SELECT s.id, $2, $3, $4, $4, $5, $6, $7, $8, $9, NOW(), NOW() + FROM subscribers s WHERE s.imsi = $1 + "#) + .bind(&event.imsi) + .bind(&event.session_id) + .bind(match event.usage_type { UsageType::Data => "data", UsageType::Voice => "voice", UsageType::SMS => "sms" }) + .bind(event.timestamp) + .bind(event.volume as i64) + .bind(event.rate) + .bind(cost) + .bind(charging_source_str) + .bind(bundle_id) + .execute(&self.bundle_repo.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + + Ok(()) + } +} diff --git a/apps/charging-engine/src/errors/types.rs b/apps/charging-engine/src/errors/types.rs index 703601a..0c38a7d 100644 --- a/apps/charging-engine/src/errors/types.rs +++ b/apps/charging-engine/src/errors/types.rs @@ -3,37 +3,51 @@ use axum::http::StatusCode; use serde_json::json; use thiserror::Error; +use crate::charging; + #[derive(Debug, Error, Clone)] pub enum ChargingError { #[error("Redis connection error: {0}")] RedisConnection(String), - + #[error("Redis operation error: {0}")] RedisOperation(String), - + #[error("Database error: {0}")] DatabaseError(String), - + #[error("Subscriber not found: {0}")] SubscriberNotFound(String), - + #[error("Rating plan not found: {0}")] RatingPlanNotFound(String), - + #[error("Insufficient credit: available={available}, requested={requested}")] InsufficientCredit { available: u64, requested: u64 }, - + #[error("Usage blocked: {0}")] UsageBlocked(String), - + #[error("Invalid input: {0}")] InvalidInput(String), - + #[error("Serialization error: {0}")] SerializationError(String), - + #[error("Internal error: {0}")] InternalError(String), + + #[error("Bundle not found: {0}")] + BundleNotFound(String), + + #[error("Bundle already active")] + BundleAlreadyActive, + + #[error("Invalid bundle configuration: {0}")] + InvalidBundleConfig(String), + + #[error("Insufficient airtime: available={available}, required={required}")] + InsufficientAirtime { available: i64, required: i64 }, } impl IntoResponse for ChargingError { @@ -49,6 +63,10 @@ impl IntoResponse for ChargingError { ChargingError::InvalidInput(_) => (StatusCode::BAD_REQUEST, "Invalid input"), ChargingError::SerializationError(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Serialization error"), ChargingError::InternalError(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Internal error"), + ChargingError::BundleNotFound(_) => (StatusCode::NOT_FOUND, "Bundle not found"), + ChargingError::BundleAlreadyActive => (StatusCode::CONFLICT, "Bundle already active"), + ChargingError::InvalidBundleConfig(_) => (StatusCode::BAD_REQUEST, "Invalid bundle config"), + ChargingError::InsufficientAirtime { .. } => (StatusCode::PAYMENT_REQUIRED, "Insufficient airtime"), }; let body = json!({ diff --git a/apps/charging-engine/src/handlers/airtime.rs b/apps/charging-engine/src/handlers/airtime.rs new file mode 100644 index 0000000..ab331dd --- /dev/null +++ b/apps/charging-engine/src/handlers/airtime.rs @@ -0,0 +1,77 @@ +use axum::{extract::{Path, State}, http::StatusCode, response::Json, Json as AxumJson}; +use serde_json::json; +use crate::models::AppState; +use crate::errors::ChargingError; + +#[derive(Deserialize)] +pub struct AddAirtimeRequest { + pub seconds_to_add: u64, + pub roaming: Option, +} + +#[derive(Serialize)] +pub struct AirtimeBalanceResponse { + pub imsi: String, + pub home_seconds: u64, + pub roaming_seconds: u64, + pub last_updated: chrono::DateTime, +} + + +#[derive(Deserialize)] +pub struct DeductAirtimeRequest { + pub seconds_to_deduct: u64, + pub roaming: Option, +} + +pub async fn add_airtime( + State(state): State, + Path(imsi): Path, + AxumJson(request): AxumJson, +) -> Result, StatusCode> { + let roaming = request.roaming.unwrap_or(false); + + match state.charging_engine.add_airtime(&imsi, request.seconds_to_add, roaming).await { + Ok(new_balance) => Ok(Json(json!({ + "imsi": imsi, + "roaming": roaming, + "new_balance_seconds": new_balance, + "timestamp": chrono::Utc::now() + }))), + Err(ChargingError::InvalidInput(_)) => Err(StatusCode::BAD_REQUEST), + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + } +} + +pub async fn get_airtime_balance( + State(state): State, + Path(imsi): Path, +) -> Result, StatusCode> { + match state.charging_engine.get_airtime_balance(&imsi).await { + Ok(balance) => Ok(Json(AirtimeBalanceResponse { + imsi: balance.imsi, + home_seconds: balance.home_seconds, + roaming_seconds: balance.roaming_seconds, + last_updated: balance.last_updated, + })), + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + } +} + +pub async fn deduct_airtime( + State(state): State, + Path(imsi): Path, + Json(request): Json, +) -> ChargingResult> { + let roaming = request.roaming.unwrap_or(false); + let result = state.charging_engine + .deduct_airtime(&imsi, request.seconds_to_deduct, roaming) + .await + .with_context("Failed to deduct airtime")?; + Ok(Json(serde_json::json!({ + "imsi": imsi, + "deducted_seconds": result.deducted, + "new_balance_seconds": result.new_balance, + "roaming": roaming, + }))) +} diff --git a/apps/charging-engine/src/handlers/bundles.rs b/apps/charging-engine/src/handlers/bundles.rs new file mode 100644 index 0000000..4635624 --- /dev/null +++ b/apps/charging-engine/src/handlers/bundles.rs @@ -0,0 +1,281 @@ +use axum::{extract::{Path, State}, http::StatusCode, response::Json, Json as AxumJson}; +use serde_json::json; +use crate::models::AppState; +use crate::errors::ChargingError; + +#[derive(Deserialize)] +pub struct CreateBundleRequest { + pub bundle_id: String, + pub name: String, + pub bundle_type: String, + pub allowances: BundleAllowancesRequest, + pub validity_days: u32, + pub priority: Option, +} + +#[derive(Deserialize)] +pub struct BundleAllowancesRequest { + pub data_bytes: Option, + pub voice_seconds: Option, + pub sms_count: Option, + pub roaming_data_bytes: Option, +} + +#[derive(Deserialize)] +pub struct ActivateBundleRequest { + pub msisdn: String, +} + +#[derive(Serialize)] +pub struct BundleResponse { + pub bundle_id: String, + pub name: String, + pub bundle_type: String, + pub allowances: BundleAllowancesRequest, + pub validity_days: u32, + pub priority: u8, + pub is_active: bool, +} + +#[derive(Serialize)] +pub struct ActiveBundleResponse { + pub imsi: String, + pub bundle_id: String, + pub activated_at: chrono::DateTime, + pub expires_at: chrono::DateTime, + pub remaining_allowances: BundleAllowancesRequest, +} + +#[derive(Deserialize)] +pub struct PurchaseBundleRequest { + pub msisdn: String, +} + +#[derive(Deserialize)] +pub struct GiftBundleRequest { + pub sender_msisdn: String, + pub recipient_msisdn: String, +} + +/// create bunndle +pub async fn create_bundle( + State(state): State, + AxumJson(request): AxumJson, +) -> Result, StatusCode> { + let bundle_type = match request.bundle_type.as_str() { + "data" => crate::charging::types::BundleType::Data, + "voice" => crate::charging::types::BundleType::Voice, + "sms" => crate::charging::types::BundleType::SMS, + "hybrid" => crate::charging::types::BundleType::Hybrid, + _ => return Err(StatusCode::BAD_REQUEST), + }; + + let bundle = crate::charging::types::Bundle { + bundle_id: request.bundle_id.clone(), + name: request.name, + bundle_type, + allowances: crate::charging::types::BundleAllowances { + data_bytes: request.allowances.data_bytes, + voice_seconds: request.allowances.voice_seconds, + sms_count: request.allowances.sms_count, + roaming_data_bytes: request.allowances.roaming_data_bytes, + }, + validity_days: request.validity_days, + priority: request.priority.unwrap_or(1), + amount_unconverted: request.amount_unconverted, + is_active: true, + }; + + match state.charging_engine.create_bundle(bundle).await { + Ok(_) => Ok(Json(json!({ + "status": "created", + "bundle_id": request.bundle_id, + }))), + Err(ChargingError::InvalidInput(_)) => Err(StatusCode::BAD_REQUEST), + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + } +} + +/// Activate a bundle for a subscriber +pub async fn activate_bundle( + State(state): State, + Path(bundle_id): Path, + AxumJson(request): AxumJson, +) -> Result, StatusCode> { + match state.charging_engine.activate_bundle_for_subscriber(&request.imsi, &bundle_id).await { + Ok(active_bundle) => Ok(Json(ActiveBundleResponse { + imsi: active_bundle.imsi, + bundle_id: active_bundle.bundle_id, + activated_at: active_bundle.activated_at, + expires_at: active_bundle.expires_at, + remaining_allowances: BundleAllowancesRequest { + data_bytes: active_bundle.remaining_allowances.data_bytes, + voice_seconds: active_bundle.remaining_allowances.voice_seconds, + sms_count: active_bundle.remaining_allowances.sms_count, + roaming_data_bytes: active_bundle.remaining_allowances.roaming_data_bytes, + }, + })), + Err(ChargingError::BundleNotFound(_)) => Err(StatusCode::NOT_FOUND), + Err(ChargingError::BundleAlreadyActive) => Err(StatusCode::CONFLICT), + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + } +} + +/// List active bundles for a subscriber +pub async fn list_active_bundles( + State(state): State, + Path(imsi): Path, +) -> Result>, StatusCode> { + match state.charging_engine.get_active_bundles_for_subscriber(&imsi).await { + Ok(bundles) => { + let response: Vec = bundles.into_iter().map(|b| ActiveBundleResponse { + imsi: b.imsi, + bundle_id: b.bundle_id, + activated_at: b.activated_at, + expires_at: b.expires_at, + remaining_allowances: BundleAllowancesRequest { + data_bytes: b.remaining_allowances.data_bytes, + voice_seconds: b.remaining_allowances.voice_seconds, + sms_count: b.remaining_allowances.sms_count, + roaming_data_bytes: b.remaining_allowances.roaming_data_bytes, + }, + }).collect(); + Ok(Json(response)) + }, + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + } +} + +/// Consume from bundle allowances (internal use) +pub async fn consume_from_bundle( + State(state): State, + AxumJson(request): AxumJson, +) -> Result, StatusCode> { + let imsi = request.get("imsi").and_then(|v| v.as_str()).ok_or(StatusCode::BAD_REQUEST)?; + let usage_type_str = request.get("usage_type").and_then(|v| v.as_str()).ok_or(StatusCode::BAD_REQUEST)?; + let amount = request.get("amount").and_then(|v| v.as_u64()).ok_or(StatusCode::BAD_REQUEST)?; + + let usage_type = match usage_type_str { + "data" => crate::charging::types::UsageType::Data, + "voice" => crate::charging::types::UsageType::Voice, + "sms" => crate::charging::types::UsageType::SMS, + _ => return Err(StatusCode::BAD_REQUEST), + }; + + match state.charging_engine.consume_from_bundle(imsi, usage_type, amount).await { + Ok(consumed) => Ok(Json(json!({ + "consumed": consumed, + "imsi": imsi, + "usage_type": usage_type_str, + "amount": amount, + }))), + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + } + + + +} + + + +/// GET /v1/bundles +pub async fn list_bundles( + State(state): State, +) -> ChargingResult> { + let bundles = state.charging_engine.list_bundles().await + .with_context("Failed to list bundles")?; + Ok(Json(serde_json::json!({ "bundles": bundles }))) +} + +/// GET /v1/bundles/:id +pub async fn get_bundle( + Path(bundle_id): Path, + State(state): State, +) -> ChargingResult> { + let bundle = state.charging_engine.get_bundle_by_id(&bundle_id).await + .with_context("Failed to get bundle")?; + match bundle { + Some(b) => Ok(Json(serde_json::json!({ + "bundle_id": b.bundle_id, + "name": b.name, + "validity_days": b.validity_days, + "priority": b.priority, + "is_active": b.is_active, + }))), + None => Err(ChargingError::BundleNotFound(bundle_id)), + } +} + +/// DELETE /v1/bundles/:id +pub async fn deactivate_bundle( + Path(bundle_id): Path, + State(state): State, +) -> ChargingResult> { + state.charging_engine.deactivate_bundle(&bundle_id).await + .with_context("Failed to deactivate bundle")?; + Ok(Json(serde_json::json!({ "status": "deactivated", "bundle_id": bundle_id }))) +} + + +/// POST /v1/bundles/:id/purchase +/// Purchase a bundle by spending airtime seconds. +pub async fn purchase_bundle_with_airtime( + State(state): State, + Path(bundle_id): Path, + Json(req): Json, +) -> ChargingResult> { + let active = state.charging_engine + .purchase_bundle_with_airtime(&req.imsi, &bundle_id) + .await + .with_context("Failed to purchase bundle with airtime")?; + + Ok(Json(serde_json::json!({ + "status": "purchased", + "imsi": active.imsi, + "bundle_id": active.bundle_id, + "expires_at": active.expires_at, + "remaining_allowances": active.remaining_allowances, + }))) +} + + +/// POST /v1/bundles/:id/purchase +pub async fn purchase_bundle( + State(state): State, + Path(bundle_id): Path, + Json(req): Json, +) -> ChargingResult> { + let active = state.charging_engine + .purchase_bundle_with_airtime(&req.msisdn, &bundle_id) + .await + .with_context("Failed to purchase bundle")?; + + Ok(Json(serde_json::json!({ + "status": "purchased", + "msisdn": req.msisdn, + "bundle_id": active.bundle_id, + "expires_at": active.expires_at, + "remaining_allowances": active.remaining_allowances, + }))) +} + + +pub async fn gift_bundle( + State(state): State, + Path(bundle_id): Path, + Json(req): Json, +) -> ChargingResult> { + let active = state.charging_engine + .gift_bundle_with_airtime(&req.sender_msisdn, &req.recipient_msisdn, &bundle_id) + .await + .with_context("Failed to gift bundle")?; + + Ok(Json(serde_json::json!({ + "status": "gifted", + "sender_msisdn": req.sender_msisdn, + "recipient_msisdn": req.recipient_msisdn, + "bundle_id": active.bundle_id, + "expires_at": active.expires_at, + "remaining_allowances": active.remaining_allowances, + }))) +} diff --git a/apps/charging-engine/src/handlers/mod.rs b/apps/charging-engine/src/handlers/mod.rs index fbd69da..aee6f52 100644 --- a/apps/charging-engine/src/handlers/mod.rs +++ b/apps/charging-engine/src/handlers/mod.rs @@ -6,6 +6,8 @@ pub mod monitoring; pub mod rating; pub mod subscriber; pub mod usage; +pub mod airtime; +pub mod bundles; pub use block::{block_user, is_user_blocked, unblock_user}; pub use credit::{add_credit, check_credit, deduct_credit, get_balance}; @@ -15,3 +17,5 @@ pub use monitoring::{detailed_health_check, get_error_stats, get_performance_met pub use rating::{add_rating_plan, get_rating_plan, list_rating_plans, remove_rating_plan}; pub use subscriber::{get_subscriber, update_subscriber}; pub use usage::{calculate_usage_cost, generate_invoice, process_usage, rate_usage, record_usage}; +pub use airtime::{add_airtime, get_airtime_balance, deduct_airtime}; +pub use bundles::{create_bundle, activate_bundle, list_active_bundles, consume_from_bundle,list_bundles, get_bundle, deactivate_bundle, gift_bundle, purchase_bundle, purchase_bundle_with_airtime}; diff --git a/apps/charging-engine/src/handlers/usage.rs b/apps/charging-engine/src/handlers/usage.rs index b2f3f5c..bbd28cd 100644 --- a/apps/charging-engine/src/handlers/usage.rs +++ b/apps/charging-engine/src/handlers/usage.rs @@ -18,7 +18,7 @@ pub async fn record_usage( let session_id = req.get("session_id").and_then(|v| v.as_str()).ok_or_else(|| { ChargingError::InvalidInput("Missing session_id".to_string()) })?; - + validate_session_id(session_id)?; let event = crate::charging::types::UsageEvent { @@ -53,7 +53,7 @@ pub async fn calculate_usage_cost( let session_id = req.get("session_id").and_then(|v| v.as_str()).ok_or_else(|| { ChargingError::InvalidInput("Missing session_id".to_string()) })?; - + validate_session_id(session_id)?; let event = crate::charging::types::UsageEvent { @@ -68,7 +68,7 @@ pub async fn calculate_usage_cost( let cost = state.charging_engine.calculate_usage_cost(&event).await .with_context("Failed to calculate usage cost")?; - + Ok(Json(serde_json::json!({ "cost": cost, "imsi": imsi, @@ -88,7 +88,7 @@ pub async fn rate_usage( let session_id = req.get("session_id").and_then(|v| v.as_str()).ok_or_else(|| { ChargingError::InvalidInput("Missing session_id".to_string()) })?; - + validate_session_id(session_id)?; let event = crate::charging::types::UsageEvent { @@ -103,7 +103,7 @@ pub async fn rate_usage( let rated_event = state.charging_engine.rate_usage(event).await .with_context("Failed to rate usage")?; - + Ok(Json(serde_json::json!({ "rated_event": rated_event, }))) @@ -121,7 +121,7 @@ pub async fn process_usage( let session_id = req.get("session_id").and_then(|v| v.as_str()).ok_or_else(|| { ChargingError::InvalidInput("Missing session_id".to_string()) })?; - + validate_session_id(session_id)?; let event = crate::charging::types::UsageEvent { @@ -136,11 +136,24 @@ pub async fn process_usage( state.charging_engine.process_usage_event(event).await .with_context("Failed to process usage event")?; - - Ok(Json(serde_json::json!({ - "status": "processed", - "imsi": imsi, - }))) + + let result = state.charging_engine.process_usage_event(event).await + .with_context("Failed to process usage event")?; + + Ok(Json(serde_json::json!({ + "status": "processed", + "imsi": result.imsi, + "session_id": result.session_id, + "volume": result.volume, + "cost": result.cost, + "charging_source": match result.charging_source { + ChargingSource::Bundle { subscriber_bundle_id } => serde_json::json!({ + "type": "bundle", + "subscriber_bundle_id": subscriber_bundle_id + }), + ChargingSource::Credit => serde_json::json!({ "type": "credit" }), + } + }))) } /// GET /v1/invoice/:imsi/:period @@ -151,7 +164,7 @@ pub async fn generate_invoice( ) -> ChargingResult> { let invoice = state.charging_engine.generate_invoice(&imsi, &period).await .with_context("Failed to generate invoice")?; - + Ok(Json(serde_json::json!({ "imsi": imsi, "period": period, diff --git a/apps/charging-engine/src/routes.rs b/apps/charging-engine/src/routes.rs index 310b461..39041f0 100644 --- a/apps/charging-engine/src/routes.rs +++ b/apps/charging-engine/src/routes.rs @@ -14,7 +14,7 @@ use crate::handlers::{ engine_start, engine_stop, engine_uptime, get_balance, get_error_stats, get_performance_metrics, get_rating_plan, get_subscriber, get_system_stats, is_user_blocked, list_rating_plans, record_usage, remove_rating_plan, start_sync, unblock_user, update_subscriber, health_check, - calculate_usage_cost, rate_usage, process_usage, generate_invoice, + calculate_usage_cost, rate_usage, process_usage, generate_invoice, add_airtime, get_airtime_balance, create_bundle, activate_bundle, list_active_bundles, consume_from_bundle }; use crate::models::AppState; @@ -76,6 +76,20 @@ pub fn create_router(state: AppState) -> Router { .route("/v1/usage/rate", post(rate_usage)) .route("/v1/usage/process", post(process_usage)) .route("/v1/invoice/:imsi/:period", get(generate_invoice)) + //airtime and bundle routes + .route("/v1/airtime/:imsi/add", post(add_airtime)) + .route("/v1/airtime/:imsi/balance", get(get_airtime_balance)) + .route("/v1/bundles", post(create_bundle)) + .route("/v1/bundles/:id/activate", post(activate_bundle)) + .route("/v1/bundles/:imsi/active", get(list_active_bundles)) + .route("/v1/bundles/consume", post(consume_from_bundle)) + .route("/v1/bundles", get(list_bundles)) + .route("/v1/bundles/:id", get(get_bundle)) + .route("/v1/bundles/deactivate/:id", delete(deactivate_bundle)) + .route("/v1/bundles/:id/purchase_with_airtime", post(purchase_bundle_with_airtime)) + .route("/v1/bundles/:id/purchase", post(purchase_bundle)) + .route("/v1/bundles/:id/gift", post(gift_bundle)) + .route("/v1/airtime/:imsi/deduct", post(deduct_airtime)) .layer(cors) .layer(GovernorLayer::new(governor_conf)) .route_layer(axum::middleware::from_fn_with_state(state.auth_config.clone(), auth_middleware)) diff --git a/migrations/00019_init_airtime.sql b/migrations/00019_init_airtime.sql new file mode 100644 index 0000000..bedff04 --- /dev/null +++ b/migrations/00019_init_airtime.sql @@ -0,0 +1,32 @@ +-- +goose Up +CREATE TABLE IF NOT EXISTS airtime_balances ( + id SERIAL PRIMARY KEY, + subscriber_id INTEGER NOT NULL REFERENCES subscribers(id) ON DELETE CASCADE, + imsi TEXT NOT NULL UNIQUE, + home_seconds BIGINT NOT NULL DEFAULT 0, + roaming_seconds BIGINT NOT NULL DEFAULT 0, + last_topped_up TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS airtime_transactions ( + id SERIAL PRIMARY KEY, + subscriber_id INTEGER NOT NULL REFERENCES subscribers(id) ON DELETE CASCADE, + imsi TEXT NOT NULL, + transaction_type TEXT NOT NULL, -- 'topup', 'deduction', 'bundle_purchase' + seconds_delta BIGINT NOT NULL, -- positive = credit, negative = debit + balance_after BIGINT NOT NULL, + roaming BOOLEAN NOT NULL DEFAULT FALSE, + reference_id TEXT, -- bundle_id if bundle_purchase, session_id if deduction + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_airtime_balances_imsi ON airtime_balances(imsi); +CREATE INDEX IF NOT EXISTS idx_airtime_transactions_imsi ON airtime_transactions(imsi); +CREATE INDEX IF NOT EXISTS idx_airtime_transactions_type ON airtime_transactions(transaction_type); +CREATE INDEX IF NOT EXISTS idx_airtime_transactions_created_at ON airtime_transactions(created_at); + +-- +goose Down +DROP TABLE IF EXISTS airtime_transactions CASCADE; +DROP TABLE IF EXISTS airtime_balances CASCADE; diff --git a/migrations/00020_init_bundles.sql b/migrations/00020_init_bundles.sql new file mode 100644 index 0000000..bcf16ff --- /dev/null +++ b/migrations/00020_init_bundles.sql @@ -0,0 +1,45 @@ +-- +goose Up +CREATE TABLE IF NOT EXISTS bundles ( + id SERIAL PRIMARY KEY, + bundle_id TEXT NOT NULL UNIQUE, + name TEXT NOT NULL, + bundle_type TEXT NOT NULL, -- 'data', 'voice', 'sms', 'hybrid' + data_bytes BIGINT, + voice_seconds BIGINT, + sms_count BIGINT, + roaming_data_bytes BIGINT, + validity_days INTEGER NOT NULL DEFAULT 30, + priority SMALLINT NOT NULL DEFAULT 1, + airtime_cost BIGINT NOT NULL DEFAULT 0, -- cost in airtime seconds to purchase + is_active BOOLEAN NOT NULL DEFAULT TRUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS subscriber_bundles ( + id SERIAL PRIMARY KEY, + subscriber_id INTEGER NOT NULL REFERENCES subscribers(id) ON DELETE CASCADE, + imsi TEXT NOT NULL, + bundle_id TEXT NOT NULL REFERENCES bundles(bundle_id), + activated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + expires_at TIMESTAMPTZ NOT NULL, + remaining_data_bytes BIGINT, + remaining_voice_seconds BIGINT, + remaining_sms_count BIGINT, + remaining_roaming_bytes BIGINT, + status TEXT NOT NULL DEFAULT 'active', -- 'active', 'exhausted', 'expired' + purchased_with_airtime BOOLEAN NOT NULL DEFAULT FALSE, + airtime_cost_paid BIGINT NOT NULL DEFAULT 0, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_bundles_bundle_id ON bundles(bundle_id); +CREATE INDEX IF NOT EXISTS idx_bundles_is_active ON bundles(is_active); +CREATE INDEX IF NOT EXISTS idx_subscriber_bundles_imsi ON subscriber_bundles(imsi); +CREATE INDEX IF NOT EXISTS idx_subscriber_bundles_expires_at ON subscriber_bundles(expires_at); +CREATE INDEX IF NOT EXISTS idx_subscriber_bundles_status ON subscriber_bundles(status); + +-- +goose Down +DROP TABLE IF EXISTS subscriber_bundles CASCADE; +DROP TABLE IF EXISTS bundles CASCADE; diff --git a/migrations/00021_usage_records_charging_source.sql b/migrations/00021_usage_records_charging_source.sql new file mode 100644 index 0000000..a9e988f --- /dev/null +++ b/migrations/00021_usage_records_charging_source.sql @@ -0,0 +1,11 @@ +-- +goose Up +ALTER TABLE usage_records + ADD COLUMN IF NOT EXISTS charging_source TEXT DEFAULT 'credit', + ADD COLUMN IF NOT EXISTS subscriber_bundle_id INTEGER REFERENCES subscriber_bundles(id); + +CREATE INDEX IF NOT EXISTS idx_usage_records_charging_source ON usage_records(charging_source); + +-- +goose Down +ALTER TABLE usage_records + DROP COLUMN IF EXISTS charging_source, + DROP COLUMN IF EXISTS subscriber_bundle_id; diff --git a/migrations/00022_bundles_amount_unconverted.sql b/migrations/00022_bundles_amount_unconverted.sql new file mode 100644 index 0000000..8435cf9 --- /dev/null +++ b/migrations/00022_bundles_amount_unconverted.sql @@ -0,0 +1,6 @@ +-- +goose Up +ALTER TABLE bundles + ADD COLUMN IF NOT EXISTS amount_unconverted BIGINT NOT NULL DEFAULT 0; + +-- +goose Down +ALTER TABLE bundles DROP COLUMN IF EXISTS amount_unconverted;