-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclient.py
More file actions
244 lines (202 loc) · 7.77 KB
/
client.py
File metadata and controls
244 lines (202 loc) · 7.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
"""
Guardian API Client implementation.
"""
import os
import json
import logging
from typing import Dict, List, Optional
from datetime import datetime
import requests
import boto3
# Configure logging
logger = logging.getLogger(__name__)
logger.setLevel(os.environ.get("LOG_LEVEL", "INFO"))
class GuardianApiClient:
"""Client for interacting with the Guardian API and publishing to message brokers."""
# Guardian API endpoint
API_URL = "https://content.guardianapis.com/search"
def __init__(self, api_key: Optional[str] = None):
"""
Initialize the Guardian API client.
Args:
api_key: The Guardian API key. If not provided, will be retrieved from
the GUARDIAN_API_KEY environment variable.
"""
self.api_key = api_key or os.environ.get("GUARDIAN_API_KEY")
if not self.api_key:
raise ValueError(
"Guardian API key is required. Set GUARDIAN_API_KEY environment variable."
)
# Initialize AWS resources
self.sns_client = boto3.client("sns")
self.sqs_client = boto3.client("sqs")
def search_articles(
self,
search_term: str,
date_from: Optional[str] = None,
page_size: int = 10,
show_fields: str = "bodyText",
) -> Dict:
"""
Search for articles in the Guardian API.
Args:
search_term: The term to search for
date_from: Optional date to filter results from (YYYY-MM-DD format)
page_size: Number of results to return (default: 10)
show_fields: Additional fields to include in the response
Returns:
Dict containing the API response
Raises:
requests.RequestException: If the API request fails
"""
params = {
"q": search_term,
"api-key": self.api_key,
"page-size": page_size,
"show-fields": show_fields,
"order-by": "newest",
}
if date_from:
params["from-date"] = date_from
logger.info(f"Searching Guardian API for: {search_term}")
response = requests.get(self.API_URL, params=params)
response.raise_for_status()
return response.json()
def process_articles(self, api_response: Dict) -> List[Dict]:
"""
Process the API response and extract relevant article data.
Args:
api_response: The JSON response from the Guardian API
Returns:
List of dictionaries containing processed article data
"""
try:
results = api_response.get("response", {}).get("results", [])
processed_articles = []
for article in results:
# Extract required fields
processed_article = {
"webPublicationDate": article.get("webPublicationDate"),
"webTitle": article.get("webTitle"),
"webUrl": article.get("webUrl"),
}
# Add content preview if available
fields = article.get("fields", {})
if fields and "bodyText" in fields:
body_text = fields["bodyText"]
processed_article["contentPreview"] = (
body_text[:1000] if body_text else None
)
processed_articles.append(processed_article)
return processed_articles
except Exception as e:
logger.error(f"Error processing articles: {str(e)}")
raise
def publish_to_sns(self, topic_arn: str, articles: List[Dict]) -> Dict:
"""
Publish articles to an SNS topic.
Args:
topic_arn: The ARN of the SNS topic
articles: List of article data to publish
Returns:
Dict containing the SNS publish response
"""
message = json.dumps(articles)
logger.info(f"Publishing {len(articles)} articles to SNS topic: {topic_arn}")
response = self.sns_client.publish(
TopicArn=topic_arn,
Message=message,
MessageAttributes={
"TTL": {
"DataType": "Number",
"StringValue": "259200", # 3 days in seconds
}
},
)
return response
def publish_to_sqs(self, queue_url: str, articles: List[Dict]) -> Dict:
"""
Publish articles to an SQS queue.
Args:
queue_url: The URL of the SQS queue
articles: List of article data to publish
Returns:
Dict containing the SQS send message response
"""
message = json.dumps(articles)
logger.info(f"Publishing {len(articles)} articles to SQS queue: {queue_url}")
response = self.sqs_client.send_message(
QueueUrl=queue_url,
MessageBody=message,
MessageAttributes={
"TTL": {
"DataType": "Number",
"StringValue": "259200", # 3 days in seconds
}
},
)
return response
def determine_broker_type(self, broker_reference: str) -> str:
"""
Determine the type of message broker from the reference.
Args:
broker_reference: Reference to the message broker
Returns:
String indicating the broker type ('sns', 'sqs', or 'unknown')
"""
if broker_reference.startswith("arn:aws:sns:"):
return "sns"
elif broker_reference.startswith("https://sqs.") or broker_reference.startswith(
"http://sqs."
):
return "sqs"
else:
return "unknown"
def publish_articles(
self, search_term: str, broker_reference: str, date_from: Optional[str] = None
) -> Dict:
"""
Search for articles and publish them to the specified message broker.
Args:
search_term: The term to search for
broker_reference: Reference to the message broker (SNS ARN or SQS URL)
date_from: Optional date to filter results from (YYYY-MM-DD format)
Returns:
Dict containing information about the operation
Raises:
ValueError: If the broker type is unknown
"""
# Validate inputs
if not search_term:
raise ValueError("Search term is required")
if not broker_reference:
raise ValueError("Broker reference is required")
# Validate date_from format if provided
if date_from:
try:
datetime.strptime(date_from, "%Y-%m-%d")
except ValueError:
raise ValueError("Invalid date_from format. Use YYYY-MM-DD")
# Get articles from the Guardian API
api_response = self.search_articles(search_term, date_from)
articles = self.process_articles(api_response)
# Determine broker type and publish
broker_type = self.determine_broker_type(broker_reference)
if broker_type == "sns":
publish_response = self.publish_to_sns(broker_reference, articles)
return {
"status": "success",
"broker_type": "sns",
"articles_count": len(articles),
"message_id": publish_response.get("MessageId"),
}
elif broker_type == "sqs":
publish_response = self.publish_to_sqs(broker_reference, articles)
return {
"status": "success",
"broker_type": "sqs",
"articles_count": len(articles),
"message_id": publish_response.get("MessageId"),
}
else:
raise ValueError(f"Unknown broker type for reference: {broker_reference}")