|
1 | 1 | import json |
2 | 2 | import collections |
| 3 | +from functools import partial |
| 4 | +from collections import defaultdict |
3 | 5 |
|
4 | 6 | from six import string_types |
5 | 7 | from requests.compat import urljoin |
|
9 | 11 |
|
10 | 12 | from .hubstorage.resourcetype import DownloadableResource |
11 | 13 | from .hubstorage.resourcetype import ItemsResourceType |
| 14 | +from .hubstorage.utils import urlpathjoin |
12 | 15 |
|
13 | 16 | # scrapinghub.hubstorage classes to use as-is |
14 | 17 | from .hubstorage.job import JobMeta |
@@ -227,7 +230,7 @@ def __init__(self, client, projectid): |
227 | 230 | # proxied sub-resources |
228 | 231 | self.activity = Activity(_Activity, client, projectid) |
229 | 232 | self.collections = Collections(_Collections, client, projectid) |
230 | | - self.frontiers = Frontiers(_Frontier, client, projectid) |
| 233 | + self.frontiers = Frontiers(_HSFrontier, client, projectid) |
231 | 234 | self.settings = Settings(client._hsclient, projectid) |
232 | 235 |
|
233 | 236 |
|
@@ -1051,6 +1054,34 @@ def post(self, _value, **kwargs): |
1051 | 1054 | self._origin.post(_value, **kwargs) |
1052 | 1055 |
|
1053 | 1056 |
|
| 1057 | +class _HSFrontier(_Frontier): |
| 1058 | + """Modified hubstorage Frontier with newcount per slot.""" |
| 1059 | + |
| 1060 | + def __init__(self, *args, **kwargs): |
| 1061 | + super(_HSFrontier, self).__init__(*args, **kwargs) |
| 1062 | + self.newcount = defaultdict(int) |
| 1063 | + |
| 1064 | + def _get_writer(self, frontier, slot): |
| 1065 | + key = (frontier, slot) |
| 1066 | + writer = self._writers.get(key) |
| 1067 | + if not writer: |
| 1068 | + writer = self.client.batchuploader.create_writer( |
| 1069 | + url=urlpathjoin(self.url, frontier, 's', slot), |
| 1070 | + auth=self.auth, |
| 1071 | + size=self.batch_size, |
| 1072 | + start=self.batch_start, |
| 1073 | + interval=self.batch_interval, |
| 1074 | + qsize=self.batch_qsize, |
| 1075 | + content_encoding=self.batch_content_encoding, |
| 1076 | + callback=partial(self._writer_callback, key), |
| 1077 | + ) |
| 1078 | + self._writers[key] = writer |
| 1079 | + return writer |
| 1080 | + |
| 1081 | + def _writer_callback(self, key, response): |
| 1082 | + self.newcount[key] += response.json()["newcount"] |
| 1083 | + |
| 1084 | + |
1054 | 1085 | class Frontiers(_Proxy): |
1055 | 1086 | """Frontiers collection for a project. |
1056 | 1087 |
|
@@ -1095,7 +1126,7 @@ def list(self): |
1095 | 1126 |
|
1096 | 1127 | @property |
1097 | 1128 | def newcount(self): |
1098 | | - return self._origin.newcount |
| 1129 | + return sum(self._origin.newcount.values()) |
1099 | 1130 |
|
1100 | 1131 |
|
1101 | 1132 | class Frontier(object): |
@@ -1145,6 +1176,12 @@ def flush(self): |
1145 | 1176 | if fname == self.key: |
1146 | 1177 | writer.flush() |
1147 | 1178 |
|
| 1179 | + @property |
| 1180 | + def newcount(self): |
| 1181 | + newcount_values = self._frontiers._origin.newcount |
| 1182 | + return sum(v for (frontier, _), v in newcount_values.items() |
| 1183 | + if frontier == self.key) |
| 1184 | + |
1148 | 1185 |
|
1149 | 1186 | class FrontierSlot(object): |
1150 | 1187 | """Representation of a frontier slot object. |
@@ -1211,6 +1248,11 @@ def flush(self): |
1211 | 1248 | if writer: |
1212 | 1249 | writer.flush() |
1213 | 1250 |
|
| 1251 | + @property |
| 1252 | + def newcount(self): |
| 1253 | + newcount_values = self._frontier._frontiers._origin.newcount |
| 1254 | + return newcount_values.get((self._frontier.key, self.key), 0) |
| 1255 | + |
1214 | 1256 |
|
1215 | 1257 | class FrontierSlotFingerprints(object): |
1216 | 1258 |
|
|
0 commit comments