From dce9fcbf03595ba4804da87d65521751cfccbf88 Mon Sep 17 00:00:00 2001 From: ashwin275 <110539449+ashwin275@users.noreply.github.com> Date: Thu, 19 Mar 2026 01:15:15 +0530 Subject: [PATCH] feat:v2 apply endpoint --- Cargo.lock | 7 + Cargo.toml | 1 + src/config.rs | 6 + src/db/job.rs | 52 ++++++- src/db/match_score.rs | 54 +------ src/db/profiles.rs | 99 ++++++++++++- src/http/routes/job.rs | 6 +- src/models/job_apply.rs | 5 + src/services/job_apply.rs | 274 +++++++++++++++++++++++++++++++++--- src/services/match_score.rs | 2 +- src/utils/external_apis.rs | 41 ++++++ src/utils/match_score.rs | 12 +- src/utils/mod.rs | 1 + 13 files changed, 477 insertions(+), 83 deletions(-) create mode 100644 src/utils/external_apis.rs diff --git a/Cargo.lock b/Cargo.lock index a1b4848..0627e7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -190,6 +190,7 @@ dependencies = [ "tracing", "tracing-appender", "tracing-subscriber", + "urlencoding", "uuid", ] @@ -3022,6 +3023,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf8_iter" version = "1.0.4" diff --git a/Cargo.toml b/Cargo.toml index 31fda2d..f4ad089 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,3 +33,4 @@ deadpool-redis = "0.22.0" strsim = "0.11.1" hmac = "0.12.1" rand = "0.10.0" +urlencoding = "2.1.3" diff --git a/src/config.rs b/src/config.rs index 7856723..41b4498 100644 --- a/src/config.rs +++ b/src/config.rs @@ -140,6 +140,11 @@ pub struct BackendServiceConfig { pub api_key: String, } #[derive(Debug, Serialize, Deserialize, Clone)] +pub struct GeoCodingConfig { + pub base_url: String, + pub api_key: String, +} +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct NotificationServiceConfig { pub base_url: String, pub ns_secret: String, @@ -150,6 +155,7 @@ pub struct NotificationServiceConfig { pub struct ServicesConfig { pub seeker: BackendServiceConfig, pub notification: NotificationServiceConfig, + pub geo_coding: GeoCodingConfig, } #[derive(Debug, Serialize, Deserialize, Clone)] diff --git a/src/db/job.rs b/src/db/job.rs index 570e473..998825d 100644 --- a/src/db/job.rs +++ b/src/db/job.rs @@ -1,6 +1,22 @@ use chrono::{DateTime, Utc}; use serde_json::Value; -use sqlx::{query, Error, PgPool}; +use sqlx::{query, query_as, Error, FromRow, PgPool}; +use uuid::Uuid; +#[derive(Debug, FromRow)] +pub struct JobRow { + pub id: Uuid, + pub hash: String, + pub metadata: Option, + pub beckn_structure: Option, +} + +#[derive(sqlx::FromRow, Debug)] +pub struct JobLookup { + pub job_id: String, + pub provider_id: Option, + pub bpp_id: Option, + pub bpp_uri: Option, +} pub struct NewJob { pub job_id: String, @@ -144,3 +160,37 @@ pub async fn deactivate_stale_jobs( Ok(result.rows_affected()) } +pub async fn fetch_job_by_id(pool: &PgPool, job_id: Uuid) -> Result { + query_as::<_, JobRow>( + r#" + SELECT + id, + hash, + metadata, + beckn_structure + FROM jobs + WHERE id = $1 + "#, + ) + .bind(job_id) + .fetch_one(pool) + .await +} + +pub async fn fetch_job_by_job_id(pool: &PgPool, job_id: &str) -> Result { + query_as::<_, JobLookup>( + r#" + SELECT + job_id, + provider_id, + bpp_id, + bpp_uri + FROM jobs + WHERE job_id = $1 + AND is_active = true + "#, + ) + .bind(job_id) + .fetch_one(pool) + .await +} diff --git a/src/db/match_score.rs b/src/db/match_score.rs index c5162cc..2620e08 100644 --- a/src/db/match_score.rs +++ b/src/db/match_score.rs @@ -1,23 +1,8 @@ +use crate::db::{job::JobRow, profiles::ProfileRow}; use serde_json::Value; use sqlx::{query, query_as, query_scalar, FromRow, PgPool}; use uuid::Uuid; -#[derive(Debug, FromRow)] -pub struct JobRow { - pub id: Uuid, - pub hash: String, - pub metadata: Option, - pub beckn_structure: Option, -} - -#[derive(Debug, FromRow)] -pub struct ProfileRow { - pub id: Uuid, - pub hash: String, - pub metadata: Option, - pub beckn_structure: Option, -} - #[derive(Debug, FromRow, Clone)] pub struct JobLiteRow { pub id: Uuid, @@ -150,43 +135,6 @@ pub async fn upsert_match_score( Ok(()) } -pub async fn fetch_job_by_id(pool: &PgPool, job_id: Uuid) -> Result { - query_as::<_, JobRow>( - r#" - SELECT - id, - hash, - metadata, - beckn_structure - FROM jobs - WHERE id = $1 - "#, - ) - .bind(job_id) - .fetch_one(pool) - .await -} - -pub async fn fetch_profile_by_id( - pool: &PgPool, - profile_id: Uuid, -) -> Result { - query_as::<_, ProfileRow>( - r#" - SELECT - id, - hash, - metadata, - beckn_structure - FROM profiles - WHERE id = $1 - "#, - ) - .bind(profile_id) - .fetch_one(pool) - .await -} - pub async fn fetch_all_jobs(pool: &PgPool) -> Result, sqlx::Error> { sqlx::query_as::<_, JobRow>( r#" diff --git a/src/db/profiles.rs b/src/db/profiles.rs index a6f751e..90cd0f0 100644 --- a/src/db/profiles.rs +++ b/src/db/profiles.rs @@ -1,14 +1,31 @@ use crate::models::search::Pagination; +use crate::services::profiles::sync_profile_by_id; +use crate::state::AppState; +use axum::http::StatusCode; use chrono::{DateTime, Utc}; -use serde_json::Value; -use sqlx::{query, query_scalar, Error, PgPool, Row}; +use serde_json::{json, Value}; +use sqlx::{query, query_as, query_scalar, Error, FromRow, PgPool, Row}; +use std::sync::Arc; use tracing::info; - +use uuid::Uuid; pub struct PaginatedItems { pub items: Vec, pub total: i64, } +#[derive(Debug, FromRow)] +pub struct ProfileRow { + pub id: Uuid, + pub hash: String, + pub metadata: Option, + pub beckn_structure: Option, +} + +#[derive(FromRow, Debug)] +pub struct ProfileLookup { + pub profile_id: String, + pub metadata: serde_json::Value, +} pub struct NewProfile { pub profile_id: String, pub user_id: String, @@ -186,3 +203,79 @@ pub async fn fetch_beckn_profile_items( Ok(PaginatedItems { items, total }) } + +pub async fn fetch_profile_by_id( + pool: &PgPool, + profile_id: Uuid, +) -> Result { + query_as::<_, ProfileRow>( + r#" + SELECT + id, + hash, + metadata, + beckn_structure + FROM profiles + WHERE id = $1 + "#, + ) + .bind(profile_id) + .fetch_one(pool) + .await +} + +pub async fn get_or_sync_profile( + state: &Arc, + profile_id: &str, +) -> Result { + let profile = query_as::<_, ProfileLookup>( + r#" + SELECT profile_id, metadata + FROM profiles + WHERE profile_id = $1 + "#, + ) + .bind(profile_id) + .fetch_optional(&state.db_pool) + .await + .map_err(|e| { + tracing::error!("DB error: {:?}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + json!({"error": "Database error"}), + ) + })?; + + if let Some(p) = profile { + return Ok(p); + } + + if let Err(e) = sync_profile_by_id(state, profile_id).await { + tracing::error!("❌ Sync failed for {}: {:?}", profile_id, e); + } + let profile = query_as::<_, ProfileLookup>( + r#" + SELECT profile_id, metadata + FROM profiles + WHERE profile_id = $1 + "#, + ) + .bind(profile_id) + .fetch_optional(&state.db_pool) + .await + .map_err(|e| { + tracing::error!("DB error: {:?}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + json!({"error": "Database error"}), + ) + })?; + + if let Some(p) = profile { + return Ok(p); + } + Err(( + StatusCode::NOT_FOUND, + json!({"error": "Invalid or missing profile_id"}), + )) +} diff --git a/src/http/routes/job.rs b/src/http/routes/job.rs index d5129bf..a2a8a7f 100644 --- a/src/http/routes/job.rs +++ b/src/http/routes/job.rs @@ -1,4 +1,4 @@ -use crate::services::job_apply::{handle_job_applications, handle_job_apply}; +use crate::services::job_apply::{handle_job_applications, handle_job_apply, handle_job_apply_v2}; use crate::services::job_draft::{ create_user_draft_application, delete_user_draft_application, get_user_draft_applications, update_user_draft_application, @@ -29,5 +29,9 @@ pub fn routes(app_state: Arc) -> Router { "/v1/job-applications/drafts/{id}", delete(delete_user_draft_application), ) + // ==================== + // V2 APIs + // ==================== + .route("/v2/apply", post(handle_job_apply_v2)) .with_state(app_state) } diff --git a/src/models/job_apply.rs b/src/models/job_apply.rs index 9d151df..cf9d41f 100644 --- a/src/models/job_apply.rs +++ b/src/models/job_apply.rs @@ -6,6 +6,11 @@ pub struct JobRequest { pub context: MinimalContext, pub message: InitMessage, } +#[derive(Debug, Deserialize, Serialize)] +pub struct JobApplyV2Request { + pub job_id: String, + pub profile_id: String, +} pub type DraftRequest = JobRequest; pub type JobApplyRequest = JobRequest; diff --git a/src/services/job_apply.rs b/src/services/job_apply.rs index 561e6c1..b587a7f 100644 --- a/src/services/job_apply.rs +++ b/src/services/job_apply.rs @@ -1,11 +1,13 @@ -use crate::db::job_applications::{ - get_job_applications, store_job_applications, NewJobApplication, +use crate::db::{ + job::{fetch_job_by_job_id, JobLookup}, + job_applications::{get_job_applications, store_job_applications, NewJobApplication}, + profiles::{get_or_sync_profile, ProfileLookup}, }; use crate::models::webhook::{Ack, AckResponse, AckStatus, WebhookPayload}; use crate::services::payload_generator::build_beckn_payload; -use crate::utils::http_client::post_json; +use crate::utils::{external_apis::call_google_geocode, http_client::post_json}; use crate::{ - models::job_apply::{JobApplicationsQuery, JobApplyRequest}, + models::job_apply::{JobApplicationsQuery, JobApplyRequest, JobApplyV2Request}, state::AppState, }; use axum::{ @@ -15,7 +17,7 @@ use axum::{ Json, }; use serde::Serialize; -use serde_json::json; +use serde_json::{json, Value}; use tokio::sync::oneshot::channel; use tokio::time::{timeout, Duration}; use tracing::info; @@ -31,6 +33,15 @@ pub async fn handle_job_apply( State(app_state): State>, Json(req): Json, ) -> Result { + match process_job_apply(&app_state, &req).await { + Ok(res) => Ok((StatusCode::OK, Json(res))), + Err((status, err)) => Err((status, Json(err)).into_response()), + } +} +pub async fn process_job_apply( + app_state: &Arc, + req: &JobApplyRequest, +) -> Result { let user_id = req .message .order @@ -56,40 +67,43 @@ pub async fn handle_job_apply( tracing::error!("DB error: {:?}", e); return Err(( StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({"error": "Database error"})), - ) - .into_response()); + json!({"error": "Database error"}), + )); } }; + if let Some(application) = existing.into_iter().next() { - let response_body = json!({ + return Ok(json!({ "message": "User has already applied for this job", "application": application - }); - return Ok((StatusCode::OK, axum::Json(response_body))); + })); } - let _on_init = match call_and_wait_for_action(&app_state, &req, "init").await { + let _on_init = match call_and_wait_for_action(app_state, req, "init").await { Ok(data) => data, - Err((status, err)) => return Err((status, err).into_response()), + Err((status, err)) => return Err((status, json!(err.0))), }; - let on_confirm = match call_and_wait_for_action(&app_state, &req, "confirm").await { + + let on_confirm = match call_and_wait_for_action(app_state, req, "confirm").await { Ok(data) => data, - Err((status, err)) => return Err((status, err).into_response()), + Err((status, err)) => return Err((status, json!(err.0))), }; let transaction_id = on_confirm["context"]["transaction_id"] .as_str() .unwrap_or_default() .to_string(); + let bpp_id = on_confirm["context"]["bpp_id"] .as_str() .unwrap_or_default() .to_string(); + let bpp_uri = on_confirm["context"]["bpp_uri"] .as_str() .unwrap_or_default() .to_string(); + let user_id = on_confirm["message"]["order"]["fulfillments"] .get(0) .and_then(|f| f["customer"]["person"]["id"].as_str()) @@ -123,17 +137,16 @@ pub async fn handle_job_apply( status: Some("APPLIED".to_string()), metadata: Some(on_confirm.clone()), }; - // Store in DB + if let Err(e) = store_job_applications(&app_state.db_pool, new_application).await { tracing::error!("❌ Failed to store job application: {:?}", e); return Err(( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"error": "Failed to store job application"})), - ) - .into_response()); + json!({"error": "Failed to store job application"}), + )); } - Ok((StatusCode::OK, Json(on_confirm))) + Ok(on_confirm) } async fn call_and_wait_for_action( @@ -308,3 +321,224 @@ pub async fn handle_job_applications( } } } + +pub async fn handle_job_apply_v2( + State(app_state): State>, + Json(req): Json, +) -> Result { + let profile = match get_or_sync_profile(&app_state, &req.profile_id).await { + Ok(p) => p, + Err((status, err)) => { + return Err((status, Json(err)).into_response()); + } + }; + let job = match fetch_job_by_job_id(&app_state.db_pool, &req.job_id).await { + Ok(j) => j, + Err(e) => { + tracing::error!("❌ Failed to fetch job: {:?}", e); + return Err(( + StatusCode::NOT_FOUND, + Json(json!({"error": "Job not found"})), + ) + .into_response()); + } + }; + let payload = build_job_apply_payload(&app_state, &job, &profile).await; + + let job_apply_req: JobApplyRequest = match serde_json::from_value(payload) { + Ok(req) => req, + Err(e) => { + tracing::error!("❌ Failed to parse JobApplyRequest: {:?}", e); + return Err(( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "Invalid payload structure"})), + ) + .into_response()); + } + }; + + let result = match process_job_apply(&app_state, &job_apply_req).await { + Ok(res) => res, + Err((status, err)) => { + return Err((status, Json(err)).into_response()); + } + }; + Ok((StatusCode::OK, Json(result))) +} + +pub async fn build_job_apply_payload( + app_state: &Arc, + job: &JobLookup, + profile: &ProfileLookup, +) -> Value { + let transaction_id = Uuid::new_v4().to_string(); + + let profile_id = &profile.profile_id; + let metadata = &profile.metadata; + + let who_i_am = metadata.get("whoIAm"); + + let name = who_i_am + .and_then(|w| w.get("name")) + .and_then(|v| v.as_str()) + .unwrap_or(""); + + let age = who_i_am + .and_then(|w| w.get("age")) + .and_then(|v| v.as_i64()) + .map(|a| a.to_string()) + .unwrap_or_default(); + + let gender = who_i_am + .and_then(|w| w.get("gender")) + .and_then(|v| v.as_str()) + .unwrap_or(""); + + let phone = who_i_am + .and_then(|w| w.get("phone")) + .and_then(|v| v.as_str()) + .unwrap_or(""); + + let address = who_i_am + .and_then(|w| w.get("location")) + .and_then(|v| v.as_str()) + .unwrap_or(""); + + let geo_data = if !address.is_empty() { + match call_google_geocode(app_state, address).await { + Ok(data) => Some(data), + Err(e) => { + tracing::error!("❌ Geocode failed: {:?}", e); + None + } + } + } else { + None + }; + let geo_location = geo_data + .as_ref() + .and_then(|g| g.get("results")) + .and_then(|r| r.get(0)) + .and_then(|r| r.get("geometry")) + .and_then(|g| g.get("location")); + + let lat = geo_location + .and_then(|l| l.get("lat")) + .and_then(|v| v.as_f64()) + .unwrap_or(0.0); + + let lng = geo_location + .and_then(|l| l.get("lng")) + .and_then(|v| v.as_f64()) + .unwrap_or(0.0); + + let mut city_name = ""; + let mut state_name = ""; + let mut state_code = ""; + let mut country_name = ""; + let mut country_code = ""; + + if let Some(components) = geo_data + .as_ref() + .and_then(|g| g.get("results")) + .and_then(|r| r.get(0)) + .and_then(|r| r.get("address_components")) + .and_then(|v| v.as_array()) + { + for comp in components { + let types = comp.get("types").and_then(|t| t.as_array()); + + let long_name = comp.get("long_name").and_then(|v| v.as_str()).unwrap_or(""); + let short_name = comp + .get("short_name") + .and_then(|v| v.as_str()) + .unwrap_or(""); + + if let Some(types) = types { + let has_type = |t: &str| types.iter().any(|x| x.as_str() == Some(t)); + + if city_name.is_empty() + && (has_type("locality") + || has_type("sublocality") + || has_type("administrative_area_level_2")) + { + city_name = long_name; + } + if state_name.is_empty() && has_type("administrative_area_level_1") { + state_name = long_name; + state_code = short_name; + } + if country_name.is_empty() && has_type("country") { + country_name = long_name; + country_code = short_name; + } + } + } + } + + let formatted_address = geo_data + .as_ref() + .and_then(|g| g.get("results")) + .and_then(|r| r.get(0)) + .and_then(|r| r.get("formatted_address")) + .and_then(|v| v.as_str()) + .unwrap_or(""); + + json!({ + "context": { + "bpp_id": job.bpp_id.clone().unwrap_or_default(), + "bpp_uri": job.bpp_uri.clone().unwrap_or_default(), + "transaction_id": transaction_id + }, + "message": { + "order": { + "provider": { + "id": job.provider_id.clone().unwrap_or_default() + }, + "items": [ + { + "id": job.job_id, + "fulfillment_ids": [profile_id] + } + ], + "fulfillments": [ + { + "id": profile_id, + "customer": { + "person": { + "id": profile_id, + "name": name, + "age": age, + "gender": gender, + "metadata": metadata + }, + "contact": { + "phone": phone, + "email": "" + }, + "location": { + "gps": { + "lat": lat, + "lng": lng + }, + "address": formatted_address, + "city": { + "name": city_name, + "code": "" + }, + "state": { + "name": state_name, + "code": state_code + }, + "country": { + "name": country_name, + "code": country_code + } + } + } + } + ] + } + } + }) +} diff --git a/src/services/match_score.rs b/src/services/match_score.rs index b7e3444..f9946c6 100644 --- a/src/services/match_score.rs +++ b/src/services/match_score.rs @@ -2,7 +2,7 @@ use crate::utils::empeding::{ compute_empeding_match_score, job_text_for_embedding, profile_text_for_embedding, }; use crate::{ - db::match_score::{JobRow, ProfileRow}, + db::{job::JobRow, profiles::ProfileRow}, state::AppState, }; use serde_json::Value; diff --git a/src/utils/external_apis.rs b/src/utils/external_apis.rs new file mode 100644 index 0000000..8408746 --- /dev/null +++ b/src/utils/external_apis.rs @@ -0,0 +1,41 @@ +use crate::{state::AppState, utils::http_client::get_json}; +use anyhow::Result; +use deadpool_redis::redis::AsyncCommands; +use serde_json::Value; +use std::sync::Arc; +use urlencoding::encode; + +pub async fn call_google_geocode(app_state: &Arc, address: &str) -> Result { + let base_url = &app_state.config.services.geo_coding.base_url; + let api_key = &app_state.config.services.geo_coding.api_key; + + let normalized_address = address.trim().to_lowercase(); + + let cache_key = format!("geo:google:in:{}", normalized_address); + + let mut conn = app_state.redis_pool.get().await?; + + if let Ok(Some(cached)) = conn.get::<_, Option>(&cache_key).await { + tracing::info!("🟢 Google Geocode cache HIT"); + let parsed: Value = serde_json::from_str(&cached)?; + return Ok(parsed); + } + + tracing::info!("🟡 Google Geocode cache MISS → calling API"); + + let url = format!( + "{}/geocode/json?address={}®ion=IN&key={}", + base_url, + encode(address), + api_key + ); + + let headers = reqwest::header::HeaderMap::new(); + let data = get_json(&url, headers).await?; + + let _: () = conn + .set_ex(&cache_key, serde_json::to_string(&data)?, 60 * 60 * 24 * 30) + .await?; + + Ok(data) +} diff --git a/src/utils/match_score.rs b/src/utils/match_score.rs index 3e204bc..86099f3 100644 --- a/src/utils/match_score.rs +++ b/src/utils/match_score.rs @@ -1,7 +1,11 @@ -use crate::db::match_score::{ - fetch_all_jobs, fetch_all_profiles, fetch_job_by_id, fetch_missing_matches, fetch_new_jobs, - fetch_new_profiles, fetch_profile_by_id, fetch_stale_matches, upsert_match_score, JobLiteRow, - JobRow, ProfileLiteRow, ProfileRow, StaleMatchRow, +use crate::db::{ + job::{fetch_job_by_id, JobRow}, + match_score::{ + fetch_all_jobs, fetch_all_profiles, fetch_missing_matches, fetch_new_jobs, + fetch_new_profiles, fetch_stale_matches, upsert_match_score, JobLiteRow, ProfileLiteRow, + StaleMatchRow, + }, + profiles::{fetch_profile_by_id, ProfileRow}, }; use crate::services::match_score::compute_match_score; use crate::state::AppState; diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 5452019..3ac1094 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,6 +1,7 @@ pub mod batching; pub mod cron; pub mod empeding; +pub mod external_apis; pub mod hash; pub mod http_client; pub mod logging;