Skip to content
This repository was archived by the owner on Aug 29, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion monolake-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use serde::{Deserialize, Serialize};
// Default iouring/epoll entries: 32k
const DEFAULT_ENTRIES: u32 = 32768;

pub const FALLBACK_PARALLELISM: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) };
pub const FALLBACK_PARALLELISM: NonZeroUsize = NonZeroUsize::new(1).unwrap();

/// Configuration structure for a service, combining listener and server configs.
///
Expand Down
2 changes: 1 addition & 1 deletion monolake-services/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
[package]
name = "monolake-services"
version = "0.3.2"
edition = "2024"
description = "MonoLake Services Implementation"

authors.workspace = true
categories.workspace = true
edition.workspace = true
keywords.workspace = true
license.workspace = true
repository.workspace = true
Expand Down
15 changes: 7 additions & 8 deletions monolake-services/src/common/cancel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ impl Canceller {
let handler = unsafe { &mut *self.handler.get() };
if !handler.cancelled {
handler.cancelled = true;
let waiters: LinkedList<Waker> =
std::mem::replace(&mut handler.waiters, LinkedList::new());
let waiters: LinkedList<Waker> = std::mem::take(&mut handler.waiters);
for waker in waiters.into_iter() {
waker.wake();
}
Expand Down Expand Up @@ -121,12 +120,12 @@ impl Future for Waiter {

impl Drop for Waiter {
fn drop(&mut self) {
if let Some(index) = unsafe { *self.index.get() } {
if let Some(handler) = self.handler.upgrade() {
let handler = unsafe { &mut *handler.get() };
if !handler.cancelled {
handler.waiters.remove(index);
}
if let Some(index) = unsafe { *self.index.get() }
&& let Some(handler) = self.handler.upgrade()
{
let handler = unsafe { &mut *handler.get() };
if !handler.cancelled {
handler.waiters.remove(index);
}
}
}
Expand Down
21 changes: 7 additions & 14 deletions monolake-services/src/common/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ use std::marker::PhantomData;
use certain_map::Handler;
use monolake_core::{context::PeerAddr, listener::AcceptedAddr};
use service_async::{
layer::{layer_fn, FactoryLayer},
AsyncMakeService, MakeService, ParamSet, Service,
layer::{FactoryLayer, layer_fn},
};

/// A service to insert Context into the request processing pipeline, compatible with `certain_map`.
Expand Down Expand Up @@ -81,10 +81,10 @@ where
// directly(here `Transformed` is not bound but `Response` and `Error` are).
for<'a> CXStore::Hdr<'a>: ParamSet<PeerAddr>,
for<'a> T: Service<
(R, <CXStore::Hdr<'a> as ParamSet<PeerAddr>>::Transformed),
Response = Resp,
Error = Err,
>,
(R, <CXStore::Hdr<'a> as ParamSet<PeerAddr>>::Transformed),
Response = Resp,
Error = Err,
>,
{
type Response = Resp;
type Error = Err;
Expand Down Expand Up @@ -113,10 +113,7 @@ impl<CXStore, F: MakeService> MakeService for ContextService<CXStore, F> {
fn make_via_ref(&self, old: Option<&Self::Service>) -> Result<Self::Service, Self::Error> {
Ok(ContextService {
ctx: PhantomData,
inner: self
.inner
.make_via_ref(old.map(|o| &o.inner))
.map_err(Into::into)?,
inner: self.inner.make_via_ref(old.map(|o| &o.inner))?,
})
}
}
Expand All @@ -131,11 +128,7 @@ impl<CXStore, F: AsyncMakeService> AsyncMakeService for ContextService<CXStore,
) -> Result<Self::Service, Self::Error> {
Ok(ContextService {
ctx: PhantomData,
inner: self
.inner
.make_via_ref(old.map(|o| &o.inner))
.await
.map_err(Into::into)?,
inner: self.inner.make_via_ref(old.map(|o| &o.inner)).await?,
})
}
}
13 changes: 3 additions & 10 deletions monolake-services/src/common/delay.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::time::Duration;

use service_async::{
layer::{layer_fn, FactoryLayer},
AsyncMakeService, MakeService, Param, Service,
layer::{FactoryLayer, layer_fn},
};

#[derive(Clone)]
Expand Down Expand Up @@ -46,10 +46,7 @@ impl<F: MakeService> MakeService for DelayService<F> {
fn make_via_ref(&self, old: Option<&Self::Service>) -> Result<Self::Service, Self::Error> {
Ok(DelayService {
delay: self.delay,
inner: self
.inner
.make_via_ref(old.map(|o| &o.inner))
.map_err(Into::into)?,
inner: self.inner.make_via_ref(old.map(|o| &o.inner))?,
})
}
}
Expand All @@ -64,11 +61,7 @@ impl<F: AsyncMakeService> AsyncMakeService for DelayService<F> {
) -> Result<Self::Service, Self::Error> {
Ok(DelayService {
delay: self.delay,
inner: self
.inner
.make_via_ref(old.map(|o| &o.inner))
.await
.map_err(Into::into)?,
inner: self.inner.make_via_ref(old.map(|o| &o.inner)).await?,
})
}
}
13 changes: 3 additions & 10 deletions monolake-services/src/common/erase.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use service_async::{
layer::{layer_fn, FactoryLayer},
AsyncMakeService, MakeService, Service,
layer::{FactoryLayer, layer_fn},
};

#[derive(Debug)]
Expand All @@ -15,10 +15,7 @@ impl<T: MakeService> MakeService for EraseResp<T> {
#[inline]
fn make_via_ref(&self, old: Option<&Self::Service>) -> Result<Self::Service, Self::Error> {
Ok(EraseResp {
svc: self
.svc
.make_via_ref(old.map(|o| &o.svc))
.map_err(Into::into)?,
svc: self.svc.make_via_ref(old.map(|o| &o.svc))?,
})
}
}
Expand All @@ -33,11 +30,7 @@ impl<T: AsyncMakeService> AsyncMakeService for EraseResp<T> {
old: Option<&Self::Service>,
) -> Result<Self::Service, Self::Error> {
Ok(EraseResp {
svc: self
.svc
.make_via_ref(old.map(|o| &o.svc))
.await
.map_err(Into::into)?,
svc: self.svc.make_via_ref(old.map(|o| &o.svc)).await?,
})
}
}
Expand Down
35 changes: 7 additions & 28 deletions monolake-services/src/common/map.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use service_async::{
layer::{layer_fn, FactoryLayer},
AsyncMakeService, MakeService, Service,
layer::{FactoryLayer, layer_fn},
};

pub struct Map<S, FN> {
Expand Down Expand Up @@ -90,11 +90,7 @@ impl<F: AsyncMakeService, FN: Clone> AsyncMakeService for Map<F, FN> {
old: Option<&Self::Service>,
) -> Result<Self::Service, Self::Error> {
Ok(Map {
inner: self
.inner
.make_via_ref(old.map(|o| &o.inner))
.await
.map_err(Into::into)?,
inner: self.inner.make_via_ref(old.map(|o| &o.inner)).await?,
rewrite_f: self.rewrite_f.clone(),
})
}
Expand All @@ -109,11 +105,7 @@ impl<F: AsyncMakeService, FN: Clone> AsyncMakeService for MapErr<F, FN> {
old: Option<&Self::Service>,
) -> Result<Self::Service, Self::Error> {
Ok(MapErr {
inner: self
.inner
.make_via_ref(old.map(|o| &o.inner))
.await
.map_err(Into::into)?,
inner: self.inner.make_via_ref(old.map(|o| &o.inner)).await?,
rewrite_f: self.rewrite_f.clone(),
})
}
Expand All @@ -128,11 +120,7 @@ impl<F: AsyncMakeService, FN: Clone> AsyncMakeService for FnSvc<F, FN> {
old: Option<&Self::Service>,
) -> Result<Self::Service, Self::Error> {
Ok(FnSvc {
inner: self
.inner
.make_via_ref(old.map(|o| &o.inner))
.await
.map_err(Into::into)?,
inner: self.inner.make_via_ref(old.map(|o| &o.inner)).await?,
rewrite_f: self.rewrite_f.clone(),
})
}
Expand All @@ -144,10 +132,7 @@ impl<F: MakeService, FN: Clone> MakeService for Map<F, FN> {

fn make_via_ref(&self, old: Option<&Self::Service>) -> Result<Self::Service, Self::Error> {
Ok(Map {
inner: self
.inner
.make_via_ref(old.map(|o| &o.inner))
.map_err(Into::into)?,
inner: self.inner.make_via_ref(old.map(|o| &o.inner))?,
rewrite_f: self.rewrite_f.clone(),
})
}
Expand All @@ -159,10 +144,7 @@ impl<F: MakeService, FN: Clone> MakeService for MapErr<F, FN> {

fn make_via_ref(&self, old: Option<&Self::Service>) -> Result<Self::Service, Self::Error> {
Ok(MapErr {
inner: self
.inner
.make_via_ref(old.map(|o| &o.inner))
.map_err(Into::into)?,
inner: self.inner.make_via_ref(old.map(|o| &o.inner))?,
rewrite_f: self.rewrite_f.clone(),
})
}
Expand All @@ -174,10 +156,7 @@ impl<F: MakeService, FN: Clone> MakeService for FnSvc<F, FN> {

fn make_via_ref(&self, old: Option<&Self::Service>) -> Result<Self::Service, Self::Error> {
Ok(FnSvc {
inner: self
.inner
.make_via_ref(old.map(|o| &o.inner))
.map_err(Into::into)?,
inner: self.inner.make_via_ref(old.map(|o| &o.inner))?,
rewrite_f: self.rewrite_f.clone(),
})
}
Expand Down
2 changes: 1 addition & 1 deletion monolake-services/src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub mod selector;
pub mod timeout;

// TODO: remove following re-exports
pub use cancel::{linked_list, Canceller, CancellerDropper, Waiter};
pub use cancel::{Canceller, CancellerDropper, Waiter, linked_list};
pub use context::ContextService;
pub use delay::{Delay, DelayService};
pub use detect::{Detect, DetectService, FixedLengthDetector, PrefixDetector};
Expand Down
13 changes: 3 additions & 10 deletions monolake-services/src/common/panic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ use std::{fmt::Debug, panic::AssertUnwindSafe};

use futures::FutureExt;
use service_async::{
layer::{layer_fn, FactoryLayer},
AsyncMakeService, MakeService, Service,
layer::{FactoryLayer, layer_fn},
};

pub struct CatchPanicService<S> {
Expand Down Expand Up @@ -110,10 +110,7 @@ impl<F: MakeService> MakeService for CatchPanicService<F> {

fn make_via_ref(&self, old: Option<&Self::Service>) -> Result<Self::Service, Self::Error> {
Ok(CatchPanicService {
inner: self
.inner
.make_via_ref(old.map(|o| &o.inner))
.map_err(Into::into)?,
inner: self.inner.make_via_ref(old.map(|o| &o.inner))?,
})
}
}
Expand All @@ -127,11 +124,7 @@ impl<F: AsyncMakeService> AsyncMakeService for CatchPanicService<F> {
old: Option<&Self::Service>,
) -> Result<Self::Service, Self::Error> {
Ok(CatchPanicService {
inner: self
.inner
.make_via_ref(old.map(|o| &o.inner))
.await
.map_err(Into::into)?,
inner: self.inner.make_via_ref(old.map(|o| &o.inner)).await?,
})
}
}
13 changes: 3 additions & 10 deletions monolake-services/src/common/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use std::time::Duration;

use monoio::time::timeout;
use service_async::{
layer::{layer_fn, FactoryLayer},
AsyncMakeService, MakeService, Param, Service,
layer::{FactoryLayer, layer_fn},
};

/// Service that adds timeout functionality to an inner service.
Expand Down Expand Up @@ -79,10 +79,7 @@ impl<F: MakeService> MakeService for TimeoutService<F> {
fn make_via_ref(&self, old: Option<&Self::Service>) -> Result<Self::Service, Self::Error> {
Ok(TimeoutService {
timeout: self.timeout,
inner: self
.inner
.make_via_ref(old.map(|o| &o.inner))
.map_err(Into::into)?,
inner: self.inner.make_via_ref(old.map(|o| &o.inner))?,
})
}
}
Expand All @@ -97,11 +94,7 @@ impl<F: AsyncMakeService> AsyncMakeService for TimeoutService<F> {
) -> Result<Self::Service, Self::Error> {
Ok(TimeoutService {
timeout: self.timeout,
inner: self
.inner
.make_via_ref(old.map(|o| &o.inner))
.await
.map_err(Into::into)?,
inner: self.inner.make_via_ref(old.map(|o| &o.inner)).await?,
})
}
}
28 changes: 14 additions & 14 deletions monolake-services/src/http/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ use std::{convert::Infallible, fmt::Debug, time::Duration};

use bytes::Bytes;
use certain_map::{Attach, Fork};
use futures::{stream::FuturesUnordered, StreamExt};
use futures::{StreamExt, stream::FuturesUnordered};
use http::StatusCode;
use monoio::io::{sink::SinkExt, stream::Stream, AsyncReadRent, AsyncWriteRent, Split, Splitable};
use monoio::io::{AsyncReadRent, AsyncWriteRent, Split, Splitable, sink::SinkExt, stream::Stream};
use monoio_http::{
common::{
body::{Body, HttpBody, StreamHint},
Expand All @@ -82,13 +82,13 @@ use monoio_http::{
h2::server::SendResponse,
};
use monolake_core::{
AnyError,
context::PeerAddr,
http::{HttpAccept, HttpHandler},
AnyError,
};
use service_async::{
layer::{layer_fn, FactoryLayer},
AsyncMakeService, MakeService, Param, ParamRef, Service,
layer::{FactoryLayer, layer_fn},
};
use tracing::{error, info, warn};

Expand Down Expand Up @@ -121,11 +121,11 @@ impl<H> HttpCoreService<H> {
CXStore: 'static,
for<'a> CXState: Attach<CXStore>,
for<'a> H: HttpHandler<
<CXState as Attach<CXStore>>::Hdr<'a>,
HttpBody,
Body = HttpBody,
Error = Err,
>,
<CXState as Attach<CXStore>>::Hdr<'a>,
HttpBody,
Body = HttpBody,
Error = Err,
>,
Err: Into<AnyError> + Debug,
S: Split + AsyncReadRent + AsyncWriteRent,
{
Expand Down Expand Up @@ -283,11 +283,11 @@ impl<H> HttpCoreService<H> {
CXStore: 'static,
for<'a> CXState: Attach<CXStore>,
for<'a> H: HttpHandler<
<CXState as Attach<CXStore>>::Hdr<'a>,
HttpBody,
Body = HttpBody,
Error = Err,
>,
<CXState as Attach<CXStore>>::Hdr<'a>,
HttpBody,
Body = HttpBody,
Error = Err,
>,
Err: Into<AnyError> + Debug,
S: Split + AsyncReadRent + AsyncWriteRent + Unpin + 'static,
{
Expand Down
Loading
Loading