Skip to content

Commit d2f262f

Browse files
committed
**feat(api): Add Patronage and PDS Fleet API controllers with routes and configuration**
- Implemented `PatronageApiController` with subscription management endpoints (create, cancel, list, and summary). - Added `PdsFleetApiController` to handle PDS fleet operations, including heartbeat, submission, and node management. - Updated routes configuration to include new APIs for Patronage and PDS Fleet operations. - Enhanced `application.conf` with AT Protocol and PDS authentication settings.
1 parent 34e7057 commit d2f262f

File tree

4 files changed

+285
-0
lines changed

4 files changed

+285
-0
lines changed
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package controllers
2+
3+
import actions.ApiSecurityAction
4+
import jakarta.inject.{Inject, Singleton}
5+
import play.api.Logging
6+
import play.api.libs.json.{Json, OFormat}
7+
import play.api.mvc.*
8+
import services.{PatronageService, WebhookEvent}
9+
10+
import java.util.UUID
11+
import scala.concurrent.ExecutionContext
12+
13+
@Singleton
14+
class PatronageApiController @Inject()(
15+
val controllerComponents: ControllerComponents,
16+
secureApi: ApiSecurityAction,
17+
patronageService: PatronageService
18+
)(implicit ec: ExecutionContext) extends BaseController with Logging {
19+
20+
case class CreateSubscriptionRequest(
21+
userId: UUID,
22+
tier: String,
23+
billingInterval: String,
24+
paymentProvider: String,
25+
providerSubscriptionId: Option[String] = None,
26+
providerCustomerId: Option[String] = None
27+
)
28+
object CreateSubscriptionRequest { implicit val format: OFormat[CreateSubscriptionRequest] = Json.format }
29+
30+
case class CancelSubscriptionRequest(userId: UUID)
31+
object CancelSubscriptionRequest { implicit val format: OFormat[CancelSubscriptionRequest] = Json.format }
32+
33+
def createSubscription(): Action[CreateSubscriptionRequest] =
34+
secureApi.jsonAction[CreateSubscriptionRequest].async { request =>
35+
val r = request.body
36+
patronageService.createSubscription(
37+
userId = r.userId,
38+
tier = r.tier,
39+
billingInterval = r.billingInterval,
40+
paymentProvider = r.paymentProvider,
41+
providerSubscriptionId = r.providerSubscriptionId,
42+
providerCustomerId = r.providerCustomerId
43+
).map {
44+
case Right(sub) => Created(Json.toJson(sub))
45+
case Left(error) => BadRequest(Json.obj("error" -> error))
46+
}
47+
}
48+
49+
def cancelSubscription(id: Int): Action[CancelSubscriptionRequest] =
50+
secureApi.jsonAction[CancelSubscriptionRequest].async { request =>
51+
patronageService.cancelSubscription(id, request.body.userId).map {
52+
case Right(_) => Ok(Json.obj("cancelled" -> true))
53+
case Left(error) => BadRequest(Json.obj("error" -> error))
54+
}
55+
}
56+
57+
def getSubscription(userId: UUID): Action[AnyContent] = secureApi.async { _ =>
58+
patronageService.getActiveSubscription(userId).map {
59+
case Some(sub) => Ok(Json.toJson(sub))
60+
case None => NotFound(Json.obj("error" -> "No active subscription"))
61+
}
62+
}
63+
64+
def getUserSubscriptions(userId: UUID): Action[AnyContent] = secureApi.async { _ =>
65+
patronageService.getUserSubscriptions(userId).map { subs =>
66+
Ok(Json.obj("subscriptions" -> subs, "total" -> subs.size))
67+
}
68+
}
69+
70+
def isPatron(userId: UUID): Action[AnyContent] = secureApi.async { _ =>
71+
patronageService.isPatron(userId).map { isPatron =>
72+
Ok(Json.obj("isPatron" -> isPatron))
73+
}
74+
}
75+
76+
def getPatronSummary: Action[AnyContent] = secureApi.async { _ =>
77+
patronageService.getPatronSummary.map(summary => Ok(Json.toJson(summary)))
78+
}
79+
80+
def expireOverdue(): Action[AnyContent] = secureApi.async { _ =>
81+
patronageService.expireOverdueSubscriptions().map { count =>
82+
Ok(Json.obj("expired" -> count))
83+
}
84+
}
85+
}
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
package controllers
2+
3+
import actions.{ApiSecurityAction, PdsAuthAction}
4+
import jakarta.inject.{Inject, Singleton}
5+
import play.api.Logging
6+
import play.api.libs.json.{Json, OFormat}
7+
import play.api.mvc.*
8+
import services.{HeartbeatRequest, PdsFleetService, SubmissionProvenanceService}
9+
10+
import java.util.UUID
11+
import scala.concurrent.ExecutionContext
12+
13+
@Singleton
14+
class PdsFleetApiController @Inject()(
15+
val controllerComponents: ControllerComponents,
16+
pdsAuth: PdsAuthAction,
17+
secureApi: ApiSecurityAction,
18+
fleetService: PdsFleetService,
19+
submissionService: SubmissionProvenanceService
20+
)(implicit ec: ExecutionContext) extends BaseController with Logging {
21+
22+
// --- PDS-authenticated endpoints (called by edge nodes) ---
23+
24+
case class HeartbeatPayload(
25+
status: String,
26+
softwareVersion: Option[String] = None,
27+
loadMetrics: Option[play.api.libs.json.JsValue] = None,
28+
processingQueueSize: Option[Int] = None,
29+
errorMessage: Option[String] = None,
30+
lastCommitCid: Option[String] = None,
31+
lastCommitRev: Option[String] = None
32+
)
33+
object HeartbeatPayload { implicit val format: OFormat[HeartbeatPayload] = Json.format }
34+
35+
def heartbeat(): Action[HeartbeatPayload] = pdsAuth.jsonAction[HeartbeatPayload].async { request =>
36+
val node = request.pdsNode
37+
val payload = request.body
38+
val hbRequest = HeartbeatRequest(
39+
did = node.did,
40+
pdsUrl = node.pdsUrl,
41+
handle = node.handle,
42+
nodeName = node.nodeName,
43+
softwareVersion = payload.softwareVersion,
44+
status = payload.status,
45+
loadMetrics = payload.loadMetrics,
46+
processingQueueSize = payload.processingQueueSize,
47+
lastCommitCid = payload.lastCommitCid,
48+
lastCommitRev = payload.lastCommitRev,
49+
errorMessage = payload.errorMessage
50+
)
51+
52+
fleetService.processHeartbeat(hbRequest).map {
53+
case Right(updatedNode) => Ok(Json.toJson(updatedNode))
54+
case Left(error) => BadRequest(Json.obj("error" -> error))
55+
}
56+
}
57+
58+
case class SubmissionPayload(
59+
submissionType: String,
60+
proposedValue: String,
61+
biosampleId: Option[Int] = None,
62+
biosampleGuid: Option[UUID] = None,
63+
confidenceScore: Option[Double] = None,
64+
algorithmVersion: Option[String] = None,
65+
softwareVersion: Option[String] = None,
66+
payload: Option[play.api.libs.json.JsValue] = None,
67+
atUri: Option[String] = None,
68+
atCid: Option[String] = None
69+
)
70+
object SubmissionPayload { implicit val format: OFormat[SubmissionPayload] = Json.format }
71+
72+
def submitData(): Action[SubmissionPayload] = pdsAuth.jsonAction[SubmissionPayload].async { request =>
73+
val node = request.pdsNode
74+
val p = request.body
75+
76+
submissionService.recordSubmission(
77+
did = node.did,
78+
submissionType = p.submissionType,
79+
proposedValue = p.proposedValue,
80+
biosampleId = p.biosampleId,
81+
biosampleGuid = p.biosampleGuid,
82+
confidenceScore = p.confidenceScore,
83+
algorithmVersion = p.algorithmVersion,
84+
softwareVersion = p.softwareVersion,
85+
payload = p.payload,
86+
atUri = p.atUri,
87+
atCid = p.atCid
88+
).map {
89+
case Right(submission) => Created(Json.toJson(submission))
90+
case Left(error) => BadRequest(Json.obj("error" -> error))
91+
}
92+
}
93+
94+
// --- Admin-authenticated endpoints (X-API-Key secured) ---
95+
96+
def getFleetSummary: Action[AnyContent] = secureApi.async { _ =>
97+
fleetService.getFleetSummary.map(summary => Ok(Json.toJson(summary)))
98+
}
99+
100+
def listNodes(status: Option[String]): Action[AnyContent] = secureApi.async { _ =>
101+
fleetService.listNodes(status).map { nodes =>
102+
Ok(Json.obj("nodes" -> nodes, "total" -> nodes.size))
103+
}
104+
}
105+
106+
def getNode(did: String): Action[AnyContent] = secureApi.async { _ =>
107+
fleetService.getNode(did).map {
108+
case Some(node) => Ok(Json.toJson(node))
109+
case None => NotFound(Json.obj("error" -> s"Node not found: $did"))
110+
}
111+
}
112+
113+
def removeNode(did: String): Action[AnyContent] = secureApi.async { _ =>
114+
fleetService.removeNode(did).map {
115+
case Right(_) => Ok(Json.obj("removed" -> true))
116+
case Left(error) => NotFound(Json.obj("error" -> error))
117+
}
118+
}
119+
120+
def markStaleOffline(): Action[AnyContent] = secureApi.async { _ =>
121+
fleetService.markStaleNodesOffline().map { count =>
122+
Ok(Json.obj("markedOffline" -> count))
123+
}
124+
}
125+
126+
def getPendingSubmissions(submissionType: Option[String], limit: Int): Action[AnyContent] = secureApi.async { _ =>
127+
submissionService.getPendingSubmissions(submissionType, limit).map { submissions =>
128+
Ok(Json.obj("submissions" -> submissions, "total" -> submissions.size))
129+
}
130+
}
131+
132+
case class ReviewRequest(reviewedBy: String, notes: Option[String] = None)
133+
object ReviewRequest { implicit val format: OFormat[ReviewRequest] = Json.format }
134+
135+
def acceptSubmission(id: Int): Action[ReviewRequest] = secureApi.jsonAction[ReviewRequest].async { request =>
136+
submissionService.acceptSubmission(id, request.body.reviewedBy, request.body.notes).map {
137+
case Right(_) => Ok(Json.obj("accepted" -> true))
138+
case Left(error) => BadRequest(Json.obj("error" -> error))
139+
}
140+
}
141+
142+
def rejectSubmission(id: Int): Action[ReviewRequest] = secureApi.jsonAction[ReviewRequest].async { request =>
143+
submissionService.rejectSubmission(id, request.body.reviewedBy, request.body.notes).map {
144+
case Right(_) => Ok(Json.obj("rejected" -> true))
145+
case Left(error) => BadRequest(Json.obj("error" -> error))
146+
}
147+
}
148+
149+
def getNodeSubmissionSummary(did: String): Action[AnyContent] = secureApi.async { _ =>
150+
submissionService.getNodeSubmissionSummary(did).map {
151+
case Right(summary) => Ok(Json.toJson(summary))
152+
case Left(error) => NotFound(Json.obj("error" -> error))
153+
}
154+
}
155+
}

