Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
429 changes: 271 additions & 158 deletions Cargo.lock

Large diffs are not rendered by default.

130 changes: 63 additions & 67 deletions drivers/drmem-drv-ntp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,81 +428,77 @@ impl driver::API for Instance {
}
}

fn run<'a>(
&'a mut self,
async fn run(
&mut self,
devices: Arc<Mutex<Self::HardwareType>>,
) -> impl Future<Output = Infallible> + Send + 'a {
async move {
// Record the peer's address in the "cfg" field of the
// span.
) -> Infallible {
// Record the peer's address in the "cfg" field of the span.

{
let addr = self
.sock
.peer_addr()
.map(|v| format!("{}", v))
.unwrap_or_else(|_| String::from("**unknown**"));
{
let addr = self
.sock
.peer_addr()
.map(|v| format!("{}", v))
.unwrap_or_else(|_| String::from("**unknown**"));

Span::current().record("cfg", addr.as_str());
}
Span::current().record("cfg", addr.as_str());
}

// Set `info` to an initial, unmatchable value. `None` would
// be preferrable here but, if DrMem had a problem at startup
// getting the NTP state, it wouldn't print the warning(s).

// Set `info` to an initial, unmatchable value. `None`
// would be preferrable here but, if DrMem had a problem
// at startup getting the NTP state, it wouldn't print the
// warning(s).

let mut info = Some(server::Info::bad_value());
let mut interval = time::interval(Duration::from_millis(20_000));

let mut devices = devices.lock().await;

loop {
interval.tick().await;

if let Some(id) = self.get_synced_host().await {
debug!("synced to host ID: {:#04x}", id);

let host_info = self.get_host_info(id).await;

match host_info {
Some(ref tmp) => {
if info != host_info {
debug!(
"host: {}, offset: {} ms, delay: {} ms",
tmp.get_host(),
tmp.get_offset(),
tmp.get_delay()
);
devices
.d_source
.report_update(tmp.get_host().clone())
.await;
devices
.d_offset
.report_update(tmp.get_offset())
.await;
devices
.d_delay
.report_update(tmp.get_delay())
.await;
devices.d_state.report_update(true).await;
info = host_info;
}
continue;
let mut info = Some(server::Info::bad_value());
let mut interval = time::interval(Duration::from_millis(20_000));

let mut devices = devices.lock().await;

loop {
interval.tick().await;

if let Some(id) = self.get_synced_host().await {
debug!("synced to host ID: {:#04x}", id);

let host_info = self.get_host_info(id).await;

match host_info {
Some(ref tmp) => {
if info != host_info {
debug!(
"host: {}, offset: {} ms, delay: {} ms",
tmp.get_host(),
tmp.get_offset(),
tmp.get_delay()
);
devices
.d_source
.report_update(tmp.get_host().clone())
.await;
devices
.d_offset
.report_update(tmp.get_offset())
.await;
devices
.d_delay
.report_update(tmp.get_delay())
.await;
devices.d_state.report_update(true).await;
info = host_info;
}
None => {
if info.is_some() {
warn!("no synced host information found");
info = None;
devices.d_state.report_update(false).await;
}
continue;
}
None => {
if info.is_some() {
warn!("no synced host information found");
info = None;
devices.d_state.report_update(false).await;
}
}
} else if info.is_some() {
warn!("we're not synced to any host");
info = None;
devices.d_state.report_update(false).await;
}
} else if info.is_some() {
warn!("we're not synced to any host");
info = None;
devices.d_state.report_update(false).await;
}
}
}
Expand Down
95 changes: 46 additions & 49 deletions drivers/drmem-drv-sump/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,67 +353,64 @@ impl driver::API for Instance {
}
}

fn run<'a>(
&'a mut self,
async fn run(
&mut self,
devices: Arc<Mutex<Self::HardwareType>>,
) -> impl Future<Output = Infallible> + Send + 'a {
async move {
// Record the peer's address in the "cfg" field of the
// span.
) -> Infallible {
// Record the peer's address in the "cfg" field of the span.

{
let addr = self
.rx
.peer_addr()
.map(|v| format!("{}", v))
.unwrap_or_else(|_| String::from("**unknown**"));
{
let addr = self
.rx
.peer_addr()
.map(|v| format!("{}", v))
.unwrap_or_else(|_| String::from("**unknown**"));

Span::current().record("cfg", addr.as_str());
}
Span::current().record("cfg", addr.as_str());
}

let mut devices = devices.lock().await;
let mut devices = devices.lock().await;

devices.d_service.report_update(true).await;
devices.d_service.report_update(true).await;

loop {
match self.get_reading().await {
Ok((stamp, true)) => {
if self.state.on_event(stamp) {
devices.d_state.report_update(true).await;
}
loop {
match self.get_reading().await {
Ok((stamp, true)) => {
if self.state.on_event(stamp) {
devices.d_state.report_update(true).await;
}
}

Ok((stamp, false)) => {
let gpm = self.gpm;

if let Some((cycle, duty, in_flow)) =
self.state.off_event(stamp, gpm)
{
debug!(
"cycle: {}, duty: {:.1}%, inflow: {:.2} gpm",
Instance::elapsed(cycle),
duty,
in_flow
);

devices.d_state.report_update(false).await;
devices.d_duty.report_update(duty).await;
devices.d_inflow.report_update(in_flow).await;
devices
.d_duration
.report_update(
((cycle as f64) / 600.0).round() / 100.0,
)
.await;
}
}
Ok((stamp, false)) => {
let gpm = self.gpm;

if let Some((cycle, duty, in_flow)) =
self.state.off_event(stamp, gpm)
{
debug!(
"cycle: {}, duty: {:.1}%, inflow: {:.2} gpm",
Instance::elapsed(cycle),
duty,
in_flow
);

Err(e) => {
devices.d_state.report_update(false).await;
devices.d_service.report_update(false).await;
panic!("couldn't read sump state -- {:?}", e);
devices.d_duty.report_update(duty).await;
devices.d_inflow.report_update(in_flow).await;
devices
.d_duration
.report_update(
((cycle as f64) / 600.0).round() / 100.0,
)
.await;
}
}

Err(e) => {
devices.d_state.report_update(false).await;
devices.d_service.report_update(false).await;
panic!("couldn't read sump state -- {:?}", e);
}
}
}
}
Expand Down
58 changes: 28 additions & 30 deletions drivers/drmem-drv-tplink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ impl Instance {
}
}

async fn main_loop<'a>(
async fn main_loop(
&mut self,
s: &mut TcpStream,
devices: &mut MutexGuard<'_, <Instance as driver::API>::HardwareType>,
Expand Down Expand Up @@ -570,45 +570,43 @@ impl driver::API for Instance {

// Main run loop for the driver.

fn run<'a>(
&'a mut self,
async fn run(
&mut self,
devices: Arc<Mutex<Self::HardwareType>>,
) -> impl Future<Output = Infallible> + Send + 'a {
async move {
// Lock the mutex for the life of the driver. There is no
// other task that wants access to these device handles.
// An Arc<Mutex<>> is the only way I know of sharing a
// mutable value with async tasks.
) -> Infallible {
// Lock the mutex for the life of the driver. There is no
// other task that wants access to these device handles. An
// Arc<Mutex<>> is the only way I know of sharing a mutable
// value with async tasks.

let mut devices = devices.lock().await;
let mut devices = devices.lock().await;

// Record the devices's address in the "cfg" field of the
// span.
// Record the devices's address in the "cfg" field of the
// span.

Span::current().record("cfg", self.addr.to_string());
Span::current().record("cfg", self.addr.to_string());

loop {
// First, connect to the device. We'll leave the TCP
// connection open so we're ready for the next
// transaction. Tests have shown that the HS220
// handles multiple client connections.
loop {
// First, connect to the device. We'll leave the TCP
// connection open so we're ready for the next
// transaction. Tests have shown that the HS220 handles
// multiple client connections.

match Instance::connect(&self.addr).await {
Ok(mut s) => {
self.main_loop(&mut s, &mut devices).await;
}
Err(e) => {
warn!("couldn't connect : '{}'", e);
}
match Instance::connect(&self.addr).await {
Ok(mut s) => {
self.main_loop(&mut s, &mut devices).await;
}
Err(e) => {
warn!("couldn't connect : '{}'", e);
}
}

self.sync_error_state(&mut devices.error, true).await;
self.sync_error_state(&mut devices.error, true).await;

// Log the error and then sleep for 10 seconds.
// Hopefully the device will be available then.
// Log the error and then sleep for 10 seconds. Hopefully
// the device will be available then.

tokio::time::sleep(tokio::time::Duration::from_secs(10)).await
}
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await
}
}
}
Expand Down
Loading