diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 31000a2..8e48689 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -16,6 +16,8 @@ jobs: steps: - uses: actions/checkout@v3 + - name: Init submodule + run: git submodule update --init --recursive - name: Build run: cargo build --verbose - name: Run tests diff --git a/.gitignore b/.gitignore index 4470988..0708668 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ target/ -Cargo.lock \ No newline at end of file +Cargo.lock +db.polo +db.polo.wal \ No newline at end of file diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..d471444 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "evenio"] + path = evenio + url = https://github.com/jidoc01/evenio.git diff --git a/Cargo.toml b/Cargo.toml index 213b006..32142fa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,26 @@ -[workspace] +[package] +name = "RustyDO" +version = "0.1.0" +edition = "2021" +authors = ["Jung Hyun Kim "] +readme = "README.md" +repository = "https://github.com/jidoc01/RustyDO" -members = [ - "server", - "crypt", -] \ No newline at end of file +[dependencies] +#evenio = { version = "0.3.0", features = ["rayon"] } +evenio = { path = "./evenio", features = ["rayon"] } +tokio = { version = "1.36.0", features = ["full"] } +polodb_core = "4.4.0" +serde = "1.0.196" +pwhash = "1" +anyhow = "1.0.79" +bytes = "1.5.0" +byteorder = "1.5.0" +encoding = "0.2.33" +lazy_static = "1.4.0" +ignore-result = "0.2.0" +ctrlc = "3.4.2" +rayon = "1.9.0" +tracing-subscriber = "0.3.18" +log = "0.4.21" +rand = "0.8.5" diff --git a/evenio b/evenio new file mode 160000 index 0000000..9e7eedd --- /dev/null +++ b/evenio @@ -0,0 +1 @@ +Subproject commit 9e7eedd184ad0440637b65bc10ebec85e6be95b4 diff --git a/old/Cargo.toml b/old/Cargo.toml new file mode 100644 index 0000000..213b006 --- /dev/null +++ b/old/Cargo.toml @@ -0,0 +1,6 @@ +[workspace] + +members = [ + "server", + "crypt", +] \ No newline at end of file diff --git a/old/config.toml b/old/config.toml new file mode 100644 index 0000000..0a7837f --- /dev/null +++ b/old/config.toml @@ -0,0 +1,34 @@ +[server] +# A password which is used when servers communicate with each other. +# If it is empty, then it is filled with a random string to prevent attack. +password = "" +use_auto_account= true +# TODO +max_users = 1000 +# TODO +tick_per_second = 20 + + +[user] +initial_level = 1 +initial_money = 999999999 + + +[message] +notice = ''' +[디지몬 온라인 v1.5 프리서버 CBT] + +안녕하세요, 테이머 여러분. +즐거운 CBT를 위해 몇 가지 안내드릴 사항이 있어, 아래와 같이 공지드립니다. + +· 게임머니는 충분히 많이 지급되오니 아이템을 적극 사용해 주세요. +· 일부 기능들은 미구현입니다: 게시판, 랭킹, 사용자 간 DM. + 해당 기능들은 인게임과 무관한 관계로, 차후 개발 예정입니다. +· 인게임에서의 일부 구현 (동글몬 및 아이템 드롭)은, 당시의 정보가 부족한 관계로 + 아직 완전히 재현되지 않았습니다. 충분한 정보가 모이는 대로 재현될 것입니다. +· 10분 간 아무런 행동을 취하지 않으면 접속 종료됩니다. + +감사합니다. + +지덕. +''' diff --git a/crypt/Cargo.toml b/old/crypt/Cargo.toml similarity index 100% rename from crypt/Cargo.toml rename to old/crypt/Cargo.toml diff --git a/crypt/src/lib.rs b/old/crypt/src/lib.rs similarity index 100% rename from crypt/src/lib.rs rename to old/crypt/src/lib.rs diff --git a/crypt/src/rc4.rs b/old/crypt/src/rc4.rs similarity index 100% rename from crypt/src/rc4.rs rename to old/crypt/src/rc4.rs diff --git a/server/Cargo.toml b/old/server/Cargo.toml similarity index 100% rename from server/Cargo.toml rename to old/server/Cargo.toml diff --git a/server/src/main.rs b/old/server/src/main.rs similarity index 100% rename from server/src/main.rs rename to old/server/src/main.rs diff --git a/server/src/prelude.rs b/old/server/src/prelude.rs similarity index 100% rename from server/src/prelude.rs rename to old/server/src/prelude.rs diff --git a/server/src/server/login/component.rs b/old/server/src/server/login/component.rs similarity index 100% rename from server/src/server/login/component.rs rename to old/server/src/server/login/component.rs diff --git a/server/src/server/login/conn.rs b/old/server/src/server/login/conn.rs similarity index 100% rename from server/src/server/login/conn.rs rename to old/server/src/server/login/conn.rs diff --git a/server/src/server/login/entity/game.rs b/old/server/src/server/login/entity/game.rs similarity index 100% rename from server/src/server/login/entity/game.rs rename to old/server/src/server/login/entity/game.rs diff --git a/server/src/server/login/entity/mod.rs b/old/server/src/server/login/entity/mod.rs similarity index 100% rename from server/src/server/login/entity/mod.rs rename to old/server/src/server/login/entity/mod.rs diff --git a/server/src/server/login/entity/room.rs b/old/server/src/server/login/entity/room.rs similarity index 100% rename from server/src/server/login/entity/room.rs rename to old/server/src/server/login/entity/room.rs diff --git a/server/src/server/login/entity/session.rs b/old/server/src/server/login/entity/session.rs similarity index 100% rename from server/src/server/login/entity/session.rs rename to old/server/src/server/login/entity/session.rs diff --git a/server/src/server/login/entity/timer.rs b/old/server/src/server/login/entity/timer.rs similarity index 100% rename from server/src/server/login/entity/timer.rs rename to old/server/src/server/login/entity/timer.rs diff --git a/server/src/server/login/handler/after_login.rs b/old/server/src/server/login/handler/after_login.rs similarity index 100% rename from server/src/server/login/handler/after_login.rs rename to old/server/src/server/login/handler/after_login.rs diff --git a/server/src/server/login/handler/before_login.rs b/old/server/src/server/login/handler/before_login.rs similarity index 100% rename from server/src/server/login/handler/before_login.rs rename to old/server/src/server/login/handler/before_login.rs diff --git a/server/src/server/login/handler/mod.rs b/old/server/src/server/login/handler/mod.rs similarity index 100% rename from server/src/server/login/handler/mod.rs rename to old/server/src/server/login/handler/mod.rs diff --git a/server/src/server/login/handler/on_game.rs b/old/server/src/server/login/handler/on_game.rs similarity index 100% rename from server/src/server/login/handler/on_game.rs rename to old/server/src/server/login/handler/on_game.rs diff --git a/server/src/server/login/handler/on_lobby.rs b/old/server/src/server/login/handler/on_lobby.rs similarity index 100% rename from server/src/server/login/handler/on_lobby.rs rename to old/server/src/server/login/handler/on_lobby.rs diff --git a/server/src/server/login/handler/on_room.rs b/old/server/src/server/login/handler/on_room.rs similarity index 100% rename from server/src/server/login/handler/on_room.rs rename to old/server/src/server/login/handler/on_room.rs diff --git a/server/src/server/login/handler/packet.rs b/old/server/src/server/login/handler/packet.rs similarity index 100% rename from server/src/server/login/handler/packet.rs rename to old/server/src/server/login/handler/packet.rs diff --git a/server/src/server/login/mod.rs b/old/server/src/server/login/mod.rs similarity index 100% rename from server/src/server/login/mod.rs rename to old/server/src/server/login/mod.rs diff --git a/server/src/server/login/timer.rs b/old/server/src/server/login/timer.rs similarity index 100% rename from server/src/server/login/timer.rs rename to old/server/src/server/login/timer.rs diff --git a/server/src/server/mod.rs b/old/server/src/server/mod.rs similarity index 100% rename from server/src/server/mod.rs rename to old/server/src/server/mod.rs diff --git a/server/src/server/status/mod.rs b/old/server/src/server/status/mod.rs similarity index 100% rename from server/src/server/status/mod.rs rename to old/server/src/server/status/mod.rs diff --git a/server/src/util/config.rs b/old/server/src/util/config.rs similarity index 100% rename from server/src/util/config.rs rename to old/server/src/util/config.rs diff --git a/server/src/util/db.rs b/old/server/src/util/db.rs similarity index 100% rename from server/src/util/db.rs rename to old/server/src/util/db.rs diff --git a/server/src/util/hash.rs b/old/server/src/util/hash.rs similarity index 100% rename from server/src/util/hash.rs rename to old/server/src/util/hash.rs diff --git a/server/src/util/mod.rs b/old/server/src/util/mod.rs similarity index 100% rename from server/src/util/mod.rs rename to old/server/src/util/mod.rs diff --git a/server/src/util/object.rs b/old/server/src/util/object.rs similarity index 100% rename from server/src/util/object.rs rename to old/server/src/util/object.rs diff --git a/server/src/util/preader.rs b/old/server/src/util/preader.rs similarity index 100% rename from server/src/util/preader.rs rename to old/server/src/util/preader.rs diff --git a/server/src/util/pwriter.rs b/old/server/src/util/pwriter.rs similarity index 100% rename from server/src/util/pwriter.rs rename to old/server/src/util/pwriter.rs diff --git a/src/constants.rs b/src/constants.rs new file mode 100644 index 0000000..e2e0575 --- /dev/null +++ b/src/constants.rs @@ -0,0 +1,7 @@ + + + +pub const HEADER_SIZE: usize = 9; +pub const TAIL_SIZE: usize = 3; + +pub const BODY_MAGIC_STAMP: u32 = 0x12345678; \ No newline at end of file diff --git a/src/encrypt/mod.rs b/src/encrypt/mod.rs new file mode 100644 index 0000000..c6e0178 --- /dev/null +++ b/src/encrypt/mod.rs @@ -0,0 +1,73 @@ +mod rc4; + +use crate::*; + +use crate::constants::HEADER_SIZE; + +const BODY_CRYPTO_SIZE: usize = 16; + +lazy_static::lazy_static! { + static ref BODY_CRYPTO_KEY: Vec = { + let mut key = b"\xf6\xef\x8b\xa1\x5c".to_vec(); + let mut salt = b"\x00".repeat(BODY_CRYPTO_SIZE - key.len()); + key.append(&mut salt); + key + }; +} + +#[inline] +pub fn encrypt_body>(mut body: T) { + rc4::transfer(&mut body, BODY_CRYPTO_KEY.as_slice()); +} + +#[inline] +pub fn encrypt_header>(mut header: T) { + let header = header.as_mut(); + + let len = header.len(); + if len != HEADER_SIZE { + panic!("Invalid header size: {}", len); + } + + let key = header[HEADER_SIZE - 1]; + assert!(key < 8, "Invalid key: {}", key); + + (0 .. len - 1) + .into_par_iter() + .map(|i| { + (header[i] >> key) | (header[(i + 8 - 1) % 8] << (8 - key)) + }) + .enumerate() + .collect::>() + .iter() + .for_each(|(i, v)| { + header[*i] = *v; + }); +} + +#[inline] +pub fn decrypt_header>(mut header: T) { + let header = header.as_mut(); + + let len = header.len(); + if len != HEADER_SIZE { + panic!("Invalid header size: {}", len); + } + + let key = header[HEADER_SIZE - 1]; + if key >= 8 { + // TODO: kick the client who is using an invalid key. + return; + } + + (0 .. len - 1) + .map(|i| { + (header[i] << key) | (header[(i + 1) % 8] >> (8 - key)) + }) + .enumerate() + .collect::>() + .iter() + .for_each(|(i, v)| { + header[*i] = *v; + }); +} diff --git a/src/encrypt/rc4.rs b/src/encrypt/rc4.rs new file mode 100644 index 0000000..90dd7ec --- /dev/null +++ b/src/encrypt/rc4.rs @@ -0,0 +1,34 @@ +#[inline] +pub fn transfer (mut data: T, key: U) where T: AsMut<[u8]>, U: AsRef<[u8]>{ + let mut state = gen_state(key.as_ref()); + transfer_aux(data.as_mut(), &mut state); +} + +#[inline] +fn gen_state(key: &[u8]) -> [u8; 256] { + let mut state = [0u8; 256]; + for i in 0..256 { + state[i] = i as u8; + } + + let mut j = 0usize; + for i in 0..256 { + j = (j + state[i] as usize + key[i % key.len()] as usize) % 256; + state.swap(i, j); + } + + state +} + +#[inline] +fn transfer_aux(data: &mut [u8], state: &mut [u8; 256]) { + let mut i = 0usize; + let mut j = 0usize; + for byte in data.iter_mut() { + i = (i + 1) % 256; + j = (j + state[i] as usize) % 256; + state.swap(i, j); + let k = state[(state[i] as usize + state[j] as usize) % 256]; + *byte ^= k; + } +} \ No newline at end of file diff --git a/src/login/bulletin/board.rs b/src/login/bulletin/board.rs new file mode 100644 index 0000000..bbcc49c --- /dev/null +++ b/src/login/bulletin/board.rs @@ -0,0 +1,55 @@ +use crate::login::*; + +/* +const BULLETIN_TEXT: &str = "Welcome to the server!"; +const TICKER_TEXT: &str = "Welcome to the server!"; + +#[derive(Resource)] +pub struct ResBulletinBoard { + bulletin_text: String, + ticker_text: String, + bulletin_pkt: Arc, + ticker_pkt: Arc, +} + +impl ResBulletinBoard { + pub fn bulletin_pkt(&self) -> Arc { + self.bulletin_pkt.clone() + } + + pub fn ticker_pkt(&self) -> Arc { + self.ticker_pkt.clone() + } + + pub fn update_bulletin_text(&mut self, text: String) { + self.bulletin_pkt = Arc::new(build_bulletin_info_packet(&text)); + self.bulletin_text = text; + } + + pub fn update_ticker_text(&mut self, text: String) { + self.ticker_pkt = Arc::new(build_ticker_message_packet(&text)); + self.ticker_text = text; + } +} + +impl Default for ResBulletinBoard { + fn default() -> Self { + Self { + bulletin_text: BULLETIN_TEXT.to_string(), + ticker_text: TICKER_TEXT.to_string(), + bulletin_pkt: Arc::new(build_bulletin_info_packet(BULLETIN_TEXT)), + ticker_pkt: Arc::new(build_ticker_message_packet(TICKER_TEXT)), + } + } +} + +fn build_bulletin_info_packet(text: &str) -> impl OutPacketBuildable { + //todo!(); + SetEncData {} +} + +fn build_ticker_message_packet(text: &str) -> impl OutPacketBuildable { + //todo!(); + SetEncData {} +} +*/ \ No newline at end of file diff --git a/src/login/bulletin/greet.rs b/src/login/bulletin/greet.rs new file mode 100644 index 0000000..582c2f7 --- /dev/null +++ b/src/login/bulletin/greet.rs @@ -0,0 +1,20 @@ +use crate::login::*; + +/* + +pub fn system_greet_client_on_bulletin( + q: Query<&ClientSessionJobSender, Added>, + board: Res +) { + if q.is_empty() { return } + let bulletin_info_pkt = board.bulletin_pkt(); + let ticker_pkt = board.ticker_pkt(); + q + .par_iter() + .for_each(move |sender| { + sender.send_shared_packet(bulletin_info_pkt.clone()); + sender.send_shared_packet(ticker_pkt.clone()); + }); +} + +*/ \ No newline at end of file diff --git a/src/login/bulletin/mod.rs b/src/login/bulletin/mod.rs new file mode 100644 index 0000000..ab5e386 --- /dev/null +++ b/src/login/bulletin/mod.rs @@ -0,0 +1,8 @@ +mod greet; +mod board; + +use crate::login::*; + + +pub fn init(world_helper: &mut WorldHelper) { +} diff --git a/src/login/component.rs b/src/login/component.rs new file mode 100644 index 0000000..551e8e1 --- /dev/null +++ b/src/login/component.rs @@ -0,0 +1,56 @@ +use crate::{login::*, storage::account::Account}; + +use super::{ClientJob, ClientSessionJob}; + +#[derive(Component)] +pub struct ClientJobReceiver(pub UnboundedReceiver); + +#[derive(Component)] +pub struct ClientSessionJobSender(pub UnboundedSender); + +#[derive(Component)] +pub struct ClientAddr(pub SocketAddr); + +#[derive(Component, Clone, PartialEq, Hash, PartialOrd, Eq, Ord)] +pub struct ClientUid(pub u16); + +#[derive(Component, PartialEq, Hash, PartialOrd, Eq, Ord)] +pub struct ClientId(pub String); + +#[derive(Component)] +pub struct ClientName(pub String); + +#[derive(Component)] +pub struct ClientLevel(pub u32); + +#[derive(Component)] +pub struct ClientAccount(pub Account); + +#[derive(Component)] +pub struct ClientDisconnecting; + +#[derive(Component)] +pub struct ClientOnBulletinBoard; + +#[derive(Component)] +pub struct ClientOnLobby; + +#[derive(Component)] +pub struct ClientOnRoom; + +#[derive(Component)] +pub struct ClientOnGame; + +impl ClientSessionJobSender { + pub fn send(&self, msg: ClientSessionJob) { + self.0.send(msg).ignore(); + } + + pub fn send_packet(&self, pkt: T) { + self.send(ClientSessionJob::SendPacket(Arc::new(pkt))); + } + + pub fn send_shared_packet(&self, pkt: Arc) { + self.send(ClientSessionJob::SendPacket(pkt)); + } +} diff --git a/src/login/event/disconnect.rs b/src/login/event/disconnect.rs new file mode 100644 index 0000000..b640cbe --- /dev/null +++ b/src/login/event/disconnect.rs @@ -0,0 +1,47 @@ +use std::borrow::Borrow; + +use crate::{login::*, storage::{account::Account, Storage, }}; + +use super::DisconnectEvent; + +pub fn handle_when_disconnecting ( + receiver: Receiver, &mut ClientJobReceiver>, + mut sender: Sender, +) { + debug!("handle_when_disconnecting"); + let e = receiver.event.entity; + sender.send(DisconnectEvent { entity: e }); + receiver.query.0.close(); +} + +pub fn handle_disconnect_event_before_login ( + receiver: Receiver>, + mut despawner: Sender, +) { + debug!("handle_disconnect_event_before_login"); + let e = receiver.event.entity; + despawner.despawn(e); + debug!("despawned entity {:?}", e); +} + +pub fn handle_disconnect_event_in_bulletin ( + receiver: Receiver, + mut despawner: Sender, + Single(storage): Single<&mut Storage>, +) { + let e = receiver.event.entity; + let account = &receiver.query.0.0; + let filter = doc! { "id": account.id.clone() }; + storage.update_one_with_replacement::(filter, account.clone()); + despawner.despawn(e); +} + +pub fn handle_disconnect_event_in_lobby( + receiver: Receiver, + mut sender: Sender<(Remove, Insert, DisconnectEvent)>, +) { + let e = receiver.event.entity; + sender.remove::(e); + sender.insert(e, ClientOnBulletinBoard); + sender.send(DisconnectEvent { entity: e }); // re-send the disconnect event +} \ No newline at end of file diff --git a/src/login/event/enter_lobby.rs b/src/login/event/enter_lobby.rs new file mode 100644 index 0000000..1e0f406 --- /dev/null +++ b/src/login/event/enter_lobby.rs @@ -0,0 +1,15 @@ +use crate::login::*; + +use super::EnterLobbyEvent; + +/// From the bulletin board, we let the client to enter the lobby. +pub fn handle_enter_lobby_event_from_bulletin( + receiver: Receiver, + mut remover: Sender>, + mut adder: Sender>, +) { + let e = receiver.event.entity; + remover.remove::(e); + adder.insert(e, ClientOnLobby); +} + diff --git a/src/login/event/login.rs b/src/login/event/login.rs new file mode 100644 index 0000000..25bc31d --- /dev/null +++ b/src/login/event/login.rs @@ -0,0 +1,54 @@ +use std::collections::HashSet; + +use crate::{login::*, packet::outgoing::{LoginMessage, LoginMessageKind}, storage::{account::Account, Storage}}; + +use evenio::{component::RemoveComponent, rayon::iter::{IntoParallelIterator, IntoParallelRefIterator}}; +use pwhash::bcrypt; + +use super::LoginEvent; + +pub fn handle_login_event( + r: Receiver)>, + online_ids: Fetcher<&ClientId>, + Single(storage): Single<&Storage>, + mut after_login_adder: Sender<(Insert, Insert, Insert)>, +) { + debug!("handle_login_event: {}", r.event.id); + let mut online_id_set = get_online_id_set(&online_ids); + let LoginEvent { entity, id, pw } = r.event; + let sender = r.query.0; + let Some(account) = storage.find_one::(doc!{ "id": id }) else { + let pkt = LoginMessage(LoginMessageKind::InvalidId); + sender.send_packet(pkt); + return; + }; + if account.pw != encrypt_password(pw) { + let pkt = LoginMessage(LoginMessageKind::InvalidAccountInfo); + sender.send_packet(pkt); + return; + } + if online_id_set.contains(id) { + let pkt = LoginMessage(LoginMessageKind::Online); + sender.send_packet(pkt); + return; + } + online_id_set.insert((*id).clone()); + after_login_adder.insert(*entity, ClientId(id.clone())); + after_login_adder.insert(*entity, ClientAccount(account.clone())); + after_login_adder.insert(*entity, ClientOnBulletinBoard); +} + +fn get_online_id_set(online_ids: &Fetcher<&ClientId>) -> HashSet { + let mut set = HashSet::new(); + online_ids + .iter() + .for_each(|x| { set.insert(x.0.clone()); }); + set +} + +fn encrypt_password(s: &str) -> String { + match bcrypt::hash(s) { + Ok(s) => s, + Err(_) => "".into(), + } +} \ No newline at end of file diff --git a/src/login/event/mod.rs b/src/login/event/mod.rs new file mode 100644 index 0000000..85e2673 --- /dev/null +++ b/src/login/event/mod.rs @@ -0,0 +1,64 @@ +mod login; +mod disconnect; +mod enter_lobby; + +use crate::*; + +use self::{disconnect::{handle_disconnect_event_before_login, handle_disconnect_event_in_bulletin, handle_disconnect_event_in_lobby, handle_when_disconnecting}, login::handle_login_event, packet::incoming::InPacket}; + +use super::packet::PacketEvent; + +#[derive(Event)] +pub struct LoginEvent { + #[event(target)] + pub entity: EntityId, + pub id: String, + pub pw: String +} + +#[derive(Event)] +struct DisconnectEvent { + #[event(target)] + pub entity: EntityId, +} + +#[derive(Event)] +pub struct EnterLobbyEvent { + #[event(target)] + pub entity: EntityId, +} + +pub fn init(world_helper: &mut WorldHelper) { + world_helper + .add_event::() + .add_event::() + .add_event::(); + world_helper + .add_system(handle_login_event) + .add_system(handle_when_disconnecting) + .add_system(handle_disconnect_event_before_login) + .add_system(handle_disconnect_event_in_bulletin) + .add_system(handle_disconnect_event_in_lobby) + .add_system(handle_packet_event) + ; +} + +fn handle_packet_event ( + // a targeted event must be along at least one query. + receiver: Receiver, + mut sender: Sender, +) { + let e = receiver.event.entity; + match &receiver.event.pkt { + InPacket::LoginRequest { id, pw } => { + sender.send(LoginEvent { + entity: e, + id: id.into(), + pw: pw.into(), + }); + }, + _ => { + // ? + } + } +} \ No newline at end of file diff --git a/src/login/game/mod.rs b/src/login/game/mod.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/login/greet/mod.rs b/src/login/greet/mod.rs new file mode 100644 index 0000000..866ae6d --- /dev/null +++ b/src/login/greet/mod.rs @@ -0,0 +1,7 @@ +mod new_client; +mod on_login; + +use crate::*; + +pub fn init(world_helper: &mut WorldHelper) { +} \ No newline at end of file diff --git a/src/login/greet/new_client.rs b/src/login/greet/new_client.rs new file mode 100644 index 0000000..9356bc6 --- /dev/null +++ b/src/login/greet/new_client.rs @@ -0,0 +1,48 @@ +use crate::login::*; + +pub fn greet_new_client( + receiver: Receiver, &ClientSessionJobSender>, + mut sender: Sender>, + uid_fetcher: Fetcher<&ClientUid>, +) { + todo!(); + /* + let used_uids = { + let mut s = HashSet::new(); + uid_fetcher + .iter() + .for_each(|uid| { + s.insert((*uid).clone()); + }); + s + }; + receiver + .iter() + .for_each(|sender| { + // try to allocate a client uid (u16). + let maybe_uid: Option = (1..=u16::MAX) + .find_map(|i| { + if used_uids.contains_key(&ClientUid(i)) == false { + Some(ClientUid(i)) + } + else { + None + } + }); + let Some(uid) = maybe_uid else { + // cannot allocate a new uid. + // disconnect this client. + sender.send(ClientSessionJob::Disconnect); + event_writer.send(DisconnectEvent { entity }); + return; + }; + commands + .entity(entity) + .insert(uid); + // TODO: send an enc data using enc_data + let pkt = SetEncData { + }; + sender.send_packet(pkt); + }); + */ +} \ No newline at end of file diff --git a/src/login/greet/on_login.rs b/src/login/greet/on_login.rs new file mode 100644 index 0000000..328e7db --- /dev/null +++ b/src/login/greet/on_login.rs @@ -0,0 +1,20 @@ + +use crate::*; +use crate::login::component::*; + +use self::{packet::outgoing::{AccountInfo, OutPacketBuildable }, storage::account::Account}; + +pub fn system_greet_client_on_login( + q: Receiver, (&ClientAccount, &ClientSessionJobSender)> +) { + let (ClientAccount(account), sender) = q.query; + let pkt = build_account_info_packet(&account); + sender.send_packet(pkt); +} + +fn build_account_info_packet(account: &Account) -> impl OutPacketBuildable { + // FIXME + let id = &account.id; + let name = &account.name; + AccountInfo +} \ No newline at end of file diff --git a/src/login/lobby/cache.rs b/src/login/lobby/cache.rs new file mode 100644 index 0000000..ab80dcf --- /dev/null +++ b/src/login/lobby/cache.rs @@ -0,0 +1,22 @@ +use crate::login::*; + +/// This includes users who are in the lobby or room. +#[derive(Default)] +pub struct LobbyCache { + users: HashSet, + user_list_packet: Vec, +} + +impl LobbyCache { + pub fn add_user(&mut self, user: EntityId) { + self.users.insert(user); + } + + pub fn remove_user(&mut self, user: EntityId) { + self.users.remove(&user); + } + + pub fn get_users(&self) -> Vec<&EntityId> { + self.users.iter().collect() + } +} diff --git a/src/login/lobby/greet.rs b/src/login/lobby/greet.rs new file mode 100644 index 0000000..73e888a --- /dev/null +++ b/src/login/lobby/greet.rs @@ -0,0 +1,33 @@ + +use evenio::handler::Local; + +use crate::login::*; + +use super::cache::LobbyCache; + +/// Note that this is not handling events (see the definition of receiver). +pub fn handle_enter_lobby( + receiver: Receiver>, + mut cache: Local, +) { + let e = receiver.event.entity; + // 1. add the newcomer to the user list of the lobby. + { + cache.add_user(e); + } + + // 2. send necessary packets to the newcomer. + { + todo!(); + } +} + +pub fn handle_leave_lobby( + receiver: Receiver>, + mut cache: Local, +) { + let e = receiver.event.entity; + { + cache.remove_user(e); + } +} \ No newline at end of file diff --git a/src/login/lobby/mod.rs b/src/login/lobby/mod.rs new file mode 100644 index 0000000..76b7efa --- /dev/null +++ b/src/login/lobby/mod.rs @@ -0,0 +1,12 @@ +mod cache; +mod greet; + +use crate::login::*; + +use self::greet::{handle_enter_lobby, handle_leave_lobby}; + +pub fn init(world_helper: &mut WorldHelper) { + world_helper + .add_system(handle_enter_lobby) + .add_system(handle_leave_lobby); +} \ No newline at end of file diff --git a/src/login/mod.rs b/src/login/mod.rs new file mode 100644 index 0000000..8d945b9 --- /dev/null +++ b/src/login/mod.rs @@ -0,0 +1,34 @@ +mod component; +mod bulletin; +mod lobby; +mod room; +mod game; +mod net; +mod event; +mod greet; +mod packet; + +pub use component::*; +pub use crate::prelude::*; + +use crate::{packet::{incoming::InPacket, outgoing::{OutPacketBuildable, }}, world::WorldHelper}; + +/// A message to be sent to a client. +#[derive(Debug)] +pub enum ClientJob { + OnReceive(InPacket), + OnDisconnected +} + +/// A message to be sent to a client session. +pub enum ClientSessionJob { + SendPacket(Arc), + Disconnect, +} + +pub fn init(world_helper: &mut WorldHelper) { + net::init(world_helper); + event::init(world_helper); + greet::init(world_helper); + bulletin::init(world_helper); +} diff --git a/src/login/net/client_job.rs b/src/login/net/client_job.rs new file mode 100644 index 0000000..688f018 --- /dev/null +++ b/src/login/net/client_job.rs @@ -0,0 +1,54 @@ +use evenio::rayon::iter::{IntoParallelIterator, IntoParallelRefMutIterator}; + +use crate::*; + +use self::{packet::incoming::InPacket, world::TaskExecutable}; + +use super::{component::*, packet::PacketEvent, ClientJob}; + +/// The maximum number of client jobs to be processed per tick. +/// To assure consistency, we limit it to 1 so that we can expect that +/// a client handles only one job per tick. +/// +/// TODO: we could scale this number up for cases where we need to handle a large +/// number of packets in a short time (e.g. in-game movement, etc). +const MAX_CLIENT_JOB_PER_TICK: usize = 4; + +/// This only generates the `ClientJobTick` event. +pub struct ClientJobTickerTask; + +#[derive(Event)] +pub struct ClientJobTick; + +impl TaskExecutable for ClientJobTickerTask { + const DURATION: Duration = fps_to_duration(30); + fn init(&mut self, _: &mut World) {} + fn execute(&mut self, world: &mut World) { + world.send(ClientJobTick); + } +} + +/// Handle messages from ClientSessions. +/// We ignore the clients who are already being disconnected. +pub fn handle_client_job( + _: Receiver, + mut fetcher: Fetcher<(EntityId, &mut ClientJobReceiver, Not<&ClientDisconnecting>)>, + mut sender: Sender<(PacketEvent, Insert)>, +) { + fetcher + .iter_mut() + .for_each(|(e, rx, _)| { + let mut count = 0; + while let Ok(msg) = rx.0.try_recv() { // TODO: use a global queue. + debug!("Received a client job {:?} for entity {:?}", msg, e); + match msg { + ClientJob::OnReceive(pkt) => sender.send(PacketEvent { entity: e, pkt }), + ClientJob::OnDisconnected => sender.insert(e, ClientDisconnecting) + } + count += 1; + if count >= MAX_CLIENT_JOB_PER_TICK { + break; + } + } + }); +} diff --git a/src/login/net/client_session.rs b/src/login/net/client_session.rs new file mode 100644 index 0000000..a64dee5 --- /dev/null +++ b/src/login/net/client_session.rs @@ -0,0 +1,69 @@ +use crate::{login::*, packet::{build_packet, packet_receiver::PacketReceiver}}; + +pub async fn run_client_session_async( + mut stream: TcpStream, + client_tx: UnboundedSender, + mut client_session_rx: UnboundedReceiver +) { + let mut recv_buf = [0u8; 1024]; + let mut packet_receiver = PacketReceiver::default(); + + loop { + select! { + res = stream.read(&mut recv_buf) => { + match res { + Ok(n) if n > 0 => on_receive(&recv_buf[0..n], &mut packet_receiver, &client_tx), + _ => { on_disconn(&client_tx); break; } + } + }, + msg = client_session_rx.recv() => { + let Some(msg) = msg else { + // TODO + continue; + }; + if !on_msg_async(&mut stream, msg).await { + break; + } + } + } + } + stream.shutdown().await.ignore(); + client_session_rx.close(); +} + +fn on_disconn(client_tx: &UnboundedSender) { + debug!("Client session disconnected"); + client_tx.send(ClientJob::OnDisconnected).ignore(); +} + +fn on_receive(received: &[u8], packet_receiver: &mut PacketReceiver, sender: &UnboundedSender) { + debug!("Client session received {} bytes", received.len()); + packet_receiver.push(received); + + // TODO: no need to allocate a new buffer for the body. + // Instead we can slice the packet buffer w/ proper lifetime. + let Ok(Some(body)) = packet_receiver.try_fetch_body() else { + // TODO: block the IP + debug!("we should block this IP"); + return; + }; + debug!("body: {:?}", body); + let pkt = InPacket::parse(body); + sender.send(ClientJob::OnReceive(pkt)).ignore(); +} + +async fn on_msg_async(stream: &mut TcpStream, msg: ClientSessionJob) -> bool { + match msg { + ClientSessionJob::SendPacket(pkt) => { + let Ok(data) = build_packet(pkt.as_ref()) else { + // TODO: why? + return true; + }; + stream.write(&data).await.ignore(); + true + }, + ClientSessionJob::Disconnect => { + false + } + } +} \ No newline at end of file diff --git a/src/login/net/mod.rs b/src/login/net/mod.rs new file mode 100644 index 0000000..c9f7e7f --- /dev/null +++ b/src/login/net/mod.rs @@ -0,0 +1,73 @@ +mod client_session; +mod client_job; + +use std::time::Duration; + +use tokio::{io::{AsyncReadExt, AsyncWriteExt}, net::{TcpListener, TcpStream}, runtime::Handle, select, sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, }; +use self::{client_job::{handle_client_job, ClientJobTickerTask}, client_session::run_client_session_async}; + +use crate::{login::*, world::TaskExecutable}; + +const SERVER_PORT: u16 = 9874; +const MAX_ACCEPT_PER_TICK: usize = 2; +const FPS: u32 = 10; // 10 acceptances per second + +struct AcceptInfo { + stream: TcpStream, + addr: std::net::SocketAddr, +} + +pub fn init(world_helper: &mut WorldHelper) { + let rx = block_on(listen_and_get_rx()); + world_helper + .add_task(AcceptHandlerTask::new(rx)) + .add_task(ClientJobTickerTask); + world_helper + .add_system(handle_client_job); +} + +async fn listen_and_get_rx() -> UnboundedReceiver { + let (tx, rx) = unbounded_channel(); + let listener = TcpListener::bind(format!("0.0.0.0:{}", SERVER_PORT)).await.unwrap(); + info!("Login server listening on port {}", SERVER_PORT); + tokio::spawn(async move { + loop { + let (stream, addr) = listener.accept().await.unwrap(); + tx.send(AcceptInfo { stream, addr }).unwrap(); + debug!("Accepted a connection from {}", addr); + } + }); + return rx; +} + +struct AcceptHandlerTask { + rx: UnboundedReceiver, +} + +impl AcceptHandlerTask { + fn new(rx: UnboundedReceiver) -> Self { + Self { rx } + } +} + +impl TaskExecutable for AcceptHandlerTask { + const DURATION: Duration = fps_to_duration(1); + fn init(&mut self, _: &mut World) {} + fn execute(&mut self, world: &mut World) { + let mut count = 0; + while let Ok(AcceptInfo { stream, addr }) = self.rx.try_recv() { + count += 1; + if count > MAX_ACCEPT_PER_TICK { + break; + } + + let entity = world.spawn(); + let (client_tx, client_rx) = unbounded_channel(); + let (client_session_tx, client_session_rx) = unbounded_channel(); + spawn_task(run_client_session_async(stream, client_tx, client_session_rx)); + world.insert(entity, ClientAddr(addr)); + world.insert(entity, ClientSessionJobSender(client_session_tx)); + world.insert(entity, ClientJobReceiver(client_rx)); + } + } +} \ No newline at end of file diff --git a/src/login/packet.rs b/src/login/packet.rs new file mode 100644 index 0000000..ddcdd4a --- /dev/null +++ b/src/login/packet.rs @@ -0,0 +1,31 @@ + +use crate::login::*; + +use crate::packet::incoming::InPacket; + +use self::event::LoginEvent; + +#[derive(Event)] +pub struct PacketEvent { + #[event(target)] + pub entity: EntityId, + pub pkt: InPacket, +} + +pub fn handle_packet_event( + receiver: Receiver, + mut login_event_sender: Sender, +) { + match &receiver.event.pkt { + InPacket::LoginRequest { id, pw } => { + login_event_sender.send(LoginEvent { + entity: receiver.event.entity, + id: id.into(), + pw: pw.into(), + }); + }, + _ => { + // ? + } + } +} diff --git a/src/login/room/mod.rs b/src/login/room/mod.rs new file mode 100644 index 0000000..af357f5 --- /dev/null +++ b/src/login/room/mod.rs @@ -0,0 +1,55 @@ +use crate::login::*; + +const MAX_ROOM_MEMBERS: usize = 8; + +/* +#[derive(Bundle)] +pub struct RoomBundle { + pub name: RoomName, + pub room_id: RoomId, + pub map_id: RoomMapId, + pub manager: RoomManager, + pub members: RoomMembers, + pub pw: RoomPassword, +} +*/ + +#[derive(Component)] +pub struct RoomName(String); + +#[derive(Component)] +pub struct RoomId(usize); + +#[derive(Component)] +pub struct RoomMapId(u8); + +#[derive(Component)] +pub struct RoomManager(EntityId); + +#[derive(Component)] +pub struct RoomMembers([EntityId; MAX_ROOM_MEMBERS]); + +#[derive(Component)] +pub struct RoomPassword(Option); + +/* +#[derive(Bundle)] +pub struct ClientOnRoomBundle { + pub color: ClientOnRoomTeamColor, + pub char: ClientOnRoomCharacter, + pub state: ClientOnRoomState +} +*/ + +#[derive(Component)] +pub struct ClientOnRoomTeamColor(u8); + +#[derive(Component)] +pub struct ClientOnRoomCharacter(u8); + +#[derive(Component)] +pub enum ClientOnRoomState { + Idle, + Ready, + Shopping, +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..5505bad --- /dev/null +++ b/src/main.rs @@ -0,0 +1,65 @@ +mod login; +mod status; +mod packet; +mod util; +mod storage; +mod prelude; +mod encrypt; +mod constants; +mod world; +#[cfg(test)] +mod test; + +use std::{net::UdpSocket, time::Instant}; + +use prelude::*; +use tracing_subscriber::filter::LevelFilter; +use world::WorldHelper; + +fn main() { + init(); + run(); +} + +fn init() { + init_tokio_runtime(); + #[cfg(debug_assertions)] + { + let level = LevelFilter::DEBUG; + tracing_subscriber::fmt().with_max_level(level).init(); + std::env::set_var("RUST_BACKTRACE", "1"); + } + debug!("Initialized"); +} + +fn run() { + let mut world_helper = create_world_helper(); + debug!("Running..."); + loop { + let curr_time = Instant::now(); + world_helper.execute(); + let elapsed = curr_time.elapsed(); + if elapsed < fps_to_duration(60) { + std::thread::sleep(Duration::from_millis(1)); + } + } +} + +fn create_world_helper() -> WorldHelper { + let mut world_helper = WorldHelper::new(); + + storage::init(&mut world_helper); + login::init(&mut world_helper); + status::init(&mut world_helper); + + world_helper +} + +/* +fn add_plugins(app: &mut App) { + app + .add_plugins(MinimalPlugins) + .add_plugins(TokioTasksPlugin::default()) /* to use tokio runtime */ + .add_plugins(LogPlugin::default()); +} +*/ \ No newline at end of file diff --git a/src/packet/incoming/handler/login.rs b/src/packet/incoming/handler/login.rs new file mode 100644 index 0000000..a5e168c --- /dev/null +++ b/src/packet/incoming/handler/login.rs @@ -0,0 +1,15 @@ +use crate::{packet::incoming::InPacket, util::reader::Reader}; +use super::InPacketHandler; + +pub struct LoginHandler; + +impl InPacketHandler for LoginHandler { + fn opcode(&self) -> u8 { 3 } + + fn parse(&self, reader: &mut Reader) -> anyhow::Result { + let id = reader.read_fixed_string(12+1)?; + let pw = reader.read_fixed_string(20+1)?; + let pkt = InPacket::LoginRequest { id, pw }; + Ok(pkt) + } +} \ No newline at end of file diff --git a/src/packet/incoming/handler/mod.rs b/src/packet/incoming/handler/mod.rs new file mode 100644 index 0000000..50f1e31 --- /dev/null +++ b/src/packet/incoming/handler/mod.rs @@ -0,0 +1,73 @@ +mod login; +mod request_notice; + +use crate::*; + +use crate::util::reader::Reader; + +use self::{login::LoginHandler, request_notice::RequestNoticeHandler}; + +use super::InPacket; + +macro_rules! register_handlers { + ($name:ident, [$($i:ident),*]) => { + lazy_static! { + static ref $name: HandlerMap = { + let mut mapping = HandlerMap::default(); + $({ mapping.register_handler($i); })* + mapping + }; + } + }; +} + +register_handlers!(HANDLER_MAP, [ + LoginHandler, + RequestNoticeHandler, + ServerStatusRequestHandler +]); + +pub trait InPacketHandler { + fn opcode(&self) -> u8; + fn parse(&self, reader: &mut Reader) -> anyhow::Result; +} + +pub struct HandlerMap(HashMap>); + +impl Default for HandlerMap { + fn default() -> Self { + Self(HashMap::new()) + } +} + +impl HandlerMap { + fn parse(&self, opcode: u8, reader: &mut Reader) -> anyhow::Result { + match self.0.get(&opcode) { + Some(handler) => handler.parse(reader), + None => Ok(InPacket::Unknown(opcode)) + } + } +} + +impl HandlerMap { + fn register_handler(&mut self, handler: T) { + self.0.insert(handler.opcode(), Box::new(handler)); + } +} + +pub fn try_parse(reader: &mut Reader) -> anyhow::Result { + let opcode = reader.read_u8()?; + reader.advance(7); + HANDLER_MAP.parse(opcode, reader) +} + +struct ServerStatusRequestHandler; + +impl InPacketHandler for ServerStatusRequestHandler { + fn opcode(&self) -> u8 { 1 } + + fn parse(&self, reader: &mut Reader) -> anyhow::Result { + let code = reader.read_u8()?; + Ok(InPacket::ServerStatusRequest { code }) + } +} \ No newline at end of file diff --git a/src/packet/incoming/handler/request_notice.rs b/src/packet/incoming/handler/request_notice.rs new file mode 100644 index 0000000..fed31f3 --- /dev/null +++ b/src/packet/incoming/handler/request_notice.rs @@ -0,0 +1,13 @@ +use crate::{packet::incoming::InPacket, util::reader::Reader}; +use super::InPacketHandler; + +pub struct RequestNoticeHandler; + +impl InPacketHandler for RequestNoticeHandler { + fn opcode(&self) -> u8 { 255 } // FIXME + + fn parse(&self, _reader: &mut Reader) -> anyhow::Result { + let pkt = InPacket::NoticeRequest; + Ok(pkt) + } +} diff --git a/src/packet/incoming/mod.rs b/src/packet/incoming/mod.rs new file mode 100644 index 0000000..9838c3a --- /dev/null +++ b/src/packet/incoming/mod.rs @@ -0,0 +1,30 @@ +mod handler; + +use anyhow::Error; + +use crate::util::reader::Reader; + +use self::handler::try_parse; + +#[derive(Debug)] +pub enum InPacket { + ServerStatusRequest { + code: u8, + }, + LoginRequest { + id: String, + pw: String, + }, + EnteringLobby, + NoticeRequest, + Unknown(u8), + ParsingError(Error), +} + +impl InPacket { + pub fn parse>(body: T) -> InPacket { + let mut reader = Reader::from_ref(body.as_ref()); + try_parse(&mut reader).unwrap_or_else(|e| InPacket::ParsingError(e)) + } +} + diff --git a/src/packet/mod.rs b/src/packet/mod.rs new file mode 100644 index 0000000..656b71b --- /dev/null +++ b/src/packet/mod.rs @@ -0,0 +1,52 @@ +use crate::{encrypt::{encrypt_body, encrypt_header}, Writer, BODY_MAGIC_STAMP, TAIL_SIZE}; + +use crate::*; + +use self::outgoing::OutPacketBuildable; + +pub mod incoming; +pub mod outgoing; +pub mod packet_receiver; + +pub fn build_packet(pkt: &dyn OutPacketBuildable) -> anyhow::Result> { + let mut out = Vec::new(); + let body = build_body(pkt)?; + let mut w = Writer::from_vec_mut(&mut out); + build_head(&mut w, body.len())?; + w.write_bytes(body)?; + build_tail(&mut w)?; + debug!("built packet: {:?}", out); + Ok(out) +} + +fn build_head(w: &mut Writer, body_len: usize) -> anyhow::Result<()> { + let crypto_seed = 7; // rand::random::() % 7 + 1; // 1 ~ 7 + w.write_u16(body_len as u16)?; + w.write_u16(0xb9)?; + w.write_u16(0x08)?; + w.write_u16(0x09)?; + w.write_u8(crypto_seed)?; + debug!("built head - 1: {:?}", w.get()); + encrypt_header(w.get_mut()); + debug!("built head - 2: {:?}", w.get()); + Ok(()) +} + +fn build_body(pkt: &dyn OutPacketBuildable) -> anyhow::Result> { + let mut v = Vec::new(); + { + let mut w= Writer::from_vec_mut(&mut v); + w.write_u8(pkt.opcode())?; + w.write_u8(0)?; + w.write_u16(0)?; + w.write_u32(BODY_MAGIC_STAMP)?; + pkt.try_build(&mut w)?; + } + encrypt_body(&mut v); + Ok(v) +} + +fn build_tail(w: &mut Writer) -> anyhow::Result<()> { + w.write_bytes(vec![0u8; TAIL_SIZE])?; + Ok(()) +} \ No newline at end of file diff --git a/src/packet/outgoing/login_message.rs b/src/packet/outgoing/login_message.rs new file mode 100644 index 0000000..225d410 --- /dev/null +++ b/src/packet/outgoing/login_message.rs @@ -0,0 +1,12 @@ +use crate::util::writer::Writer; + +use super::{LoginMessage, OutPacketBuildable}; + +impl OutPacketBuildable for LoginMessage { + fn opcode(&self) -> u8 { 0 } + fn try_build(&self, writer: &mut Writer) -> anyhow::Result<()> { + let kind = self.0.clone(); + writer.write_u32(kind as u32)?; + Ok(()) + } +} \ No newline at end of file diff --git a/src/packet/outgoing/mod.rs b/src/packet/outgoing/mod.rs new file mode 100644 index 0000000..f0f0945 --- /dev/null +++ b/src/packet/outgoing/mod.rs @@ -0,0 +1,60 @@ + +mod login_message; +mod set_account_info; + +use anyhow::Error; + +use crate::util::writer::Writer; + +pub trait OutPacketBuildable: Sync + Send + core::fmt::Debug { + fn opcode (&self) -> u8; + fn try_build(&self, writer: &mut Writer) -> anyhow::Result<()>; +} + +#[derive(Debug)] +pub struct LoginMessage(pub LoginMessageKind); + +#[derive(Debug, Clone)] +pub enum LoginMessageKind { + FullClients = 5001, + InvalidId = 7001, + NoResponse = 7003, + NoUserInfo = 7005, + Banned = 7007, + Online = 7010, + FullRoom = 7011, + FullRooms = 7012, + NoRoom = 7013, + LackOfLevel = 7014, + UnbalancedTeamNumber = 7016, + BoardNotReady = 7021, + SameNickname = 7027, + LackOfTicketForNameChange = 7028, + DuplicatedNickname = 7029, + InvalidAccountInfo = 8003, +} + +#[derive(Debug)] +pub struct AccountInfo; + +#[derive(Debug)] +pub struct ServerStatusResponse { + pub code: u8, +} +impl OutPacketBuildable for ServerStatusResponse { + fn opcode (&self) -> u8 { 2 } + fn try_build(&self, writer: &mut Writer) -> Result<(), Error> { + // FIXME + let avail = 200; + let max = 200; + writer.write_u8(self.code)?; + writer.write_u8(1)?; // the number of servers + writer.write_u16(401)?; // (1) server uid + writer.write_u16(avail)?; // (2) available + writer.write_u16(max)?; // (3) the max number of clients + Ok(()) + } +} + + + diff --git a/src/packet/outgoing/set_account_info.rs b/src/packet/outgoing/set_account_info.rs new file mode 100644 index 0000000..b0bc395 --- /dev/null +++ b/src/packet/outgoing/set_account_info.rs @@ -0,0 +1,12 @@ +use crate::util::writer::Writer; + +use super::{AccountInfo, OutPacketBuildable}; + +impl OutPacketBuildable for AccountInfo { + fn opcode (&self) -> u8 { + todo!() + } + fn try_build(&self, writer: &mut Writer) -> anyhow::Result<()> { + todo!() + } +} \ No newline at end of file diff --git a/src/packet/packet_receiver.rs b/src/packet/packet_receiver.rs new file mode 100644 index 0000000..b8d4b4c --- /dev/null +++ b/src/packet/packet_receiver.rs @@ -0,0 +1,58 @@ +use tracing_subscriber::field::debug; + +use crate::encrypt::{decrypt_header, encrypt_body}; + +use super::incoming::InPacket; + +use crate::prelude::*; + +/// A threshold of incoming body length. +/// TODO: decide a proper value. +const INCOMING_BODY_LEN_THRESHOLD: usize = 2048; + +#[derive(Default)] +pub struct PacketReceiver { + gathering: Vec, +} + +impl PacketReceiver { + pub fn push>(&mut self, data: T) { + self.gathering.extend(data.as_ref()); + } + + pub fn clear(&mut self) { + self.gathering.clear(); + } + + pub fn try_fetch_body(&mut self) -> anyhow::Result>> { + let gathering = &mut self.gathering; + + let header = { + if gathering.len() < HEADER_SIZE { + return Ok(None); + } + let mut header = gathering[..HEADER_SIZE].to_vec(); + decrypt_header(&mut header); + header + }; + debug!("header: {:?}", header); + + let body_len = { + let temp_bytes = [header[0], header[1]]; + u16::from_le_bytes(temp_bytes) as usize + }; + debug!("body_len: {}", body_len); + if body_len >= INCOMING_BODY_LEN_THRESHOLD { + anyhow::bail!("Invalid body length: {}", body_len); + } + let chunk_size = HEADER_SIZE + body_len + TAIL_SIZE; + if gathering.len() < chunk_size { + return Ok(None); + } + + let mut body = gathering[HEADER_SIZE..HEADER_SIZE + body_len].to_vec(); + encrypt_body(&mut body); + gathering.drain(..chunk_size); + Ok(Some(body)) + } +} \ No newline at end of file diff --git a/src/prelude.rs b/src/prelude.rs new file mode 100644 index 0000000..d012eea --- /dev/null +++ b/src/prelude.rs @@ -0,0 +1,60 @@ +pub use std::collections::HashMap; +use std::future::Future; +pub use polodb_core::bson::*; +use tokio::{runtime::Runtime, task::JoinHandle}; +//pub use bevy::prelude::*; +pub use std::sync::Arc; +pub use lazy_static::lazy_static; +pub use std::net::SocketAddr; +pub use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +pub use ignore_result::Ignore; +pub use tokio::{io::{AsyncReadExt, AsyncWriteExt}, net::TcpStream, select}; +pub use std::time::Duration; +pub use std::collections::HashSet; +pub use evenio::prelude::*; +pub use log::*; +pub use rayon::prelude::*; + +pub use crate::util::writer::Writer; +pub use crate::util::reader::Reader; +pub use crate::constants::*; + +pub const fn fps_to_duration(fps: u32) -> Duration { + let millis = 1000 / fps; /* TODO: maybe inaccurate */ + Duration::from_millis(millis as u64) +} + +pub fn block_on(f: F) ->F::Output{ + rt().block_on(f) +} + +pub fn spawn_task(f: F) -> JoinHandle +where + F: Future + Send + 'static, + F::Output: Send + 'static, { + rt().spawn(f) +} + +static mut RT: Option = None; + +fn set_rt(rt: Runtime) { + assert!(unsafe { RT.is_none() }); + unsafe { + RT = Some(rt); + } +} + +fn rt() -> &'static Runtime { + assert!(unsafe { RT.is_some() }); + unsafe { + RT.as_ref().unwrap() + } +} + +pub fn init_tokio_runtime() { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + set_rt(rt); +} \ No newline at end of file diff --git a/src/status/mod.rs b/src/status/mod.rs new file mode 100644 index 0000000..1ac4752 --- /dev/null +++ b/src/status/mod.rs @@ -0,0 +1,19 @@ +mod net; + +use crate::world::WorldHelper; +pub use crate::prelude::*; + +use self::net::{handle_packet_received_event, handle_packet_sent_event, handle_server_tick_event, PacketReceivedEvent, PacketSendEvent, ServerSocket, StatusServerTickEvent, StatusServerTicker}; + +pub fn init(world_helper: &mut WorldHelper) { + world_helper + .add_event::() + .add_event::() + .add_event::(); + world_helper + .add_task(StatusServerTicker::default()); + world_helper + .add_system(handle_server_tick_event) + .add_system(handle_packet_received_event) + .add_system(handle_packet_sent_event); +} diff --git a/src/status/net.rs b/src/status/net.rs new file mode 100644 index 0000000..479f282 --- /dev/null +++ b/src/status/net.rs @@ -0,0 +1,150 @@ +use std::io::ErrorKind; + +use evenio::handler::Local; + +use crate::{encrypt::{encrypt_body, encrypt_header}, packet::{build_packet, incoming::InPacket, outgoing::{OutPacketBuildable, ServerStatusResponse}, packet_receiver::PacketReceiver}, status::*, world::TaskExecutable}; + +const STATUS_SERVER_PORT: u16 = 9874; +const MAX_RECV_PER_TICK: usize = 2; + +#[derive(Event)] +pub struct StatusServerTickEvent; + +#[derive(Event)] +pub struct PacketReceivedEvent { + pub addr: SocketAddr, + pub pkt: InPacket, +} + +#[derive(Event)] +pub struct PacketSendEvent { + pub addr: SocketAddr, + pub pkt: Box, +} + +pub fn handle_server_tick_event( + _: Receiver, + Single(server_socket): Single<&mut ServerSocket>, + mut sender: Sender, +) { + let mut packet_receiver = PacketReceiver::default(); + for _ in 0..MAX_RECV_PER_TICK { + match server_socket.socket.try_recv_from(&mut server_socket.receive_buffer) { + Ok((_, addr)) if server_socket.is_blocked(&addr) => { + debug!("Ignore a packet from a blocked IP: {}", addr); + }, + Ok((n, addr)) => { + info!("Received {} bytes from {}", n, addr); + let buf = &server_socket.receive_buffer[0..n]; + packet_receiver.clear(); + packet_receiver.push(buf); + // TODO: no need to allocate a new buffer for the body. + // Instead we can slice the packet buffer w/ proper lifetime. + let Ok(Some(body)) = packet_receiver.try_fetch_body() else { + // TODO: block the IP + server_socket.block_ip(addr); + continue; + }; + debug!("body: {:?}", body); + let pkt = InPacket::parse(body); + sender.send(PacketReceivedEvent { + addr, + pkt, + }); + }, + Err(e) if e.kind() == ErrorKind::WouldBlock => { // nothing to read + break; + }, + Err(e) => { + warn!("{}", e); + }, + _ => {} + } + } +} + +/// TODO: detect DoS & block the IP +pub fn handle_packet_received_event( + receiver: Receiver, + mut sender: Sender, +) { + let pkt = &receiver.event.pkt; + let addr = &receiver.event.addr; + debug!("Received a packet {:?} from {}", pkt, addr); + match pkt { + InPacket::ServerStatusRequest { code } => { + sender.send(PacketSendEvent { + addr: addr.clone(), + pkt: Box::new(ServerStatusResponse { code: *code }), + }); + }, + _ => { + debug!("Received an unhandled packet {:?} from {}", pkt, receiver.event.addr); + } + } +} + +pub fn handle_packet_sent_event( + receiver: Receiver, + Single(server_socket): Single<&ServerSocket>, +) { + match server_socket.send(receiver.event.addr, &receiver.event.pkt) { + Ok(_) => {}, + Err(e) => { + warn!("Sent a packet to {} failed: {}", receiver.event.addr, e); + }, + } +} + +#[derive(Component)] +pub struct ServerSocket { + socket: tokio::net::UdpSocket, + receive_buffer: [u8; 1024], + blocked_addrs: HashSet, // TODO: manage a ban list in the db +} + +impl ServerSocket { + pub fn new() -> Self { + let addr = format!("0.0.0.0:{}", STATUS_SERVER_PORT); + let socket = block_on(tokio::net::UdpSocket::bind(addr)).unwrap(); + info!("Status server listening on port {}", STATUS_SERVER_PORT); + Self { + socket, + receive_buffer: [0u8; 1024], + blocked_addrs: HashSet::new(), + } + } + + pub fn is_blocked(&self, addr: &SocketAddr) -> bool { + self.blocked_addrs.contains(addr) + } + + pub fn block_ip(&mut self, addr: SocketAddr) { + self.blocked_addrs.insert(addr); + } + + pub fn send(&self, addr: SocketAddr, pkt: &Box) -> anyhow::Result<()> { + let chunk = build_packet(pkt.as_ref())?; + self.socket.try_send_to(&chunk, addr)?; + Ok(()) + } +} + +#[derive(Default)] +pub struct StatusServerTicker; + +impl TaskExecutable for StatusServerTicker { + const DURATION: Duration = fps_to_duration(2); + + fn init(&mut self, world: &mut World) { + let e = world.spawn(); + world.insert(e, ServerSocket::new()); + } + + fn execute( + &mut self, + world: &mut World, + ) { + world.send(StatusServerTickEvent); + } +} \ No newline at end of file diff --git a/src/storage/account.rs b/src/storage/account.rs new file mode 100644 index 0000000..d941906 --- /dev/null +++ b/src/storage/account.rs @@ -0,0 +1,36 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Clone)] +pub struct Account { + pub id: String, + pub pw: String, + pub name: String, + pub bits: u32, + pub exps: [u32; 8], + pub items: [u8; 4], + pub setting: AccountSetting, +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct AccountSetting { + pub macro_texts: [String; 8], + pub key_type: u8, +} + +impl Default for AccountSetting { + fn default() -> Self { + Self { + macro_texts: [ + "지덕".into(), + "쥐덕".into(), + "더덕".into(), + "철푸덕".into(), + "호더덕".into(), + "을지문덕".into(), + "도날드덕".into(), + "푸더더덕".into(), + ], + key_type: 1u8, + } + } +} \ No newline at end of file diff --git a/src/storage/board.rs b/src/storage/board.rs new file mode 100644 index 0000000..1b2e4e8 --- /dev/null +++ b/src/storage/board.rs @@ -0,0 +1,9 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +pub struct Article { + pub title: String, + pub num: usize, + pub author_id: String, + pub text: String, +} \ No newline at end of file diff --git a/src/storage/mod.rs b/src/storage/mod.rs new file mode 100644 index 0000000..a570df1 --- /dev/null +++ b/src/storage/mod.rs @@ -0,0 +1,67 @@ +pub mod account; +pub mod board; + +use std::{borrow::Borrow, cell::{RefCell, UnsafeCell}}; + +use crate::{prelude::*, world::WorldHelper}; +use polodb_core::{bson::Document, Database}; +use serde::{de::DeserializeOwned, Serialize}; + +const STORAGE_PATH: &str = "db.polo"; + +#[derive(Component)] +pub struct Storage { + db: Database, +} + +#[derive(Event)] +pub struct SaveEvent; + +fn get_type_id_string() -> String { + format!("{:?}", std::any::TypeId::of::()) +} + +impl Storage { + pub fn find_one(&self, filter: impl Into>) -> Option { + let coll_name = get_type_id_string::(); + let coll = self.db.collection::(&coll_name); + coll.find_one(filter).unwrap() + } + + pub fn update_one_with_query(&self, filter: impl Into, query: impl Into) { + let coll_name = get_type_id_string::(); + let coll = self.db.collection::(&coll_name); + coll.update_one(filter.into(), query.into()).unwrap(); + } + + /// This costs more than `update_one_with_query` since it replaces the whole document. + pub fn update_one_with_replacement(&self, filter: impl Into, update: impl Serialize) { + let coll_name = get_type_id_string::(); + let coll = self.db.collection::(&coll_name); + let doc = to_document(&update).unwrap(); + let query = doc! { + "$set": doc + }; + coll.update_one(filter.into(), query).unwrap(); + } + + pub fn insert_one(&self, doc: impl Borrow) { + let coll_name = get_type_id_string::(); + let coll = self.db.collection::(&coll_name); + coll.insert_one(doc).unwrap(); + } +} + +impl Default for Storage { + fn default() -> Self { + let db = Database::open_file(STORAGE_PATH).unwrap(); + Self { + db, + } + } +} + +pub fn init(world_helper: &mut WorldHelper) { + world_helper + .spawn_single(Storage::default()); +} \ No newline at end of file diff --git a/src/storage/save.rs b/src/storage/save.rs new file mode 100644 index 0000000..cc231ad --- /dev/null +++ b/src/storage/save.rs @@ -0,0 +1,5 @@ +pub fn system_save_to_storage( + mut storage: ResMut, + reader: EventReader, +) { +} \ No newline at end of file diff --git a/src/test.rs b/src/test.rs new file mode 100644 index 0000000..252fb7c --- /dev/null +++ b/src/test.rs @@ -0,0 +1,24 @@ + +use crate::prelude::*; + +#[derive(Event)] +struct Tick; + +#[derive(Component)] +struct Name(String); + +#[test] +fn empty_iter_test() { + let mut world = World::new(); + println!("start"); + world.add_handler(|_: Receiver, mut fetcher: Fetcher<(EntityId, &mut Name)>| { + fetcher.iter_mut().for_each(|(e, name)| { + println!("{}", name.0); + }); + }); + let e = world.spawn(); + world.insert(e, Name("hello".into())); + world.send(Tick); + world.despawn(e); + world.send(Tick); +} diff --git a/src/util/encoding.rs b/src/util/encoding.rs new file mode 100644 index 0000000..5f69376 --- /dev/null +++ b/src/util/encoding.rs @@ -0,0 +1,22 @@ +use encoding::{all::WINDOWS_949, codec::korean::Windows949Encoding, Encoding}; + +/// DO uses CP949 only. +const ENCODING: &Windows949Encoding = WINDOWS_949; + +pub fn encode_to_vec>(s: T) -> anyhow::Result> { + let input = s.as_ref(); + let trap = encoding::EncoderTrap::Strict; + match ENCODING.encode(input, trap) { + Ok(b) => Ok(b), + Err(e) => anyhow::bail!("encoding error") + } +} + +pub fn decode_to_string>(b: T) -> anyhow::Result { + let input = b.as_ref(); + let trap = encoding::DecoderTrap::Replace; + match ENCODING.decode(input, trap) { + Ok(s) => Ok(s), + Err(e) => anyhow::bail!("decoding error") + } +} \ No newline at end of file diff --git a/src/util/mod.rs b/src/util/mod.rs new file mode 100644 index 0000000..209a43c --- /dev/null +++ b/src/util/mod.rs @@ -0,0 +1,3 @@ +pub mod reader; +pub mod writer; +pub mod encoding; \ No newline at end of file diff --git a/src/util/reader.rs b/src/util/reader.rs new file mode 100644 index 0000000..7767d6e --- /dev/null +++ b/src/util/reader.rs @@ -0,0 +1,79 @@ +use super::encoding::decode_to_string; + +pub struct Reader<'a> { + data: &'a [u8], + off: usize, +} + +impl<'a> Reader<'a> { + pub fn from_ref(data: &'a [u8]) -> Self { + Self { + data, + off: 0, + } + } +} + +impl Reader<'_> { + fn head(&self) -> &[u8] { + &self.data + } + + fn assert_space(&self, n: usize) -> anyhow::Result<()> { + let total_len = self.data.len(); + let off = self.off; + let remaining = total_len - off; + if remaining >= n { Ok (()) } + else { anyhow::bail!("not enough space") } + } + + pub fn advance(&mut self, n: usize) { + self.off += n; + } + + pub fn read_u8(&mut self) -> anyhow::Result { + let head = self.head(); + self.assert_space(1)?; + let off = self.off; + let ret = head[off]; + self.advance(1); + Ok(ret) + } + + pub fn read_u16(&mut self) -> anyhow::Result { + let head = self.head(); + self.assert_space(2)?; + let off = self.off; + let ret = + ((head[off+0] as u16) << 0) | ((head[off+1] as u16) << 8); + self.advance(2); + Ok(ret) + } + + pub fn read_u32(&mut self) -> anyhow::Result { + let head = self.head(); + self.assert_space(4)?; + let off = self.off; + let ret = + ((head[off+0] as u32) << 0) | ((head[off+1] as u32) << 8) | + ((head[off+2] as u32) << 16) | ((head[off+3] as u32) << 24); + self.advance(4); + Ok(ret) + } + + pub fn read_bytes(&mut self, n: usize) -> anyhow::Result> { + let head = self.head(); + self.assert_space(n)?; + let off = self.off; + let ret = head[off..off+n].to_vec(); + self.advance(n); + Ok(ret) + } + + pub fn read_fixed_string(&mut self, len: usize) -> anyhow::Result { + let data = self.read_bytes(len)?; + let data = data.iter().take_while(|&&x| x != 0).cloned().collect::>(); + let ret = decode_to_string(data)?; + Ok(ret) + } +} diff --git a/src/util/writer.rs b/src/util/writer.rs new file mode 100644 index 0000000..f2a1ac8 --- /dev/null +++ b/src/util/writer.rs @@ -0,0 +1,61 @@ +use std::{io::Write, num::Wrapping}; + +use byteorder::{LittleEndian, WriteBytesExt}; + +use crate::util::encoding::encode_to_vec; + +const BUFFER_SIZE_THRESHOLD: usize = 1024; + +pub struct Writer<'a> { + buffer: &'a mut Vec, +} + +impl<'a> Writer<'a> { + pub fn from_vec_mut(value: &'a mut Vec) -> Self { + Self { + buffer: value + } + } +} + +impl<'a> Writer<'a> { + fn assert_space(&self, n: usize) -> anyhow::Result<()> { + let required_len = self.buffer.len() + n; + if required_len <= BUFFER_SIZE_THRESHOLD { Ok (()) } + else { anyhow::bail!("too large data") } + } + + pub fn write_u8(&mut self, v: u8) -> anyhow::Result<()> { + self.assert_space(1)?; + self.buffer.write_u8(v)?; + Ok(()) + } + + pub fn write_u16(&mut self, v: u16) -> anyhow::Result<()> { + self.assert_space(2)?; + self.buffer.write_u16::(v)?; + Ok(()) + } + + pub fn write_u32(&mut self, v: u32) -> anyhow::Result<()> { + self.assert_space(4)?; + self.buffer.write_u32::(v)?; + Ok(()) + } + + pub fn write_bytes>(&mut self, b: T) -> anyhow::Result<()> { + let b = b.as_ref(); + let n = b.len(); + self.assert_space(n)?; + self.buffer.write(b)?; + Ok(()) + } + + pub fn get(&self) -> &[u8] { + &self.buffer + } + + pub fn get_mut(&mut self) -> &mut [u8] { + &mut self.buffer + } +} diff --git a/src/world.rs b/src/world.rs new file mode 100644 index 0000000..d3cc9d0 --- /dev/null +++ b/src/world.rs @@ -0,0 +1,113 @@ +use std::time::Instant; + +use crate::prelude::*; + +pub struct WorldHelper { + world: World, + task_handler: PeriodicTaskHandler, +} + +impl WorldHelper { + pub fn new() -> Self { + Self { + world: World::new(), + task_handler: PeriodicTaskHandler::new(), + } + } + + pub fn spawn_single(&mut self, component: T) -> EntityId { + let e = self.world.spawn(); + self.world.insert(e, component); + e + } + + pub fn execute(&mut self) { + self.task_handler.execute(&mut self.world); + } + + pub fn add_task(&mut self, task: T) -> &mut Self { + self.task_handler.register(&mut self.world, task); + self + } + + pub fn add_system(&mut self, system: S) -> &mut Self where S: IntoHandler { + self.world.add_handler(system); + self + } + + pub fn add_event(&mut self) -> &mut Self { + self.world.add_event::(); + self + } + + pub fn add_component(&mut self) -> &mut Self { + self.world.add_component::(); + self + } +} + +pub trait TaskExecutable { + const DURATION: Duration; + fn init(&mut self, world: &mut World); + fn execute(&mut self, world: &mut World); +} + +trait TaskExecutableSafe { + fn duration(&self) -> Duration; + fn init(&mut self, world: &mut World); + fn execute(&mut self, world: &mut World); +} + +impl TaskExecutableSafe for T { + fn init(&mut self, world: &mut World) { + ::init(self, world); + } + fn duration(&self) -> Duration { + ::DURATION + } + fn execute(&mut self, world: &mut World) { + ::execute(self, world); + } +} + +struct PeriodicTask { + pub last_time: Instant, + pub task: Box, +} + +impl PeriodicTask { + fn new(task: T) -> Self { + Self { + last_time: Instant::now(), + task: Box::new(task), + } + } +} + +struct PeriodicTaskHandler { + tasks: Vec, +} + +impl PeriodicTaskHandler { + pub fn new() -> Self { + Self { + tasks: vec![], /* TODO: priority */ + } + } + + pub fn execute(&mut self, world: &mut World) { + for task in self.tasks.iter_mut() { + let inner_task = task.task.as_mut(); + if task.last_time.elapsed() >= inner_task.duration() { + inner_task.execute(world); + task.last_time = Instant::now(); /* TODO: update the last time only once */ + // info!("Task executed with duration {:?}", inner_task.duration()); + } + } + } + + pub fn register(&mut self, world: &mut World, task: T) { + self.tasks.push(PeriodicTask::new(task)); + self.tasks.last_mut().unwrap().task.init(world); + } +} \ No newline at end of file