-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscraper.py
More file actions
416 lines (328 loc) · 14.3 KB
/
scraper.py
File metadata and controls
416 lines (328 loc) · 14.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
"""
Web Scraper Module for Restaurant Data
Scrapes restaurant information, menus, and ratings from various sources
"""
import requests
from bs4 import BeautifulSoup
import time
import json
import re
from typing import List, Dict, Optional
from dataclasses import dataclass, asdict
from abc import ABC, abstractmethod
import logging
from config import (
USER_AGENT,
REQUEST_DELAY,
MAX_RESTAURANTS_TO_SCRAPE,
SERPAPI_KEY,
YELP_API_KEY
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class Restaurant:
"""Data class representing a restaurant"""
name: str
address: str
city: str
rating: float
review_count: int
cuisine_type: str
menu_items: List[str]
price_range: str
phone: str = ""
website: str = ""
source: str = ""
def to_dict(self) -> Dict:
return asdict(self)
def to_text(self) -> str:
"""Convert restaurant data to text for embedding"""
menu_text = ", ".join(self.menu_items) if self.menu_items else "Menu not available"
return f"""
Restaurant: {self.name}
Location: {self.address}, {self.city}
Rating: {self.rating}/5 ({self.review_count} reviews)
Cuisine: {self.cuisine_type}
Price Range: {self.price_range}
Menu Items: {menu_text}
Phone: {self.phone}
Website: {self.website}
"""
class BaseScraper(ABC):
"""Abstract base class for restaurant scrapers"""
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
"User-Agent": USER_AGENT,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
})
@abstractmethod
def search_restaurants(self, city: str, dish: str = None) -> List[Restaurant]:
"""Search for restaurants in a city"""
pass
def _delay(self):
"""Add delay between requests"""
time.sleep(REQUEST_DELAY)
def _safe_request(self, url: str, params: Dict = None) -> Optional[requests.Response]:
"""Make a safe HTTP request with error handling"""
try:
self._delay()
response = self.session.get(url, params=params, timeout=10)
response.raise_for_status()
return response
except requests.RequestException as e:
logger.error(f"Request failed for {url}: {e}")
return None
class YelpScraper(BaseScraper):
"""Scraper for Yelp restaurant data using Yelp Fusion API"""
BASE_URL = "https://api.yelp.com/v3"
def __init__(self):
super().__init__()
self.session.headers.update({
"Authorization": f"Bearer {YELP_API_KEY}"
})
def search_restaurants(self, city: str, dish: str = None) -> List[Restaurant]:
"""Search restaurants on Yelp"""
restaurants = []
search_term = f"restaurants {dish}" if dish else "restaurants"
url = f"{self.BASE_URL}/businesses/search"
params = {
"location": city,
"term": search_term,
"categories": "restaurants",
"limit": MAX_RESTAURANTS_TO_SCRAPE,
"sort_by": "rating"
}
response = self._safe_request(url, params)
if not response:
return restaurants
try:
data = response.json()
businesses = data.get("businesses", [])
for biz in businesses:
# Get detailed business info including menu if available
details = self._get_business_details(biz.get("id"))
menu_items = details.get("menu_items", []) if details else []
restaurant = Restaurant(
name=biz.get("name", "Unknown"),
address=" ".join(biz.get("location", {}).get("display_address", [])),
city=city,
rating=biz.get("rating", 0.0),
review_count=biz.get("review_count", 0),
cuisine_type=", ".join([c.get("title", "") for c in biz.get("categories", [])]),
menu_items=menu_items,
price_range=biz.get("price", "$$"),
phone=biz.get("phone", ""),
website=biz.get("url", ""),
source="Yelp"
)
restaurants.append(restaurant)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse Yelp response: {e}")
return restaurants
def _get_business_details(self, business_id: str) -> Optional[Dict]:
"""Get detailed business information"""
if not business_id:
return None
url = f"{self.BASE_URL}/businesses/{business_id}"
response = self._safe_request(url)
if response:
try:
return response.json()
except json.JSONDecodeError:
return None
return None
class GooglePlacesScraper(BaseScraper):
"""Scraper using SerpAPI for Google search results"""
BASE_URL = "https://serpapi.com/search"
def search_restaurants(self, city: str, dish: str = None) -> List[Restaurant]:
"""Search restaurants using Google via SerpAPI"""
restaurants = []
query = f"best restaurants serving {dish} in {city}" if dish else f"best restaurants in {city}"
params = {
"engine": "google_local",
"q": query,
"location": city,
"api_key": SERPAPI_KEY
}
response = self._safe_request(self.BASE_URL, params)
if not response:
return restaurants
try:
data = response.json()
local_results = data.get("local_results", [])
for result in local_results[:MAX_RESTAURANTS_TO_SCRAPE]:
restaurant = Restaurant(
name=result.get("title", "Unknown"),
address=result.get("address", ""),
city=city,
rating=float(result.get("rating", 0)),
review_count=int(result.get("reviews", 0)),
cuisine_type=result.get("type", "Restaurant"),
menu_items=self._extract_menu_items(result),
price_range=result.get("price", "$$"),
phone=result.get("phone", ""),
website=result.get("website", ""),
source="Google"
)
restaurants.append(restaurant)
except (json.JSONDecodeError, ValueError) as e:
logger.error(f"Failed to parse Google response: {e}")
return restaurants
def _extract_menu_items(self, result: Dict) -> List[str]:
"""Extract menu items from search result snippets"""
menu_items = []
# Try to extract from extensions or snippets
extensions = result.get("extensions", [])
if isinstance(extensions, list):
menu_items.extend([ext for ext in extensions if isinstance(ext, str)])
return menu_items
class GenericWebScraper(BaseScraper):
"""Generic web scraper for restaurant websites"""
def search_restaurants(self, city: str, dish: str = None) -> List[Restaurant]:
"""
Generic search - scrapes from multiple sources
This is a fallback when API-based scrapers are not available
"""
restaurants = []
# Search query
query = f"restaurants+{dish}+{city}" if dish else f"restaurants+{city}"
search_url = f"https://www.google.com/search?q={query}"
response = self._safe_request(search_url)
if not response:
return restaurants
soup = BeautifulSoup(response.text, 'html.parser')
# Parse search results (simplified)
# Note: Google's structure changes frequently
for result in soup.select('.g')[:MAX_RESTAURANTS_TO_SCRAPE]:
try:
title_elem = result.select_one('h3')
link_elem = result.select_one('a')
snippet_elem = result.select_one('.VwiC3b')
if title_elem and link_elem:
name = title_elem.get_text()
website = link_elem.get('href', '')
snippet = snippet_elem.get_text() if snippet_elem else ""
# Extract rating if present in snippet
rating_match = re.search(r'(\d+\.?\d*)\s*(?:stars?|/5)', snippet, re.I)
rating = float(rating_match.group(1)) if rating_match else 0.0
restaurant = Restaurant(
name=name,
address="",
city=city,
rating=rating,
review_count=0,
cuisine_type="Restaurant",
menu_items=self._extract_menu_from_snippet(snippet),
price_range="$$",
website=website,
source="Web Search"
)
restaurants.append(restaurant)
except Exception as e:
logger.debug(f"Failed to parse result: {e}")
continue
return restaurants
def _extract_menu_from_snippet(self, snippet: str) -> List[str]:
"""Extract potential menu items from text snippet"""
# Simple extraction - look for food-related words
food_patterns = [
r'\b(pizza|burger|pasta|salad|steak|sushi|tacos?|curry|soup|sandwich|wings?)\b',
r'\b(chicken|beef|pork|fish|seafood|vegetarian|vegan)\b'
]
items = []
for pattern in food_patterns:
matches = re.findall(pattern, snippet, re.I)
items.extend(matches)
return list(set(items))
def scrape_menu_from_url(self, url: str) -> List[str]:
"""Attempt to scrape menu items from a restaurant's website"""
menu_items = []
response = self._safe_request(url)
if not response:
return menu_items
soup = BeautifulSoup(response.text, 'html.parser')
# Look for menu-related sections
menu_selectors = [
'.menu', '#menu', '[class*="menu"]',
'.food-item', '.dish', '.menu-item',
'[class*="dish"]', '[class*="food"]'
]
for selector in menu_selectors:
items = soup.select(selector)
for item in items:
text = item.get_text(strip=True)
if text and len(text) > 2 and len(text) < 100:
menu_items.append(text)
# Fallback: look for common food words in the page
if not menu_items:
page_text = soup.get_text()
menu_items = self._extract_menu_from_snippet(page_text)
return list(set(menu_items))[:50] # Limit to 50 items
class RestaurantDataCollector:
"""Main class to collect restaurant data from multiple sources"""
def __init__(self):
self.scrapers: List[BaseScraper] = []
# Initialize available scrapers based on API keys
if YELP_API_KEY and YELP_API_KEY != "your-yelp-api-key":
self.scrapers.append(YelpScraper())
logger.info("Yelp scraper initialized")
if SERPAPI_KEY and SERPAPI_KEY != "your-serpapi-key":
self.scrapers.append(GooglePlacesScraper())
logger.info("Google Places scraper initialized")
# Always include generic scraper as fallback
self.scrapers.append(GenericWebScraper())
logger.info("Generic web scraper initialized")
def collect_restaurants(self, city: str, dish: str = None) -> List[Restaurant]:
"""Collect restaurants from all available sources"""
all_restaurants = []
seen_names = set()
for scraper in self.scrapers:
try:
restaurants = scraper.search_restaurants(city, dish)
# Deduplicate by restaurant name
for restaurant in restaurants:
name_key = restaurant.name.lower().strip()
if name_key not in seen_names:
seen_names.add(name_key)
all_restaurants.append(restaurant)
except Exception as e:
logger.error(f"Scraper {scraper.__class__.__name__} failed: {e}")
continue
# Sort by rating (descending)
all_restaurants.sort(key=lambda x: (x.rating, x.review_count), reverse=True)
return all_restaurants
def collect_and_save(self, city: str, dish: str = None, output_file: str = None) -> str:
"""Collect restaurants and save to JSON file"""
restaurants = self.collect_restaurants(city, dish)
if not output_file:
safe_city = city.lower().replace(" ", "_")
safe_dish = dish.lower().replace(" ", "_") if dish else "all"
output_file = f"restaurants_{safe_city}_{safe_dish}.json"
data = {
"city": city,
"dish_query": dish,
"restaurant_count": len(restaurants),
"restaurants": [r.to_dict() for r in restaurants]
}
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
logger.info(f"Saved {len(restaurants)} restaurants to {output_file}")
return output_file
# Demo/Test function
def demo_scraping():
"""Demo function to test scraping"""
collector = RestaurantDataCollector()
# Example: Find pizza restaurants in New York
restaurants = collector.collect_restaurants("New York", "pizza")
print(f"\nFound {len(restaurants)} restaurants:\n")
for i, r in enumerate(restaurants[:5], 1):
print(f"{i}. {r.name}")
print(f" Rating: {r.rating}/5 ({r.review_count} reviews)")
print(f" Address: {r.address}")
print(f" Cuisine: {r.cuisine_type}")
print()
if __name__ == "__main__":
demo_scraping()