diff --git a/README.md b/README.md index 35ab22d..2694572 100644 --- a/README.md +++ b/README.md @@ -9,4 +9,12 @@ For this experiment, we will use the https://api.starwars.run REST API. A simple 1. Currying 2. Tail Recursion 3. Function Compostion -4. and more!!!! \ No newline at end of file +4. and more!!!! + + +without threads: +[2022-01-14T18:22:15Z INFO sw_scraper] START +[2022-01-14T18:22:58Z INFO sw_scraper] FINISH +43 seconds + +with threads diff --git a/app_config.toml b/app_config.toml new file mode 100644 index 0000000..a10105f --- /dev/null +++ b/app_config.toml @@ -0,0 +1,3 @@ +BASE_URL = "https://api.starwars.run/api/" +OUTPUT_DIR = "./output/" +NUM_THREADS = "3" \ No newline at end of file diff --git a/sw-scraper/Cargo.toml b/sw-scraper/Cargo.toml index 737f8d5..342fb86 100644 --- a/sw-scraper/Cargo.toml +++ b/sw-scraper/Cargo.toml @@ -1,13 +1,38 @@ [package] +build = "build.rs" edition = "2018" name = "sw-scraper" version = "0.1.0" +[[bin]] +name = "sw-scraper" +path = "src/main.rs" + +[profile.release] +lto = true +opt-level = 'z' + +[build-dependencies] +anyhow = "1" +vergen = "6" + [dependencies] -#log = "0.4" -reqwest = {version = "0.11", features = [ "blocking", "json"]} +config = "0.11" +env_logger = "0.9" +log = "0.4" + +actix = "0.12" +reqwest = {version = "0.11", features = ["blocking", "json"]} serde = {version = "1", features = ["derive"]} -serde_json = "1.0" +serde_json = "1" tokio = {version = "1", features = ["full"]} url = {version = "2", features = ["serde"]} -#uuid = {version = "0.8", features = ["v4", "serde"]} + +### ACTIX TEST STUFF +slog = "2" +slog-async = "2" +slog-json = "2" +slog-term = "2" + +lazy_static = "1" +thread-id = "4" diff --git a/sw-scraper/build.rs b/sw-scraper/build.rs new file mode 100644 index 0000000..9b3bf8c --- /dev/null +++ b/sw-scraper/build.rs @@ -0,0 +1,7 @@ +use anyhow::Result; +use vergen::{Config, vergen}; + +fn main() -> Result<()> { + // Generate the default 'cargo:' instruction output + vergen(Config::default()) +} \ No newline at end of file diff --git a/sw-scraper/src/error.rs b/sw-scraper/src/error.rs index b2469b1..54173c7 100644 --- a/sw-scraper/src/error.rs +++ b/sw-scraper/src/error.rs @@ -3,7 +3,8 @@ pub enum AppErrorType { _FetchError, _NotFound, _InvalidData, - WriteError, + ConfigError, + _WriteError, } #[derive(Debug)] diff --git a/sw-scraper/src/logger.rs b/sw-scraper/src/logger.rs new file mode 100644 index 0000000..3ad7d0c --- /dev/null +++ b/sw-scraper/src/logger.rs @@ -0,0 +1,69 @@ +use slog; +use thread_id; + +use slog::*; +use std::result; + +thread_local!(static TL_THREAD_ID: usize = thread_id::get() % 65536); + +#[derive(Clone)] +pub struct ThreadLocalDrain +where + D: Drain, +{ + pub drain: D, +} + +impl Drain for ThreadLocalDrain +where + D: Drain, +{ + type Ok = (); + type Err = Never; + + fn log( + &self, + record: &Record<'_>, + values: &OwnedKVList, + ) -> result::Result { + let chained = OwnedKVList::from(OwnedKV(( + SingleKV("thread", TL_THREAD_ID.with(|id| *id)), + values.clone(), + ))); + let _ = self.drain.log(record, &chained); + Ok(()) + } +} + +pub struct FnGuard { + function_name: &'static str, + logger: Logger, +} + +impl FnGuard { + // pub fn new(logger: Logger, values: OwnedKV, function_name: &'static str) -> FnGuard + // where + // T: SendSyncRefUnwindSafeKV + 'static, + // { + // let new_logger = logger.new(values); + // info!(new_logger, "[Enter]"; o!("function_name"=>function_name)); + // FnGuard { + // function_name, + // logger: new_logger, + // } + // } + + // pub fn sub_guard(&self, function_name: &'static str) -> FnGuard { + // FnGuard::new(self.logger.clone(), o!(), function_name) + // } + + // pub fn log(&self, record: &Record<'_>) { + // self.logger.log(record) + // } +} + +impl Drop for FnGuard { + fn drop(&mut self) { + info!(self.logger, "[Exit]"; o!("function_name"=>self.function_name)) + } +} \ No newline at end of file diff --git a/sw-scraper/src/main.rs b/sw-scraper/src/main.rs index 9c65b6f..fa0c5a6 100644 --- a/sw-scraper/src/main.rs +++ b/sw-scraper/src/main.rs @@ -1,219 +1,152 @@ -mod error; -mod model; - +extern crate actix; extern crate reqwest; -use model::Collection; -use serde::Serialize; - -use serde_json::Value; -use std::{ - fs::{self, File}, - io::Write, - path::Path, -}; -use tokio::join; - -use crate::{ - error::AppError, - model::{EntityType, Film, People, Planet, Species, Starship, Url, Vehicle}, -}; - -#[derive(Debug)] -struct NextUrlToFetch { - url: Option, - results: Vec, -} - -trait Factor { - fn factorial_tail_rec(url: NextUrlToFetch) -> Self; - fn factorial(url: NextUrlToFetch) -> Self; -} - -impl Factor for NextUrlToFetch { - fn factorial_tail_rec(url: NextUrlToFetch) -> Self { - url - } - - fn factorial(mut input: NextUrlToFetch) -> Self { - //fetch the next results - let current_url_to_fetch = &input.url; - - //check pagination "next", match Some/None - if let Some(next_url_to_fetch) = current_url_to_fetch { - println!("next url to fetch!! {:?}", next_url_to_fetch); - let sr = reqwest::blocking::get(next_url_to_fetch) - .unwrap() - .json::() - .unwrap(); - - let next_page = &sr["next"]; - - println!("found results next url: {:?} {:?}", &next_page, sr["count"]); - sr["results"] - .as_array() - .unwrap_or(&Vec::new()) - .iter() - .for_each(|f| input.results.push(f.to_owned())); - - match next_page { - Value::String(next_page) => { - let u = Self::factorial(NextUrlToFetch { - url: Some(next_page.to_string()), - ..input - }); - u - } - _ => NextUrlToFetch { url: None, ..input }, - } - } else { - NextUrlToFetch { url: None, ..input } - } - } -} -static BASE_URL: &'static str = "https://api.starwars.run/api"; - -fn write_to_file(file_name: &'static str, f: impl Fn() -> Collection) -> Result<(), AppError> -where - T: Serialize, -{ - let mut file = get_file(file_name)?; - let content = apply(to_bytes, f())?; - - file.write_all(content.as_bytes()).map_err(|e| AppError { - message: Some(String::from("failed to write all to file")), - cause: Some(e.to_string()), - error_type: error::AppErrorType::WriteError, - }) -} - -fn get_file(file_name: &'static str) -> Result { - apply(to_path, file_name).map_err(|e| AppError { - message: Some(String::from("failed to create file")), - cause: Some(e.to_string()), - error_type: error::AppErrorType::WriteError, - }) -} - -fn fetch_all_pages(entity: EntityType) -> Vec { - let results = vec![]; - let active_url: NextUrlToFetch = Factor::factorial(NextUrlToFetch { - url: Some(to_url(entity)), - results, - }); - - active_url.results -} +mod error; +mod logger; +mod model; +mod scraper; +mod utils; -fn to_url(entity_type: EntityType) -> Url { - match entity_type { - _ => format!("{}/{}/", BASE_URL, entity_type), - } -} +use crate::error::AppError; +use crate::logger::ThreadLocalDrain; +use crate::scraper::{FetchPageCommand, UrlFetcher}; -fn to_path(file_name: &'static str) -> Result { - File::create(Path::new(file_name)) -} +use actix::prelude::*; -fn to_bytes(all: Collection) -> Result -where - T: Serialize, -{ - serde_json::to_string(&all).map_err(|e| AppError { - message: Some(String::from("failed to serialize data to json")), - cause: Some(e.to_string()), - error_type: error::AppErrorType::_InvalidData, - }) -} +use serde_json::from_str; +use slog::{debug, info, o}; +use slog::{Drain, Logger}; +use slog_async; +use slog_term; +use std::collections::HashMap; -#[tokio::main] +#[actix::main] async fn main() { - //create base output dir - fs::create_dir::<_>("output").unwrap(); - let mut handles = vec![]; - handles.push(tokio::spawn(async move { - //Film - let find_all = || { - fetch_all_pages(EntityType::Film) - .into_iter() - .collect::>() - }; - write_to_file("output/Film.json", find_all).unwrap() - })); - handles.push(tokio::spawn(async move { - //Planet - let find_all = || { - fetch_all_pages(EntityType::Planet) - .into_iter() - .collect::>() - }; - write_to_file("output/Planet.json", find_all).unwrap() - })); - handles.push(tokio::spawn(async move { - //People - let find_all = || { - fetch_all_pages(EntityType::People) - .into_iter() - .collect::>() - }; - write_to_file("output/People.json", find_all).unwrap() - })); - handles.push(tokio::spawn(async move { - //Species - let find_all = || { - fetch_all_pages(EntityType::Species) - .into_iter() - .collect::>() - }; - write_to_file("output/Species.json", find_all).unwrap() - })); - handles.push(tokio::spawn(async move { - //Starship - let find_all = || { - fetch_all_pages(EntityType::Starship) - .into_iter() - .collect::>() - }; - write_to_file("output/Starship.json", find_all).unwrap() - })); - - handles.push(tokio::spawn(async move { - //Vehicle - let find_all = || { - fetch_all_pages(EntityType::Vehicle) - .into_iter() - .collect::>() - }; - write_to_file("output/Vehicle.json", find_all).unwrap() - })); - - let joins = join!( - handles.pop().unwrap(), - handles.pop().unwrap(), - handles.pop().unwrap(), - handles.pop().unwrap(), - handles.pop().unwrap(), - handles.pop().unwrap() - ); - joins.0.unwrap(); - joins.1.unwrap(); - joins.2.unwrap(); - joins.3.unwrap(); - joins.4.unwrap(); - joins.5.unwrap(); + let log = setup_logging(); + // let _scraper_logger = log.new(o!("thread_name"=>"scraper")); + // let _writer_logger = log.new(o!("thread_name"=>"writer")); + + //create app config + let mut app_config = config::Config::default(); + + //load the app_config.toml file + app_config + .merge(config::File::with_name("app_config")) + .unwrap(); + let base_url: String = app_config + .get("BASE_URL") + .map_err(|err| AppError { + message: Some("failed to load config files".to_string()), + cause: Some(err.to_string()), + error_type: error::AppErrorType::ConfigError, + }) + .unwrap(); + debug!(log, " BASE_URL {}", &base_url); + + let output_dir: String = app_config.get("OUTPUT_DIR").unwrap(); + debug!(log, " OUTPUT_DIR {}", &output_dir); + + let num_threads: usize = app_config.get("NUM_THREADS").unwrap(); + debug!(log, " NUM_THREADS {}", &num_threads); + + //fs::create_dir(output_dir).unwrap(); + info!(log, "fetching base entities"); + + let l = log.new(o!("thread_name"=>"url_fetcher")); + let fetch_url_addr = SyncArbiter::start(num_threads, move || UrlFetcher { logger: l.clone() }); + let resp = fetch_url_addr + .send(FetchPageCommand { + entity_type: String::from("root"), + base_url, + }) + .await + .unwrap() + .unwrap(); + let root_entities: HashMap = from_str(&resp).unwrap(); + + for n in root_entities { + let _ = fetch_url_addr + .send(FetchPageCommand { + entity_type: n.0.to_string(), + base_url: n.1.to_string(), + }) + .await + .map_err(|err| AppError { + message: Some("failed to load url".to_string()), + cause: Some(err.to_string()), + error_type: error::AppErrorType::_FetchError, + }) + .unwrap() + .unwrap(); + // info!(log, "Response came in {:?}", y); + } } -fn apply(fun: F, args: A) -> B -where - F: Fn(A) -> B, -{ - fun(args) +fn setup_logging() -> Logger { + //--- set up slog + + // set up terminal logging + let decorator = slog_term::TermDecorator::new().build(); + let term_drain = slog_term::CompactFormat::new(decorator).build().fuse(); + + // json log file + let logfile = std::fs::File::create("/var/tmp/actix-test.log").unwrap(); + let json_drain = slog_json::Json::new(logfile) + .add_default_keys() + // include source code location + .add_key_value(o!("place" => + slog::FnValue(move |info| { + format!("{}::({}:{})", + info.module(), + info.file(), + info.line(), + )}), + "sha"=> env!("VERGEN_GIT_SHA"))) + .build() + .fuse(); + + // duplicate log to both terminal and json file + let dup_drain = slog::Duplicate::new(json_drain, term_drain); + // make it async + let async_drain = slog_async::Async::new(dup_drain.fuse()).build(); + // and add thread local logging + let log = slog::Logger::root(ThreadLocalDrain { drain: async_drain }.fuse(), o!()); + log } -fn _compose(f: F, g: G) -> impl Fn(X) -> Z -where - F: Fn(X) -> Y, - G: Fn(Y) -> Z, -{ - move |x| g(f(x)) -} +// fn main2() { +// //init logging +// env_logger::init(); + +// info!("main - init app config"); +// //create app config +// let mut app_config = config::Config::default(); + +// //load the app_config.toml file +// info!("main - load app config toml file"); +// app_config +// .merge(config::File::with_name("app_config")) +// .unwrap(); + +// let base_url: String = app_config +// .get("BASE_URL") +// .map_err(|err| AppError { +// message: Some("failed to load config files".to_string()), +// cause: Some(err.to_string()), +// error_type: error::AppErrorType::ConfigError, +// }) +// .unwrap(); + +// let output_dir: String = app_config.get("OUTPUT_DIR").unwrap(); + +// info!("main - creating base output dir"); +// fs::create_dir(output_dir).unwrap(); + +// debug!("main - base_url {:?}", &base_url); + +// let base_entities = reqwest::blocking::get(&base_url) +// .unwrap() +// .json::() +// .unwrap(); +// dbg!(base_entities); +// } diff --git a/sw-scraper/src/model.rs b/sw-scraper/src/model.rs index c2a6000..3ee80dd 100644 --- a/sw-scraper/src/model.rs +++ b/sw-scraper/src/model.rs @@ -155,7 +155,7 @@ impl From for People { } } -#[derive(Serialize, Deserialize, Debug, Clone, Default)] +#[derive(Serialize, Deserialize, Debug, Clone, Default,PartialEq)] pub struct Film { #[serde(skip_deserializing)] id: String, diff --git a/sw-scraper/src/scraper.rs b/sw-scraper/src/scraper.rs new file mode 100644 index 0000000..d324b3e --- /dev/null +++ b/sw-scraper/src/scraper.rs @@ -0,0 +1,82 @@ +use actix::prelude::*; +use slog::info; + +pub(crate) struct _AppState { + logger: slog::Logger, +} +/// Define message +#[derive(Message, Debug, Default, PartialEq)] +#[rtype(result = "Result")] +pub(crate) struct FetchPageCommand { + pub base_url: String, + pub entity_type: String, +} + +#[derive(Debug)] +pub(crate) struct UrlFetcher { + pub(crate) logger: slog::Logger, +} + +// Provide Actor implementation for our actor +impl Actor for UrlFetcher { + type Context = SyncContext; + + fn started(&mut self, _ctx: &mut SyncContext) { + info!(self.logger, "UrlFetcher started"); + } + + fn stopped(&mut self, _ctx: &mut SyncContext) { + info!(self.logger, "UrlFetcher stopped"); + } +} + +/// Define handler for `Ping` message +impl Handler for UrlFetcher { + type Result = Result; + + fn handle(&mut self, msg: FetchPageCommand, _ctx: &mut SyncContext) -> Self::Result { + let text = reqwest::blocking::get(msg.base_url).unwrap().text(); + if let Ok(_) = text { + info!(self.logger, "got the text back"); + text + } else { + info!(self.logger, "no text???"); + todo!(); + } + } +} + +#[derive(Message)] +#[rtype(result = "()")] +pub struct SearchResultResponse { + pub next: Option, + pub result: Vec, +} + +// Actor definition +struct SwScraper { + //name: String, + recipient: Recipient, +} + +impl Actor for SwScraper { + type Context = Context; +} + +// simple message handler for Ping message +impl Handler for SwScraper { + type Result = (); + + fn handle(&mut self, msg: SearchResultResponse, _ctx: &mut Context) { + let url = match msg.next { + Some(it) => it, + _ => return, + }; + self.recipient + .do_send(FetchPageCommand { + base_url: url, + entity_type: "todo!()".to_string(), + }) + .unwrap(); + } +} diff --git a/sw-scraper/src/utils.rs b/sw-scraper/src/utils.rs new file mode 100644 index 0000000..7144ded --- /dev/null +++ b/sw-scraper/src/utils.rs @@ -0,0 +1,20 @@ +use std::{fs::File, path::Path}; + +pub(crate) fn _apply(fun: F, args: A) -> B +where + F: Fn(A) -> B, +{ + fun(args) +} + +pub(crate) fn _compose(f: F, g: G) -> impl Fn(X) -> Z +where + F: Fn(X) -> Y, + G: Fn(Y) -> Z, +{ + move |x| g(f(x)) +} + +pub(crate) fn _to_path(file_name: String) -> Result { + File::create(Path::new(&file_name)) +}