conf/application.conf

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,20 @@ contact {
8787
recipient.email = ${?CONTACT_RECIPIENT_EMAIL} # Can be overridden by environment variable
8888
}
8989

90+
# AT Protocol / PDS configuration
91+
atproto {
92+
client.timeout = 5000
93+
client.timeout = ${?ATPROTO_CLIENT_TIMEOUT}
94+
plc.directory = "https://plc.directory"
95+
plc.directory = ${?ATPROTO_PLC_DIRECTORY}
96+
}
97+
98+
# PDS Edge node authentication
99+
pds.auth {
100+
timestamp.window.seconds = 300
101+
timestamp.window.seconds = ${?PDS_AUTH_TIMESTAMP_WINDOW}
102+
}
103+
90104
pekko {
91105
loglevel = "DEBUG"
92106
stdout-loglevel = "DEBUG"

conf/routes

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,37 @@ GET /curator/genome-regions/:id/edit
310310
POST /curator/genome-regions/:id controllers.GenomeRegionsCuratorController.updateRegion(id: Int)
311311
DELETE /curator/genome-regions/:id controllers.GenomeRegionsCuratorController.deleteRegion(id: Int)
312312

313+
# =============================================
314+
# PDS Fleet Management API
315+
# =============================================
316+
# PDS-authenticated endpoints (signed by edge node private key)
317+
+nocsrf
318+
POST /api/v1/pds/heartbeat controllers.PdsFleetApiController.heartbeat()
319+
+nocsrf
320+
POST /api/v1/pds/submissions controllers.PdsFleetApiController.submitData()
321+
322+
# Admin fleet management (X-API-Key secured)
323+
GET /api/v1/pds/fleet/summary controllers.PdsFleetApiController.getFleetSummary
324+
GET /api/v1/pds/fleet/nodes controllers.PdsFleetApiController.listNodes(status: Option[String])
325+
GET /api/v1/pds/fleet/nodes/:did controllers.PdsFleetApiController.getNode(did: String)
326+
DELETE /api/v1/pds/fleet/nodes/:did controllers.PdsFleetApiController.removeNode(did: String)
327+
POST /api/v1/pds/fleet/mark-stale controllers.PdsFleetApiController.markStaleOffline()
328+
GET /api/v1/pds/submissions/pending controllers.PdsFleetApiController.getPendingSubmissions(type: Option[String], limit: Int ?= 100)
329+
POST /api/v1/pds/submissions/:id/accept controllers.PdsFleetApiController.acceptSubmission(id: Int)
330+
POST /api/v1/pds/submissions/:id/reject controllers.PdsFleetApiController.rejectSubmission(id: Int)
331+
GET /api/v1/pds/submissions/summary/:did controllers.PdsFleetApiController.getNodeSubmissionSummary(did: String)
332+
333+
# =============================================
334+
# Patronage API (X-API-Key secured)
335+
# =============================================
336+
POST /api/v1/patronage/subscriptions controllers.PatronageApiController.createSubscription()
337+
POST /api/v1/patronage/subscriptions/:id/cancel controllers.PatronageApiController.cancelSubscription(id: Int)
338+
GET /api/v1/patronage/subscriptions/user/:userId controllers.PatronageApiController.getUserSubscriptions(userId: java.util.UUID)
339+
GET /api/v1/patronage/subscriptions/active/:userId controllers.PatronageApiController.getSubscription(userId: java.util.UUID)
340+
GET /api/v1/patronage/is-patron/:userId controllers.PatronageApiController.isPatron(userId: java.util.UUID)
341+
GET /api/v1/patronage/summary controllers.PatronageApiController.getPatronSummary
342+
POST /api/v1/patronage/expire-overdue controllers.PatronageApiController.expireOverdue()
343+
313344
# --- API Routes (Handled by Tapir, including Swagger UI) ---
314345
POST /api/registerPDS controllers.PDSRegistrationController.registerPDS()
315346

0 commit comments

Comments
 (0)