From dd76107ec483689c33dcc7f6f5013052394721e1 Mon Sep 17 00:00:00 2001 From: Mike Zupper Date: Fri, 14 Jan 2022 13:13:59 -0500 Subject: [PATCH 1/3] added logging and configs --- app_config.toml | 2 + sw-scraper/Cargo.toml | 8 ++- sw-scraper/src/main.rs | 145 +++++++++++++++++++++++++++-------------- 3 files changed, 104 insertions(+), 51 deletions(-) create mode 100644 app_config.toml diff --git a/app_config.toml b/app_config.toml new file mode 100644 index 0000000..8afe853 --- /dev/null +++ b/app_config.toml @@ -0,0 +1,2 @@ +BASE_URL = "https://api.starwars.run/api" +OUTPUT_DIR = "output" diff --git a/sw-scraper/Cargo.toml b/sw-scraper/Cargo.toml index 737f8d5..83f81bb 100644 --- a/sw-scraper/Cargo.toml +++ b/sw-scraper/Cargo.toml @@ -4,10 +4,12 @@ name = "sw-scraper" version = "0.1.0" [dependencies] -#log = "0.4" +log = "0.4" +env_logger = "0.9" +config = "0.11" + reqwest = {version = "0.11", features = [ "blocking", "json"]} serde = {version = "1", features = ["derive"]} serde_json = "1.0" tokio = {version = "1", features = ["full"]} -url = {version = "2", features = ["serde"]} -#uuid = {version = "0.8", features = ["v4", "serde"]} +url = {version = "2", features = ["serde"]} \ No newline at end of file diff --git a/sw-scraper/src/main.rs b/sw-scraper/src/main.rs index 9c65b6f..0ba5a31 100644 --- a/sw-scraper/src/main.rs +++ b/sw-scraper/src/main.rs @@ -1,8 +1,9 @@ -mod error; -mod model; - +extern crate log; extern crate reqwest; +mod error; +mod model; +use log::info; use model::Collection; use serde::Serialize; @@ -41,7 +42,8 @@ impl Factor for NextUrlToFetch { //check pagination "next", match Some/None if let Some(next_url_to_fetch) = current_url_to_fetch { - println!("next url to fetch!! {:?}", next_url_to_fetch); + info!("factorial - {:?}", next_url_to_fetch); + let sr = reqwest::blocking::get(next_url_to_fetch) .unwrap() .json::() @@ -49,7 +51,6 @@ impl Factor for NextUrlToFetch { let next_page = &sr["next"]; - println!("found results next url: {:?} {:?}", &next_page, sr["count"]); sr["results"] .as_array() .unwrap_or(&Vec::new()) @@ -71,48 +72,42 @@ impl Factor for NextUrlToFetch { } } } -static BASE_URL: &'static str = "https://api.starwars.run/api"; - -fn write_to_file(file_name: &'static str, f: impl Fn() -> Collection) -> Result<(), AppError> +fn write_to_file(file_name: String, f: impl Fn() -> Collection) -> Result<(), AppError> where T: Serialize, { - let mut file = get_file(file_name)?; - let content = apply(to_bytes, f())?; - - file.write_all(content.as_bytes()).map_err(|e| AppError { - message: Some(String::from("failed to write all to file")), + let mut file = apply(to_path, file_name).map_err(|e| AppError { + message: Some(String::from("failed to create file")), cause: Some(e.to_string()), error_type: error::AppErrorType::WriteError, - }) -} + })?; + let content = apply(to_bytes, f()).map_err(|e| AppError { + message: Some(String::from("failed to create content")), + cause: None, + error_type: error::AppErrorType::WriteError, + })?; -fn get_file(file_name: &'static str) -> Result { - apply(to_path, file_name).map_err(|e| AppError { - message: Some(String::from("failed to create file")), + file.write_all(content.as_bytes()).map_err(|e| AppError { + message: Some(String::from("failed to write all to file")), cause: Some(e.to_string()), error_type: error::AppErrorType::WriteError, }) } - -fn fetch_all_pages(entity: EntityType) -> Vec { +fn fetch_all_pages(url: Url) -> Vec { let results = vec![]; let active_url: NextUrlToFetch = Factor::factorial(NextUrlToFetch { - url: Some(to_url(entity)), + url: Some(url), results, }); active_url.results } - -fn to_url(entity_type: EntityType) -> Url { - match entity_type { - _ => format!("{}/{}/", BASE_URL, entity_type), - } +fn format_url(base: String) -> impl Fn(EntityType) -> Url { + move |entity_type| -> Url { format!("{}/{}/", &base, entity_type) } } -fn to_path(file_name: &'static str) -> Result { - File::create(Path::new(file_name)) +fn to_path(file_name: String) -> Result { + File::create(Path::new(&file_name)) } fn to_bytes(all: Collection) -> Result @@ -125,66 +120,120 @@ where error_type: error::AppErrorType::_InvalidData, }) } - #[tokio::main] async fn main() { + //init logging + env_logger::init(); + + info!("main - init app config"); + + //create app config + let mut app_config = config::Config::default(); + + //load the app_config.toml file + info!("main - load app config toml file"); + app_config + .merge(config::File::with_name("app_config")) + .unwrap(); + + let base_url: String = app_config.get("BASE_URL").unwrap(); + let output_dir: String = app_config.get("OUTPUT_DIR").unwrap(); + + let build_entity_url = |entity_type: EntityType| -> Url { + let u = apply(format_url, (&base_url).to_string()); + let url: Url = u(entity_type); + url + }; + //create base output dir - fs::create_dir::<_>("output").unwrap(); + info!("main - creating base output dir"); + + fs::create_dir::<_>(&output_dir).unwrap(); + let mut handles = vec![]; + handles.push(tokio::spawn(async move { + info!("main - load films"); //Film let find_all = || { - fetch_all_pages(EntityType::Film) + fetch_all_pages(build_entity_url(EntityType::Film)) .into_iter() .collect::>() }; - write_to_file("output/Film.json", find_all).unwrap() + info!("main - write films"); + + write_to_file(format!("{}/Film.json", &output_dir), find_all); + info!("main - done films"); })); + handles.push(tokio::spawn(async move { + info!("main - load Planet"); //Planet let find_all = || { - fetch_all_pages(EntityType::Planet) + fetch_all_pages(build_entity_url(EntityType::Planet)) .into_iter() .collect::>() }; - write_to_file("output/Planet.json", find_all).unwrap() + info!("main - write Planet"); + + write_to_file(format!("{}/Planet.json", &output_dir), find_all); + info!("main - done Planet"); })); + handles.push(tokio::spawn(async move { - //People + info!("main - load Species"); + //Species let find_all = || { - fetch_all_pages(EntityType::People) + fetch_all_pages(build_entity_url(EntityType::Species)) .into_iter() - .collect::>() + .collect::>() }; - write_to_file("output/People.json", find_all).unwrap() + info!("main - write Planet"); + + write_to_file(format!("{}/Species.json", &output_dir), find_all); + info!("main - done Species"); })); + handles.push(tokio::spawn(async move { - //Species + info!("main - load Vehicle"); + //Vehicle let find_all = || { - fetch_all_pages(EntityType::Species) + fetch_all_pages(build_entity_url(EntityType::Vehicle)) .into_iter() - .collect::>() + .collect::>() }; - write_to_file("output/Species.json", find_all).unwrap() + info!("main - write Vehicle"); + + write_to_file(format!("{}/Vehicle.json", &output_dir), find_all); + info!("main - done Vehicle"); })); + handles.push(tokio::spawn(async move { + info!("main - load Starship"); //Starship let find_all = || { - fetch_all_pages(EntityType::Starship) + fetch_all_pages(build_entity_url(EntityType::Starship)) .into_iter() .collect::>() }; - write_to_file("output/Starship.json", find_all).unwrap() + info!("main - write Starship"); + + write_to_file(format!("{}/Starship.json", &output_dir), find_all); + info!("main - done Starship"); })); handles.push(tokio::spawn(async move { - //Vehicle + info!("main - load People"); + //People let find_all = || { - fetch_all_pages(EntityType::Vehicle) + fetch_all_pages(build_entity_url(EntityType::People)) .into_iter() - .collect::>() + .collect::>() }; - write_to_file("output/Vehicle.json", find_all).unwrap() + info!("main - write People"); + + write_to_file(format!("{}/People.json", &output_dir), find_all); + info!("main - done People"); })); let joins = join!( From a4bf23ca59218c3127c5a4e626abe74a515b6836 Mon Sep 17 00:00:00 2001 From: Mike Zupper Date: Tue, 18 Jan 2022 23:45:30 -0500 Subject: [PATCH 2/3] added actors and threaded logging --- README.md | 10 +- app_config.toml | 4 +- sw-scraper/Cargo.toml | 33 +++- sw-scraper/build.rs | 7 + sw-scraper/src/error.rs | 1 + sw-scraper/src/logger.rs | 69 +++++++ sw-scraper/src/main.rs | 390 ++++++++++++++------------------------ sw-scraper/src/model.rs | 2 +- sw-scraper/src/scraper.rs | 83 ++++++++ sw-scraper/src/utils.rs | 20 ++ 10 files changed, 358 insertions(+), 261 deletions(-) create mode 100644 sw-scraper/build.rs create mode 100644 sw-scraper/src/logger.rs create mode 100644 sw-scraper/src/scraper.rs create mode 100644 sw-scraper/src/utils.rs diff --git a/README.md b/README.md index 35ab22d..2694572 100644 --- a/README.md +++ b/README.md @@ -9,4 +9,12 @@ For this experiment, we will use the https://api.starwars.run REST API. A simple 1. Currying 2. Tail Recursion 3. Function Compostion -4. and more!!!! \ No newline at end of file +4. and more!!!! + + +without threads: +[2022-01-14T18:22:15Z INFO sw_scraper] START +[2022-01-14T18:22:58Z INFO sw_scraper] FINISH +43 seconds + +with threads diff --git a/app_config.toml b/app_config.toml index 8afe853..9bbc839 100644 --- a/app_config.toml +++ b/app_config.toml @@ -1,2 +1,2 @@ -BASE_URL = "https://api.starwars.run/api" -OUTPUT_DIR = "output" +BASE_URL = "https://api.starwars.run/api/" +OUTPUT_DIR = "./output/" diff --git a/sw-scraper/Cargo.toml b/sw-scraper/Cargo.toml index 83f81bb..342fb86 100644 --- a/sw-scraper/Cargo.toml +++ b/sw-scraper/Cargo.toml @@ -1,15 +1,38 @@ [package] +build = "build.rs" edition = "2018" name = "sw-scraper" version = "0.1.0" +[[bin]] +name = "sw-scraper" +path = "src/main.rs" + +[profile.release] +lto = true +opt-level = 'z' + +[build-dependencies] +anyhow = "1" +vergen = "6" + [dependencies] -log = "0.4" -env_logger = "0.9" config = "0.11" +env_logger = "0.9" +log = "0.4" -reqwest = {version = "0.11", features = [ "blocking", "json"]} +actix = "0.12" +reqwest = {version = "0.11", features = ["blocking", "json"]} serde = {version = "1", features = ["derive"]} -serde_json = "1.0" +serde_json = "1" tokio = {version = "1", features = ["full"]} -url = {version = "2", features = ["serde"]} \ No newline at end of file +url = {version = "2", features = ["serde"]} + +### ACTIX TEST STUFF +slog = "2" +slog-async = "2" +slog-json = "2" +slog-term = "2" + +lazy_static = "1" +thread-id = "4" diff --git a/sw-scraper/build.rs b/sw-scraper/build.rs new file mode 100644 index 0000000..9b3bf8c --- /dev/null +++ b/sw-scraper/build.rs @@ -0,0 +1,7 @@ +use anyhow::Result; +use vergen::{Config, vergen}; + +fn main() -> Result<()> { + // Generate the default 'cargo:' instruction output + vergen(Config::default()) +} \ No newline at end of file diff --git a/sw-scraper/src/error.rs b/sw-scraper/src/error.rs index b2469b1..574f619 100644 --- a/sw-scraper/src/error.rs +++ b/sw-scraper/src/error.rs @@ -3,6 +3,7 @@ pub enum AppErrorType { _FetchError, _NotFound, _InvalidData, + ConfigError, WriteError, } diff --git a/sw-scraper/src/logger.rs b/sw-scraper/src/logger.rs new file mode 100644 index 0000000..f4ec5d5 --- /dev/null +++ b/sw-scraper/src/logger.rs @@ -0,0 +1,69 @@ +use slog; +use thread_id; + +use slog::*; +use std::result; + +thread_local!(static TL_THREAD_ID: usize = thread_id::get() % 65536); + +#[derive(Clone)] +pub struct ThreadLocalDrain +where + D: Drain, +{ + pub drain: D, +} + +impl Drain for ThreadLocalDrain +where + D: Drain, +{ + type Ok = (); + type Err = Never; + + fn log( + &self, + record: &Record<'_>, + values: &OwnedKVList, + ) -> result::Result { + let chained = OwnedKVList::from(OwnedKV(( + SingleKV("thread", TL_THREAD_ID.with(|id| *id)), + values.clone(), + ))); + let _ = self.drain.log(record, &chained); + Ok(()) + } +} + +pub struct FnGuard { + function_name: &'static str, + logger: Logger, +} + +impl FnGuard { + pub fn new(logger: Logger, values: OwnedKV, function_name: &'static str) -> FnGuard + where + T: SendSyncRefUnwindSafeKV + 'static, + { + let new_logger = logger.new(values); + info!(new_logger, "[Enter]"; o!("function_name"=>function_name)); + FnGuard { + function_name, + logger: new_logger, + } + } + + pub fn sub_guard(&self, function_name: &'static str) -> FnGuard { + FnGuard::new(self.logger.clone(), o!(), function_name) + } + + pub fn log(&self, record: &Record<'_>) { + self.logger.log(record) + } +} + +impl Drop for FnGuard { + fn drop(&mut self) { + info!(self.logger, "[Exit]"; o!("function_name"=>self.function_name)) + } +} \ No newline at end of file diff --git a/sw-scraper/src/main.rs b/sw-scraper/src/main.rs index 0ba5a31..aef0105 100644 --- a/sw-scraper/src/main.rs +++ b/sw-scraper/src/main.rs @@ -1,268 +1,154 @@ -extern crate log; +extern crate actix; extern crate reqwest; mod error; +mod logger; mod model; -use log::info; -use model::Collection; -use serde::Serialize; - -use serde_json::Value; -use std::{ - fs::{self, File}, - io::Write, - path::Path, -}; -use tokio::join; - -use crate::{ - error::AppError, - model::{EntityType, Film, People, Planet, Species, Starship, Url, Vehicle}, -}; - -#[derive(Debug)] -struct NextUrlToFetch { - url: Option, - results: Vec, -} - -trait Factor { - fn factorial_tail_rec(url: NextUrlToFetch) -> Self; - fn factorial(url: NextUrlToFetch) -> Self; -} - -impl Factor for NextUrlToFetch { - fn factorial_tail_rec(url: NextUrlToFetch) -> Self { - url - } - - fn factorial(mut input: NextUrlToFetch) -> Self { - //fetch the next results - let current_url_to_fetch = &input.url; - - //check pagination "next", match Some/None - if let Some(next_url_to_fetch) = current_url_to_fetch { - info!("factorial - {:?}", next_url_to_fetch); - - let sr = reqwest::blocking::get(next_url_to_fetch) - .unwrap() - .json::() - .unwrap(); - - let next_page = &sr["next"]; - - sr["results"] - .as_array() - .unwrap_or(&Vec::new()) - .iter() - .for_each(|f| input.results.push(f.to_owned())); - - match next_page { - Value::String(next_page) => { - let u = Self::factorial(NextUrlToFetch { - url: Some(next_page.to_string()), - ..input - }); - u - } - _ => NextUrlToFetch { url: None, ..input }, - } - } else { - NextUrlToFetch { url: None, ..input } - } - } -} -fn write_to_file(file_name: String, f: impl Fn() -> Collection) -> Result<(), AppError> -where - T: Serialize, -{ - let mut file = apply(to_path, file_name).map_err(|e| AppError { - message: Some(String::from("failed to create file")), - cause: Some(e.to_string()), - error_type: error::AppErrorType::WriteError, - })?; - let content = apply(to_bytes, f()).map_err(|e| AppError { - message: Some(String::from("failed to create content")), - cause: None, - error_type: error::AppErrorType::WriteError, - })?; - - file.write_all(content.as_bytes()).map_err(|e| AppError { - message: Some(String::from("failed to write all to file")), - cause: Some(e.to_string()), - error_type: error::AppErrorType::WriteError, - }) -} -fn fetch_all_pages(url: Url) -> Vec { - let results = vec![]; - let active_url: NextUrlToFetch = Factor::factorial(NextUrlToFetch { - url: Some(url), - results, - }); - - active_url.results -} -fn format_url(base: String) -> impl Fn(EntityType) -> Url { - move |entity_type| -> Url { format!("{}/{}/", &base, entity_type) } -} - -fn to_path(file_name: String) -> Result { - File::create(Path::new(&file_name)) -} - -fn to_bytes(all: Collection) -> Result -where - T: Serialize, -{ - serde_json::to_string(&all).map_err(|e| AppError { - message: Some(String::from("failed to serialize data to json")), - cause: Some(e.to_string()), - error_type: error::AppErrorType::_InvalidData, - }) -} -#[tokio::main] +mod scraper; +mod utils; + +use crate::error::AppError; +use crate::logger::ThreadLocalDrain; +use crate::model::SearchResult; +use crate::scraper::{FetchPageCommand, UrlFetcher}; + +use actix::prelude::*; + +use serde_json::from_str; +use slog::Drain; +use slog::{debug, info, o}; +use slog_async; +use slog_term; +use std::collections::HashMap; +use std::fs::File; +use std::path::Path; +use std::sync::Arc; + +#[actix::main] async fn main() { - //init logging - env_logger::init(); - - info!("main - init app config"); - + //--- set up slog + + // set up terminal logging + let decorator = slog_term::TermDecorator::new().build(); + let term_drain = slog_term::CompactFormat::new(decorator).build().fuse(); + + // json log file + let logfile = std::fs::File::create("/var/tmp/actix-test.log").unwrap(); + let json_drain = slog_json::Json::new(logfile) + .add_default_keys() + // include source code location + .add_key_value(o!("place" => + slog::FnValue(move |info| { + format!("{}::({}:{})", + info.module(), + info.file(), + info.line(), + )}), + "sha"=> env!("VERGEN_GIT_SHA"))) + .build() + .fuse(); + + // duplicate log to both terminal and json file + let dup_drain = slog::Duplicate::new(json_drain, term_drain); + // make it async + let async_drain = slog_async::Async::new(dup_drain.fuse()).build(); + // and add thread local logging + let log = slog::Logger::root(ThreadLocalDrain { drain: async_drain }.fuse(), o!()); + let _scraper_logger = log.new(o!("thread_name"=>"scraper")); + let _writer_logger = log.new(o!("thread_name"=>"writer")); + + //--- end of slog setup + info!(log, "Started main app"); //create app config let mut app_config = config::Config::default(); //load the app_config.toml file - info!("main - load app config toml file"); + info!(log, "loading app_config.toml"); + app_config .merge(config::File::with_name("app_config")) .unwrap(); - let base_url: String = app_config.get("BASE_URL").unwrap(); - let output_dir: String = app_config.get("OUTPUT_DIR").unwrap(); - - let build_entity_url = |entity_type: EntityType| -> Url { - let u = apply(format_url, (&base_url).to_string()); - let url: Url = u(entity_type); - url - }; - - //create base output dir - info!("main - creating base output dir"); - - fs::create_dir::<_>(&output_dir).unwrap(); - - let mut handles = vec![]; - - handles.push(tokio::spawn(async move { - info!("main - load films"); - //Film - let find_all = || { - fetch_all_pages(build_entity_url(EntityType::Film)) - .into_iter() - .collect::>() - }; - info!("main - write films"); - - write_to_file(format!("{}/Film.json", &output_dir), find_all); - info!("main - done films"); - })); - - handles.push(tokio::spawn(async move { - info!("main - load Planet"); - //Planet - let find_all = || { - fetch_all_pages(build_entity_url(EntityType::Planet)) - .into_iter() - .collect::>() - }; - info!("main - write Planet"); - - write_to_file(format!("{}/Planet.json", &output_dir), find_all); - info!("main - done Planet"); - })); - - handles.push(tokio::spawn(async move { - info!("main - load Species"); - //Species - let find_all = || { - fetch_all_pages(build_entity_url(EntityType::Species)) - .into_iter() - .collect::>() - }; - info!("main - write Planet"); - - write_to_file(format!("{}/Species.json", &output_dir), find_all); - info!("main - done Species"); - })); + debug!(log, " reading BASE_URL"); - handles.push(tokio::spawn(async move { - info!("main - load Vehicle"); - //Vehicle - let find_all = || { - fetch_all_pages(build_entity_url(EntityType::Vehicle)) - .into_iter() - .collect::>() - }; - info!("main - write Vehicle"); - - write_to_file(format!("{}/Vehicle.json", &output_dir), find_all); - info!("main - done Vehicle"); - })); - - handles.push(tokio::spawn(async move { - info!("main - load Starship"); - //Starship - let find_all = || { - fetch_all_pages(build_entity_url(EntityType::Starship)) - .into_iter() - .collect::>() - }; - info!("main - write Starship"); - - write_to_file(format!("{}/Starship.json", &output_dir), find_all); - info!("main - done Starship"); - })); - - handles.push(tokio::spawn(async move { - info!("main - load People"); - //People - let find_all = || { - fetch_all_pages(build_entity_url(EntityType::People)) - .into_iter() - .collect::>() - }; - info!("main - write People"); - - write_to_file(format!("{}/People.json", &output_dir), find_all); - info!("main - done People"); - })); - - let joins = join!( - handles.pop().unwrap(), - handles.pop().unwrap(), - handles.pop().unwrap(), - handles.pop().unwrap(), - handles.pop().unwrap(), - handles.pop().unwrap() - ); - joins.0.unwrap(); - joins.1.unwrap(); - joins.2.unwrap(); - joins.3.unwrap(); - joins.4.unwrap(); - joins.5.unwrap(); -} - -fn apply(fun: F, args: A) -> B -where - F: Fn(A) -> B, -{ - fun(args) + let base_url: String = app_config + .get("BASE_URL") + .map_err(|err| AppError { + message: Some("failed to load config files".to_string()), + cause: Some(err.to_string()), + error_type: error::AppErrorType::ConfigError, + }) + .unwrap(); + debug!(log, " reading OUTPUT_DIR"); + + let _output_dir: String = app_config.get("OUTPUT_DIR").unwrap(); + + //fs::create_dir(output_dir).unwrap(); + info!(log, "fetching base entities"); + + let l = log.new(o!("thread_name"=>"url_fetcher")); + let fetch_url_addr = SyncArbiter::start(3, move || UrlFetcher { logger:l.clone() }); + let resp = fetch_url_addr + .send(FetchPageCommand { + entity_type: String::from("root"), + base_url, + }) + .await + .unwrap() + .unwrap(); + let root_entities: HashMap = from_str(&resp).unwrap(); + + for n in root_entities { + let y = fetch_url_addr + .send(FetchPageCommand { + entity_type: n.0.to_string(), + base_url: n.1.to_string(), + }) + .await + .map_err(|err| AppError { + message: Some("failed to load url".to_string()), + cause: Some(err.to_string()), + error_type: error::AppErrorType::_FetchError, + }) + .unwrap() + .unwrap(); + // info!(log, "Response came in {:?}", y); + } } -fn _compose(f: F, g: G) -> impl Fn(X) -> Z -where - F: Fn(X) -> Y, - G: Fn(Y) -> Z, -{ - move |x| g(f(x)) -} +// fn main2() { +// //init logging +// env_logger::init(); + +// info!("main - init app config"); +// //create app config +// let mut app_config = config::Config::default(); + +// //load the app_config.toml file +// info!("main - load app config toml file"); +// app_config +// .merge(config::File::with_name("app_config")) +// .unwrap(); + +// let base_url: String = app_config +// .get("BASE_URL") +// .map_err(|err| AppError { +// message: Some("failed to load config files".to_string()), +// cause: Some(err.to_string()), +// error_type: error::AppErrorType::ConfigError, +// }) +// .unwrap(); + +// let output_dir: String = app_config.get("OUTPUT_DIR").unwrap(); + +// info!("main - creating base output dir"); +// fs::create_dir(output_dir).unwrap(); + +// debug!("main - base_url {:?}", &base_url); + +// let base_entities = reqwest::blocking::get(&base_url) +// .unwrap() +// .json::() +// .unwrap(); +// dbg!(base_entities); +// } diff --git a/sw-scraper/src/model.rs b/sw-scraper/src/model.rs index c2a6000..3ee80dd 100644 --- a/sw-scraper/src/model.rs +++ b/sw-scraper/src/model.rs @@ -155,7 +155,7 @@ impl From for People { } } -#[derive(Serialize, Deserialize, Debug, Clone, Default)] +#[derive(Serialize, Deserialize, Debug, Clone, Default,PartialEq)] pub struct Film { #[serde(skip_deserializing)] id: String, diff --git a/sw-scraper/src/scraper.rs b/sw-scraper/src/scraper.rs new file mode 100644 index 0000000..9626564 --- /dev/null +++ b/sw-scraper/src/scraper.rs @@ -0,0 +1,83 @@ +use actix::prelude::*; +use slog::info; + +use crate::error::AppError; + +pub(crate) struct _AppState { + logger: slog::Logger, +} +/// Define message +#[derive(Message, Debug, Default, PartialEq)] +#[rtype(result = "Result")] +pub(crate) struct FetchPageCommand { + pub base_url: String, + pub entity_type: String, +} + +#[derive(Debug)] +pub(crate) struct UrlFetcher { + pub(crate) logger: slog::Logger, +} + +// Provide Actor implementation for our actor +impl Actor for UrlFetcher { + type Context = SyncContext; + + fn started(&mut self, ctx: &mut SyncContext) { + info!(self.logger, "UrlFetcher started"); + } + + fn stopped(&mut self, ctx: &mut SyncContext) { + info!(self.logger,"UrlFetcher stopped"); + } +} + +/// Define handler for `Ping` message +impl Handler for UrlFetcher { + type Result = Result; + + fn handle(&mut self, msg: FetchPageCommand, ctx: &mut SyncContext) -> Self::Result { + let text =reqwest::blocking::get(msg.base_url).unwrap().text(); + if let Ok(_) = text { + info!(self.logger,"got the text back"); + text + } else { + info!(self.logger,"no text???"); + todo!(); + } + + } +} + +#[derive(Message)] +#[rtype(result = "()")] +pub struct SearchResultResponse { + pub next: Option, + pub result: Vec, +} + +// Actor definition +struct SwScraper { + name: String, + recipient: Recipient, +} + +impl Actor for SwScraper { + type Context = Context; +} + +// simple message handler for Ping message +impl Handler for SwScraper { + type Result = (); + + fn handle(&mut self, msg: SearchResultResponse, ctx: &mut Context) { + let url = match msg.next { + Some(it) => it, + _ => return, + }; + self.recipient.do_send(FetchPageCommand { + base_url: url, + entity_type: "todo!()".to_string(), + }); + } +} diff --git a/sw-scraper/src/utils.rs b/sw-scraper/src/utils.rs new file mode 100644 index 0000000..7144ded --- /dev/null +++ b/sw-scraper/src/utils.rs @@ -0,0 +1,20 @@ +use std::{fs::File, path::Path}; + +pub(crate) fn _apply(fun: F, args: A) -> B +where + F: Fn(A) -> B, +{ + fun(args) +} + +pub(crate) fn _compose(f: F, g: G) -> impl Fn(X) -> Z +where + F: Fn(X) -> Y, + G: Fn(Y) -> Z, +{ + move |x| g(f(x)) +} + +pub(crate) fn _to_path(file_name: String) -> Result { + File::create(Path::new(&file_name)) +} From fcb454687bf79fa4e71dfbbcc021e502e40462a4 Mon Sep 17 00:00:00 2001 From: Mike Zupper Date: Wed, 19 Jan 2022 18:39:52 -0500 Subject: [PATCH 3/3] removed rust warnings during compile --- app_config.toml | 1 + sw-scraper/src/error.rs | 2 +- sw-scraper/src/logger.rs | 34 +++++++-------- sw-scraper/src/main.rs | 92 +++++++++++++++++++-------------------- sw-scraper/src/scraper.rs | 31 +++++++------ 5 files changed, 79 insertions(+), 81 deletions(-) diff --git a/app_config.toml b/app_config.toml index 9bbc839..a10105f 100644 --- a/app_config.toml +++ b/app_config.toml @@ -1,2 +1,3 @@ BASE_URL = "https://api.starwars.run/api/" OUTPUT_DIR = "./output/" +NUM_THREADS = "3" \ No newline at end of file diff --git a/sw-scraper/src/error.rs b/sw-scraper/src/error.rs index 574f619..54173c7 100644 --- a/sw-scraper/src/error.rs +++ b/sw-scraper/src/error.rs @@ -4,7 +4,7 @@ pub enum AppErrorType { _NotFound, _InvalidData, ConfigError, - WriteError, + _WriteError, } #[derive(Debug)] diff --git a/sw-scraper/src/logger.rs b/sw-scraper/src/logger.rs index f4ec5d5..3ad7d0c 100644 --- a/sw-scraper/src/logger.rs +++ b/sw-scraper/src/logger.rs @@ -41,25 +41,25 @@ pub struct FnGuard { } impl FnGuard { - pub fn new(logger: Logger, values: OwnedKV, function_name: &'static str) -> FnGuard - where - T: SendSyncRefUnwindSafeKV + 'static, - { - let new_logger = logger.new(values); - info!(new_logger, "[Enter]"; o!("function_name"=>function_name)); - FnGuard { - function_name, - logger: new_logger, - } - } + // pub fn new(logger: Logger, values: OwnedKV, function_name: &'static str) -> FnGuard + // where + // T: SendSyncRefUnwindSafeKV + 'static, + // { + // let new_logger = logger.new(values); + // info!(new_logger, "[Enter]"; o!("function_name"=>function_name)); + // FnGuard { + // function_name, + // logger: new_logger, + // } + // } - pub fn sub_guard(&self, function_name: &'static str) -> FnGuard { - FnGuard::new(self.logger.clone(), o!(), function_name) - } + // pub fn sub_guard(&self, function_name: &'static str) -> FnGuard { + // FnGuard::new(self.logger.clone(), o!(), function_name) + // } - pub fn log(&self, record: &Record<'_>) { - self.logger.log(record) - } + // pub fn log(&self, record: &Record<'_>) { + // self.logger.log(record) + // } } impl Drop for FnGuard { diff --git a/sw-scraper/src/main.rs b/sw-scraper/src/main.rs index aef0105..fa0c5a6 100644 --- a/sw-scraper/src/main.rs +++ b/sw-scraper/src/main.rs @@ -9,68 +9,30 @@ mod utils; use crate::error::AppError; use crate::logger::ThreadLocalDrain; -use crate::model::SearchResult; use crate::scraper::{FetchPageCommand, UrlFetcher}; use actix::prelude::*; use serde_json::from_str; -use slog::Drain; use slog::{debug, info, o}; +use slog::{Drain, Logger}; use slog_async; use slog_term; use std::collections::HashMap; -use std::fs::File; -use std::path::Path; -use std::sync::Arc; #[actix::main] async fn main() { - //--- set up slog - - // set up terminal logging - let decorator = slog_term::TermDecorator::new().build(); - let term_drain = slog_term::CompactFormat::new(decorator).build().fuse(); + let log = setup_logging(); + // let _scraper_logger = log.new(o!("thread_name"=>"scraper")); + // let _writer_logger = log.new(o!("thread_name"=>"writer")); - // json log file - let logfile = std::fs::File::create("/var/tmp/actix-test.log").unwrap(); - let json_drain = slog_json::Json::new(logfile) - .add_default_keys() - // include source code location - .add_key_value(o!("place" => - slog::FnValue(move |info| { - format!("{}::({}:{})", - info.module(), - info.file(), - info.line(), - )}), - "sha"=> env!("VERGEN_GIT_SHA"))) - .build() - .fuse(); - - // duplicate log to both terminal and json file - let dup_drain = slog::Duplicate::new(json_drain, term_drain); - // make it async - let async_drain = slog_async::Async::new(dup_drain.fuse()).build(); - // and add thread local logging - let log = slog::Logger::root(ThreadLocalDrain { drain: async_drain }.fuse(), o!()); - let _scraper_logger = log.new(o!("thread_name"=>"scraper")); - let _writer_logger = log.new(o!("thread_name"=>"writer")); - - //--- end of slog setup - info!(log, "Started main app"); //create app config let mut app_config = config::Config::default(); //load the app_config.toml file - info!(log, "loading app_config.toml"); - app_config .merge(config::File::with_name("app_config")) .unwrap(); - - debug!(log, " reading BASE_URL"); - let base_url: String = app_config .get("BASE_URL") .map_err(|err| AppError { @@ -79,15 +41,19 @@ async fn main() { error_type: error::AppErrorType::ConfigError, }) .unwrap(); - debug!(log, " reading OUTPUT_DIR"); + debug!(log, " BASE_URL {}", &base_url); - let _output_dir: String = app_config.get("OUTPUT_DIR").unwrap(); + let output_dir: String = app_config.get("OUTPUT_DIR").unwrap(); + debug!(log, " OUTPUT_DIR {}", &output_dir); + + let num_threads: usize = app_config.get("NUM_THREADS").unwrap(); + debug!(log, " NUM_THREADS {}", &num_threads); //fs::create_dir(output_dir).unwrap(); info!(log, "fetching base entities"); let l = log.new(o!("thread_name"=>"url_fetcher")); - let fetch_url_addr = SyncArbiter::start(3, move || UrlFetcher { logger:l.clone() }); + let fetch_url_addr = SyncArbiter::start(num_threads, move || UrlFetcher { logger: l.clone() }); let resp = fetch_url_addr .send(FetchPageCommand { entity_type: String::from("root"), @@ -99,7 +65,7 @@ async fn main() { let root_entities: HashMap = from_str(&resp).unwrap(); for n in root_entities { - let y = fetch_url_addr + let _ = fetch_url_addr .send(FetchPageCommand { entity_type: n.0.to_string(), base_url: n.1.to_string(), @@ -112,10 +78,42 @@ async fn main() { }) .unwrap() .unwrap(); - // info!(log, "Response came in {:?}", y); + // info!(log, "Response came in {:?}", y); } } +fn setup_logging() -> Logger { + //--- set up slog + + // set up terminal logging + let decorator = slog_term::TermDecorator::new().build(); + let term_drain = slog_term::CompactFormat::new(decorator).build().fuse(); + + // json log file + let logfile = std::fs::File::create("/var/tmp/actix-test.log").unwrap(); + let json_drain = slog_json::Json::new(logfile) + .add_default_keys() + // include source code location + .add_key_value(o!("place" => + slog::FnValue(move |info| { + format!("{}::({}:{})", + info.module(), + info.file(), + info.line(), + )}), + "sha"=> env!("VERGEN_GIT_SHA"))) + .build() + .fuse(); + + // duplicate log to both terminal and json file + let dup_drain = slog::Duplicate::new(json_drain, term_drain); + // make it async + let async_drain = slog_async::Async::new(dup_drain.fuse()).build(); + // and add thread local logging + let log = slog::Logger::root(ThreadLocalDrain { drain: async_drain }.fuse(), o!()); + log +} + // fn main2() { // //init logging // env_logger::init(); diff --git a/sw-scraper/src/scraper.rs b/sw-scraper/src/scraper.rs index 9626564..d324b3e 100644 --- a/sw-scraper/src/scraper.rs +++ b/sw-scraper/src/scraper.rs @@ -1,8 +1,6 @@ use actix::prelude::*; use slog::info; -use crate::error::AppError; - pub(crate) struct _AppState { logger: slog::Logger, } @@ -23,12 +21,12 @@ pub(crate) struct UrlFetcher { impl Actor for UrlFetcher { type Context = SyncContext; - fn started(&mut self, ctx: &mut SyncContext) { + fn started(&mut self, _ctx: &mut SyncContext) { info!(self.logger, "UrlFetcher started"); } - fn stopped(&mut self, ctx: &mut SyncContext) { - info!(self.logger,"UrlFetcher stopped"); + fn stopped(&mut self, _ctx: &mut SyncContext) { + info!(self.logger, "UrlFetcher stopped"); } } @@ -36,16 +34,15 @@ impl Actor for UrlFetcher { impl Handler for UrlFetcher { type Result = Result; - fn handle(&mut self, msg: FetchPageCommand, ctx: &mut SyncContext) -> Self::Result { - let text =reqwest::blocking::get(msg.base_url).unwrap().text(); + fn handle(&mut self, msg: FetchPageCommand, _ctx: &mut SyncContext) -> Self::Result { + let text = reqwest::blocking::get(msg.base_url).unwrap().text(); if let Ok(_) = text { - info!(self.logger,"got the text back"); + info!(self.logger, "got the text back"); text } else { - info!(self.logger,"no text???"); + info!(self.logger, "no text???"); todo!(); } - } } @@ -58,7 +55,7 @@ pub struct SearchResultResponse { // Actor definition struct SwScraper { - name: String, + //name: String, recipient: Recipient, } @@ -70,14 +67,16 @@ impl Actor for SwScraper { impl Handler for SwScraper { type Result = (); - fn handle(&mut self, msg: SearchResultResponse, ctx: &mut Context) { + fn handle(&mut self, msg: SearchResultResponse, _ctx: &mut Context) { let url = match msg.next { Some(it) => it, _ => return, }; - self.recipient.do_send(FetchPageCommand { - base_url: url, - entity_type: "todo!()".to_string(), - }); + self.recipient + .do_send(FetchPageCommand { + base_url: url, + entity_type: "todo!()".to_string(), + }) + .unwrap(); } }