Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ log

# kafka data
kafka-data
aws_credentials
aws_credentials
brickstudy_ingestion/dags/viral/tmp
brickstudy_ingestion/src/scrapper/results
4 changes: 3 additions & 1 deletion brickstudy_ingestion/dags/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ def set_env_variables():
"TWITTER_CLIENT_ID",
"TWITTER_CLIENT_PASSWORD",
"TWITTER_TOKEN"
"TWITTER_CRAWLER_AUTH_TOKEN_PASSWORD"
# Instagram
"INSTAGRAM_CLIENT_ID",
"INSTAGRAM_CLIENT_PASSWORD"
]
for ENV_VARIABLE in ALL_ENV_VARIABLES:
os.environ[ENV_VARIABLE] = Variable.get(ENV_VARIABLE, "")
89 changes: 89 additions & 0 deletions brickstudy_ingestion/dags/viral/instagram_crawler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from datetime import timedelta

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonVirtualenvOperator, PythonOperator
from airflow.models import Variable

from src.scrapper.brand_name_getter import get_brand_list_fr_s3

# =========================================
# Change parameter
DAG_ID = "bronze_viral_instagram"
TARGET_PLATFORM = 'instagram'

# Set aiflow setting
default_args = {
'owner': 'brickstudy',
'start_date': days_ago(0),
'retries': 1,
'retry_delay': timedelta(minutes=1),
# 'on_failure_callback': on_failure_callback,
}
# =========================================


def get_brand_list():
import os
for ENV_VARIABLE in ['AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY']:
os.environ[ENV_VARIABLE] = Variable.get(ENV_VARIABLE, "")
return get_brand_list_fr_s3()


def instagram_crawling(brand_lst, id, pwd):
import os
import logging
from src.common.kafka.utils import Kafka
from src.scrapper.inscrawler import InsCrawler
from src.scrapper.ins_url import InsURLCrawler
from src.scrapper.ins_data import InsDataCrawler

os.environ['INSTAGRAM_CLIENT_ID'] = id
os.environ['INSTAGRAM_CLIENT_PASSWORD'] = pwd

def crawl_instagram(keywords: tuple):
crawler = InsURLCrawler(InsCrawler(keywords=keywords)).get_urls()
post_crawler = InsDataCrawler(crawler.data)
post_crawler.get_post_data()
producer.send_data_to_kafka(
kafka_topic='instagram',
data=post_crawler.data
)

try:
producer = Kafka()
crawl_instagram(brand_lst)
except Exception as e:
logging.error("***entrypoint error***", e)
raise


with DAG(
dag_id=DAG_ID,
default_args=default_args,
schedule_interval='@daily',
catchup=False
):
t1 = PythonOperator(
task_id='get_brand_list_from_s3',
python_callable=get_brand_list
)

t2 = PythonVirtualenvOperator(
task_id='crawl_instagram_based_on_keyword',
system_site_packages=False,
op_kwargs={
'brand_lst': "{{ ti.xcom_pull(task_ids='get_brand_list_from_s3') }}",
'id': Variable.get('INSTAGRAM_CLIENT_ID'),
'pwd': Variable.get('INSTAGRAM_CLIENT_PASSWORD')
},
python_version='3.10',
system_site_packages=False,
requirements=['selenium==4.24.0', 'webdriver-manager==4.0.2',
'bs4==0.0.2', 'beautifulsoup4==4.12.3',
'lxml==5.3.0', 'pytz==2024.1',
"python-dotenv==0.19.0", "multiprocess", "kafka-python"],
python_callable=instagram_crawling
)

t1 >> t2
9 changes: 6 additions & 3 deletions brickstudy_ingestion/dags/viral/twitter_crawler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
import os

from airflow import DAG
from airflow.models import Variable
Expand All @@ -20,11 +21,11 @@
}
# =========================================


OUTPUT_FILENAME = "test.csv"
SEARCH_KEYWORD = "enhypen"
LIMIT = 10
TOKEN = Variable.get("TWITTER_CRAWLER_AUTH_TOKEN_PASSWORD")
HOST_BASE_PATH = '/Users/seoyeongkim/Documents/ETL'

