Skip to content

Commit 44696bf

Browse files
committed
**feat(metadata): Add PDS fleet management, heartbeat tracking, and submission provenance**
- Introduced schema for `pds_node`, `pds_heartbeat_log`, and `pds_fleet_config` tables to enable fleet management, status tracking, and configuration. - Added `pds_submission` table for tracking submission provenance, including haplogroup calls and variant proposals. - Implemented `PdsFleetService` for fleet operations like node registration, heartbeat processing, and fleet summaries. - Created models, repositories, and services for managing PDS metadata entities and interactions. - Developed unit tests to validate service logic, repository operations, and heartbeat handling.
1 parent 9663937 commit 44696bf

16 files changed

Lines changed: 1508 additions & 0 deletions

app/models/dal/MetadataSchema.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,18 @@ package models.dal
22

33
import models.PDSRegistration
44
import models.dal.MyPostgresProfile.api.*
5+
import models.dal.domain.pds.*
56
import slick.lifted.ProvenShape
67

78
import java.time.ZonedDateTime
89

910
object MetadataSchema {
1011

12+
val pdsNodes = TableQuery[PdsNodeTable]
13+
val pdsHeartbeatLogs = TableQuery[PdsHeartbeatLogTable]
14+
val pdsFleetConfigs = TableQuery[PdsFleetConfigTable]
15+
val pdsSubmissions = TableQuery[PdsSubmissionTable]
16+
1117
class PDSRegistrationsTable(tag: Tag) extends Table[PDSRegistration](tag, "pds_registrations") {
1218
def did = column[String]("did", O.PrimaryKey)
1319

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package models.dal.domain.pds
2+
3+
import models.dal.MyPostgresProfile.api.*
4+
import models.domain.pds.PdsFleetConfig
5+
6+
import java.time.LocalDateTime
7+
8+
class PdsFleetConfigTable(tag: Tag) extends Table[PdsFleetConfig](tag, "pds_fleet_config") {
9+
def id = column[Int]("id", O.PrimaryKey, O.AutoInc)
10+
def configKey = column[String]("config_key")
11+
def configValue = column[String]("config_value")
12+
def description = column[Option[String]]("description")
13+
def updatedBy = column[Option[String]]("updated_by")
14+
def updatedAt = column[LocalDateTime]("updated_at")
15+
16+
def * = (id.?, configKey, configValue, description, updatedBy, updatedAt).mapTo[PdsFleetConfig]
17+
18+
def uniqueKey = index("idx_pds_fleet_config_key_unique", configKey, unique = true)
19+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package models.dal.domain.pds
2+
3+
import models.dal.MyPostgresProfile.api.*
4+
import models.domain.pds.PdsHeartbeatLog
5+
import play.api.libs.json.JsValue
6+
7+
import java.time.LocalDateTime
8+
9+
class PdsHeartbeatLogTable(tag: Tag) extends Table[PdsHeartbeatLog](tag, "pds_heartbeat_log") {
10+
def id = column[Int]("id", O.PrimaryKey, O.AutoInc)
11+
def pdsNodeId = column[Int]("pds_node_id")
12+
def status = column[String]("status")
13+
def softwareVersion = column[Option[String]]("software_version")
14+
def loadMetrics = column[Option[JsValue]]("load_metrics")
15+
def processingQueueSize = column[Option[Int]]("processing_queue_size")
16+
def errorMessage = column[Option[String]]("error_message")
17+
def recordedAt = column[LocalDateTime]("recorded_at")
18+
19+
def * = (
20+
id.?, pdsNodeId, status, softwareVersion, loadMetrics, processingQueueSize, errorMessage, recordedAt
21+
).mapTo[PdsHeartbeatLog]
22+
23+
def nodeFk = foreignKey("pds_heartbeat_log_node_fk", pdsNodeId, TableQuery[PdsNodeTable])(_.id)
24+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package models.dal.domain.pds
2+
3+
import models.dal.MyPostgresProfile.api.*
4+
import models.domain.pds.PdsNode
5+
import play.api.libs.json.JsValue
6+
7+
import java.time.LocalDateTime
8+
9+
class PdsNodeTable(tag: Tag) extends Table[PdsNode](tag, "pds_node") {
10+
def id = column[Int]("id", O.PrimaryKey, O.AutoInc)
11+
def did = column[String]("did")
12+
def pdsUrl = column[String]("pds_url")
13+
def handle = column[Option[String]]("handle")
14+
def nodeName = column[Option[String]]("node_name")
15+
def softwareVersion = column[Option[String]]("software_version")
16+
def status = column[String]("status")
17+
def capabilities = column[JsValue]("capabilities")
18+
def lastHeartbeat = column[Option[LocalDateTime]]("last_heartbeat")
19+
def lastCommitCid = column[Option[String]]("last_commit_cid")
20+
def lastCommitRev = column[Option[String]]("last_commit_rev")
21+
def ipAddress = column[Option[String]]("ip_address")
22+
def osInfo = column[Option[String]]("os_info")
23+
def createdAt = column[LocalDateTime]("created_at")
24+
def updatedAt = column[LocalDateTime]("updated_at")
25+
26+
def * = (
27+
id.?, did, pdsUrl, handle, nodeName, softwareVersion, status, capabilities,
28+
lastHeartbeat, lastCommitCid, lastCommitRev, ipAddress, osInfo, createdAt, updatedAt
29+
).mapTo[PdsNode]
30+
31+
def uniqueDid = index("idx_pds_node_did_unique", did, unique = true)
32+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package models.dal.domain.pds
2+
3+
import models.dal.MyPostgresProfile.api.*
4+
import models.domain.pds.PdsSubmission
5+
import play.api.libs.json.JsValue
6+
7+
import java.time.LocalDateTime
8+
import java.util.UUID
9+
10+
class PdsSubmissionTable(tag: Tag) extends Table[PdsSubmission](tag, "pds_submission") {
11+
def id = column[Int]("id", O.PrimaryKey, O.AutoInc)
12+
def pdsNodeId = column[Int]("pds_node_id")
13+
def submissionType = column[String]("submission_type")
14+
def biosampleId = column[Option[Int]]("biosample_id")
15+
def biosampleGuid = column[Option[UUID]]("biosample_guid")
16+
def proposedValue = column[String]("proposed_value")
17+
def confidenceScore = column[Option[Double]]("confidence_score")
18+
def algorithmVersion = column[Option[String]]("algorithm_version")
19+
def softwareVersion = column[Option[String]]("software_version")
20+
def payload = column[Option[JsValue]]("payload")
21+
def status = column[String]("status")
22+
def reviewedBy = column[Option[String]]("reviewed_by")
23+
def reviewedAt = column[Option[LocalDateTime]]("reviewed_at")
24+
def reviewNotes = column[Option[String]]("review_notes")
25+
def atUri = column[Option[String]]("at_uri")
26+
def atCid = column[Option[String]]("at_cid")
27+
def createdAt = column[LocalDateTime]("created_at")
28+
29+
def * = (
30+
id.?, pdsNodeId, submissionType, biosampleId, biosampleGuid, proposedValue,
31+
confidenceScore, algorithmVersion, softwareVersion, payload, status,
32+
reviewedBy, reviewedAt, reviewNotes, atUri, atCid, createdAt
33+
).mapTo[PdsSubmission]
34+
35+
def nodeFk = foreignKey("pds_submission_node_fk", pdsNodeId, TableQuery[PdsNodeTable])(_.id)
36+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package models.domain.pds
2+
3+
import play.api.libs.json.{Json, JsValue, OFormat}
4+
5+
import java.time.LocalDateTime
6+
7+
case class PdsNode(
8+
id: Option[Int] = None,
9+
did: String,
10+
pdsUrl: String,
11+
handle: Option[String] = None,
12+
nodeName: Option[String] = None,
13+
softwareVersion: Option[String] = None,
14+
status: String = "UNKNOWN",
15+
capabilities: JsValue = Json.obj(),
16+
lastHeartbeat: Option[LocalDateTime] = None,
17+
lastCommitCid: Option[String] = None,
18+
lastCommitRev: Option[String] = None,
19+
ipAddress: Option[String] = None,
20+
osInfo: Option[String] = None,
21+
createdAt: LocalDateTime = LocalDateTime.now(),
22+
updatedAt: LocalDateTime = LocalDateTime.now()
23+
)
24+
25+
object PdsNode {
26+
implicit val format: OFormat[PdsNode] = Json.format[PdsNode]
27+
28+
val ValidStatuses: Set[String] = Set("ONLINE", "OFFLINE", "BUSY", "ERROR", "UNKNOWN")
29+
}
30+
31+
case class PdsHeartbeatLog(
32+
id: Option[Int] = None,
33+
pdsNodeId: Int,
34+
status: String,
35+
softwareVersion: Option[String] = None,
36+
loadMetrics: Option[JsValue] = None,
37+
processingQueueSize: Option[Int] = Some(0),
38+
errorMessage: Option[String] = None,
39+
recordedAt: LocalDateTime = LocalDateTime.now()
40+
)
41+
42+
object PdsHeartbeatLog {
43+
implicit val format: OFormat[PdsHeartbeatLog] = Json.format[PdsHeartbeatLog]
44+
}
45+
46+
case class PdsFleetConfig(
47+
id: Option[Int] = None,
48+
configKey: String,
49+
configValue: String,
50+
description: Option[String] = None,
51+
updatedBy: Option[String] = None,
52+
updatedAt: LocalDateTime = LocalDateTime.now()
53+
)
54+
55+
object PdsFleetConfig {
56+
implicit val format: OFormat[PdsFleetConfig] = Json.format[PdsFleetConfig]
57+
}
58+
59+
case class PdsFleetSummary(
60+
totalNodes: Int,
61+
onlineNodes: Int,
62+
offlineNodes: Int,
63+
busyNodes: Int,
64+
errorNodes: Int,
65+
unknownNodes: Int,
66+
targetVersion: Option[String],
67+
nodesOnTargetVersion: Int,
68+
nodesOutdated: Int
69+
)
70+
71+
object PdsFleetSummary {
72+
implicit val format: OFormat[PdsFleetSummary] = Json.format[PdsFleetSummary]
73+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package models.domain.pds
2+
3+
import play.api.libs.json.{JsValue, Json, OFormat}
4+
5+
import java.time.LocalDateTime
6+
import java.util.UUID
7+
8+
case class PdsSubmission(
9+
id: Option[Int] = None,
10+
pdsNodeId: Int,
11+
submissionType: String,
12+
biosampleId: Option[Int] = None,
13+
biosampleGuid: Option[UUID] = None,
14+
proposedValue: String,
15+
confidenceScore: Option[Double] = None,
16+
algorithmVersion: Option[String] = None,
17+
softwareVersion: Option[String] = None,
18+
payload: Option[JsValue] = None,
19+
status: String = "PENDING",
20+
reviewedBy: Option[String] = None,
21+
reviewedAt: Option[LocalDateTime] = None,
22+
reviewNotes: Option[String] = None,
23+
atUri: Option[String] = None,
24+
atCid: Option[String] = None,
25+
createdAt: LocalDateTime = LocalDateTime.now()
26+
)
27+
28+
object PdsSubmission {
29+
implicit val format: OFormat[PdsSubmission] = Json.format[PdsSubmission]
30+
31+
val ValidTypes: Set[String] = Set("HAPLOGROUP_CALL", "VARIANT_CALL", "BRANCH_PROPOSAL", "PRIVATE_VARIANT", "STR_PROFILE")
32+
val ValidStatuses: Set[String] = Set("PENDING", "ACCEPTED", "REJECTED", "SUPERSEDED")
33+
}
34+
35+
case class SubmissionSummary(
36+
pdsNodeId: Int,
37+
did: String,
38+
totalSubmissions: Int,
39+
pendingCount: Int,
40+
acceptedCount: Int,
41+
rejectedCount: Int,
42+
acceptanceRate: Double
43+
)
44+
45+
object SubmissionSummary {
46+
implicit val format: OFormat[SubmissionSummary] = Json.format[SubmissionSummary]
47+
}

app/modules/BaseModule.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,5 +178,22 @@ class BaseModule extends AbstractModule {
178178
bind(classOf[GroupProjectMemberRepository])
179179
.to(classOf[GroupProjectMemberRepositoryImpl])
180180
.asEagerSingleton()
181+
182+
// PDS Fleet Management
183+
bind(classOf[PdsNodeRepository])
184+
.to(classOf[PdsNodeRepositoryImpl])
185+
.asEagerSingleton()
186+
187+
bind(classOf[PdsHeartbeatLogRepository])
188+
.to(classOf[PdsHeartbeatLogRepositoryImpl])
189+
.asEagerSingleton()
190+
191+
bind(classOf[PdsFleetConfigRepository])
192+
.to(classOf[PdsFleetConfigRepositoryImpl])
193+
.asEagerSingleton()
194+
195+
bind(classOf[PdsSubmissionRepository])
196+
.to(classOf[PdsSubmissionRepositoryImpl])
197+
.asEagerSingleton()
181198
}
182199
}

0 commit comments

Comments
 (0)