Skip to content

Commit d1c88bb

Browse files
authored
Merge pull request #51 from scrapinghub/kumo1719-newcount
Frontier newcount counter per slot
2 parents f97e81f + 8de779b commit d1c88bb

File tree

4 files changed

+114
-18
lines changed

4 files changed

+114
-18
lines changed

README.rst

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,15 @@ Add a request to the slot::
454454

455455
>>> slot.queue.add([{'fp': '/some/path.html'}])
456456
>>> slot.flush()
457+
>>> slot.newcount
458+
1
459+
460+
``newcount`` is defined per slot, but also available per frontier and globally::
461+
462+
>>> frontier.newcount
463+
1
464+
>>> frontiers.newcount
465+
3
457466

458467
Add a fingerprint only to the slot::
459468

scrapinghub/client.py

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import json
22
import collections
3+
from functools import partial
4+
from collections import defaultdict
35

46
from six import string_types
57
from requests.compat import urljoin
@@ -9,6 +11,7 @@
911

1012
from .hubstorage.resourcetype import DownloadableResource
1113
from .hubstorage.resourcetype import ItemsResourceType
14+
from .hubstorage.utils import urlpathjoin
1215

1316
# scrapinghub.hubstorage classes to use as-is
1417
from .hubstorage.job import JobMeta
@@ -227,7 +230,7 @@ def __init__(self, client, projectid):
227230
# proxied sub-resources
228231
self.activity = Activity(_Activity, client, projectid)
229232
self.collections = Collections(_Collections, client, projectid)
230-
self.frontiers = Frontiers(_Frontier, client, projectid)
233+
self.frontiers = Frontiers(_HSFrontier, client, projectid)
231234
self.settings = Settings(client._hsclient, projectid)
232235

233236

@@ -1051,6 +1054,34 @@ def post(self, _value, **kwargs):
10511054
self._origin.post(_value, **kwargs)
10521055

10531056

1057+
class _HSFrontier(_Frontier):
1058+
"""Modified hubstorage Frontier with newcount per slot."""
1059+
1060+
def __init__(self, *args, **kwargs):
1061+
super(_HSFrontier, self).__init__(*args, **kwargs)
1062+
self.newcount = defaultdict(int)
1063+
1064+
def _get_writer(self, frontier, slot):
1065+
key = (frontier, slot)
1066+
writer = self._writers.get(key)
1067+
if not writer:
1068+
writer = self.client.batchuploader.create_writer(
1069+
url=urlpathjoin(self.url, frontier, 's', slot),
1070+
auth=self.auth,
1071+
size=self.batch_size,
1072+
start=self.batch_start,
1073+
interval=self.batch_interval,
1074+
qsize=self.batch_qsize,
1075+
content_encoding=self.batch_content_encoding,
1076+
callback=partial(self._writer_callback, key),
1077+
)
1078+
self._writers[key] = writer
1079+
return writer
1080+
1081+
def _writer_callback(self, key, response):
1082+
self.newcount[key] += response.json()["newcount"]
1083+
1084+
10541085
class Frontiers(_Proxy):
10551086
"""Frontiers collection for a project.
10561087
@@ -1074,6 +1105,10 @@ class Frontiers(_Proxy):
10741105
- flush data of all frontiers of a project
10751106
>>> project.frontiers.flush()
10761107
1108+
- show amount of new requests added for all frontiers
1109+
>>> project.frontiers.newcount
1110+
3
1111+
10771112
- close batch writers of all frontiers of a project
10781113
>>> project.frontiers.close()
10791114
"""
@@ -1095,7 +1130,7 @@ def list(self):
10951130

10961131
@property
10971132
def newcount(self):
1098-
return self._origin.newcount
1133+
return sum(self._origin.newcount.values())
10991134

11001135

11011136
class Frontier(object):
@@ -1120,6 +1155,10 @@ class Frontier(object):
11201155
11211156
- flush frontier data
11221157
>>> frontier.flush()
1158+
1159+
- show amount of new requests added to frontier
1160+
>>> frontier.newcount
1161+
3
11231162
"""
11241163
def __init__(self, client, frontiers, name):
11251164
self.key = name
@@ -1145,6 +1184,12 @@ def flush(self):
11451184
if fname == self.key:
11461185
writer.flush()
11471186

1187+
@property
1188+
def newcount(self):
1189+
newcount_values = self._frontiers._origin.newcount
1190+
return sum(v for (frontier, _), v in newcount_values.items()
1191+
if frontier == self.key)
1192+
11481193

11491194
class FrontierSlot(object):
11501195
"""Representation of a frontier slot object.
@@ -1164,6 +1209,10 @@ class FrontierSlot(object):
11641209
- flush data for a slot
11651210
>>> slot.flush()
11661211
1212+
- show amount of new requests added to a slot
1213+
>>> slot.newcount
1214+
2
1215+
11671216
- read requests from a slot
11681217
>>> slot.q.iter()
11691218
<generator object jldecode at 0x1049aa9e8>
@@ -1202,7 +1251,8 @@ def q(self):
12021251
def delete(self):
12031252
"""Delete the slot."""
12041253
origin = self._frontier._frontiers._origin
1205-
return origin.delete_slot(self._frontier.key, self.key)
1254+
origin.delete_slot(self._frontier.key, self.key)
1255+
origin.newcount.pop((self._frontier.key, self.key), None)
12061256

12071257
def flush(self):
12081258
"""Flush data for the slot."""
@@ -1211,6 +1261,11 @@ def flush(self):
12111261
if writer:
12121262
writer.flush()
12131263