with DAG(
dag_id=DAG_ID,
Expand All @@ -36,15 +37,17 @@
task_id='t_docker',
image='brickstudy/twitter_crawler:latest',
container_name='twitter_crawler',
api_version='1.37',
auto_remove=True,
mount_tmp_dir=False,
mounts=[
Mount(source="/opt/airflow/logs/tweets-data", target="/app/tweets-data", type="bind"),
Mount(source=f"{HOST_BASE_PATH}/logs", target="/app/tweets-data", type="bind"),
],
command=[
"bash", "-c",
f"npx --yes tweet-harvest@latest -o {OUTPUT_FILENAME} -s {SEARCH_KEYWORD} -l {LIMIT} --token {TOKEN}"
],
docker_url='unix://var/run/docker.sock',
docker_url='tcp://docker-socket-proxy:2375',
network_mode='bridge',
)

Expand Down
34 changes: 0 additions & 34 deletions brickstudy_ingestion/src/scrapper/__init__.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,4 @@
import urllib
from urllib.request import urlopen
from urllib.error import HTTPError, URLError
from bs4 import BeautifulSoup
import random
import time

from src.common.exception import ExtractError


def get_soup(url: str = None):
user_agent_lst = ['Googlebot', 'Yeti', 'Daumoa', 'Twitterbot']
user_agent = user_agent_lst[random.randint(0, len(user_agent_lst) - 1)]
headers = {'User-Agent': user_agent}

try:
req = urllib.request.Request(url, headers=headers)
page = urlopen(req)
html = page.read().decode("utf-8")
soup = BeautifulSoup(html, "html.parser")
except (HTTPError, URLError) as e:
err = ExtractError(
code=000,
message=f"**{url}** HTTPError/URLError. Sleep 5 and continue.",
log=e
)
time.sleep(5) # TODO 이 경우 해당 url에 대해 재실행 필요
except (ValueError) as e:
err = ExtractError(
code=000,
message=f"**{url}** ValueError. Ignore this url parameter.",
log=e
)
print(err)
soup = None # TODO 해당 url 무시
else:
time.sleep(random.random())
return soup
61 changes: 61 additions & 0 deletions brickstudy_ingestion/src/scrapper/brand_name_getter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import json

from src.common.aws.s3_uploader import S3Uploader
from src.scrapper.models import OliveyoungBrand


def get_latest_dt():
return '2024-08-20'


def category_checker(category: list) -> bool:
"""
standard 기준 카테고리에 하나라도 속해있으면 True 반환, 아니라면 False 반환
"""
compare = set([c.split('_')[0] for c in category])
standard = {'메이크업', '스킨케어', '향수', '헤어케어', '바디케어', '마스크팩',
'클렌징', '선케어', '더모코스메틱', '맨즈케어'}
if len(compare & standard) > 0:
return True
return False


def filter_brand(file_content: str) -> list:
filtered = []
for line in file_content.split('\n'):
if line == '':
break
for brandname, brandinfo in json.loads(line).items():
brandinfo_dic = OliveyoungBrand(**brandinfo)
if category_checker(brandinfo_dic.category):
filtered.append(brandname)
return filtered


def get_brand_list_fr_s3():
s3_client = S3Uploader().s3_client
bucket = 'brickstudy'

def file_keys_getter():
paginator = s3_client.get_paginator('list_objects_v2')
prefix = f"bronze/viral/oliveyoung/{get_latest_dt()}"
file_key_lst = []
for page in paginator.paginate(
Bucket=bucket,
Prefix=prefix
):
if 'Contents' in page:
for obj in page['Contents']:
file_key_lst.append(obj['Key'])
return file_key_lst

file_key_lst = file_keys_getter()
filtered_brand_lst = []
for filekey in file_key_lst:
response = s3_client.get_object(
Bucket=bucket,
Key=filekey
)
file_content = response['Body'].read().decode('utf-8')
filtered_brand_lst += filter_brand(file_content)
return filtered_brand_lst
103 changes: 0 additions & 103 deletions brickstudy_ingestion/src/scrapper/browser.py

This file was deleted.

Loading