-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
136 lines (106 loc) · 3.97 KB
/
utils.py
File metadata and controls
136 lines (106 loc) · 3.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import pathlib
import resources
import bs4_impl as dom_parser
def _get_user(doc):
return doc['entry_data']['ProfilePage'][0]['graphql']['user']
def parse_main_page(html_source):
profile_info = dom_parser.get_profile_json(html_source)
try:
user = _get_user(profile_info)
except Exception as ex:
print(ex)
return {}
data = {
'is_private': user['is_private'],
'media': user['edge_owner_to_timeline_media'],
'profile_pic': user['profile_pic_url_hd'],
'id': user['id'],
'fb_page': user['connected_fb_page'],
}
return data
def parse_pic_page(html_source):
pics_info = dom_parser.get_profile_json(html_source)
data = {
'media': pics_info['entry_data']['PostPage'][0]['graphql']['shortcode_media']['display_url'],
'id': pics_info['entry_data']['PostPage'][0]['graphql']['shortcode_media']['id'],
'user': pics_info['entry_data']['PostPage'][0]['graphql']['shortcode_media']['owner']['username'],
}
if pics_info['entry_data']['PostPage'][0]['graphql']['shortcode_media'].get('edge_sidecar_to_children'):
data.update(
children=[{
'media': edge['node']['display_url'],
'id': edge['node']['id'],
} for edge in pics_info['entry_data']['PostPage'][0][
'graphql']['shortcode_media']['edge_sidecar_to_children']['edges']])
return data
def parse_pic_single(html_source):
pic_url = dom_parser.get_pic_url(html_source)
return pic_url
async def get_single_page(session, url):
response = await session.get(url=url)
content = await response.text()
return content
def parse_single_page(html_source):
profile_info = dom_parser.get_profile_json(html_source)
user = _get_user(profile_info)
return user['edge_owner_to_timeline_media']
def _get_single_entries_list(nodes):
result = []
for node in nodes:
n = node['node']
if not n['is_video']:
result.append({
# 'code': n['code'],
'thumbnail': n['thumbnail_src'],
'pic': n['display_url'],
# 'date': n['date'],
'id': n['id'],
'caption': n.get('caption', '')
})
return result
async def get_images(session, nodes, username, idx):
entries = _get_single_entries_list(nodes)
n = 0
for entry in entries:
result = await dl_image(session, entry['pic'], entry['id'], username, idx)
if not result['success']:
return None
n += 1
return n
async def dl_image(session, pic_url, pic_id, username, idx):
response = await session.get(url=pic_url)
if not response.status == 200:
return {'success': False}
content = await response.read()
path_template = '%s%s'
dirpath = resources.default_download_dir % (username, idx)
pathlib.Path(dirpath).mkdir(parents=True, exist_ok=True)
filepath = path_template % (
dirpath,
resources.filename_template.format(
username=username, id=pic_id))
with open(filepath, 'wb') as temp_file:
temp_file.write(content)
return {'success': True, 'file': filepath}
def _insert_exif_comment(jpg_file, date, caption):
import piexif
exif_dict = piexif.load(jpg_file)
exif_dict.pop("thumbnail")
if not exif_dict['Exif'] and caption:
exif_dict['Exif'] = {
piexif.ExifIFD.DateTimeOriginal: date.strftime("%Y:%m:%d %H:%M:%S"),
piexif.ExifIFD.UserComment: caption.encode('utf-8'),
piexif.ExifIFD.LensMake: u"LensMake",
piexif.ExifIFD.Sharpness: 65535,
piexif.ExifIFD.LensSpecification: ((1, 1), (1, 1), (1, 1), (1, 1)),
}
try:
exif_bytes = piexif.dump(exif_dict)
piexif.insert(exif_bytes, jpg_file)
from PIL import Image
i = Image.open(jpg_file)
i.save(jpg_file, exif=exif_bytes)
except ValueError:
return False
else:
return True