Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ serde_json.workspace = true
sysinfo.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["full"] }
tokio-stream.workspace = true
tokio-stream = { workspace = true, features = ["sync"] }
tokio-util = { workspace = true, features = ["rt"] }
toml.workspace = true
tracing.workspace = true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
---
source: src/file_transfer/interface/status.rs
expression: data
---
AstarteObject {
inner: [
(
"id",
String(
"00000000-0000-0000-0000-000000000128",
),
),
(
"type",
String(
"device_to_server",
),
),
(
"bytes",
LongInteger(
1024,
),
),
(
"totalBytes",
LongInteger(
-1,
),
),
],
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
---
source: src/file_transfer/interface/status.rs
expression: data
---
AstarteObject {
inner: [
(
"id",
String(
"00000000-0000-0000-0000-000000000128",
),
),
(
"type",
String(
"device_to_server",
),
),
(
"bytes",
LongInteger(
1024,
),
),
(
"totalBytes",
LongInteger(
4096,
),
),
],
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
---
source: src/file_transfer/interface/status.rs
expression: data
---
AstarteObject {
inner: [
(
"id",
String(
"00000000-0000-0000-0000-000000000128",
),
),
(
"type",
String(
"server_to_device",
),
),
(
"code",
Integer(
0,
),
),
(
"message",
String(
"",
),
),
],
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
---
source: src/file_transfer/interface/status.rs
expression: data
---
AstarteObject {
inner: [
(
"id",
String(
"00000000-0000-0000-0000-000000000128",
),
),
(
"type",
String(
"device_to_server",
),
),
(
"code",
Integer(
16,
),
),
(
"message",
String(
"file transfer request can't be handled currently",
),
),
],
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
---
source: src/file_transfer/interface/status.rs
expression: data
---
AstarteObject {
inner: [
(
"id",
String(
"00000000-0000-0000-0000-000000000128",
),
),
(
"type",
String(
"server_to_device",
),
),
(
"code",
Integer(
22,
),
),
(
"message",
String(
"validation error encountered",
),
),
],
}
18 changes: 12 additions & 6 deletions src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ use astarte_device_sdk::client::RecvError;
use astarte_device_sdk::prelude::PropAccess;
use edgehog_store::db::{Handle, HandleError};
use eyre::{Context, Report};
use tokio::sync::Notify;
use tokio::sync::{Notify, watch};
use tokio::{sync::mpsc, task::JoinSet};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, instrument, trace};

use crate::commands::execute_command;
use crate::file_transfer::interface::request::FileTransferRequest;
use crate::file_transfer::{self, FileTransfer};
use crate::file_transfer::{self, FileTransfer, ProgressTracker};
use crate::jobs::Queue;
use crate::telemetry::Telemetry;
use crate::telemetry::event::TelemetryEvent;
Expand Down Expand Up @@ -125,6 +125,7 @@ impl<C> Runtime<C> {
tasks,
jobs,
opts.store_directory.join("file-store"),
cancel.child_token(),
)
.wrap_err("could't initialize file transfer")?;

Expand Down Expand Up @@ -174,19 +175,24 @@ impl<C> Runtime<C> {
tasks: &mut JoinSet<eyre::Result<()>>,
jobs: Queue,
store_dir: std::path::PathBuf,
cancel: CancellationToken,
) -> eyre::Result<mpsc::Sender<FileTransferRequest>>
where
C: Client + Send + Sync + 'static,
{
let (tx, rx) = mpsc::channel(EVENT_BUFFER);
let (transfer_tx, transfer_rx) = mpsc::channel(EVENT_BUFFER);
let notify = Arc::new(Notify::new());

let (progress_tx, progress_rx) = watch::channel(None);

tasks.spawn(ProgressTracker::create(device.clone()).run(progress_rx, cancel.clone()));
tasks.spawn(
FileTransfer::create(jobs.clone(), store_dir, device.clone())?.run(Arc::clone(&notify)),
FileTransfer::create(jobs.clone(), store_dir, device.clone(), progress_tx)?
.run(Arc::clone(&notify), cancel),
);
tasks.spawn(file_transfer::Receiver::new(jobs, notify, device).run(rx));
tasks.spawn(file_transfer::Receiver::new(jobs, notify, device).run(transfer_rx));

Ok(tx)
Ok(transfer_tx)
}

#[cfg(feature = "containers")]
Expand Down
68 changes: 0 additions & 68 deletions src/file_transfer/file_system/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ impl<F> FileStorage<F> {
pub(crate) struct WriteHandle {
id: Uuid,
current_size: u64,
// TODO limit the size of the file
#[pin]
file: tokio::fs::File,
}
Expand Down Expand Up @@ -322,73 +321,6 @@ impl AsyncSeek for WriteHandle {
}
}

#[derive(Debug)]
#[pin_project]
pub(crate) struct Limit<W> {
remaining: u64,
#[pin]
inner: W,
}

impl<W> Limit<W> {
pub(crate) fn new(limit: u64, inner: W) -> Self {
Self {
remaining: limit,
inner,
}
}
}

impl<W> AsyncWrite for Limit<W>
where
W: AsyncWrite,
{
#[instrument(skip_all, ret)]
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
let buf_len =
u64::try_from(buf.len()).map_err(|e| io::Error::new(io::ErrorKind::FileTooLarge, e))?;

if self.remaining < buf_len {
return std::task::Poll::Ready(Err(io::Error::new(
io::ErrorKind::FileTooLarge,
"write exceeds file limit",
)));
}

let this = self.project();

let written = std::task::ready!(this.inner.poll_write(cx, buf))?;

// NOTE it must be guaranteed that `written <= buf.len()`
debug_assert!(written <= buf.len());
*this.remaining -= written as u64;

std::task::Poll::Ready(Ok(written))
}

fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let this = self.project();

this.inner.poll_flush(cx)
}

fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let this = self.project();

this.inner.poll_shutdown(cx)
}
}

#[cfg_attr(test, mockall::automock)]
pub(crate) trait Space {
/// Reserves the space for the file on the device.
Expand Down
Loading
Loading