1264+
@property
1265+
def newcount(self):
1266+
newcount_values = self._frontier._frontiers._origin.newcount
1267+
return newcount_values.get((self._frontier.key, self.key), 0)
1268+
12141269

12151270
class FrontierSlotFingerprints(object):
12161271

tests/client/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,4 +177,4 @@ def clean_collection(collection):
177177
# Frontier helpers section
178178

179179
def clean_frontier_slot(frontier):
180-
frontier.delete(TEST_FRONTIER_SLOT)
180+
frontier.get(TEST_FRONTIER_SLOT).delete()

tests/client/test_frontier.py

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
1-
2-
from six import string_types
1+
import time
32
from types import GeneratorType
43
from collections import Iterable
54

6-
from scrapinghub.client import Frontiers, Frontier, FrontierSlot
5+
from six import string_types
76

7+
from scrapinghub.client import Frontiers, Frontier, FrontierSlot
88
from .conftest import TEST_FRONTIER_NAME, TEST_FRONTIER_SLOT
99

1010

1111
def _add_test_requests_to_frontier(frontier):
1212
slot = frontier.get(TEST_FRONTIER_SLOT)
13-
slot.add([{'fp': '/some/path.html'}, {'fp': '/other/path.html'}])
13+
slot.q.add([{'fp': '/some/path.html'}, {'fp': '/other/path.html'}])
1414
slot.flush()
1515

1616

1717
def test_frontiers(project, frontier):
1818
# reset a test slot and add some requests to init it
19-
frontier.delete(TEST_FRONTIER_SLOT)
19+
frontier.get(TEST_FRONTIER_SLOT).delete()
2020
_add_test_requests_to_frontier(frontier)
2121

2222
assert isinstance(project.frontiers, Frontiers)
@@ -42,7 +42,7 @@ def test_frontiers(project, frontier):
4242

4343
def test_frontier(project, frontier):
4444
# add some requests to test frontier to init a test slot
45-
frontier.delete(TEST_FRONTIER_SLOT)
45+
frontier.get(TEST_FRONTIER_SLOT).delete()
4646
_add_test_requests_to_frontier(frontier)
4747

4848
slots = frontier.iter()
@@ -60,13 +60,13 @@ def test_frontier(project, frontier):
6060

6161
def test_frontier_slot(project, frontier):
6262
# add some requests to test frontier to init a test slot
63-
frontier.delete(TEST_FRONTIER_SLOT)
63+
frontier.get(TEST_FRONTIER_SLOT).delete()
6464
_add_test_requests_to_frontier(frontier)
6565

6666
slot = frontier.get(TEST_FRONTIER_SLOT)
6767

6868
# get all batches from slot and validate its content
69-
batches_iter = slot.iter()
69+
batches_iter = slot.q.iter()
7070
assert isinstance(batches_iter, GeneratorType)
7171
batches = list(batches_iter)
7272
assert len(batches) == 1
@@ -78,21 +78,53 @@ def test_frontier_slot(project, frontier):
7878
assert requests == [['/some/path.html', None],
7979
['/other/path.html', None]]
8080

81-
# validate that slot.list() returns same data as slot.iter()
82-
batches_list = slot.list()
81+
# validate that slot.list() returns same data as slot.q.iter()
82+
batches_list = slot.q.list()
8383
assert isinstance(batches, list)
8484
assert batches_list == batches
8585

8686
# add a requests with additional parameters
87-
slot.add([{'fp': 'page1.html', 'p': 1, 'qdata': {'depth': 1}}])
87+
slot.q.add([{'fp': 'page1.html', 'p': 1, 'qdata': {'depth': 1}}])
8888
slot.flush()
89-
batches = slot.list()
89+
batches = slot.q.list()
9090
assert len(batches) == 2
9191
assert batches[1]['requests'] == [['page1.html', {'depth': 1}]]
9292

9393
# drop all batches and validate that slot is empty
94-
slot.delete([batch['id'] for batch in batches])
95-
assert slot.list() == []
94+
slot.q.delete([batch['id'] for batch in batches])
95+
assert slot.q.list() == []
9696

9797
slot.delete()
9898
assert TEST_FRONTIER_SLOT not in frontier.list()
99+
100+
101+
def test_frontier_newcount(project):
102+
# add some requests to test frontier to init a test slot
103+
frontier = project.frontiers.get(TEST_FRONTIER_NAME)
104+
first_slot = frontier.get(TEST_FRONTIER_SLOT)
105+
first_slot.delete()
106+
107+
assert frontier._frontiers.newcount == 0
108+
assert frontier.newcount == 0
109+
assert first_slot.newcount == 0
110+
111+
# shorter batch interval for faster tests
112+
frontier._frontiers._origin.batch_interval = 0.1
113+
_add_test_requests_to_frontier(frontier)
114+
time.sleep(0.5)
115+
116+
assert frontier._frontiers.newcount == 2
117+
assert frontier.newcount == 2
118+
assert first_slot.newcount == 2
119+
120+
second_slot = frontier.get('test2.com')
121+
second_slot.delete()
122+
second_slot.q.add([{'fp': '/different_path.html'}])
123+
second_slot.flush()
124+
125+
assert frontier._frontiers.newcount == 3
126+
assert frontier.newcount == 3
127+
assert second_slot.newcount == 1
128+
assert first_slot.newcount == 2
129+
130+
frontier._frontiers.close()

0 commit comments

Comments
 (0)