Skip to content

Commit 2e3f32c

Browse files
authored
http back pressure - progress reporting (#30165)
1 parent d4f0c7d commit 2e3f32c

File tree

3 files changed

+166
-22
lines changed

3 files changed

+166
-22
lines changed

ydb/library/actors/http/http_proxy.h

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ struct TEvHttpProxy {
6161
EvHttpOutgoingDataChunk,
6262
EvSubscribeForCancel,
6363
EvRequestCancelled,
64+
EvHttpOutgoingResponseProgress,
6465
EvEnd
6566
};
6667

@@ -181,9 +182,14 @@ struct TEvHttpProxy {
181182
, Response(std::move(response))
182183
{}
183184
};
184-
185+
185186
struct TEvHttpOutgoingResponse : NActors::TEventLocal<TEvHttpOutgoingResponse, EvHttpOutgoingResponse> {
186187
THttpOutgoingResponsePtr Response;
188+
ui64 ProgressNotificationBytes = 0;
189+
// If set to a non-zero value, enables progress notifications.
190+
// Progress notifications will be sent approximately every N bytes (where N is this value).
191+
// Notifications are sent to the sender of the TEvHttpOutgoingResponse event with the original cookie.
192+
// The field value of 0 (default) disables progress notifications.
187193

188194
TEvHttpOutgoingResponse(THttpOutgoingResponsePtr response)
189195
: Response(std::move(response))
@@ -203,6 +209,16 @@ struct TEvHttpProxy {
203209
{}
204210
};
205211

212+
struct TEvHttpOutgoingResponseProgress : NActors::TEventLocal<TEvHttpOutgoingResponseProgress, EvHttpOutgoingResponseProgress> {
213+
ui64 Bytes = 0;
214+
ui64 DataChunks = 0;
215+
216+
TEvHttpOutgoingResponseProgress(ui64 bytes, ui64 dataChunks)
217+
: Bytes(bytes)
218+
, DataChunks(dataChunks)
219+
{}
220+
};
221+
206222
struct TEvHttpIncomingDataChunk : NActors::TEventLocal<TEvHttpIncomingDataChunk, EvHttpIncomingDataChunk> {
207223
THttpIncomingResponsePtr Response;
208224
TString Data;

ydb/library/actors/http/http_proxy_incoming.cpp

Lines changed: 70 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,23 @@ class TIncomingConnectionActor : public TActor<TIncomingConnectionActor<TSocketI
1919
std::shared_ptr<TPrivateEndpointInfo> Endpoint;
2020
SocketAddressType Address;
2121
TList<THttpIncomingRequestPtr> Requests;
22-
THashMap<THttpIncomingRequestPtr, THttpOutgoingResponsePtr> Responses;
22+
23+
struct TResponseState {
24+
THttpOutgoingResponsePtr Response;
25+
NActors::TActorId ProgressNotificationActor; // The actor to send progress notifications to
26+
ui64 ProgressNotificationCookie = 0; // Cookie for progress notifications
27+
ui64 ProgressNotificationBytes = 0; // The byte interval for progress notifications
28+
ui64 ProgressBytes = 0; // Total bytes sent so far
29+
ui64 ProgressChunks = 0; // Total data chunks sent so far
30+
31+
operator bool() const {
32+
return Response != nullptr;
33+
}
34+
};
35+
36+
THashMap<THttpIncomingRequestPtr, TResponseState> Responses;
2337
THttpIncomingRequestPtr CurrentRequest;
24-
THttpOutgoingResponsePtr CurrentResponse;
38+
TResponseState CurrentResponse;
2539
TDeque<THttpIncomingRequestPtr> RecycledRequests;
2640

2741
THPTimer InactivityTimer;
@@ -158,15 +172,19 @@ class TIncomingConnectionActor : public TActor<TIncomingConnectionActor<TSocketI
158172
Send(Endpoint->Proxy, new TEvHttpProxy::TEvHttpIncomingRequest(CurrentRequest));
159173
CurrentRequest = nullptr;
160174
} else {
161-
bool success = Respond(CurrentRequest->CreateResponseTooManyRequests());
175+
bool success = Respond({
176+
.Response = CurrentRequest->CreateResponseTooManyRequests()
177+
});
162178
if (!success) {
163179
return;
164180
}
165181
CleanupRequest(CurrentRequest);
166182
}
167183
} else if (CurrentRequest->IsError()) {
168184
ALOG_DEBUG(HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") -! (" << GetRequestDebugText() << ")");
169-
bool success = Respond(CurrentRequest->CreateResponseBadRequest());
185+
bool success = Respond({
186+
.Response = CurrentRequest->CreateResponseBadRequest()
187+
});
170188
if (!success) {
171189
return;
172190
}
@@ -218,7 +236,12 @@ class TIncomingConnectionActor : public TActor<TIncomingConnectionActor<TSocketI
218236
if (event->Get()->Response->IsDone()) {
219237
CancelSubscriber = nullptr;
220238
}
221-
Respond(event->Get()->Response);
239+
Respond({
240+
.Response = event->Get()->Response,
241+
.ProgressNotificationActor = event->Get()->ProgressNotificationBytes > 0 ? event->Sender : TActorId(),
242+
.ProgressNotificationCookie = event->Cookie,
243+
.ProgressNotificationBytes = event->Get()->ProgressNotificationBytes,
244+
});
222245
}
223246

224247
static TString GetChunkDebugText(THttpOutgoingDataChunkPtr chunk) {
@@ -251,12 +274,12 @@ class TIncomingConnectionActor : public TActor<TIncomingConnectionActor<TSocketI
251274
ALOG_ERROR(HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - DataChunk error: " << event->Get()->Error);
252275
return PassAway();
253276
}
254-
if (CurrentResponse != nullptr && CurrentResponse == event->Get()->DataChunk->GetResponse()) {
255-
CurrentResponse->AddDataChunk(event->Get()->DataChunk);
277+
if (CurrentResponse && CurrentResponse.Response == event->Get()->DataChunk->GetResponse()) {
278+
CurrentResponse.Response->AddDataChunk(event->Get()->DataChunk);
256279
} else {
257280
auto itResponse = Responses.find(event->Get()->DataChunk->GetRequest());
258-
if (itResponse != Responses.end() && itResponse->second == event->Get()->DataChunk->GetResponse()) {
259-
itResponse->second->AddDataChunk(event->Get()->DataChunk);
281+
if (itResponse != Responses.end() && itResponse->second.Response == event->Get()->DataChunk->GetResponse()) {
282+
itResponse->second.Response->AddDataChunk(event->Get()->DataChunk);
260283
} else {
261284
ALOG_ERROR(HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - DataChunk request not found");
262285
return PassAway();
@@ -274,7 +297,8 @@ class TIncomingConnectionActor : public TActor<TIncomingConnectionActor<TSocketI
274297
CancelSubscriber = std::move(event);
275298
}
276299

277-
bool Respond(THttpOutgoingResponsePtr response) {
300+
bool Respond(TResponseState state) {
301+
THttpOutgoingResponsePtr response = state.Response;
278302
THttpIncomingRequestPtr request = response->GetRequest();
279303
ALOG_DEBUG(HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") <- ("
280304
<< GetResponseDebugText(response) << (response->IsDone() ? ")" : ") (incomplete)"));
@@ -304,30 +328,31 @@ class TIncomingConnectionActor : public TActor<TIncomingConnectionActor<TSocketI
304328
PassAway();
305329
return false; // no request to respond to
306330
}
307-
if (request == Requests.front() && CurrentResponse == nullptr) {
308-
CurrentResponse = response;
331+
if (request == Requests.front() && !CurrentResponse) {
332+
CurrentResponse = std::move(state);
309333
return FlushOutput();
310334
} else {
311335
// we are ahead of our pipeline
312-
Responses.emplace(request, response);
336+
Responses.emplace(request, std::move(state));
313337
return true;
314338
}
315339
}
316340

317341
bool FlushOutput() {
318-
while (CurrentResponse != nullptr) {
319-
auto* buffer = CurrentResponse->GetActiveBuffer();
342+
while (CurrentResponse) {
343+
auto& response = CurrentResponse.Response;
344+
auto* buffer = response->GetActiveBuffer();
320345
size_t size = buffer->Size();
321346
if (size == 0) {
322-
if (CurrentResponse->IsDone()) {
323-
Y_ABORT_UNLESS(Requests.front() == CurrentResponse->GetRequest());
324-
bool close = CurrentResponse->IsConnectionClose();
347+
if (response->IsDone()) {
348+
Y_ABORT_UNLESS(Requests.front() == response->GetRequest());
349+
bool close = response->IsConnectionClose();
325350
Requests.pop_front();
326-
CleanupResponse(CurrentResponse);
351+
CleanupResponse(response);
327352
if (!Requests.empty()) {
328353
auto it = Responses.find(Requests.front());
329354
if (it != Responses.end()) {
330-
CurrentResponse = it->second;
355+
CurrentResponse = std::move(it->second);
331356
Responses.erase(it);
332357
continue;
333358
} else {
@@ -353,6 +378,30 @@ class TIncomingConnectionActor : public TActor<TIncomingConnectionActor<TSocketI
353378
if (res > 0) {
354379
InactivityTimer.Reset();
355380
buffer->ChopHead(res);
381+
if (CurrentResponse.ProgressNotificationBytes > 0) {
382+
bool needToNotify = false;
383+
ui64 progressBefore = CurrentResponse.ProgressBytes / CurrentResponse.ProgressNotificationBytes;
384+
CurrentResponse.ProgressBytes += res;
385+
if (buffer->Size() == 0) { // end of headers / chunk
386+
needToNotify = true;
387+
if (buffer != response) {
388+
// If buffer != response, this means we are at the end of a data chunk (not headers).
389+
// This comparison distinguishes between the headers buffer (buffer == response)
390+
// and data chunk buffers (buffer != response).
391+
CurrentResponse.ProgressChunks++;
392+
}
393+
} else {
394+
ui64 progressAfter = CurrentResponse.ProgressBytes / CurrentResponse.ProgressNotificationBytes;
395+
if (progressAfter != progressBefore) {
396+
needToNotify = true;
397+
}
398+
}
399+
if (needToNotify) {
400+
Send(CurrentResponse.ProgressNotificationActor,
401+
new TEvHttpProxy::TEvHttpOutgoingResponseProgress(CurrentResponse.ProgressBytes, CurrentResponse.ProgressChunks),
402+
0, CurrentResponse.ProgressNotificationCookie);
403+
}
404+
}
356405
} else if (-res == EINTR) {
357406
continue;
358407
} else if (-res == EAGAIN || -res == EWOULDBLOCK) {
@@ -366,7 +415,7 @@ class TIncomingConnectionActor : public TActor<TIncomingConnectionActor<TSocketI
366415
}
367416
break;
368417
} else {
369-
CleanupResponse(CurrentResponse);
418+
CleanupResponse(response);
370419
ALOG_ERROR(HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - error in FlushOutput: " << strerror(-res));
371420
PassAway();
372421
return false;

ydb/library/actors/http/http_ut.cpp

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1221,6 +1221,85 @@ CRA/5XcX13GJwHHj6LCoc3sL7mt8qV9HKY2AOZ88mpObzISZxgPpdKCfjsrdm63V
12211221
UNIT_ASSERT(dataChunk->IsEndOfData());
12221222
}
12231223

1224+
Y_UNIT_TEST(StreamingResponseWithProgress1) {
1225+
constexpr size_t ChunkSize = 400; // 400 bytes
1226+
constexpr int ChunkCount = 100; // 100 chunks
1227+
NActors::TTestActorRuntimeBase actorSystem(1, true);
1228+
TPortManager portManager;
1229+
TIpPort port = portManager.GetTcpPort();
1230+
TAutoPtr<NActors::IEventHandle> handle;
1231+
actorSystem.Initialize();
1232+
#ifndef NDEBUG
1233+
actorSystem.SetLogPriority(NActorsServices::HTTP, NActors::NLog::PRI_DEBUG);
1234+
#endif
1235+
1236+
NActors::IActor* proxy = NHttp::CreateHttpProxy();
1237+
NActors::TActorId proxyId = actorSystem.Register(proxy);
1238+
actorSystem.Send(new NActors::IEventHandle(proxyId, actorSystem.AllocateEdgeActor(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true);
1239+
actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvConfirmListen>(handle);
1240+
1241+
NActors::TActorId serverId = actorSystem.AllocateEdgeActor();
1242+
actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true);
1243+
1244+
NActors::TActorId clientId = actorSystem.AllocateEdgeActor();
1245+
NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet("http://127.0.0.1:" + ToString(port) + "/test");
1246+
NHttp::TEvHttpProxy::TEvHttpOutgoingRequest* event = new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest);
1247+
event->StreamContentTypes = {"text/plain"};
1248+
actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, event), 0, true);
1249+
1250+
NHttp::TEvHttpProxy::TEvHttpIncomingRequest* request = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingRequest>(handle);
1251+
1252+
UNIT_ASSERT_EQUAL(request->Request->URL, "/test");
1253+
1254+
TString responseString = "HTTP/1.1 200 Found\r\nConnection: Close\r\nContent-Type: text/plain\r\nTransfer-Encoding: chunked\r\n\r\n";
1255+
NHttp::THttpOutgoingResponsePtr httpResponse = request->Request->CreateResponseString(responseString);
1256+
auto response = new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse);
1257+
response->ProgressNotificationBytes = ChunkSize; // notify for every byte
1258+
actorSystem.Send(new NActors::IEventHandle(handle->Sender, serverId, response), 0, true);
1259+
1260+
NHttp::TEvHttpProxy::TEvHttpOutgoingResponseProgress* headersProgress = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpOutgoingResponseProgress>(handle);
1261+
UNIT_ASSERT_VALUES_EQUAL(headersProgress->Bytes, responseString.size());
1262+
UNIT_ASSERT_VALUES_EQUAL(headersProgress->DataChunks, 0);
1263+
1264+
ui64 totalBytes = responseString.size();
1265+
1266+
for (int i = 0; i < ChunkCount; ++i) {
1267+
TString longChunk(ChunkSize, 'X');
1268+
NHttp::THttpOutgoingDataChunkPtr httpDataChunk = httpResponse->CreateDataChunk(longChunk);
1269+
actorSystem.Send(new NActors::IEventHandle(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingDataChunk(httpDataChunk)), 0, true);
1270+
1271+
totalBytes += longChunk.size() + 7; // 7 bytes for chunk header and footer
1272+
1273+
NHttp::TEvHttpProxy::TEvHttpOutgoingResponseProgress* chunkProgress = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpOutgoingResponseProgress>(handle);
1274+
UNIT_ASSERT_VALUES_EQUAL(chunkProgress->Bytes, totalBytes);
1275+
UNIT_ASSERT_VALUES_EQUAL(chunkProgress->DataChunks, static_cast<ui64>(i + 1));
1276+
}
1277+
1278+
NHttp::THttpOutgoingDataChunkPtr httpDataChunk = httpResponse->CreateDataChunk(); // end of data
1279+
actorSystem.Send(new NActors::IEventHandle(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingDataChunk(httpDataChunk)), 0, true);
1280+
1281+
totalBytes += 5; // "0\r\n\r\n"
1282+
1283+
NHttp::TEvHttpProxy::TEvHttpOutgoingResponseProgress* finalProgress = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpOutgoingResponseProgress>(handle);
1284+
UNIT_ASSERT_VALUES_EQUAL(finalProgress->Bytes, totalBytes);
1285+
UNIT_ASSERT_VALUES_EQUAL(finalProgress->DataChunks, ChunkCount + 1);
1286+
1287+
NHttp::TEvHttpProxy::TEvHttpIncompleteIncomingResponse* incompleteResponse = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncompleteIncomingResponse>(handle);
1288+
1289+
UNIT_ASSERT_VALUES_EQUAL(incompleteResponse->Response->Status, "200");
1290+
1291+
for (int i = 0; i < ChunkCount; ++i) {
1292+
NHttp::TEvHttpProxy::TEvHttpIncomingDataChunk* dataChunk = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingDataChunk>(handle);
1293+
UNIT_ASSERT(!dataChunk->Error);
1294+
UNIT_ASSERT_VALUES_EQUAL(dataChunk->IsEndOfData(), false);
1295+
}
1296+
1297+
NHttp::TEvHttpProxy::TEvHttpIncomingDataChunk* dataChunk = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingDataChunk>(handle);
1298+
UNIT_ASSERT(!dataChunk->Error);
1299+
UNIT_ASSERT(!dataChunk->Data);
1300+
UNIT_ASSERT(dataChunk->IsEndOfData());
1301+
}
1302+
12241303
Y_UNIT_TEST(RequestAfter307) {
12251304
NActors::TTestActorRuntimeBase actorSystem(1, true);
12261305
TPortManager portManager;

0 commit comments

Comments
 (0)