Description
for msg in rx {
match msg? {
None => {
sock_rdr. read_line ( & mut poll) . await ?;
poll. clear ( ) ;
}
Some ( msg) => {
log:: trace!( "{msg}" ) ;
sock_wtr. write_all ( msg. as_bytes ( ) ) . await ?;
sock_wtr. write_all ( & [ b'\n' ] ) . await ?;
sock_wtr. flush ( ) . await ?;
}
}
}
loop {
let ( mut socket, addr) = match listener. accept ( ) . await {
Ok ( ( s, a) ) => ( s, a) ,
Err ( e) => {
log:: error!( "failed to accept connection: {:?}" , e) ;
continue ;
}
} ;
let ( sock_rdr, _) = socket. split ( ) ;
let mut sock_rdr = BufReader :: new ( sock_rdr) ;
log:: info!( "({}) new client" , & addr) ;
// "0" => PRODUCE
// "1" => CONSUME
let mut client_kind = String :: new ( ) ;
if let Err ( e) = sock_rdr. read_line ( & mut client_kind) . await {
log:: error!( "{e}" ) ;
continue ;
}
let client_kind = match client_kind. trim ( ) . parse :: < u8 > ( ) {
Ok ( c) => c,
Err ( e) => {
log:: error!( "{e}" ) ;
continue ;
}
} ;
let client_kind = match num:: FromPrimitive :: from_u8 ( client_kind) {
Some ( c) => c,
None => {
log:: error!( "failed to parse client kind {client_kind}" ) ;
continue ;
}
} ;
// \n
// topic_name
let mut topic = String :: new ( ) ;
if let Err ( e) = sock_rdr. read_line ( & mut topic) . await {
log:: error!( "{e}" ) ;
continue ;
}
let topic = topic. trim ( ) . to_string ( ) ;
// \n
let _ = match client_kind {
ClientKind :: Produce => self . handle_produce ( socket, addr, topic) . await ,
ClientKind :: Consume => self . handle_consume ( socket, addr, topic) . await ,
ClientKind :: Info => unreachable ! ( ) ,
} ;
}
literally unusable
Reactions are currently unavailable
You can’t perform that action at this time.
franz/src/server.rs
Lines 102 to 115 in 5fc5e9f
franz/src/server.rs
Lines 127 to 178 in 5fc5e9f
literally unusable