diff --git a/apps/api-server/internal/handlers/auc_handler.go b/apps/api-server/internal/handlers/auc_handler.go new file mode 100644 index 0000000..42d045a --- /dev/null +++ b/apps/api-server/internal/handlers/auc_handler.go @@ -0,0 +1,66 @@ +package handlers + +import ( + "net/http" + + "github.com/gin-gonic/gin" + "github.com/nutcas3/telecom-platform/apps/api-server/internal/models" + "github.com/nutcas3/telecom-platform/apps/api-server/internal/services" +) + +// AuCHandler handles Authentication Center (AuC) HTTP requests. +type AuCHandler struct { + subscriberService *services.SubscriberService +} + +// NewAuCHandler creates a new AuCHandler. +func NewAuCHandler(subscriberService *services.SubscriberService) *AuCHandler { + return &AuCHandler{subscriberService: subscriberService} +} + +// GenerateAuthVector generates a 3GPP EPS authentication vector for a subscriber. +// +// @Summary Generate authentication vector +// @Description Runs the Milenage algorithm and returns (RAND, XRES, CK, IK, AUTN). +// +// Called by the MME/AMF during an Attach or TAU procedure. +// +// @Tags auc +// @Produce json +// @Param imsi path string true "Subscriber IMSI (15 digits)" +// @Success 200 {object} services.AuthVector +// @Failure 400 {object} ErrorResponse +// @Failure 404 {object} ErrorResponse +// @Failure 500 {object} ErrorResponse +// @Router /api/v1/auc/{imsi}/auth-vector [post] +func (h *AuCHandler) GenerateAuthVector(c *gin.Context) { + imsi := models.IMSI(c.Param("imsi")) + if imsi == "" { + c.JSON(http.StatusBadRequest, ErrorResponse{ + Error: "IMSI is required", + Code: ErrCodeMissingRequired, + Details: "imsi path parameter must not be empty", + }) + return + } + + av, err := h.subscriberService.GenerateAuthVector(c.Request.Context(), imsi) + if err != nil { + handleError(c, err, "Failed to generate authentication vector", ErrCodeInternalError) + return + } + + c.JSON(http.StatusOK, av) +} + +// RegisterAuCRoutes registers AuC routes on the given router group. +// Call this from your router setup alongside other handler registrations. +// +// v1 := r.Group("/api/v1") +// handlers.RegisterAuCRoutes(v1, handlers.NewAuCHandler(subscriberService)) +func RegisterAuCRoutes(rg *gin.RouterGroup, h *AuCHandler) { + auc := rg.Group("/auc") + { + auc.POST("/:imsi/auth-vector", h.GenerateAuthVector) + } +} diff --git a/apps/api-server/internal/models/account.go b/apps/api-server/internal/models/account.go index 8e23660..0333983 100644 --- a/apps/api-server/internal/models/account.go +++ b/apps/api-server/internal/models/account.go @@ -6,6 +6,7 @@ import ( type SubscriberAccount struct { IMSI string `json:"imsi"` + MSISDN string `json:"msisdn"` Balance float64 `json:"balance"` DataLimit float64 `json:"data_limit"` DataUsed float64 `json:"data_used"` diff --git a/apps/api-server/internal/models/subscriber.go b/apps/api-server/internal/models/subscriber.go index 2ad0d34..7abcb18 100644 --- a/apps/api-server/internal/models/subscriber.go +++ b/apps/api-server/internal/models/subscriber.go @@ -25,6 +25,7 @@ type Subscriber struct { AuthKey string `json:"auth_key" gorm:"not null"` OPc string `json:"opc" gorm:"not null"` + SQN int64 `json:"sqn" gorm:"default:0"` ServingPLMN PLMN `json:"serving_plmn" gorm:"not null"` EUICCID string `json:"euicc_id"` diff --git a/apps/api-server/internal/services/charging.go b/apps/api-server/internal/services/charging.go index bc7c8de..29c0d4d 100644 --- a/apps/api-server/internal/services/charging.go +++ b/apps/api-server/internal/services/charging.go @@ -39,7 +39,7 @@ func (cs *ChargingService) GetSystemStats(ctx context.Context) (*models.SystemSt // Count total accounts var totalAccounts int64 - if err := cs.db.DB.WithContext(ctx).Model(&models.SubscriberAccount{}). + if err := cs.db.DB.WithContext(ctx).Model(&models.Subscriber{}). Count(&totalAccounts).Error; err != nil { return nil, fmt.Errorf("failed to count total accounts: %w", err) } diff --git a/apps/api-server/internal/services/subscriber_auc.go b/apps/api-server/internal/services/subscriber_auc.go new file mode 100644 index 0000000..0cab464 --- /dev/null +++ b/apps/api-server/internal/services/subscriber_auc.go @@ -0,0 +1,119 @@ +package services + +import ( + "context" + "crypto/aes" + "crypto/rand" + "encoding/binary" + "encoding/hex" + "fmt" + + "github.com/nutcas3/telecom-platform/apps/api-server/internal/models" +) + +// defaultAMF is the Authentication Management Field (2 bytes). +// Bit 15 (separation bit) = 1 for 3G/4G per 3GPP TS 33.102 §6.3.3. +var defaultAMF = []byte{0x80, 0x00} + +// AuthVector is a 3GPP EPS authentication vector (AV) returned to the MME/AMF. +type AuthVector struct { + RAND string `json:"rand"` // 16 bytes hex — random challenge sent to UE + XRES string `json:"xres"` // 8 bytes hex — expected response from UE (f2) + CK string `json:"ck"` // 16 bytes hex — cipher key (f3) + IK string `json:"ik"` // 16 bytes hex — integrity key (f4) + AUTN string `json:"autn"` // 16 bytes hex — (SQN XOR AK) || AMF || MAC-A +} + +// GenerateAuthVector generates a fresh 3GPP EPS authentication vector for the +// subscriber identified by imsi. It atomically increments the subscriber's SQN +// in the database before computing the vector to prevent replay attacks. +func (s *SubscriberService) GenerateAuthVector(ctx context.Context, imsi models.IMSI) (*AuthVector, error) { + // 1. Load subscriber — need K (AuthKey) and OPc + sub, err := s.db.GetSubscriberByIMSI(ctx, imsi) + if err != nil { + return nil, fmt.Errorf("subscriber not found: %w", err) + } + + // 2. Decode K and OPc from hex storage + k, err := hex.DecodeString(sub.AuthKey) + if err != nil || len(k) != 16 { + return nil, fmt.Errorf("invalid AuthKey for subscriber %s: must be 32 hex chars", imsi) + } + opc, err := hex.DecodeString(sub.OPc) + if err != nil || len(opc) != 16 { + return nil, fmt.Errorf("invalid OPc for subscriber %s: must be 32 hex chars", imsi) + } + + // 3. Atomically increment SQN — prevents replay attacks + sqn, err := s.incrementSQN(ctx, sub) + if err != nil { + return nil, fmt.Errorf("failed to increment SQN: %w", err) + } + + // 4. Generate RAND (16 cryptographically random bytes) + randBytes := make([]byte, 16) + if _, err := rand.Read(randBytes); err != nil { + return nil, fmt.Errorf("failed to generate RAND: %w", err) + } + + // 5. Run Milenage and return the auth vector + return runMilenage(k, opc, randBytes, sqn, defaultAMF) +} + +// runMilenage executes the full Milenage algorithm (3GPP TS 35.206) and +// returns a complete authentication vector. +func runMilenage(k, opc, randBytes []byte, sqn uint64, amf []byte) (*AuthVector, error) { + block, err := aes.NewCipher(k) + if err != nil { + return nil, fmt.Errorf("AES cipher init failed: %w", err) + } + + // Encode SQN as 6 bytes (big-endian, 48-bit counter) + sqnBuf := make([]byte, 8) + binary.BigEndian.PutUint64(sqnBuf, sqn) + sqn6 := sqnBuf[2:] // lower 6 bytes + + // Shared intermediate value: temp = AES-128(K, RAND XOR OPc) + temp := milenageTemp(block, randBytes, opc) + + // f2 + f5: RES (expected response) and AK (anonymity key) + res, ak := milenageF2F5(block, temp, opc) + + // f3: CK (cipher key) + ck := milenageF3(block, temp, opc) + + // f4: IK (integrity key) + ik := milenageF4(block, temp, opc) + + // f1: MAC-A (network authentication code) — uses raw SQN, not SQN XOR AK + macA := milenageF1(block, temp, opc, sqn6, amf) + + // AUTN = (SQN XOR AK)[6] || AMF[2] || MAC-A[8] — total 16 bytes + sqnXorAK := xorBytes(sqn6, ak) + autn := make([]byte, 16) + copy(autn[0:6], sqnXorAK) + copy(autn[6:8], amf) + copy(autn[8:16], macA) + + return &AuthVector{ + RAND: hex.EncodeToString(randBytes), + XRES: hex.EncodeToString(res), + CK: hex.EncodeToString(ck), + IK: hex.EncodeToString(ik), + AUTN: hex.EncodeToString(autn), + }, nil +} + +// incrementSQN atomically increments the subscriber's SQN in PostgreSQL and +// returns the new value. Using UPDATE … RETURNING ensures no two concurrent +// requests can receive the same SQN. +func (s *SubscriberService) incrementSQN(ctx context.Context, sub *models.Subscriber) (uint64, error) { + var newSQN int64 + err := s.db.DB.WithContext(ctx). + Raw("UPDATE subscribers SET sqn = sqn + 1 WHERE id = ? RETURNING sqn", sub.ID). + Scan(&newSQN).Error + if err != nil { + return 0, fmt.Errorf("SQN increment failed: %w", err) + } + return uint64(newSQN), nil +} diff --git a/apps/api-server/internal/services/subscriber_auth.go b/apps/api-server/internal/services/subscriber_auth.go index e17d240..7a1b247 100644 --- a/apps/api-server/internal/services/subscriber_auth.go +++ b/apps/api-server/internal/services/subscriber_auth.go @@ -2,80 +2,137 @@ package services import ( "crypto/aes" + "crypto/cipher" "crypto/rand" "encoding/hex" "fmt" "os" ) -// generateAuthKeys generates authentication keys for the subscriber +// generateAuthKeys generates a fresh Ki (K) and derives OPc for a new subscriber. +// Returns (K_hex, OPc_hex, error). func (s *SubscriberService) generateAuthKeys() (string, string, error) { - // Generate 128-bit random key (K) - key := make([]byte, 16) - if _, err := rand.Read(key); err != nil { - return "", "", err + k := make([]byte, 16) + if _, err := rand.Read(k); err != nil { + return "", "", fmt.Errorf("failed to generate K: %w", err) } - - // Get OP (Operator variant) from operator configuration - // This should be consistent across all subscribers for the same operator op := s.getOperatorVariant() - if op == nil { - return "", "", fmt.Errorf("operator variant not configured") - } - - // Generate OPc (derived from OP and K) using AES-128 encryption - // OPc = AES-128(K, OP) where OP is encrypted with K - opc, err := s.generateOPc(key, op) + opc, err := deriveOPc(k, op) if err != nil { - return "", "", err + return "", "", fmt.Errorf("failed to derive OPc: %w", err) } - - return hex.EncodeToString(key), hex.EncodeToString(opc), nil + return hex.EncodeToString(k), hex.EncodeToString(opc), nil } -// getOperatorVariant returns the operator variant (OP) from configuration +// getOperatorVariant returns the 16-byte OP from environment configuration. +// OP must be kept secret and identical across all subscribers for the same operator. func (s *SubscriberService) getOperatorVariant() []byte { - // Get operator variant from environment variable or secure configuration - // This should be the same across all subscribers for the same operator opStr := os.Getenv("OPERATOR_VARIANT") if opStr == "" { - // Fallback to default for development - opStr = "TelecomOP1234567" // 16-byte operator variant + opStr = "TelecomOP1234567" // 16 bytes — override in production via env } - - // Ensure exactly 16 bytes for AES-128 op := make([]byte, 16) copy(op, []byte(opStr)) - - // If shorter than 16 bytes, pad with zeros - if len(opStr) < 16 { - for i := len(opStr); i < 16; i++ { - op[i] = 0 - } - } - return op } -// generateOPc derives OPc from OP and K using AES-128 encryption -func (s *SubscriberService) generateOPc(k, op []byte) ([]byte, error) { - // Create AES-128 cipher block with key K +// deriveOPc computes OPc = AES-128(K, OP) XOR OP per 3GPP TS 35.206 §3. +// BUG FIX: the previous implementation was missing the final XOR OP step. +func deriveOPc(k, op []byte) ([]byte, error) { + if len(k) != 16 || len(op) != 16 { + return nil, fmt.Errorf("K and OP must each be exactly 16 bytes") + } block, err := aes.NewCipher(k) if err != nil { - return nil, fmt.Errorf("failed to create AES cipher: %w", err) + return nil, fmt.Errorf("AES cipher init failed: %w", err) } + opc := make([]byte, 16) + block.Encrypt(opc, op) + xorInPlace(opc, op) // OPc = AES-128(K, OP) XOR OP ← was missing + return opc, nil +} + +// ─── Milenage f-functions (3GPP TS 35.206) ─────────────────────────────────── + +// milenageTemp computes the shared intermediate value used by all f-functions: +// +// temp = AES-128(K, RAND XOR OPc) +func milenageTemp(block cipher.Block, rand16, opc []byte) []byte { + out := make([]byte, 16) + block.Encrypt(out, xorBytes(rand16, opc)) + return out +} + +// milenageF1 computes MAC-A (8 bytes) — the network authentication code. +// 3GPP TS 35.206 §2.3: r1=64 bits (8 bytes), c1=0x00…00 (no-op XOR). +func milenageF1(block cipher.Block, temp, opc, sqn6, amf2 []byte) []byte { + // in1 = SQN[0..5] || AMF[0..1] || SQN[0..5] || AMF[0..1] + in1 := make([]byte, 16) + copy(in1[0:6], sqn6) + copy(in1[6:8], amf2) + copy(in1[8:14], sqn6) + copy(in1[14:16], amf2) + + x := xorBytes(rotLeft(xorBytes(temp, opc), 8), in1) // rot r1=8 bytes, c1=0 (no-op) + out := make([]byte, 16) + block.Encrypt(out, x) + return xorBytes(out, opc)[:8] // MAC-A = out[0..7] +} + +// milenageF2F5 computes RES (8 bytes) and AK (6 bytes) in one pass. +// 3GPP TS 35.206 §2.4 (f2) and §2.7 (f5): r2=0, c2=0x00…01. +func milenageF2F5(block cipher.Block, temp, opc []byte) (res, ak []byte) { + x := xorBytes(temp, opc) // rot r2=0 bits — no rotation + x[15] ^= 0x01 // XOR c2 + out := make([]byte, 16) + block.Encrypt(out, x) + result := xorBytes(out, opc) + return append([]byte{}, result[8:16]...), append([]byte{}, result[0:6]...) +} + +// milenageF3 computes CK (16 bytes) — the cipher key. +// 3GPP TS 35.206 §2.5: r3=32 bits (4 bytes), c3=0x00…02. +func milenageF3(block cipher.Block, temp, opc []byte) []byte { + x := rotLeft(xorBytes(temp, opc), 4) + x[15] ^= 0x02 + out := make([]byte, 16) + block.Encrypt(out, x) + return xorBytes(out, opc) +} + +// milenageF4 computes IK (16 bytes) — the integrity key. +// 3GPP TS 35.206 §2.6: r4=64 bits (8 bytes), c4=0x00…04. +func milenageF4(block cipher.Block, temp, opc []byte) []byte { + x := rotLeft(xorBytes(temp, opc), 8) + x[15] ^= 0x04 + out := make([]byte, 16) + block.Encrypt(out, x) + return xorBytes(out, opc) +} - // OPc = AES-128(K, OP) - encrypt OP with key K - opc := make([]byte, aes.BlockSize) // AES block size is 16 bytes - - // Create ECB mode cipher (as per 3GPP specification for OPc derivation) - // In 3GPP, OPc is derived using AES-128 ECB mode - if len(op) != aes.BlockSize { - return nil, fmt.Errorf("OP must be 16 bytes for AES-128") +// ─── Byte utilities ─────────────────────────────────────────────────────────── + +// rotLeft rotates a 16-byte slice left by byteCount positions. +func rotLeft(x []byte, byteCount int) []byte { + out := make([]byte, 16) + for i := 0; i < 16; i++ { + out[i] = x[(i+byteCount)%16] } + return out +} - // Encrypt OP using ECB mode (single block) - block.Encrypt(opc, op) +// xorBytes returns a XOR b (slices must be the same length). +func xorBytes(a, b []byte) []byte { + out := make([]byte, len(a)) + for i := range a { + out[i] = a[i] ^ b[i] + } + return out +} - return opc, nil +// xorInPlace XORs b into a in place. +func xorInPlace(a, b []byte) { + for i := range a { + a[i] ^= b[i] + } } diff --git a/apps/api-server/internal/services/subscriber_billing.go b/apps/api-server/internal/services/subscriber_billing.go index 49f61c6..2aa778d 100644 --- a/apps/api-server/internal/services/subscriber_billing.go +++ b/apps/api-server/internal/services/subscriber_billing.go @@ -34,6 +34,7 @@ func (s *SubscriberService) GetAccount(ctx context.Context, imsi string) (*model return &models.SubscriberAccount{ IMSI: string(subscriber.IMSI), + MSISDN: subscriber.MSISDN, Balance: balance, DataLimit: float64(subscriber.Plan.DataLimit), DataUsed: dataUsed, @@ -118,8 +119,9 @@ func (s *SubscriberService) GetRatingPlan(ctx context.Context, planId string) (* } // TopUpBalance tops up a subscriber's account balance. -func (s *SubscriberService) TopUpBalance(ctx context.Context, imsi string, req *models.TopUpRequest) (*models.SubscriberAccount, error) { - var account models.SubscriberAccount +func (s *SubscriberService) TopUpBalance(ctx context.Context, imsi string, req *models.TopUpRequest) (*models.Subscriber, error) { + // changed this to subscriber (initially added the MSISDN field to the subscriber model to match the subscriber account model for queries and requests) + var account models.Subscriber if err := s.db.DB.WithContext(ctx).Where("imsi = ?", imsi).First(&account).Error; err != nil { return nil, fmt.Errorf("subscriber account not found: %w", err) diff --git a/apps/api-server/internal/services/subscriber_lifecycle.go b/apps/api-server/internal/services/subscriber_lifecycle.go index 8b397b3..8f1c76d 100644 --- a/apps/api-server/internal/services/subscriber_lifecycle.go +++ b/apps/api-server/internal/services/subscriber_lifecycle.go @@ -91,7 +91,7 @@ func (s *SubscriberService) DeleteSubscriber(ctx context.Context, subscriberId u } if err := tx.Where("imsi IN (SELECT imsi FROM subscribers WHERE id = ?)", subscriberId). - Delete(&models.SubscriberAccount{}).Error; err != nil { + Delete(&models.Subscriber{}).Error; err != nil { tx.Rollback() return false, fmt.Errorf("failed to delete account: %w", err) } diff --git a/apps/charging-engine/src/charging.rs b/apps/charging-engine/src/charging.rs index ac3b4af..64a76a9 100644 --- a/apps/charging-engine/src/charging.rs +++ b/apps/charging-engine/src/charging.rs @@ -3,6 +3,10 @@ pub mod engine; pub mod credit_management; pub mod rating_billing; pub mod rating_plans_repo; +pub mod airtime_service; +pub mod bundle_service; +pub mod bundle_repo; pub use engine::ChargingEngine; pub use rating_plans_repo::RatingPlansRepo; +pub use bundle_repo::BundleRepo; diff --git a/apps/charging-engine/src/charging/airtime_service.rs b/apps/charging-engine/src/charging/airtime_service.rs new file mode 100644 index 0000000..1b8eed3 --- /dev/null +++ b/apps/charging-engine/src/charging/airtime_service.rs @@ -0,0 +1,153 @@ +use crate::circuit_breaker::CircuitBreakerError; +use crate::errors::{ChargingError, ChargingResult}; +use super::types::AirtimeBalance; + +impl crate::charging::ChargingEngine { + pub async fn add_airtime(&self, imsi: &str, seconds_to_add: u64, roaming: bool) -> ChargingResult { + self.redis_circuit_breaker.execute(|| async { + let mut conn = self.redis_client.get_multiplexed_async_connection() + .await.map_err(|e| ChargingError::RedisConnection(e.to_string()))?; + + let key = if roaming { + format!("airtime:roaming:{}", imsi) + } else { + format!("airtime:home:{}", imsi) + }; + + // Atomic increment with overflow check + let current: Option = redis::AsyncCommands::get(&mut conn, &key).await + .map_err(|e| ChargingError::RedisOperation(e.to_string()))?; + + if let Some(existing) = current { + if existing.checked_add(seconds_to_add).is_none() { + return Err(ChargingError::InvalidInput("Airtime addition would overflow".to_string())); + } + } + + let new_balance: u64 = redis::AsyncCommands::incr(&mut conn, &key, seconds_to_add).await + .map_err(|e| ChargingError::RedisOperation(e.to_string()))?; + + // Set expiration (30 days for airtime) + let _: () = redis::AsyncCommands::expire(&mut conn, &key, 2592000).await + .unwrap_or(()); + + // Update metadata + let meta_key = format!("airtime:meta:{}", imsi); + let _: () = redis::AsyncCommands::hset(&mut conn, &meta_key, "last_updated", Utc::now().timestamp()).await + .map_err(|e| ChargingError::RedisOperation(e.to_string()))?; + + Ok(new_balance) + }).await.map_err(|e| match e { + CircuitBreakerError::Open => ChargingError::RedisConnection("Circuit breaker is open".to_string()), + CircuitBreakerError::Inner(e) => e, + }) + } + + pub async fn deduct_airtime(&self, imsi: &str, seconds_to_deduct: u64, roaming: bool) -> ChargingResult { + self.redis_circuit_breaker.execute(|| async { + let mut conn = self.redis_client.get_multiplexed_async_connection() + .await.map_err(|e| ChargingError::RedisConnection(e.to_string()))?; + + let balance_key = if roaming { + format!("airtime:roaming:{}", imsi) + } else { + format!("airtime:home:{}", imsi) + }; + + // Use Lua script for atomic check-and-deduct + let script = r#" + local current = redis.call('GET', KEYS[1]) + if not current then + return {err = "NO_BALANCE"} + end + current = tonumber(current) + if current < tonumber(ARGV[1]) then + return {err = "INSUFFICIENT_BALANCE", current = current} + end + local new_balance = current - tonumber(ARGV[1]) + redis.call('SET', KEYS[1], new_balance) + return {ok = true, new_balance = new_balance, old_balance = current} + "#; + + let result: redis::RedisResult = redis::Script::new(script) + .key(&balance_key) + .arg(seconds_to_deduct) + .invoke_async(&mut conn) + .await; + + match result { + Ok(redis::Value::Bulk(items)) => { + let deduction = self.parse_airtime_deduction_result(items)?; + + // Record usage event for billing + self.record_airtime_usage(&mut conn, imsi, seconds_to_deduct, roaming).await?; + + Ok(deduction) + } + Ok(_) => Err(ChargingError::InvalidState("Unexpected script result".to_string())), + Err(e) => Err(ChargingError::RedisOperation(e.to_string())), + } + }).await + } + + pub async fn get_airtime_balance(&self, imsi: &str) -> ChargingResult { + self.redis_circuit_breaker.execute(|| async { + let mut conn = self.redis_client.get_multiplexed_async_connection() + .await.map_err(|e| ChargingError::RedisConnection(e.to_string()))?; + + let home_key = format!("airtime:home:{}", imsi); + let roaming_key = format!("airtime:roaming:{}", imsi); + let meta_key = format!("airtime:meta:{}", imsi); + + let pipe = redis::pipe(); + pipe.get(&home_key) + .get(&roaming_key) + .hget(&meta_key, "last_updated"); + + let results: Vec> = pipe.query_async(&mut conn).await + .map_err(|e| ChargingError::RedisOperation(e.to_string()))?; + + let home_seconds = match results[0] { + Some(redis::Value::Int(v)) => v as u64, + _ => 0, + }; + + let roaming_seconds = match results[1] { + Some(redis::Value::Int(v)) => v as u64, + _ => 0, + }; + + let last_updated = match results[2] { + Some(redis::Value::Bulk(v)) => { + if let Some(redis::Value::Data(ts_bytes)) = v.first() { + let ts_str = String::from_utf8_lossy(ts_bytes); + ts_str.parse::().ok() + .and_then(|ts| DateTime::from_timestamp(ts, 0)) + .unwrap_or_else(Utc::now) + } else { + Utc::now() + } + } + _ => Utc::now(), + }; + + Ok(AirtimeBalance { + imsi: imsi.to_string(), + home_seconds, + roaming_seconds, + last_updated, + }) + }).await.map_err(|e| match e { + CircuitBreakerError::Open => ChargingError::RedisConnection("Circuit breaker is open".to_string()), + CircuitBreakerError::Inner(e) => e, + }) + } +} + +#[derive(Debug, Serialize)] +pub struct AirtimeDeduction { + pub old_balance: u64, + pub new_balance: u64, + pub deducted: u64, + pub remaining: u64, +} diff --git a/apps/charging-engine/src/charging/bundle_expiry_task.rs b/apps/charging-engine/src/charging/bundle_expiry_task.rs new file mode 100644 index 0000000..d2fdf4b --- /dev/null +++ b/apps/charging-engine/src/charging/bundle_expiry_task.rs @@ -0,0 +1,23 @@ +use tokio::time::{interval, Duration}; +use tracing::{error, info}; +use super::bundle_repo::BundleRepo; + +/// Spawns a background task that expires stale subscriber bundles every 5 minutes. +/// Must be called once at engine startup. +pub fn spawn_bundle_expiry_task(bundle_repo: BundleRepo) { + tokio::spawn(async move { + let mut ticker = interval(Duration::from_secs(300)); // every 5 minutes + loop { + ticker.tick().await; + match bundle_repo.expire_stale_bundles().await { + Ok(count) if count > 0 => { + info!(expired_count = count, "Expired stale subscriber bundles"); + } + Ok(_) => {} + Err(e) => { + error!(error = %e, "Failed to expire stale bundles"); + } + } + } + }); +} diff --git a/apps/charging-engine/src/charging/bundle_repo.rs b/apps/charging-engine/src/charging/bundle_repo.rs new file mode 100644 index 0000000..03fe754 --- /dev/null +++ b/apps/charging-engine/src/charging/bundle_repo.rs @@ -0,0 +1,215 @@ +use sqlx::postgres::{PgPool, PgPoolOptions}; +use sqlx::Row; +use crate::circuit_breaker::{CircuitBreaker, CircuitBreakerError}; +use crate::errors::{ChargingError, ChargingResult}; +use super::types::{Bundle, BundleType, BundleAllowances, ActiveBundle}; +use chrono::Utc; + +#[derive(Clone)] +pub struct BundleRepo { + pub(crate) pool: PgPool, + circuit_breaker: CircuitBreaker, +} + +impl BundleRepo { + pub async fn connect(database_url: &str) -> ChargingResult { + let pool = PgPoolOptions::new() + .max_connections(10) + .connect(database_url) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + let circuit_breaker = CircuitBreaker::new(5, std::time::Duration::from_secs(60)); + Ok(Self { pool, circuit_breaker }) + } + + /// Upsert a bundle definition. + pub async fn upsert_bundle(&self, bundle: &Bundle) -> ChargingResult<()> { + let bundle = bundle.clone(); + self.circuit_breaker.execute(|| async move { + sqlx::query(r#" + INSERT INTO bundles + (bundle_id, name, bundle_type, data_bytes, voice_seconds, + sms_count, roaming_data_bytes, validity_days, priority, is_active, updated_at) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,TRUE,NOW()) + ON CONFLICT (bundle_id) DO UPDATE SET + name = EXCLUDED.name, + bundle_type = EXCLUDED.bundle_type, + data_bytes = EXCLUDED.data_bytes, + voice_seconds = EXCLUDED.voice_seconds, + sms_count = EXCLUDED.sms_count, + roaming_data_bytes = EXCLUDED.roaming_data_bytes, + validity_days = EXCLUDED.validity_days, + priority = EXCLUDED.priority, + is_active = TRUE, + updated_at = NOW() + "#) + .bind(&bundle.bundle_id) + .bind(&bundle.name) + .bind(bundle.bundle_type.as_str()) + .bind(bundle.allowances.data_bytes.map(|v| v as i64)) + .bind(bundle.allowances.voice_seconds.map(|v| v as i64)) + .bind(bundle.allowances.sms_count.map(|v| v as i64)) + .bind(bundle.allowances.roaming_data_bytes.map(|v| v as i64)) + .bind(bundle.validity_days as i32) + .bind(bundle.priority as i16) + .execute(&self.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + Ok(()) + }).await.map_err(|e| match e { + CircuitBreakerError::Open => ChargingError::DatabaseError("Circuit breaker open".to_string()), + CircuitBreakerError::Inner(e) => e, + }) + } + + pub async fn get_bundle(&self, bundle_id: &str) -> ChargingResult> { + let bundle_id = bundle_id.to_string(); + self.circuit_breaker.execute(|| async move { + let row = sqlx::query(r#" + SELECT bundle_id, name, bundle_type, data_bytes, voice_seconds, + sms_count, roaming_data_bytes, validity_days, priority, is_active + FROM bundles WHERE bundle_id = $1 AND is_active = TRUE + "#) + .bind(&bundle_id) + .fetch_optional(&self.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + Ok(row.map(row_to_bundle)) + }).await.map_err(|e| match e { + CircuitBreakerError::Open => ChargingError::DatabaseError("Circuit breaker open".to_string()), + CircuitBreakerError::Inner(e) => e, + }) + } + + /// Activate a bundle for a subscriber — inserts into subscriber_bundles. + /// Returns the new subscriber_bundle id. + pub async fn activate_for_subscriber( + &self, + imsi: &str, + bundle_id: &str, + validity_days: u32, + allowances: &BundleAllowances, + ) -> ChargingResult { + let imsi = imsi.to_string(); + let bundle_id = bundle_id.to_string(); + let expires_at = Utc::now() + chrono::Duration::days(validity_days as i64); + let allowances = allowances.clone(); + + self.circuit_breaker.execute(|| async move { + let row = sqlx::query(r#" + INSERT INTO subscriber_bundles + (imsi, bundle_id, expires_at, + remaining_data_bytes, remaining_voice_seconds, + remaining_sms_count, remaining_roaming_bytes, status) + VALUES ($1,$2,$3,$4,$5,$6,$7,'active') + RETURNING id + "#) + .bind(&imsi) + .bind(&bundle_id) + .bind(expires_at) + .bind(allowances.data_bytes.map(|v| v as i64)) + .bind(allowances.voice_seconds.map(|v| v as i64)) + .bind(allowances.sms_count.map(|v| v as i64)) + .bind(allowances.roaming_data_bytes.map(|v| v as i64)) + .fetch_one(&self.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + + Ok(row.get::("id")) + }).await.map_err(|e| match e { + CircuitBreakerError::Open => ChargingError::DatabaseError("Circuit breaker open".to_string()), + CircuitBreakerError::Inner(e) => e, + }) + } + + /// Fetch active, non-expired bundles for a subscriber, ordered by priority. + pub async fn get_active_bundles(&self, imsi: &str) -> ChargingResult> { + let imsi = imsi.to_string(); + self.circuit_breaker.execute(|| async move { + let rows = sqlx::query(r#" + SELECT sb.id, sb.imsi, sb.bundle_id, sb.activated_at, sb.expires_at, + sb.remaining_data_bytes, sb.remaining_voice_seconds, + sb.remaining_sms_count, sb.remaining_roaming_bytes, + b.priority + FROM subscriber_bundles sb + JOIN bundles b ON b.bundle_id = sb.bundle_id + WHERE sb.imsi = $1 + AND sb.status = 'active' + AND sb.expires_at > NOW() + ORDER BY b.priority ASC, sb.activated_at ASC + "#) + .bind(&imsi) + .fetch_all(&self.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + + Ok(rows.into_iter().map(row_to_active_bundle).collect()) + }).await.map_err(|e| match e { + CircuitBreakerError::Open => ChargingError::DatabaseError("Circuit breaker open".to_string()), + CircuitBreakerError::Inner(e) => e, + }) + } + + /// Atomically deduct from a subscriber_bundle row. + /// Returns true if deduction succeeded, false if insufficient allowance. + pub async fn deduct_from_bundle( + &self, + subscriber_bundle_id: i64, + usage_type: &str, + amount: u64, + ) -> ChargingResult { + let amount = amount as i64; + self.circuit_breaker.execute(|| async move { + let column = match usage_type { + "data" => "remaining_data_bytes", + "voice" => "remaining_voice_seconds", + "sms" => "remaining_sms_count", + "roaming" => "remaining_roaming_bytes", + _ => return Err(ChargingError::InvalidInput(format!("Unknown usage type: {}", usage_type))), + }; + + // Atomic check-and-deduct: only update if remaining >= amount + let result = sqlx::query(&format!(r#" + UPDATE subscriber_bundles + SET {column} = {column} - $1, + status = CASE + WHEN ({column} - $1) <= 0 THEN 'exhausted' + ELSE status + END, + updated_at = NOW() + WHERE id = $2 + AND {column} >= $1 + AND status = 'active' + AND expires_at > NOW() + "#)) + .bind(amount) + .bind(subscriber_bundle_id) + .execute(&self.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + + Ok(result.rows_affected() > 0) + }).await.map_err(|e| match e { + CircuitBreakerError::Open => ChargingError::DatabaseError("Circuit breaker open".to_string()), + CircuitBreakerError::Inner(e) => e, + }) + } + + /// Expire bundles whose expires_at has passed — run this from a background task. + pub async fn expire_stale_bundles(&self) -> ChargingResult { + self.circuit_breaker.execute(|| async { + let result = sqlx::query(r#" + UPDATE subscriber_bundles + SET status = 'expired', updated_at = NOW() + WHERE status = 'active' AND expires_at <= NOW() + "#) + .execute(&self.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + Ok(result.rows_affected()) + }).await.map_err(|e| match e { + CircuitBreakerError::Open => ChargingError::DatabaseError("Circuit breaker open".to_string()), + CircuitBreakerError::Inner(e) => e, + }) + } +} diff --git a/apps/charging-engine/src/charging/bundle_service.rs b/apps/charging-engine/src/charging/bundle_service.rs new file mode 100644 index 0000000..68daea2 --- /dev/null +++ b/apps/charging-engine/src/charging/bundle_service.rs @@ -0,0 +1,576 @@ +use crate::circuit_breaker::CircuitBreakerError; +use crate::errors::{ChargingError, ChargingResult}; +use super::types::{Bundle, ActiveBundle, BundleAllowances}; + +impl crate::charging::ChargingEngine { + pub async fn create_bundle(&self, bundle: Bundle) -> ChargingResult<()> { + self.postgres_circuit_breaker.execute(|| async { + // Store bundle configuration in PostgreSQL + let query = r#" + INSERT INTO bundles (bundle_id, name, bundle_type, data_bytes, voice_seconds, + sms_count, roaming_data_bytes, validity_days, priority, + amount_unconverted, is_active) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + ON CONFLICT (bundle_id) DO UPDATE SET + name = EXCLUDED.name, + bundle_type = EXCLUDED.bundle_type, + data_bytes = EXCLUDED.data_bytes, + voice_seconds = EXCLUDED.voice_seconds, + sms_count = EXCLUDED.sms_count, + roaming_data_bytes = EXCLUDED.roaming_data_bytes, + validity_days = EXCLUDED.validity_days, + priority = EXCLUDED.priority, + amount_unconverted = EXCLUDED.amount_unconverted, + is_active = EXCLUDED.is_active + "#; + + let allowances_json = serde_json::to_value(&bundle.allowances) + .map_err(|e| ChargingError::InvalidFormat(e.to_string()))?; + + sqlx::query(query) + .bind(&bundle.bundle_id) + .bind(&bundle.name) + .bind(bundle.bundle_type.as_str()) + .bind(bundle.allowances.data_bytes.map(|v| v as i64)) + .bind(bundle.allowances.voice_seconds.map(|v| v as i64)) + .bind(bundle.allowances.sms_count.map(|v| v as i64)) + .bind(bundle.allowances.roaming_data_bytes.map(|v| v as i64)) + .bind(bundle.validity_days as i32) + .bind(bundle.priority as i16) + .bind(bundle.amount_unconverted) + .bind(bundle.is_active) + .execute(&self.plans.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + + + Ok(()) + }).await.map_err(|e| match e { + CircuitBreakerError::Open => ChargingError::PostgresConnection("Circuit breaker is open".to_string()), + CircuitBreakerError::Inner(e) => e, + }) + } + + pub async fn activate_bundle_for_subscriber(&self, imsi: &str, bundle_id: &str) -> ChargingResult { + self.redis_circuit_breaker.execute(|| async { + let mut conn = self.redis_client.get_multiplexed_async_connection() + .await.map_err(|e| ChargingError::RedisConnection(e.to_string()))?; + + // Get bundle configuration from PostgreSQL cache or DB + let bundle = self.get_bundle_config(bundle_id).await?; + + // Check if bundle already active + let active_key = format!("bundle:active:{}:{}", imsi, bundle_id); + let is_active: bool = redis::AsyncCommands::exists(&mut conn, &active_key).await + .map_err(|e| ChargingError::RedisOperation(e.to_string()))?; + + if is_active { + return Err(ChargingError::BundleAlreadyActive); + } + + // Atomic bundle activation + let mut pipe = redis::pipe(); + pipe.atomic(); + + // Set active bundle with TTL + let expires_at = Utc::now() + chrono::Duration::days(bundle.validity_days as i64); + let active_bundle = ActiveBundle { + imsi: imsi.to_string(), + bundle_id: bundle_id.to_string(), + priority: bundle.priority, + activated_at: Utc::now(), + expires_at, + remaining_allowances: bundle.allowances.clone(), + }; + + let bundle_json = serde_json::to_string(&active_bundle) + .map_err(|e| ChargingError::InvalidFormat(e.to_string()))?; + + pipe.set(&active_key, bundle_json) + .expire(&active_key, bundle.validity_days as i64 * 86400); + + let set_key = format!("bundle:active:set:{}", imsi); + pipe.sadd(&set_key, bundle_id); + + // Apply bundle allowances to subscriber balances + if let Some(data_bytes) = bundle.allowances.data_bytes { + let data_key = format!("bundle:data:{}", imsi); + pipe.incr(&data_key, data_bytes as i64); + } + + if let Some(voice_seconds) = bundle.allowances.voice_seconds { + let voice_key = format!("bundle:voice:{}", imsi); + pipe.incr(&voice_key, voice_seconds as i64); + } + + if let Some(sms_count) = bundle.allowances.sms_count { + let sms_key = format!("bundle:sms:{}", imsi); + pipe.incr(&sms_key, sms_count as i64); + } + + pipe.query_async(&mut conn).await + .map_err(|e| ChargingError::RedisOperation(e.to_string()))?; + + // Record activation for audit + self.record_bundle_activation(&mut conn, imsi, bundle_id).await?; + + Ok(active_bundle) + }).await.map_err(|e| match e { + CircuitBreakerError::Open => ChargingError::RedisConnection("Circuit breaker is open".to_string()), + CircuitBreakerError::Inner(e) => e, + }) + } + + pub async fn consume_from_bundle( + &self, + imsi: &str, + usage_type: UsageType, + amount: u64, + ) -> ChargingResult { + self.redis_circuit_breaker.execute(|| async { + let mut conn = self.redis_client.get_multiplexed_async_connection() + .await.map_err(|e| ChargingError::RedisConnection(e.to_string()))?; + + // Use SMEMBERS on the tracking Set — O(N bundles per subscriber), not O(total Redis keys) + let set_key = format!("bundle:active:set:{}", imsi); + let bundle_ids: Vec = redis::AsyncCommands::smembers(&mut conn, &set_key) + .await + .map_err(|e| ChargingError::RedisOperation(e.to_string()))?; + + // Fetch all active bundle JSON values in one pipeline + let mut pipe = redis::pipe(); + for bundle_id in &bundle_ids { + let active_key = format!("bundle:active:{}:{}", imsi, bundle_id); + pipe.get(&active_key); + } + let values: Vec> = pipe.query_async(&mut conn) + .await + .map_err(|e| ChargingError::RedisOperation(e.to_string()))?; + + // Pair bundle_ids with their JSON, filter out expired/missing entries + let mut bundles: Vec<(String, ActiveBundle)> = bundle_ids.iter() + .zip(values.into_iter()) + .filter_map(|(id, maybe_json)| { + let json = maybe_json?; + let b: ActiveBundle = serde_json::from_str(&json).ok()?; + // Remove from set if expired + Some((id.clone(), b)) + }) + .collect(); + + // Sort by priority ascending (lower number = higher priority) + bundles.sort_by_key(|(_, b)| b.priority); + + for (bundle_id, mut active_bundle) in bundles { + // Skip expired bundles and clean up the set + if active_bundle.expires_at <= chrono::Utc::now() { + let set_key = format!("bundle:active:set:{}", imsi); + let _: () = redis::AsyncCommands::srem(&mut conn, &set_key, &bundle_id) + .await + .unwrap_or(()); + continue; + } + + let has_allowance = match usage_type { + UsageType::Data => active_bundle.remaining_allowances.data_bytes + .map_or(false, |b| b >= amount), + UsageType::Voice => active_bundle.remaining_allowances.voice_seconds + .map_or(false, |s| s >= amount), + UsageType::SMS => active_bundle.remaining_allowances.sms_count + .map_or(false, |c| c >= amount), + }; + + if !has_allowance { + continue; + } + + match usage_type { + UsageType::Data => { + if let Some(ref mut b) = active_bundle.remaining_allowances.data_bytes { *b -= amount; } + } + UsageType::Voice => { + if let Some(ref mut s) = active_bundle.remaining_allowances.voice_seconds { *s -= amount; } + } + UsageType::SMS => { + if let Some(ref mut c) = active_bundle.remaining_allowances.sms_count { *c -= amount; } + } + } + + let active_key = format!("bundle:active:{}:{}", imsi, bundle_id); + let updated_json = serde_json::to_string(&active_bundle) + .map_err(|e| ChargingError::SerializationError(e.to_string()))?; + let _: () = redis::AsyncCommands::set(&mut conn, &active_key, updated_json) + .await + .map_err(|e| ChargingError::RedisOperation(e.to_string()))?; + + return Ok(true); + } + + Ok(false) + }).await.map_err(|e| match e { + crate::circuit_breaker::CircuitBreakerError::Open => + ChargingError::RedisConnection("Circuit breaker is open".to_string()), + crate::circuit_breaker::CircuitBreakerError::Inner(e) => e, + }) + } + + + pub async fn list_bundles(&self) -> ChargingResult> { + use sqlx::Row; + + let rows = sqlx::query( + "SELECT bundle_id, name, bundle_type, data_bytes, voice_seconds, \ + sms_count, roaming_data_bytes, validity_days, priority \ + FROM bundles WHERE is_active = TRUE ORDER BY priority ASC", + ) + .fetch_all(&self.plans.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + + let bundles = rows + .into_iter() + .map(|row| Bundle { + bundle_id: row.get("bundle_id"), + name: row.get("name"), + bundle_type: BundleType::from_str(row.get("bundle_type")), + allowances: BundleAllowances { + data_bytes: row.get::, _>("data_bytes").map(|v| v as u64), + voice_seconds: row.get::, _>("voice_seconds").map(|v| v as u64), + sms_count: row.get::, _>("sms_count").map(|v| v as u64), + roaming_data_bytes: row.get::, _>("roaming_data_bytes").map(|v| v as u64), + }, + validity_days: row.get::("validity_days") as u32, + priority: row.get::("priority") as u8, + amount_unconverted: row.get::("amount_unconverted"), + is_active: true, + }) + .collect(); + + Ok(bundles) + } + + pub async fn get_bundle_by_id(&self, bundle_id: &str) -> ChargingResult> { + match self.get_bundle_config(bundle_id).await { + Ok(bundle) => Ok(Some(bundle)), + Err(ChargingError::BundleNotFound(_)) => Ok(None), + Err(e) => Err(e), + } + } + + + pub async fn purchase_bundle_with_airtime( + &self, + imsi: &str, + bundle_id: &str, + ) -> ChargingResult { + use sqlx::Row; + + // 1. Load bundle definition from PostgreSQL + let bundle = self.get_bundle_config(bundle_id).await?; + + if bundle.airtime_cost == 0 { + return Err(ChargingError::InvalidBundleConfig( + "Bundle has no airtime price set".to_string(), + )); + } + + // 2. Check and deduct airtime atomically in Redis (Lua script) + let lua_script = r#" + local key = KEYS[1] + local cost = tonumber(ARGV[1]) + local balance = tonumber(redis.call('GET', key) or '0') + if balance < cost then + return {-1, balance} + end + local new_balance = redis.call('DECRBY', key, cost) + return {0, new_balance} + "#; + + let mut conn = self.redis_client + .get_multiplexed_async_connection() + .await + .map_err(|e| ChargingError::RedisConnection(e.to_string()))?; + + let airtime_key = format!("airtime:home:{}", imsi); + let result: Vec = redis::Script::new(lua_script) + .key(&airtime_key) + .arg(bundle.airtime_cost as i64) + .invoke_async(&mut conn) + .await + .map_err(|e| ChargingError::RedisOperation(e.to_string()))?; + + if result[0] == -1 { + return Err(ChargingError::InsufficientAirtime { + available: result[1] as i64, + required: bundle.airtime_cost, + }); + } + + let new_airtime_balance = result[1] as u64; + + // 3. Activate the bundle in Redis (same as activate_bundle_for_subscriber) + let active_bundle = self.activate_bundle_for_subscriber(imsi, bundle_id).await?; + + // 4. Persist airtime deduction to PostgreSQL (audit trail) + self.postgres_circuit_breaker.execute(|| async { + // Get subscriber_id for FK + let sub_row = sqlx::query( + "SELECT id FROM subscribers WHERE imsi = $1" + ) + .bind(imsi) + .fetch_optional(&self.plans.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))? + .ok_or_else(|| ChargingError::SubscriberNotFound(imsi.to_string()))?; + + let subscriber_id: i32 = sub_row.get("id"); + + // Write airtime_transactions row + sqlx::query(r#" + INSERT INTO airtime_transactions + (subscriber_id, imsi, transaction_type, seconds_delta, balance_after, roaming, reference_id) + VALUES ($1, $2, 'bundle_purchase', $3, $4, FALSE, $5) + "#) + .bind(subscriber_id) + .bind(imsi) + .bind(-(bundle.airtime_cost as i64)) // negative = debit + .bind(new_airtime_balance as i64) + .bind(bundle_id) + .execute(&self.plans.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + + // Update airtime_balances table (PG source of truth) + sqlx::query(r#" + INSERT INTO airtime_balances (subscriber_id, imsi, home_seconds, updated_at) + VALUES ($1, $2, $3, NOW()) + ON CONFLICT (imsi) DO UPDATE SET + home_seconds = $3, + updated_at = NOW() + "#) + .bind(subscriber_id) + .bind(imsi) + .bind(new_airtime_balance as i64) + .execute(&self.plans.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + + Ok(()) + }).await.map_err(|e| match e { + CircuitBreakerError::Open => ChargingError::DatabaseError("Circuit breaker open".to_string()), + CircuitBreakerError::Inner(e) => e, + })?; + + Ok(active_bundle) + } + + pub async fn deactivate_bundle(&self, bundle_id: &str) -> ChargingResult<()> { + sqlx::query("UPDATE bundles SET is_active = FALSE, updated_at = NOW() WHERE bundle_id = $1") + .bind(bundle_id) + .execute(&self.plans.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + Ok(()) + } + + async fn get_bundle_config(&self, bundle_id: &str) -> ChargingResult { + let row = sqlx::query(r#" + SELECT bundle_id, name, bundle_type, data_bytes, voice_seconds, + sms_count, roaming_data_bytes, validity_days, priority, amount_unconverted + FROM bundles WHERE bundle_id = $1 AND is_active = TRUE + "#) + .bind(bundle_id) + .fetch_optional(&self.plans.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))? + .ok_or_else(|| ChargingError::BundleNotFound(bundle_id.to_string()))?; + + Ok(Bundle { + bundle_id: row.get("bundle_id"), + name: row.get("name"), + bundle_type: match row.get::<&str, _>("bundle_type") { + "voice" => BundleType::Voice, + "sms" => BundleType::SMS, + "hybrid" => BundleType::Hybrid, + _ => BundleType::Data, + }, + allowances: BundleAllowances { + data_bytes: row.get::, _>("data_bytes").map(|v| v as u64), + voice_seconds: row.get::, _>("voice_seconds").map(|v| v as u64), + sms_count: row.get::, _>("sms_count").map(|v| v as u64), + roaming_data_bytes: row.get::, _>("roaming_data_bytes").map(|v| v as u64), + }, + validity_days: row.get::("validity_days") as u32, + priority: row.get::("priority") as u8, + amount_unconverted: row.get::("amount_unconverted"), + is_active: true, + }) + } + + async fn record_bundle_activation( + &self, + conn: &mut redis::aio::MultiplexedConnection, + imsi: &str, + bundle_id: &str, + ) -> ChargingResult<()> { + let audit_key = format!("bundle:audit:{}:{}", imsi, bundle_id); + let payload = serde_json::json!({ + "imsi": imsi, + "bundle_id": bundle_id, + "activated_at": chrono::Utc::now().to_rfc3339(), + }).to_string(); + let _: () = redis::AsyncCommands::set(conn, &audit_key, payload) + .await + .map_err(|e| ChargingError::RedisOperation(e.to_string()))?; + Ok(()) + } + + + /// Resolve IMSI from MSISDN via PostgreSQL. + async fn resolve_imsi_from_msisdn(&self, msisdn: &str) -> ChargingResult { + use sqlx::Row; + let row = sqlx::query( + "SELECT imsi FROM subscribers WHERE msisdn = $1 AND deleted_at IS NULL LIMIT 1", + ) + .bind(msisdn) + .fetch_optional(&self.plans.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))? + .ok_or_else(|| ChargingError::SubscriberNotFound(msisdn.to_string()))?; + + Ok(row.get("imsi")) + } + + async fn deduct_airtime_atomic( + &self, + conn: &mut redis::aio::MultiplexedConnection, + imsi: &str, + amount: i64, + ) -> ChargingResult { + // Lua script: atomic check-and-deduct + let lua_script = r#" + local key = KEYS[1] + local cost = tonumber(ARGV[1]) + local balance = tonumber(redis.call('GET', key) or '0') + if balance < cost then + return -1 + end + return redis.call('DECRBY', key, cost) + "#; + + let new_balance: i64 = redis::Script::new(lua_script) + .key(format!("airtime:home:{}", imsi)) + .arg(amount) + .invoke_async(conn) + .await + .map_err(|e| ChargingError::RedisOperation(e.to_string()))?; + + if new_balance < 0 { + // Script returned -1 sentinel + let current: i64 = redis::AsyncCommands::get(conn, format!("airtime:home:{}", imsi)) + .await + .unwrap_or(0); + return Err(ChargingError::InsufficientAirtime { + available: current, + required: amount, + }); + } + + Ok(new_balance) + } + + + + pub async fn purchase_bundle_with_airtime( + &self, + msisdn: &str, + bundle_id: &str, + ) -> ChargingResult { + // 1. Resolve IMSI + let imsi = self.resolve_imsi_from_msisdn(msisdn).await?; + + // 2. Load bundle config + let bundle = self.get_bundle_config(bundle_id).await?; + + // 3. Deduct airtime atomically + let mut conn = self.redis_client.get_multiplexed_async_connection() + .await.map_err(|e| ChargingError::RedisConnection(e.to_string()))?; + + let new_airtime = self.deduct_airtime_atomic(&mut conn, &imsi, bundle.amount_unconverted).await?; + + // 4. Activate bundle for subscriber + let active_bundle = self.activate_bundle_for_subscriber(&imsi, bundle_id).await?; + + // 5. Audit: record airtime transaction + self.record_airtime_transaction( + &imsi, + -bundle.amount_unconverted, + new_airtime, + &format!("bundle_purchase:{}", bundle_id), + ).await?; + + Ok(active_bundle) + } + + + /// POST /v1/bundles/:id/gift + /// Sender purchases a bundle for a recipient using the sender's airtime. + /// Deducts from sender_msisdn's airtime; activates bundle on recipient_msisdn. + pub async fn gift_bundle_with_airtime( + &self, + sender_msisdn: &str, + recipient_msisdn: &str, + bundle_id: &str, + ) -> ChargingResult { + // 1. Resolve both IMSIs + let sender_imsi = self.resolve_imsi_from_msisdn(sender_msisdn).await?; + let recipient_imsi = self.resolve_imsi_from_msisdn(recipient_msisdn).await?; + + // 2. Load bundle config + let bundle = self.get_bundle_config(bundle_id).await?; + + // 3. Deduct from SENDER's airtime + let mut conn = self.redis_client.get_multiplexed_async_connection() + .await.map_err(|e| ChargingError::RedisConnection(e.to_string()))?; + + let new_sender_airtime = self.deduct_airtime_atomic( + &mut conn, &sender_imsi, bundle.amount_unconverted, + ).await?; + + // 4. Activate bundle on RECIPIENT + let active_bundle = self.activate_bundle_for_subscriber(&recipient_imsi, bundle_id).await?; + + // 5. Audit: record airtime transaction for sender + self.record_airtime_transaction( + &sender_imsi, + -bundle.amount_unconverted, + new_sender_airtime, + &format!("bundle_gift:{}:to:{}", bundle_id, recipient_imsi), + ).await?; + + Ok(active_bundle) + } + + /// Internal: write an airtime_transactions row for audit. + async fn record_airtime_transaction( + &self, + imsi: &str, + delta: i64, + balance_after: i64, + reason: &str, + ) -> ChargingResult<()> { + sqlx::query(r#" + INSERT INTO airtime_transactions + (imsi, delta_seconds, balance_after, reason, created_at) + VALUES ($1, $2, $3, $4, NOW()) + "#) + .bind(imsi) + .bind(delta) + .bind(balance_after) + .bind(reason) + .execute(&self.plans.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + Ok(()) + } +} diff --git a/apps/charging-engine/src/charging/engine.rs b/apps/charging-engine/src/charging/engine.rs index c42cce6..e13e37b 100644 --- a/apps/charging-engine/src/charging/engine.rs +++ b/apps/charging-engine/src/charging/engine.rs @@ -29,6 +29,7 @@ pub struct ChargingEngine { pub(crate) startup_time: SystemTime, pub(crate) redis_circuit_breaker: CircuitBreaker, pub(crate) postgres_circuit_breaker: CircuitBreaker, + pub(crate) bundle_repo: BundleRepo, } impl ChargingEngine { @@ -39,6 +40,7 @@ impl ChargingEngine { redis_url: &str, plans: RatingPlansRepo, sync_interval_secs: u64, + bundle_repo: BundleRepo, ) -> ChargingResult { let redis_client = redis::Client::open(redis_url) .map_err(|e| crate::errors::ChargingError::RedisConnection(e.to_string()))?; @@ -53,7 +55,7 @@ impl ChargingEngine { .ok() .and_then(|s| s.parse().ok()) .unwrap_or(60); - + let postgres_failure_threshold: u32 = std::env::var("POSTGRES_FAILURE_THRESHOLD") .ok() .and_then(|s| s.parse().ok()) @@ -67,7 +69,7 @@ impl ChargingEngine { redis_failure_threshold, Duration::from_secs(redis_timeout), ); - + let postgres_circuit_breaker = CircuitBreaker::new( postgres_failure_threshold, Duration::from_secs(postgres_timeout), @@ -80,6 +82,7 @@ impl ChargingEngine { startup_time: SystemTime::now(), redis_circuit_breaker, postgres_circuit_breaker, + bundle_repo }) } diff --git a/apps/charging-engine/src/charging/rating_plans_repo.rs b/apps/charging-engine/src/charging/rating_plans_repo.rs index 119a692..be83db8 100644 --- a/apps/charging-engine/src/charging/rating_plans_repo.rs +++ b/apps/charging-engine/src/charging/rating_plans_repo.rs @@ -16,7 +16,7 @@ use crate::errors::{ChargingError, ChargingResult}; /// Repository for rating plans backed by Postgres. #[derive(Clone)] pub struct RatingPlansRepo { - pool: PgPool, + pub(crate) pool: PgPool, circuit_breaker: CircuitBreaker, } @@ -243,14 +243,14 @@ fn row_to_plan(row: sqlx::postgres::PgRow) -> RatingPlan { /// Changes to the repo root directory to find the migrations folder. async fn run_migrations(database_url: &str) -> Result<(), Box> { use std::process::Command; - + // Change to repo root to find migrations directory let manifest_dir = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")); let repo_root = manifest_dir .parent() .and_then(|p| p.parent()) .ok_or("Failed to determine repo root")?; - + let output = Command::new("goose") .current_dir(&repo_root) .args(["postgres", database_url, "up", "migrations"]) diff --git a/apps/charging-engine/src/charging/types.rs b/apps/charging-engine/src/charging/types.rs index 04f6c26..5e3ec92 100644 --- a/apps/charging-engine/src/charging/types.rs +++ b/apps/charging-engine/src/charging/types.rs @@ -2,16 +2,8 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use redis::{FromRedisValue, ToRedisArgs, ToSingleRedisArg}; -/// Charging rule for usage authorization -/// -/// # Example -/// -/// ```rust -/// use charging_engine::charging::types::ChargingRule; -/// -/// let rule = ChargingRule::Allowed; -/// assert_eq!(rule.as_str(), "ALLOWED"); -/// ``` +// ─── Charging Rule ──────────────────────────────────────────────────────────── + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum ChargingRule { Allowed, @@ -24,73 +16,21 @@ pub enum ChargingRule { } impl ChargingRule { - /// Convert charging rule to string representation - /// - /// # Example - /// - /// ```rust - /// use charging_engine::charging::types::ChargingRule; - /// - /// let rule = ChargingRule::Allowed; - /// assert_eq!(rule.as_str(), "ALLOWED"); - /// ``` pub fn as_str(&self) -> &str { match self { - ChargingRule::Allowed => "ALLOWED", - ChargingRule::InsufficientCredit => "INSUFFICIENT_CREDIT", - ChargingRule::DataLimitExceeded => "DATA_LIMIT_EXCEEDED", - ChargingRule::VoiceLimitExceeded => "VOICE_LIMIT_EXCEEDED", - ChargingRule::SmsLimitExceeded => "SMS_LIMIT_EXCEEDED", - ChargingRule::UserBlocked => "USER_BLOCKED", - ChargingRule::Blocked => "BLOCKED", + ChargingRule::Allowed => "ALLOWED", + ChargingRule::InsufficientCredit => "INSUFFICIENT_CREDIT", + ChargingRule::DataLimitExceeded => "DATA_LIMIT_EXCEEDED", + ChargingRule::VoiceLimitExceeded => "VOICE_LIMIT_EXCEEDED", + ChargingRule::SmsLimitExceeded => "SMS_LIMIT_EXCEEDED", + ChargingRule::UserBlocked => "USER_BLOCKED", + ChargingRule::Blocked => "BLOCKED", } } } -/// Subscriber account information -/// -/// # Example -/// -/// ```rust -/// use charging_engine::charging::types::{SubscriberAccount, AccountStatus}; -/// use chrono::Utc; -/// -/// let account = SubscriberAccount { -/// imsi: "1234567890".to_string(), -/// balance: 1000, -/// data_limit: 1000000000, -/// data_used: 0, -/// voice_limit: 1000, -/// voice_used: 0, -/// sms_limit: 100, -/// sms_used: 0, -/// status: AccountStatus::Active, -/// last_updated: Utc::now(), -/// }; -/// ``` -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct SubscriberAccount { - pub imsi: String, - pub balance: i64, - pub data_limit: u64, - pub data_used: u64, - pub voice_limit: u64, - pub voice_used: u64, - pub sms_limit: u64, - pub sms_used: u64, - pub status: AccountStatus, - pub last_updated: DateTime, -} +// ─── Account ────────────────────────────────────────────────────────────────── -/// Account status -/// -/// # Example -/// -/// ```rust -/// use charging_engine::charging::types::AccountStatus; -/// -/// let status = AccountStatus::Active; -/// ``` #[derive(Debug, Clone, Serialize, Deserialize)] pub enum AccountStatus { Active, @@ -99,44 +39,22 @@ pub enum AccountStatus { Blocked, } -/// Usage event for charging -/// -/// # Example -/// -/// ```rust -/// use charging_engine::charging::types::{UsageEvent, UsageType}; -/// use chrono::Utc; -/// -/// let event = UsageEvent { -/// imsi: "1234567890".to_string(), -/// session_id: "session123".to_string(), -/// usage_type: UsageType::Data, -/// volume: 1000, -/// timestamp: Utc::now(), -/// rate: 0.01, -/// cost: 10.0, -/// }; -/// ``` #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct UsageEvent { - pub imsi: String, - pub session_id: String, - pub usage_type: UsageType, - pub volume: u64, - pub timestamp: DateTime, - pub rate: f64, - pub cost: f64, -} - -/// Usage type classification -/// -/// # Example -/// -/// ```rust -/// use charging_engine::charging::types::UsageType; -/// -/// let usage_type = UsageType::Data; -/// ``` +pub struct SubscriberAccount { + pub imsi: String, + pub balance: i64, + pub data_limit: u64, + pub data_used: u64, + pub voice_limit: u64, + pub voice_used: u64, + pub sms_limit: u64, + pub sms_used: u64, + pub status: AccountStatus, + pub last_updated: DateTime, +} + +// ─── Usage ──────────────────────────────────────────────────────────────────── + #[derive(Debug, Clone, Serialize, Deserialize)] pub enum UsageType { Data, @@ -144,58 +62,134 @@ pub enum UsageType { SMS, } -/// Rating plan for subscriber billing -/// -/// # Example -/// -/// ```rust -/// use charging_engine::charging::types::RatingPlan; -/// -/// let plan = RatingPlan { -/// plan_id: "plan1".to_string(), -/// name: "Basic Plan".to_string(), -/// data_rate: 0.01, -/// voice_rate: 0.05, -/// sms_rate: 0.1, -/// monthly_fee: 10.0, -/// data_limit: 1000000000, -/// voice_limit: 1000, -/// sms_limit: 100, -/// }; -/// ``` +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UsageEvent { + pub imsi: String, + pub session_id: String, + pub usage_type: UsageType, + pub volume: u64, + pub timestamp: DateTime, + pub rate: f64, + pub cost: f64, +} + +// ─── Rating Plan ────────────────────────────────────────────────────────────── + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RatingPlan { - pub plan_id: String, - pub name: String, - pub data_rate: f64, - pub voice_rate: f64, - pub sms_rate: f64, + pub plan_id: String, + pub name: String, + pub data_rate: f64, + pub voice_rate: f64, + pub sms_rate: f64, pub monthly_fee: f64, - pub data_limit: u64, + pub data_limit: u64, pub voice_limit: u64, - pub sms_limit: u64, + pub sms_limit: u64, +} + +// ─── Airtime ────────────────────────────────────────────────────────────────── + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AirtimeBalance { + pub imsi: String, + pub home_seconds: u64, + pub roaming_seconds: u64, + pub last_updated: DateTime, +} + +/// Returned by `deduct_airtime` so callers know exactly what was consumed. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AirtimeDeduction { + pub imsi: String, + pub deducted: u64, + pub new_balance: u64, + pub roaming: bool, +} + +// ─── Bundles ────────────────────────────────────────────────────────────────── + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum BundleType { + Data, + Voice, + SMS, + Hybrid, +} + +impl BundleType { + /// Returns the lowercase string stored in the `bundles.bundle_type` column. + pub fn as_str(&self) -> &str { + match self { + BundleType::Data => "data", + BundleType::Voice => "voice", + BundleType::SMS => "sms", + BundleType::Hybrid => "hybrid", + } + } + + /// Parse from the DB string back to the enum. + pub fn from_str(s: &str) -> Self { + match s { + "voice" => BundleType::Voice, + "sms" => BundleType::SMS, + "hybrid" => BundleType::Hybrid, + _ => BundleType::Data, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BundleAllowances { + pub data_bytes: Option, + pub voice_seconds: Option, + pub sms_count: Option, + pub roaming_data_bytes: Option, +} + +/// Bundle definition stored in PostgreSQL. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Bundle { + pub bundle_id: String, + pub name: String, + pub bundle_type: BundleType, + pub allowances: BundleAllowances, + pub validity_days: u32, + pub priority: u8, + pub amount_unconverted: i64, + pub is_active: bool, +} + +/// A bundle that has been activated for a specific subscriber. +/// Stored as JSON in Redis with a TTL matching `validity_days`. +/// `priority` is copied from the bundle definition so `consume_from_bundle` +/// can sort without an extra DB round-trip. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ActiveBundle { + pub imsi: String, + pub bundle_id: String, + /// Copied from `Bundle::priority` at activation time. + pub priority: u8, + pub activated_at: DateTime, + pub expires_at: DateTime, + pub remaining_allowances: BundleAllowances, } -// Redis trait implementations for serialization +// ─── Redis trait implementations ────────────────────────────────────────────── + impl FromRedisValue for SubscriberAccount { fn from_redis_value(v: redis::Value) -> Result { let json: String = redis::from_redis_value(v)?; - let account: SubscriberAccount = serde_json::from_str(&json) - .map_err(|e| redis::ParsingError::from(e.to_string()))?; - Ok(account) + serde_json::from_str(&json).map_err(|e| redis::ParsingError::from(e.to_string())) } } impl ToRedisArgs for SubscriberAccount { fn write_redis_args(&self, out: &mut W) - where - W: redis::RedisWrite + ?Sized, - { - let json = serde_json::to_string(self) - .unwrap_or_else(|_| { - // Fallback to empty JSON if serialization fails - r#"{"imsi":"","balance":0,"data_limit":0,"data_used":0,"voice_limit":0,"voice_used":0,"sms_limit":0,"sms_used":0,"status":"Active","last_updated":"1970-01-01T00:00:00Z"}"#.to_string() - }); + where W: redis::RedisWrite + ?Sized { + let json = serde_json::to_string(self).unwrap_or_else(|_| { + r#"{"imsi":"","balance":0,"data_limit":0,"data_used":0,"voice_limit":0,"voice_used":0,"sms_limit":0,"sms_used":0,"status":"Active","last_updated":"1970-01-01T00:00:00Z"}"#.to_string() + }); json.write_redis_args(out) } } @@ -205,24 +199,38 @@ impl ToSingleRedisArg for SubscriberAccount {} impl FromRedisValue for UsageEvent { fn from_redis_value(v: redis::Value) -> Result { let json: String = redis::from_redis_value(v)?; - let event: UsageEvent = serde_json::from_str(&json) - .map_err(|e| redis::ParsingError::from(e.to_string()))?; - Ok(event) + serde_json::from_str(&json).map_err(|e| redis::ParsingError::from(e.to_string())) } } impl ToRedisArgs for UsageEvent { fn write_redis_args(&self, out: &mut W) - where - W: redis::RedisWrite + ?Sized, - { - let json = serde_json::to_string(self) - .unwrap_or_else(|_| { - // Fallback to empty JSON if serialization fails - r#"{"imsi":"","session_id":"","usage_type":"Data","volume":0,"timestamp":"1970-01-01T00:00:00Z","rate":0.0,"cost":0.0}"#.to_string() - }); + where W: redis::RedisWrite + ?Sized { + let json = serde_json::to_string(self).unwrap_or_else(|_| { + r#"{"imsi":"","session_id":"","usage_type":"Data","volume":0,"timestamp":"1970-01-01T00:00:00Z","rate":0.0,"cost":0.0}"#.to_string() + }); json.write_redis_args(out) } } impl ToSingleRedisArg for UsageEvent {} + +/// ActiveBundle is stored as JSON in Redis — implement the Redis traits +/// so it can be set/get with `redis::AsyncCommands`. +impl FromRedisValue for ActiveBundle { + fn from_redis_value(v: redis::Value) -> Result { + let json: String = redis::from_redis_value(v)?; + serde_json::from_str(&json).map_err(|e| redis::ParsingError::from(e.to_string())) + } +} + +impl ToRedisArgs for ActiveBundle { + fn write_redis_args(&self, out: &mut W) + where W: redis::RedisWrite + ?Sized { + let json = serde_json::to_string(self) + .unwrap_or_else(|_| "{}".to_string()); + json.write_redis_args(out) + } +} + +impl ToSingleRedisArg for ActiveBundle {} diff --git a/apps/charging-engine/src/charging/usage_pipeline.rs b/apps/charging-engine/src/charging/usage_pipeline.rs new file mode 100644 index 0000000..c9ce71a --- /dev/null +++ b/apps/charging-engine/src/charging/usage_pipeline.rs @@ -0,0 +1,152 @@ +use chrono::Utc; +use tracing::{info, warn}; + +use crate::errors::{ChargingError, ChargingResult}; +use super::types::{UsageEvent, UsageType}; +use super::engine::ChargingEngine; + +#[derive(Debug)] +pub enum ChargingSource { + Bundle { subscriber_bundle_id: i64 }, + Credit, +} + +pub struct ChargeResult { + pub imsi: String, + pub session_id: String, + pub volume: u64, + pub cost: f64, + pub charging_source: ChargingSource, +} + +impl ChargingEngine { + /// Full charging pipeline: bundle-first, then credit fallback. + /// This is the method called by the `process_usage` handler. + pub async fn process_usage_event(&self, event: UsageEvent) -> ChargingResult { + // 1. Validate subscriber exists and is active + let subscriber = self.get_subscriber_account(&event.imsi).await? + .ok_or_else(|| ChargingError::SubscriberNotFound(event.imsi.clone()))?; + + if !matches!(subscriber.status, crate::charging::types::AccountStatus::Active) { + return Err(ChargingError::UsageBlocked( + format!("Subscriber {} is not active", event.imsi) + )); + } + + let usage_type_str = match event.usage_type { + UsageType::Data => "data", + UsageType::Voice => "voice", + UsageType::SMS => "sms", + }; + + // 2. Try bundle deduction first + let active_bundles = self.bundle_repo.get_active_bundles(&event.imsi).await?; + + for bundle in &active_bundles { + let has_allowance = match event.usage_type { + UsageType::Data => bundle.remaining_allowances.data_bytes + .map_or(false, |b| b >= event.volume), + UsageType::Voice => bundle.remaining_allowances.voice_seconds + .map_or(false, |s| s >= event.volume), + UsageType::SMS => bundle.remaining_allowances.sms_count + .map_or(false, |c| c >= event.volume), + }; + + if !has_allowance { + continue; + } + + let deducted = self.bundle_repo + .deduct_from_bundle(bundle.id, usage_type_str, event.volume) + .await?; + + if deducted { + info!( + imsi = %event.imsi, + bundle_id = %bundle.bundle_id, + volume = event.volume, + usage_type = usage_type_str, + "Usage charged from bundle" + ); + + // Persist usage record with charging_source = 'bundle' + self.persist_usage_record(&event, 0.0, ChargingSource::Bundle { + subscriber_bundle_id: bundle.id, + }).await?; + + return Ok(ChargeResult { + imsi: event.imsi, + session_id: event.session_id, + volume: event.volume, + cost: 0.0, // bundle usage has no per-event cost + charging_source: ChargingSource::Bundle { + subscriber_bundle_id: bundle.id, + }, + }); + } + } + + // 3. No bundle covered it — fall back to credit deduction + warn!( + imsi = %event.imsi, + usage_type = usage_type_str, + "No active bundle for usage, falling back to credit" + ); + + let plan = self.plans.get("basic").await? // resolve subscriber's actual plan + .ok_or_else(|| ChargingError::RatingPlanNotFound("basic".to_string()))?; + + let cost = self.calculate_usage_cost(&event).await?; + + // Deduct from credit balance (existing credit_management logic) + self.deduct_credit_for_usage(&event.imsi, cost).await?; + + // Persist usage record with charging_source = 'credit' + self.persist_usage_record(&event, cost, ChargingSource::Credit).await?; + + Ok(ChargeResult { + imsi: event.imsi, + session_id: event.session_id, + volume: event.volume, + cost, + charging_source: ChargingSource::Credit, + }) + } + + /// Persist a usage record to Postgres for billing reconciliation. + async fn persist_usage_record( + &self, + event: &UsageEvent, + cost: f64, + source: ChargingSource, + ) -> ChargingResult<()> { + let (charging_source_str, bundle_id) = match &source { + ChargingSource::Bundle { subscriber_bundle_id } => + ("bundle", Some(*subscriber_bundle_id)), + ChargingSource::Credit => + ("credit", None), + }; + + sqlx::query(r#" + INSERT INTO usage_records + (subscriber_id, session_id, usage_type, start_time, end_time, + volume, rate, cost, charging_source, subscriber_bundle_id, created_at, updated_at) + SELECT s.id, $2, $3, $4, $4, $5, $6, $7, $8, $9, NOW(), NOW() + FROM subscribers s WHERE s.imsi = $1 + "#) + .bind(&event.imsi) + .bind(&event.session_id) + .bind(match event.usage_type { UsageType::Data => "data", UsageType::Voice => "voice", UsageType::SMS => "sms" }) + .bind(event.timestamp) + .bind(event.volume as i64) + .bind(event.rate) + .bind(cost) + .bind(charging_source_str) + .bind(bundle_id) + .execute(&self.bundle_repo.pool) + .await + .map_err(|e| ChargingError::DatabaseError(e.to_string()))?; + + Ok(()) + } +} diff --git a/apps/charging-engine/src/errors/types.rs b/apps/charging-engine/src/errors/types.rs index 703601a..0c38a7d 100644 --- a/apps/charging-engine/src/errors/types.rs +++ b/apps/charging-engine/src/errors/types.rs @@ -3,37 +3,51 @@ use axum::http::StatusCode; use serde_json::json; use thiserror::Error; +use crate::charging; + #[derive(Debug, Error, Clone)] pub enum ChargingError { #[error("Redis connection error: {0}")] RedisConnection(String), - + #[error("Redis operation error: {0}")] RedisOperation(String), - + #[error("Database error: {0}")] DatabaseError(String), - + #[error("Subscriber not found: {0}")] SubscriberNotFound(String), - + #[error("Rating plan not found: {0}")] RatingPlanNotFound(String), - + #[error("Insufficient credit: available={available}, requested={requested}")] InsufficientCredit { available: u64, requested: u64 }, - + #[error("Usage blocked: {0}")] UsageBlocked(String), - + #[error("Invalid input: {0}")] InvalidInput(String), - + #[error("Serialization error: {0}")] SerializationError(String), - + #[error("Internal error: {0}")] InternalError(String), + + #[error("Bundle not found: {0}")] + BundleNotFound(String), + + #[error("Bundle already active")] + BundleAlreadyActive, + + #[error("Invalid bundle configuration: {0}")] + InvalidBundleConfig(String), + + #[error("Insufficient airtime: available={available}, required={required}")] + InsufficientAirtime { available: i64, required: i64 }, } impl IntoResponse for ChargingError { @@ -49,6 +63,10 @@ impl IntoResponse for ChargingError { ChargingError::InvalidInput(_) => (StatusCode::BAD_REQUEST, "Invalid input"), ChargingError::SerializationError(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Serialization error"), ChargingError::InternalError(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Internal error"), + ChargingError::BundleNotFound(_) => (StatusCode::NOT_FOUND, "Bundle not found"), + ChargingError::BundleAlreadyActive => (StatusCode::CONFLICT, "Bundle already active"), + ChargingError::InvalidBundleConfig(_) => (StatusCode::BAD_REQUEST, "Invalid bundle config"), + ChargingError::InsufficientAirtime { .. } => (StatusCode::PAYMENT_REQUIRED, "Insufficient airtime"), }; let body = json!({ diff --git a/apps/charging-engine/src/handlers/airtime.rs b/apps/charging-engine/src/handlers/airtime.rs new file mode 100644 index 0000000..ab331dd --- /dev/null +++ b/apps/charging-engine/src/handlers/airtime.rs @@ -0,0 +1,77 @@ +use axum::{extract::{Path, State}, http::StatusCode, response::Json, Json as AxumJson}; +use serde_json::json; +use crate::models::AppState; +use crate::errors::ChargingError; + +#[derive(Deserialize)] +pub struct AddAirtimeRequest { + pub seconds_to_add: u64, + pub roaming: Option, +} + +#[derive(Serialize)] +pub struct AirtimeBalanceResponse { + pub imsi: String, + pub home_seconds: u64, + pub roaming_seconds: u64, + pub last_updated: chrono::DateTime, +} + + +#[derive(Deserialize)] +pub struct DeductAirtimeRequest { + pub seconds_to_deduct: u64, + pub roaming: Option, +} + +pub async fn add_airtime( + State(state): State, + Path(imsi): Path, + AxumJson(request): AxumJson, +) -> Result, StatusCode> { + let roaming = request.roaming.unwrap_or(false); + + match state.charging_engine.add_airtime(&imsi, request.seconds_to_add, roaming).await { + Ok(new_balance) => Ok(Json(json!({ + "imsi": imsi, + "roaming": roaming, + "new_balance_seconds": new_balance, + "timestamp": chrono::Utc::now() + }))), + Err(ChargingError::InvalidInput(_)) => Err(StatusCode::BAD_REQUEST), + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + } +} + +pub async fn get_airtime_balance( + State(state): State, + Path(imsi): Path, +) -> Result, StatusCode> { + match state.charging_engine.get_airtime_balance(&imsi).await { + Ok(balance) => Ok(Json(AirtimeBalanceResponse { + imsi: balance.imsi, + home_seconds: balance.home_seconds, + roaming_seconds: balance.roaming_seconds, + last_updated: balance.last_updated, + })), + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + } +} + +pub async fn deduct_airtime( + State(state): State, + Path(imsi): Path, + Json(request): Json, +) -> ChargingResult> { + let roaming = request.roaming.unwrap_or(false); + let result = state.charging_engine + .deduct_airtime(&imsi, request.seconds_to_deduct, roaming) + .await + .with_context("Failed to deduct airtime")?; + Ok(Json(serde_json::json!({ + "imsi": imsi, + "deducted_seconds": result.deducted, + "new_balance_seconds": result.new_balance, + "roaming": roaming, + }))) +} diff --git a/apps/charging-engine/src/handlers/bundles.rs b/apps/charging-engine/src/handlers/bundles.rs new file mode 100644 index 0000000..4635624 --- /dev/null +++ b/apps/charging-engine/src/handlers/bundles.rs @@ -0,0 +1,281 @@ +use axum::{extract::{Path, State}, http::StatusCode, response::Json, Json as AxumJson}; +use serde_json::json; +use crate::models::AppState; +use crate::errors::ChargingError; + +#[derive(Deserialize)] +pub struct CreateBundleRequest { + pub bundle_id: String, + pub name: String, + pub bundle_type: String, + pub allowances: BundleAllowancesRequest, + pub validity_days: u32, + pub priority: Option, +} + +#[derive(Deserialize)] +pub struct BundleAllowancesRequest { + pub data_bytes: Option, + pub voice_seconds: Option, + pub sms_count: Option, + pub roaming_data_bytes: Option, +} + +#[derive(Deserialize)] +pub struct ActivateBundleRequest { + pub msisdn: String, +} + +#[derive(Serialize)] +pub struct BundleResponse { + pub bundle_id: String, + pub name: String, + pub bundle_type: String, + pub allowances: BundleAllowancesRequest, + pub validity_days: u32, + pub priority: u8, + pub is_active: bool, +} + +#[derive(Serialize)] +pub struct ActiveBundleResponse { + pub imsi: String, + pub bundle_id: String, + pub activated_at: chrono::DateTime, + pub expires_at: chrono::DateTime, + pub remaining_allowances: BundleAllowancesRequest, +} + +#[derive(Deserialize)] +pub struct PurchaseBundleRequest { + pub msisdn: String, +} + +#[derive(Deserialize)] +pub struct GiftBundleRequest { + pub sender_msisdn: String, + pub recipient_msisdn: String, +} + +/// create bunndle +pub async fn create_bundle( + State(state): State, + AxumJson(request): AxumJson, +) -> Result, StatusCode> { + let bundle_type = match request.bundle_type.as_str() { + "data" => crate::charging::types::BundleType::Data, + "voice" => crate::charging::types::BundleType::Voice, + "sms" => crate::charging::types::BundleType::SMS, + "hybrid" => crate::charging::types::BundleType::Hybrid, + _ => return Err(StatusCode::BAD_REQUEST), + }; + + let bundle = crate::charging::types::Bundle { + bundle_id: request.bundle_id.clone(), + name: request.name, + bundle_type, + allowances: crate::charging::types::BundleAllowances { + data_bytes: request.allowances.data_bytes, + voice_seconds: request.allowances.voice_seconds, + sms_count: request.allowances.sms_count, + roaming_data_bytes: request.allowances.roaming_data_bytes, + }, + validity_days: request.validity_days, + priority: request.priority.unwrap_or(1), + amount_unconverted: request.amount_unconverted, + is_active: true, + }; + + match state.charging_engine.create_bundle(bundle).await { + Ok(_) => Ok(Json(json!({ + "status": "created", + "bundle_id": request.bundle_id, + }))), + Err(ChargingError::InvalidInput(_)) => Err(StatusCode::BAD_REQUEST), + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + } +} + +/// Activate a bundle for a subscriber +pub async fn activate_bundle( + State(state): State, + Path(bundle_id): Path, + AxumJson(request): AxumJson, +) -> Result, StatusCode> { + match state.charging_engine.activate_bundle_for_subscriber(&request.imsi, &bundle_id).await { + Ok(active_bundle) => Ok(Json(ActiveBundleResponse { + imsi: active_bundle.imsi, + bundle_id: active_bundle.bundle_id, + activated_at: active_bundle.activated_at, + expires_at: active_bundle.expires_at, + remaining_allowances: BundleAllowancesRequest { + data_bytes: active_bundle.remaining_allowances.data_bytes, + voice_seconds: active_bundle.remaining_allowances.voice_seconds, + sms_count: active_bundle.remaining_allowances.sms_count, + roaming_data_bytes: active_bundle.remaining_allowances.roaming_data_bytes, + }, + })), + Err(ChargingError::BundleNotFound(_)) => Err(StatusCode::NOT_FOUND), + Err(ChargingError::BundleAlreadyActive) => Err(StatusCode::CONFLICT), + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + } +} + +/// List active bundles for a subscriber +pub async fn list_active_bundles( + State(state): State, + Path(imsi): Path, +) -> Result>, StatusCode> { + match state.charging_engine.get_active_bundles_for_subscriber(&imsi).await { + Ok(bundles) => { + let response: Vec = bundles.into_iter().map(|b| ActiveBundleResponse { + imsi: b.imsi, + bundle_id: b.bundle_id, + activated_at: b.activated_at, + expires_at: b.expires_at, + remaining_allowances: BundleAllowancesRequest { + data_bytes: b.remaining_allowances.data_bytes, + voice_seconds: b.remaining_allowances.voice_seconds, + sms_count: b.remaining_allowances.sms_count, + roaming_data_bytes: b.remaining_allowances.roaming_data_bytes, + }, + }).collect(); + Ok(Json(response)) + }, + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + } +} + +/// Consume from bundle allowances (internal use) +pub async fn consume_from_bundle( + State(state): State, + AxumJson(request): AxumJson, +) -> Result, StatusCode> { + let imsi = request.get("imsi").and_then(|v| v.as_str()).ok_or(StatusCode::BAD_REQUEST)?; + let usage_type_str = request.get("usage_type").and_then(|v| v.as_str()).ok_or(StatusCode::BAD_REQUEST)?; + let amount = request.get("amount").and_then(|v| v.as_u64()).ok_or(StatusCode::BAD_REQUEST)?; + + let usage_type = match usage_type_str { + "data" => crate::charging::types::UsageType::Data, + "voice" => crate::charging::types::UsageType::Voice, + "sms" => crate::charging::types::UsageType::SMS, + _ => return Err(StatusCode::BAD_REQUEST), + }; + + match state.charging_engine.consume_from_bundle(imsi, usage_type, amount).await { + Ok(consumed) => Ok(Json(json!({ + "consumed": consumed, + "imsi": imsi, + "usage_type": usage_type_str, + "amount": amount, + }))), + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + } + + + +} + + + +/// GET /v1/bundles +pub async fn list_bundles( + State(state): State, +) -> ChargingResult> { + let bundles = state.charging_engine.list_bundles().await + .with_context("Failed to list bundles")?; + Ok(Json(serde_json::json!({ "bundles": bundles }))) +} + +/// GET /v1/bundles/:id +pub async fn get_bundle( + Path(bundle_id): Path, + State(state): State, +) -> ChargingResult> { + let bundle = state.charging_engine.get_bundle_by_id(&bundle_id).await + .with_context("Failed to get bundle")?; + match bundle { + Some(b) => Ok(Json(serde_json::json!({ + "bundle_id": b.bundle_id, + "name": b.name, + "validity_days": b.validity_days, + "priority": b.priority, + "is_active": b.is_active, + }))), + None => Err(ChargingError::BundleNotFound(bundle_id)), + } +} + +/// DELETE /v1/bundles/:id +pub async fn deactivate_bundle( + Path(bundle_id): Path, + State(state): State, +) -> ChargingResult> { + state.charging_engine.deactivate_bundle(&bundle_id).await + .with_context("Failed to deactivate bundle")?; + Ok(Json(serde_json::json!({ "status": "deactivated", "bundle_id": bundle_id }))) +} + + +/// POST /v1/bundles/:id/purchase +/// Purchase a bundle by spending airtime seconds. +pub async fn purchase_bundle_with_airtime( + State(state): State, + Path(bundle_id): Path, + Json(req): Json, +) -> ChargingResult> { + let active = state.charging_engine + .purchase_bundle_with_airtime(&req.imsi, &bundle_id) + .await + .with_context("Failed to purchase bundle with airtime")?; + + Ok(Json(serde_json::json!({ + "status": "purchased", + "imsi": active.imsi, + "bundle_id": active.bundle_id, + "expires_at": active.expires_at, + "remaining_allowances": active.remaining_allowances, + }))) +} + + +/// POST /v1/bundles/:id/purchase +pub async fn purchase_bundle( + State(state): State, + Path(bundle_id): Path, + Json(req): Json, +) -> ChargingResult> { + let active = state.charging_engine + .purchase_bundle_with_airtime(&req.msisdn, &bundle_id) + .await + .with_context("Failed to purchase bundle")?; + + Ok(Json(serde_json::json!({ + "status": "purchased", + "msisdn": req.msisdn, + "bundle_id": active.bundle_id, + "expires_at": active.expires_at, + "remaining_allowances": active.remaining_allowances, + }))) +} + + +pub async fn gift_bundle( + State(state): State, + Path(bundle_id): Path, + Json(req): Json, +) -> ChargingResult> { + let active = state.charging_engine + .gift_bundle_with_airtime(&req.sender_msisdn, &req.recipient_msisdn, &bundle_id) + .await + .with_context("Failed to gift bundle")?; + + Ok(Json(serde_json::json!({ + "status": "gifted", + "sender_msisdn": req.sender_msisdn, + "recipient_msisdn": req.recipient_msisdn, + "bundle_id": active.bundle_id, + "expires_at": active.expires_at, + "remaining_allowances": active.remaining_allowances, + }))) +} diff --git a/apps/charging-engine/src/handlers/mod.rs b/apps/charging-engine/src/handlers/mod.rs index fbd69da..aee6f52 100644 --- a/apps/charging-engine/src/handlers/mod.rs +++ b/apps/charging-engine/src/handlers/mod.rs @@ -6,6 +6,8 @@ pub mod monitoring; pub mod rating; pub mod subscriber; pub mod usage; +pub mod airtime; +pub mod bundles; pub use block::{block_user, is_user_blocked, unblock_user}; pub use credit::{add_credit, check_credit, deduct_credit, get_balance}; @@ -15,3 +17,5 @@ pub use monitoring::{detailed_health_check, get_error_stats, get_performance_met pub use rating::{add_rating_plan, get_rating_plan, list_rating_plans, remove_rating_plan}; pub use subscriber::{get_subscriber, update_subscriber}; pub use usage::{calculate_usage_cost, generate_invoice, process_usage, rate_usage, record_usage}; +pub use airtime::{add_airtime, get_airtime_balance, deduct_airtime}; +pub use bundles::{create_bundle, activate_bundle, list_active_bundles, consume_from_bundle,list_bundles, get_bundle, deactivate_bundle, gift_bundle, purchase_bundle, purchase_bundle_with_airtime}; diff --git a/apps/charging-engine/src/handlers/usage.rs b/apps/charging-engine/src/handlers/usage.rs index b2f3f5c..bbd28cd 100644 --- a/apps/charging-engine/src/handlers/usage.rs +++ b/apps/charging-engine/src/handlers/usage.rs @@ -18,7 +18,7 @@ pub async fn record_usage( let session_id = req.get("session_id").and_then(|v| v.as_str()).ok_or_else(|| { ChargingError::InvalidInput("Missing session_id".to_string()) })?; - + validate_session_id(session_id)?; let event = crate::charging::types::UsageEvent { @@ -53,7 +53,7 @@ pub async fn calculate_usage_cost( let session_id = req.get("session_id").and_then(|v| v.as_str()).ok_or_else(|| { ChargingError::InvalidInput("Missing session_id".to_string()) })?; - + validate_session_id(session_id)?; let event = crate::charging::types::UsageEvent { @@ -68,7 +68,7 @@ pub async fn calculate_usage_cost( let cost = state.charging_engine.calculate_usage_cost(&event).await .with_context("Failed to calculate usage cost")?; - + Ok(Json(serde_json::json!({ "cost": cost, "imsi": imsi, @@ -88,7 +88,7 @@ pub async fn rate_usage( let session_id = req.get("session_id").and_then(|v| v.as_str()).ok_or_else(|| { ChargingError::InvalidInput("Missing session_id".to_string()) })?; - + validate_session_id(session_id)?; let event = crate::charging::types::UsageEvent { @@ -103,7 +103,7 @@ pub async fn rate_usage( let rated_event = state.charging_engine.rate_usage(event).await .with_context("Failed to rate usage")?; - + Ok(Json(serde_json::json!({ "rated_event": rated_event, }))) @@ -121,7 +121,7 @@ pub async fn process_usage( let session_id = req.get("session_id").and_then(|v| v.as_str()).ok_or_else(|| { ChargingError::InvalidInput("Missing session_id".to_string()) })?; - + validate_session_id(session_id)?; let event = crate::charging::types::UsageEvent { @@ -136,11 +136,24 @@ pub async fn process_usage( state.charging_engine.process_usage_event(event).await .with_context("Failed to process usage event")?; - - Ok(Json(serde_json::json!({ - "status": "processed", - "imsi": imsi, - }))) + + let result = state.charging_engine.process_usage_event(event).await + .with_context("Failed to process usage event")?; + + Ok(Json(serde_json::json!({ + "status": "processed", + "imsi": result.imsi, + "session_id": result.session_id, + "volume": result.volume, + "cost": result.cost, + "charging_source": match result.charging_source { + ChargingSource::Bundle { subscriber_bundle_id } => serde_json::json!({ + "type": "bundle", + "subscriber_bundle_id": subscriber_bundle_id + }), + ChargingSource::Credit => serde_json::json!({ "type": "credit" }), + } + }))) } /// GET /v1/invoice/:imsi/:period @@ -151,7 +164,7 @@ pub async fn generate_invoice( ) -> ChargingResult> { let invoice = state.charging_engine.generate_invoice(&imsi, &period).await .with_context("Failed to generate invoice")?; - + Ok(Json(serde_json::json!({ "imsi": imsi, "period": period, diff --git a/apps/charging-engine/src/routes.rs b/apps/charging-engine/src/routes.rs index 310b461..39041f0 100644 --- a/apps/charging-engine/src/routes.rs +++ b/apps/charging-engine/src/routes.rs @@ -14,7 +14,7 @@ use crate::handlers::{ engine_start, engine_stop, engine_uptime, get_balance, get_error_stats, get_performance_metrics, get_rating_plan, get_subscriber, get_system_stats, is_user_blocked, list_rating_plans, record_usage, remove_rating_plan, start_sync, unblock_user, update_subscriber, health_check, - calculate_usage_cost, rate_usage, process_usage, generate_invoice, + calculate_usage_cost, rate_usage, process_usage, generate_invoice, add_airtime, get_airtime_balance, create_bundle, activate_bundle, list_active_bundles, consume_from_bundle }; use crate::models::AppState; @@ -76,6 +76,20 @@ pub fn create_router(state: AppState) -> Router { .route("/v1/usage/rate", post(rate_usage)) .route("/v1/usage/process", post(process_usage)) .route("/v1/invoice/:imsi/:period", get(generate_invoice)) + //airtime and bundle routes + .route("/v1/airtime/:imsi/add", post(add_airtime)) + .route("/v1/airtime/:imsi/balance", get(get_airtime_balance)) + .route("/v1/bundles", post(create_bundle)) + .route("/v1/bundles/:id/activate", post(activate_bundle)) + .route("/v1/bundles/:imsi/active", get(list_active_bundles)) + .route("/v1/bundles/consume", post(consume_from_bundle)) + .route("/v1/bundles", get(list_bundles)) + .route("/v1/bundles/:id", get(get_bundle)) + .route("/v1/bundles/deactivate/:id", delete(deactivate_bundle)) + .route("/v1/bundles/:id/purchase_with_airtime", post(purchase_bundle_with_airtime)) + .route("/v1/bundles/:id/purchase", post(purchase_bundle)) + .route("/v1/bundles/:id/gift", post(gift_bundle)) + .route("/v1/airtime/:imsi/deduct", post(deduct_airtime)) .layer(cors) .layer(GovernorLayer::new(governor_conf)) .route_layer(axum::middleware::from_fn_with_state(state.auth_config.clone(), auth_middleware)) diff --git a/apps/cli/internal/api/subscribers.go b/apps/cli/internal/api/subscribers.go index 487e29e..cd78ca9 100644 --- a/apps/cli/internal/api/subscribers.go +++ b/apps/cli/internal/api/subscribers.go @@ -4,6 +4,7 @@ package api type Subscriber struct { ID uint `json:"id"` IMSI string `json:"imsi"` + MSISDN string `json:"msisdn"` FirstName string `json:"first_name"` LastName string `json:"last_name"` Email string `json:"email"` @@ -13,15 +14,16 @@ type Subscriber struct { // SubscriberAccount holds account info type SubscriberAccount struct { IMSI string `json:"imsi"` + MSISDN string `json:"msisdn"` Name string `json:"name"` Status string `json:"status"` Balance float64 `json:"balance"` } // ListSubscribers retrieves subscribers -func (c *Client) ListSubscribers() ([]SubscriberAccount, error) { +func (c *Client) ListSubscribers() ([]Subscriber, error) { var resp struct { - Data []SubscriberAccount `json:"data"` + Data []Subscriber `json:"data"` } if err := c.doGetJSON("/api/v1/subscribers", &resp); err != nil { return nil, err @@ -30,8 +32,8 @@ func (c *Client) ListSubscribers() ([]SubscriberAccount, error) { } // GetSubscriber retrieves a single subscriber -func (c *Client) GetSubscriber(imsi string) (*SubscriberAccount, error) { - var sub SubscriberAccount +func (c *Client) GetSubscriber(imsi string) (*Subscriber, error) { + var sub Subscriber if err := c.doGetJSON("/api/v1/subscribers/"+imsi, &sub); err != nil { return nil, err } @@ -39,8 +41,8 @@ func (c *Client) GetSubscriber(imsi string) (*SubscriberAccount, error) { } // CreateSubscriber creates a subscriber -func (c *Client) CreateSubscriber(imsi, name string) (*SubscriberAccount, error) { - var sub SubscriberAccount +func (c *Client) CreateSubscriber(imsi, name string) (*Subscriber, error) { + var sub Subscriber body := map[string]string{"imsi": imsi, "name": name} if err := c.doPostJSON("/api/v1/subscribers", body, &sub); err != nil { return nil, err diff --git a/apps/cli/internal/commands/enhanced_subscribers.go b/apps/cli/internal/commands/enhanced_subscribers.go index fb9d67a..bf5d125 100644 --- a/apps/cli/internal/commands/enhanced_subscribers.go +++ b/apps/cli/internal/commands/enhanced_subscribers.go @@ -87,8 +87,11 @@ func balanceSubscriberUI(args []string, config *types.CLIConfig) error { return nil } t.AddRow("IMSI", sub.IMSI) - t.AddRow("Name", sub.Name) - t.AddRow("Balance", fmt.Sprintf("$%.2f", sub.Balance)) + t.AddRow("MSISDN", sub.MSISDN) + // (commented this due to a change in the CLI/internal/api/subscriber.go to sue Subscriber isntead of SubscriberAccount.) + // t.AddRow("Name", sub.Name) + // t.AddRow("Balance", fmt.Sprintf("$%.2f", sub.Balance)) + fmt.Println(t.Render()) return nil } @@ -127,7 +130,7 @@ func searchSubscribersUI(args []string, config *types.CLIConfig) error { subs, err := u.client.ListSubscribers() t := u.newTable() t.AddColumn("IMSI", 16, "left") - t.AddColumn("Name", 22, "left") + // t.AddColumn("Name", 22, "left") t.AddColumn("Status", 10, "left") t.AddColumn("Balance", 10, "right") @@ -139,7 +142,7 @@ func searchSubscribersUI(args []string, config *types.CLIConfig) error { } for _, s := range subs { t.AddStyledRow(statusStyle(s.Status).Style, - s.IMSI, s.Name, s.Status, fmt.Sprintf("$%.2f", s.Balance)) + s.IMSI, s.Status) } fmt.Println(t.Render()) return nil diff --git a/apps/cli/internal/commands/subscribers.go b/apps/cli/internal/commands/subscribers.go index cbdb868..abc0eec 100644 --- a/apps/cli/internal/commands/subscribers.go +++ b/apps/cli/internal/commands/subscribers.go @@ -62,10 +62,10 @@ func listSubscribers(u *uiContext) error { t.AddColumn("IMSI", 16, "left") t.AddColumn("Name", 22, "left") t.AddColumn("Status", 10, "left") - t.AddColumn("Balance", 10, "right") - t.AddStyledRow(statusStyle("ACTIVE").Style, "310260123456789", "John Doe", "Active", "$45.67") - t.AddStyledRow(statusStyle("ACTIVE").Style, "310260123456790", "Jane Smith", "Active", "$123.45") - t.AddStyledRow(statusStyle("INACTIVE").Style, "310260123456791", "Bob Johnson", "Inactive", "$0.00") + t.AddColumn("MSISDN", 10, "right") + t.AddStyledRow(statusStyle("ACTIVE").Style, "310260123456789", "John Doe", "Active", "07XXXXXXX") + t.AddStyledRow(statusStyle("ACTIVE").Style, "310260123456790", "Jane Smith", "Active", "07XXXXXXX") + t.AddStyledRow(statusStyle("INACTIVE").Style, "310260123456791", "Bob Johnson", "Inactive", "07XXXXXXX") fmt.Println(t.Render()) return nil } @@ -77,7 +77,7 @@ func listSubscribers(u *uiContext) error { t.AddColumn("Balance", 10, "right") for _, s := range subs { t.AddStyledRow(statusStyle(s.Status).Style, - s.IMSI, s.Name, s.Status, fmt.Sprintf("$%.2f", s.Balance)) + s.IMSI, s.Status) } fmt.Println(t.Render()) return nil @@ -102,7 +102,7 @@ func createSubscriber(u *uiContext, args []string) error { t.AddColumn("Field", 12, "left") t.AddColumn("Value", 24, "left") t.AddRow("IMSI", sub.IMSI) - t.AddRow("Name", sub.Name) + // t.AddRow("Name", sub.Name) t.AddRow("Status", sub.Status) fmt.Println(t.Render()) return nil @@ -151,9 +151,9 @@ func showSubscriber(u *uiContext, args []string) error { return nil } t.AddRow("IMSI", sub.IMSI) - t.AddRow("Name", sub.Name) + t.AddRow("Name", sub.FirstName+" "+sub.LastName) t.AddRow("Status", sub.Status) - t.AddRow("Balance", fmt.Sprintf("$%.2f", sub.Balance)) + t.AddRow("MSISDN", sub.MSISDN) fmt.Println(t.Render()) return nil } diff --git a/migrations/00019_init_airtime.sql b/migrations/00019_init_airtime.sql new file mode 100644 index 0000000..bedff04 --- /dev/null +++ b/migrations/00019_init_airtime.sql @@ -0,0 +1,32 @@ +-- +goose Up +CREATE TABLE IF NOT EXISTS airtime_balances ( + id SERIAL PRIMARY KEY, + subscriber_id INTEGER NOT NULL REFERENCES subscribers(id) ON DELETE CASCADE, + imsi TEXT NOT NULL UNIQUE, + home_seconds BIGINT NOT NULL DEFAULT 0, + roaming_seconds BIGINT NOT NULL DEFAULT 0, + last_topped_up TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS airtime_transactions ( + id SERIAL PRIMARY KEY, + subscriber_id INTEGER NOT NULL REFERENCES subscribers(id) ON DELETE CASCADE, + imsi TEXT NOT NULL, + transaction_type TEXT NOT NULL, -- 'topup', 'deduction', 'bundle_purchase' + seconds_delta BIGINT NOT NULL, -- positive = credit, negative = debit + balance_after BIGINT NOT NULL, + roaming BOOLEAN NOT NULL DEFAULT FALSE, + reference_id TEXT, -- bundle_id if bundle_purchase, session_id if deduction + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_airtime_balances_imsi ON airtime_balances(imsi); +CREATE INDEX IF NOT EXISTS idx_airtime_transactions_imsi ON airtime_transactions(imsi); +CREATE INDEX IF NOT EXISTS idx_airtime_transactions_type ON airtime_transactions(transaction_type); +CREATE INDEX IF NOT EXISTS idx_airtime_transactions_created_at ON airtime_transactions(created_at); + +-- +goose Down +DROP TABLE IF EXISTS airtime_transactions CASCADE; +DROP TABLE IF EXISTS airtime_balances CASCADE; diff --git a/migrations/00020_init_bundles.sql b/migrations/00020_init_bundles.sql new file mode 100644 index 0000000..bcf16ff --- /dev/null +++ b/migrations/00020_init_bundles.sql @@ -0,0 +1,45 @@ +-- +goose Up +CREATE TABLE IF NOT EXISTS bundles ( + id SERIAL PRIMARY KEY, + bundle_id TEXT NOT NULL UNIQUE, + name TEXT NOT NULL, + bundle_type TEXT NOT NULL, -- 'data', 'voice', 'sms', 'hybrid' + data_bytes BIGINT, + voice_seconds BIGINT, + sms_count BIGINT, + roaming_data_bytes BIGINT, + validity_days INTEGER NOT NULL DEFAULT 30, + priority SMALLINT NOT NULL DEFAULT 1, + airtime_cost BIGINT NOT NULL DEFAULT 0, -- cost in airtime seconds to purchase + is_active BOOLEAN NOT NULL DEFAULT TRUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS subscriber_bundles ( + id SERIAL PRIMARY KEY, + subscriber_id INTEGER NOT NULL REFERENCES subscribers(id) ON DELETE CASCADE, + imsi TEXT NOT NULL, + bundle_id TEXT NOT NULL REFERENCES bundles(bundle_id), + activated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + expires_at TIMESTAMPTZ NOT NULL, + remaining_data_bytes BIGINT, + remaining_voice_seconds BIGINT, + remaining_sms_count BIGINT, + remaining_roaming_bytes BIGINT, + status TEXT NOT NULL DEFAULT 'active', -- 'active', 'exhausted', 'expired' + purchased_with_airtime BOOLEAN NOT NULL DEFAULT FALSE, + airtime_cost_paid BIGINT NOT NULL DEFAULT 0, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_bundles_bundle_id ON bundles(bundle_id); +CREATE INDEX IF NOT EXISTS idx_bundles_is_active ON bundles(is_active); +CREATE INDEX IF NOT EXISTS idx_subscriber_bundles_imsi ON subscriber_bundles(imsi); +CREATE INDEX IF NOT EXISTS idx_subscriber_bundles_expires_at ON subscriber_bundles(expires_at); +CREATE INDEX IF NOT EXISTS idx_subscriber_bundles_status ON subscriber_bundles(status); + +-- +goose Down +DROP TABLE IF EXISTS subscriber_bundles CASCADE; +DROP TABLE IF EXISTS bundles CASCADE; diff --git a/migrations/00021_usage_records_charging_source.sql b/migrations/00021_usage_records_charging_source.sql new file mode 100644 index 0000000..a9e988f --- /dev/null +++ b/migrations/00021_usage_records_charging_source.sql @@ -0,0 +1,11 @@ +-- +goose Up +ALTER TABLE usage_records + ADD COLUMN IF NOT EXISTS charging_source TEXT DEFAULT 'credit', + ADD COLUMN IF NOT EXISTS subscriber_bundle_id INTEGER REFERENCES subscriber_bundles(id); + +CREATE INDEX IF NOT EXISTS idx_usage_records_charging_source ON usage_records(charging_source); + +-- +goose Down +ALTER TABLE usage_records + DROP COLUMN IF EXISTS charging_source, + DROP COLUMN IF EXISTS subscriber_bundle_id; diff --git a/migrations/00022_bundles_amount_unconverted.sql b/migrations/00022_bundles_amount_unconverted.sql new file mode 100644 index 0000000..8435cf9 --- /dev/null +++ b/migrations/00022_bundles_amount_unconverted.sql @@ -0,0 +1,6 @@ +-- +goose Up +ALTER TABLE bundles + ADD COLUMN IF NOT EXISTS amount_unconverted BIGINT NOT NULL DEFAULT 0; + +-- +goose Down +ALTER TABLE bundles DROP COLUMN IF EXISTS amount_unconverted; diff --git a/migrations/00023_add_subscriber_sqn.sql b/migrations/00023_add_subscriber_sqn.sql new file mode 100644 index 0000000..75f9ab9 --- /dev/null +++ b/migrations/00023_add_subscriber_sqn.sql @@ -0,0 +1,5 @@ +-- +goose Up +ALTER TABLE subscribers ADD COLUMN IF NOT EXISTS sqn BIGINT NOT NULL DEFAULT 0; + +-- +goose Down +ALTER TABLE subscribers DROP COLUMN IF EXISTS sqn;