Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/openapi/openapi.json

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions api/src/application/ports/files_repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@ pub trait FilesRepository: Send + Sync {
doc_id: Uuid,
filename: &str,
) -> anyhow::Result<Option<(String, Option<String>)>>;

async fn list_storage_paths_for_document(&self, doc_id: Uuid) -> anyhow::Result<Vec<String>>;
async fn list_storage_paths_for_document_tx(
&self,
tx: &mut Transaction<'_, Postgres>,
doc_id: Uuid,
) -> anyhow::Result<Vec<String>>;

async fn list_files_for_document(&self, doc_id: Uuid) -> anyhow::Result<Vec<FileRecord>>;

async fn list_storage_paths_for_workspace(
&self,
workspace_id: Uuid,
Expand All @@ -51,3 +55,13 @@ pub trait FilesRepository: Send + Sync {

async fn delete_by_id(&self, file_id: Uuid) -> anyhow::Result<()>;
}

#[derive(Debug, Clone)]
pub struct FileRecord {
pub id: Uuid,
pub filename: String,
pub content_type: Option<String>,
pub size: i64,
pub storage_path: String,
pub content_hash: String,
}
272 changes: 270 additions & 2 deletions api/src/application/services/documents.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::fmt::Write;
use std::path::Path;
use std::sync::Arc;

use sha2::{Digest, Sha256};
use sqlx::{Pool, Postgres, Transaction};
use tracing::{error, warn};
use uuid::Uuid;
Expand Down Expand Up @@ -185,6 +188,100 @@ impl DocumentService {
Ok(doc)
}

pub async fn duplicate_document(
&self,
workspace_id: Uuid,
source_id: Uuid,
actor_id: Uuid,
permissions: &PermissionSet,
title: Option<String>,
parent_id: Option<Option<Uuid>>,
) -> Result<DomainDocument, ServiceError> {
let actor = Actor::User(actor_id);
access::require_view(
self.access_repo.as_ref(),
self.share_access.as_ref(),
&actor,
source_id,
)
.await
.map_err(|_| ServiceError::Forbidden)?;

let source = self
.document_repo
.get_by_id(source_id)
.await
.map_err(ServiceError::from)?
.ok_or(ServiceError::NotFound)?;
if source.workspace_id != workspace_id {
return Err(ServiceError::NotFound);
}
if source.doc_type == "folder" {
return Err(ServiceError::BadRequest("cannot_duplicate_folder"));
}

let target_parent = match parent_id {
Some(explicit) => explicit,
None => source.parent_id.or(source.archived_parent_id),
};

let source_content = self
.realtime
.get_content(&source_id.to_string())
.await
.map_err(ServiceError::from)?
.unwrap_or_default();

let attachments = self.snapshot_attachments(source.id).await?;
let new_title = duplicate_title(&source.title, title);
let new_doc = self
.create_for_user(
workspace_id,
actor_id,
permissions,
&new_title,
target_parent,
&source.doc_type,
source.created_by_plugin.as_deref(),
)
.await?;

let result = async {
let updated_doc = self
.update_content(&actor, new_doc.id, &source_content)
.await?;

self.copy_attachments(&updated_doc, &attachments, actor_id)
.await?;

Ok::<_, ServiceError>(updated_doc)
}
.await;

match result {
Ok(doc) => Ok(doc),
Err(err) => {
if let Err(clean_err) = self
.delete_for_user_internal(
workspace_id,
new_doc.id,
Some(actor_id),
permissions,
false,
)
.await
{
warn!(
document_id = %new_doc.id,
error = ?clean_err,
"duplicate_cleanup_failed"
);
}
Err(err)
}
}
}

pub async fn get_for_actor(
&self,
actor: &Actor,
Expand All @@ -207,6 +304,18 @@ impl DocumentService {
doc_id: Uuid,
actor_id: Option<Uuid>,
permissions: &PermissionSet,
) -> Result<bool, ServiceError> {
self.delete_for_user_internal(workspace_id, doc_id, actor_id, permissions, true)
.await
}

async fn delete_for_user_internal(
&self,
workspace_id: Uuid,
doc_id: Uuid,
actor_id: Option<Uuid>,
permissions: &PermissionSet,
enforce_permissions: bool,
) -> Result<bool, ServiceError> {
let mut tx = self.begin_transaction().await?;
let root_meta = self
Expand All @@ -215,15 +324,23 @@ impl DocumentService {
.await
.map_err(ServiceError::from)?
.ok_or(ServiceError::NotFound)?;
ensure_can_delete(permissions, &root_meta.doc_type)?;
if enforce_permissions {
ensure_can_delete(permissions, &root_meta.doc_type)?;
}
let delete_plan = self
.build_delete_plan(&mut tx, doc_id, workspace_id, root_meta.clone())
.await?;
if delete_plan.is_empty() {
tx.rollback().await.map_err(map_sqlx_error)?;
return Ok(false);
}
let permission_snapshot = permissions.to_vec();
let permission_snapshot = if enforce_permissions {
permissions.to_vec()
} else {
// Cleanup flows (e.g., duplicate rollback) bypass user permissions so storage delete
// jobs always have authority to remove docs and attachments.
PermissionSet::all().to_vec()
};
let uc = DeleteDocument {
repo: self.document_repo.as_ref(),
};
Expand Down Expand Up @@ -766,6 +883,112 @@ impl DocumentService {
.map_err(ServiceError::from)
}

async fn snapshot_attachments(
&self,
doc_id: Uuid,
) -> Result<Vec<AttachmentSnapshot>, ServiceError> {
let files = self
.files_repo
.list_files_for_document(doc_id)
.await
.map_err(ServiceError::from)?;
let mut snapshots = Vec::new();
for file in files {
let abs_path = self.storage.absolute_from_relative(&file.storage_path);
let exists = self
.storage
.exists(&abs_path)
.await
.map_err(ServiceError::from)?;
if !exists {
warn!(
document_id = %doc_id,
storage_path = %file.storage_path,
"duplicate_attachment_missing"
);
continue;
}
let bytes = self
.storage
.read_bytes(&abs_path)
.await
.map_err(ServiceError::from)?;
let content_hash = hash_bytes(&bytes);
snapshots.push(AttachmentSnapshot {
filename: file.filename,
content_type: file.content_type,
bytes,
content_hash,
});
}
Ok(snapshots)
}

async fn copy_attachments(
&self,
target_doc: &DomainDocument,
attachments: &[AttachmentSnapshot],
actor_id: Uuid,
) -> Result<(), ServiceError> {
if attachments.is_empty() {
return Ok(());
}
let base_dir = self
.storage
.build_doc_dir(target_doc.id)
.await
.map_err(ServiceError::from)?;
for attachment in attachments {
let filename = Path::new(&attachment.filename)
.file_name()
.and_then(|f| f.to_str())
.map(str::to_string)
.filter(|f| !f.is_empty())
.unwrap_or_else(|| attachment.filename.clone());
let target_path = base_dir.join("attachments").join(&filename);
self.storage
.write_bytes(&target_path, &attachment.bytes)
.await
.map_err(ServiceError::from)?;
let storage_path = self
.storage
.relative_from_uploads(&target_path)
.replace('\\', "/");
self.files_repo
.insert_file(
target_doc.id,
&filename,
attachment.content_type.as_deref(),
attachment.bytes.len() as i64,
&storage_path,
&attachment.content_hash,
)
.await
.map_err(ServiceError::from)?;
if let Some(repo_path) =
repo_relative_from_storage(target_doc.workspace_id, &storage_path)
{
let payload = json!({
"repo_path": repo_path,
"storage_path": storage_path,
"backend": "api",
"size": attachment.bytes.len() as i64,
"content_hash": attachment.content_hash,
"workspace_id": target_doc.workspace_id.to_string(),
"actor_id": actor_id.to_string(),
});
self.record_event(
target_doc.workspace_id,
target_doc.id,
"attachment.ingest_upsert",
Some(payload),
)
.await;
}
}
Ok(())
}

async fn ensure_active_parent(
&self,
workspace_id: Uuid,
Expand Down Expand Up @@ -1118,6 +1341,14 @@ pub enum DocumentPatchOperation {
},
}

#[derive(Debug, Clone)]
struct AttachmentSnapshot {
filename: String,
content_type: Option<String>,
bytes: Vec<u8>,
content_hash: String,
}

fn apply_patch_operations(
initial: &str,
operations: &[DocumentPatchOperation],
Expand Down Expand Up @@ -1162,6 +1393,29 @@ fn splice_chars(
Ok(())
}

fn duplicate_title(source_title: &str, override_title: Option<String>) -> String {
if let Some(custom) = override_title {
let trimmed = custom.trim();
if !trimmed.is_empty() {
return trimmed.to_string();
}
}
let base = source_title.trim();
let fallback = if base.is_empty() { "Untitled" } else { base };
format!("{fallback} (Copy)")
}

fn hash_bytes(bytes: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(bytes);
let digest = hasher.finalize();
let mut out = String::with_capacity(64);
for byte in digest {
let _ = write!(&mut out, "{:02x}", byte);
}
out
}

fn path_depth(path: &str) -> usize {
path.split('/')
.filter(|segment| !segment.is_empty())
Expand Down Expand Up @@ -1206,6 +1460,20 @@ fn snapshot_diff_side_from_use_case(side: SnapshotDiffSide) -> SnapshotDiffSideD
}
}

fn repo_relative_from_storage(workspace_id: Uuid, storage_path: &str) -> Option<String> {
let trimmed = storage_path.trim_start_matches('/');
let owner_prefix = workspace_id.to_string();
let remainder = trimmed
.strip_prefix(&owner_prefix)
.map(|rest| rest.trim_start_matches('/'))
.unwrap_or(trimmed);
if remainder.is_empty() {
None
} else {
Some(remainder.to_string())
}
}

fn workspace_repo_relative(workspace_id: Uuid, stored_path: Option<&str>) -> Option<String> {
let stored = stored_path?.trim_start_matches('/');
if stored.is_empty() {
Expand Down
4 changes: 2 additions & 2 deletions api/src/application/services/plugins/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl PluginExecutionService {
allowed_doc_id,
actor,
)
.await
.map_err(ServiceError::from)
.await
.map_err(ServiceError::from)
}
}
Loading