diff --git a/src/civetweb.c b/src/civetweb.c index 4d133baa4..af5d21710 100644 --- a/src/civetweb.c +++ b/src/civetweb.c @@ -1085,6 +1085,7 @@ enum { DOCUMENT_ROOT, SSL_CERTIFICATE, NUM_THREADS, + READ_TO_TOTAL_THREADS_PERCENTAGE, RUN_AS_USER, REWRITE, HIDE_FILES, @@ -1161,6 +1162,7 @@ static struct mg_option config_options[] = { {"document_root", CONFIG_TYPE_DIRECTORY, NULL}, {"ssl_certificate", CONFIG_TYPE_FILE, NULL}, {"num_threads", CONFIG_TYPE_NUMBER, "50"}, + {"read_to_total_threads_percentage", CONFIG_TYPE_NUMBER, "50"}, {"run_as_user", CONFIG_TYPE_STRING, NULL}, {"url_rewrite_patterns", CONFIG_TYPE_STRING, NULL}, {"hide_files_patterns", CONFIG_TYPE_EXT_PATTERN, NULL}, @@ -1214,6 +1216,8 @@ mg_static_assert((sizeof(config_options) / sizeof(config_options[0])) enum { REQUEST_HANDLER, WEBSOCKET_HANDLER, AUTH_HANDLER }; +typedef enum { WRITE_CONNECTION, READ_CONNECTION } conn_type_t; + struct mg_handler_info { /* Name/Pattern of the URI. */ char *uri; @@ -1241,6 +1245,48 @@ struct mg_handler_info { struct mg_handler_info *next; }; + +struct mg_connection { + struct mg_request_info request_info; + struct mg_context *ctx; + SSL *ssl; /* SSL descriptor */ + SSL_CTX *client_ssl_ctx; /* SSL context for client connections */ + struct socket client; /* Connected client */ + time_t conn_birth_time; /* Time (wall clock) when connection was + * established */ + struct timespec req_time; /* Time (since system start) when the request + * was received */ + int64_t num_bytes_sent; /* Total bytes sent to client */ + int64_t content_len; /* Content-Length header value */ + int64_t consumed_content; /* How many bytes of content have been read */ + int is_chunked; /* Transfer-Encoding is chunked: 0=no, 1=yes: + * data available, 2: all data read */ + size_t chunk_remainder; /* Unread data from the last chunk */ + char *buf; /* Buffer for received data */ + char *path_info; /* PATH_INFO part of the URL */ + + int must_close; /* 1 if connection must be closed */ + int in_error_handler; /* 1 if in handler for user defined error + * pages */ + int internal_error; /* 1 if an error occured while processing the + * request */ + + int buf_size; /* Buffer size */ + int request_len; /* Size of the request + headers in a buffer */ + int data_len; /* Total size of data in a buffer */ + int status_code; /* HTTP reply status code, e.g. 200 */ + int throttle; /* Throttling, bytes/sec. <= 0 means no + * throttle */ + time_t last_throttle_time; /* Last time throttled data was sent */ + int64_t last_throttle_bytes; /* Bytes sent this second */ + pthread_mutex_t mutex; /* Used by mg_(un)lock_connection to ensure + * atomic transmissions for websockets */ +#if defined(USE_LUA) && defined(USE_WEBSOCKET) + void *lua_websocket_state; /* Lua_State for a websocket connection */ +#endif +}; + + struct mg_context { volatile int stop_flag; /* Should we stop event loop */ SSL_CTX *ssl_ctx; /* SSL context */ @@ -1267,12 +1313,27 @@ struct mg_context { unsigned int cfg_worker_threads; /* The number of configured worker threads. */ pthread_t *workerthreadids; /* The worker thread IDs */ + pthread_t dispatcherthreadid; /* The dispatcher thread ID */ time_t start_time; /* Server start time, used for authentication */ uint64_t auth_nonce_mask; /* Mask for all nonce values */ pthread_mutex_t nonce_mutex; /* Protects nonce_count */ unsigned long nonce_count; /* Used nonces, used for authentication */ + struct mg_connection + w_conn_queue[MGSQLEN]; /* Accepted write connections */ + volatile int w_cq_head; /* Head of the write connection queue */ + volatile int w_cq_tail; /* Tail of the write connection queue */ + pthread_cond_t w_cq_full; /* Signaled when write connection is produced */ + pthread_cond_t w_cq_empty; /* Signaled when write connection is consumed */ + + struct mg_connection + r_conn_queue[MGSQLEN]; /* Accepted read connections */ + volatile int r_cq_head; /* Head of the read connection queue */ + volatile int r_cq_tail; /* Tail of the read connection queue */ + pthread_cond_t r_cq_full; /* Signaled when read connection is produced */ + pthread_cond_t r_cq_empty; /* Signaled when read connection is consumed */ + char *systemName; /* What operating system is running */ /* linked list of uri handlers */ @@ -1289,47 +1350,6 @@ struct mg_context { }; -struct mg_connection { - struct mg_request_info request_info; - struct mg_context *ctx; - SSL *ssl; /* SSL descriptor */ - SSL_CTX *client_ssl_ctx; /* SSL context for client connections */ - struct socket client; /* Connected client */ - time_t conn_birth_time; /* Time (wall clock) when connection was - * established */ - struct timespec req_time; /* Time (since system start) when the request - * was received */ - int64_t num_bytes_sent; /* Total bytes sent to client */ - int64_t content_len; /* Content-Length header value */ - int64_t consumed_content; /* How many bytes of content have been read */ - int is_chunked; /* Transfer-Encoding is chunked: 0=no, 1=yes: - * data available, 2: all data read */ - size_t chunk_remainder; /* Unread data from the last chunk */ - char *buf; /* Buffer for received data */ - char *path_info; /* PATH_INFO part of the URL */ - - int must_close; /* 1 if connection must be closed */ - int in_error_handler; /* 1 if in handler for user defined error - * pages */ - int internal_error; /* 1 if an error occured while processing the - * request */ - - int buf_size; /* Buffer size */ - int request_len; /* Size of the request + headers in a buffer */ - int data_len; /* Total size of data in a buffer */ - int status_code; /* HTTP reply status code, e.g. 200 */ - int throttle; /* Throttling, bytes/sec. <= 0 means no - * throttle */ - time_t last_throttle_time; /* Last time throttled data was sent */ - int64_t last_throttle_bytes; /* Bytes sent this second */ - pthread_mutex_t mutex; /* Used by mg_(un)lock_connection to ensure - * atomic transmissions for websockets */ -#if defined(USE_LUA) && defined(USE_WEBSOCKET) - void *lua_websocket_state; /* Lua_State for a websocket connection */ -#endif -}; - - static pthread_key_t sTlsKey; /* Thread local storage index */ static int sTlsInit = 0; static int thread_idx_max = 0; @@ -12422,9 +12442,109 @@ consume_socket(struct mg_context *ctx, struct socket *sp) #undef QUEUE_SIZE } +static int +consume_connection(struct mg_context *ctx, struct mg_connection *conn, conn_type_t conn_type) +{ + if (!ctx) { + return 0; + } + struct mg_connection* conn_queue; + volatile int *cq_head, *cq_tail; + pthread_cond_t *cq_full, *cq_empty; + + switch (conn_type) { + case WRITE_CONNECTION: + conn_queue = ctx->w_conn_queue; + cq_head = &ctx->w_cq_head; + cq_tail = &ctx->w_cq_tail; + cq_full = &ctx->w_cq_full; + cq_empty = &ctx->w_cq_empty; + break; + case READ_CONNECTION: + conn_queue = ctx->r_conn_queue; + cq_head = &ctx->r_cq_head; + cq_tail = &ctx->r_cq_tail; + cq_full = &ctx->r_cq_full; + cq_empty = &ctx->r_cq_empty; + break; + default: + return !ctx->stop_flag; + } + + (void)pthread_mutex_lock(&ctx->thread_mutex); + + while (*cq_head == *cq_tail && ctx->stop_flag == 0) { + pthread_cond_wait(cq_full, &ctx->thread_mutex); + } + + if (*cq_head > *cq_tail) { + *conn = conn_queue[*cq_tail % MGSQLEN]; + (*cq_tail)++; + + while (*cq_tail > MGSQLEN) { + *cq_tail -= MGSQLEN; + *cq_head -= MGSQLEN; + } + } + (void)pthread_cond_signal(cq_empty); + (void)pthread_mutex_unlock(&ctx->thread_mutex); + + return !ctx->stop_flag; +} + +static void +produce_connection(struct mg_context *ctx, struct mg_connection *conn, conn_type_t conn_type) +{ + if (!ctx) { + return; + } + struct mg_connection* conn_queue; + volatile int *cq_head, *cq_tail; + pthread_cond_t* cq_full; + + switch (conn_type) { + case WRITE_CONNECTION: + conn_queue = ctx->w_conn_queue; + cq_head = &ctx->w_cq_head; + cq_tail = &ctx->w_cq_tail; + cq_full = &ctx->w_cq_full; + break; + case READ_CONNECTION: + conn_queue = ctx->r_conn_queue; + cq_head = &ctx->r_cq_head; + cq_tail = &ctx->r_cq_tail; + cq_full = &ctx->r_cq_full; + break; + default: + return; + } + + char ebuf[100]; + + (void)pthread_mutex_lock(&ctx->thread_mutex); + + if (*cq_head - *cq_tail >= MGSQLEN) { + mg_snprintf(conn, + NULL, + ebuf, + sizeof(ebuf), + "Internal Server Error"); + (void)pthread_mutex_unlock(&ctx->thread_mutex); + send_http_error(conn, 503, "%s", ebuf); + return; + } + + if (*cq_head - *cq_tail < MGSQLEN) { + conn_queue[*cq_head % MGSQLEN] = *conn; + (*cq_head)++; + } + + (void)pthread_cond_signal(cq_full); + (void)pthread_mutex_unlock(&ctx->thread_mutex); +} static void * -worker_thread_run(void *thread_func_param) +dispatcher_thread_run(void *thread_func_param) { struct mg_context *ctx = (struct mg_context *)thread_func_param; struct mg_connection *conn; @@ -12433,7 +12553,7 @@ worker_thread_run(void *thread_func_param) uint32_t addr; #endif - mg_set_thread_name("worker"); + mg_set_thread_name("dispatcher"); tls.is_master = 0; tls.thread_idx = (unsigned)mg_atomic_inc(&thread_idx_max); @@ -12500,10 +12620,46 @@ worker_thread_run(void *thread_func_param) #endif ) { + if(is_put_or_delete_method(conn)){ + produce_connection(ctx,conn, WRITE_CONNECTION); + } + else { + produce_connection(ctx,conn, READ_CONNECTION); + } - process_new_connection(conn); } + } + } + + pthread_cond_broadcast(&ctx->w_cq_full); + pthread_cond_broadcast(&ctx->r_cq_full); + DEBUG_TRACE("%s", "exiting"); + return NULL; +} + +static void * +worker_thread_run(void *thread_func_param, conn_type_t conn_type) +{ + struct mg_context *ctx = (struct mg_context *)thread_func_param; + struct mg_connection *conn; + + mg_set_thread_name("worker"); + + if (ctx->callbacks.init_thread) { + /* call init_thread for a worker thread (type 1) */ + ctx->callbacks.init_thread(ctx, 1); + } + + conn = + (struct mg_connection *)mg_calloc(1, sizeof(*conn) + MAX_REQUEST_SIZE); + if (conn == NULL) { + mg_cry(fc(ctx), "%s", "Cannot create new connection struct, OOM"); + } else { + (void)pthread_mutex_init(&conn->mutex, &pthread_mutex_attr); + mg_cry(fc(ctx), "%s", "Just before consume write"); + while (consume_connection(ctx, conn, conn_type)) { + process_new_connection(conn); close_connection(conn); } } @@ -12529,16 +12685,31 @@ worker_thread_run(void *thread_func_param) /* Threads have different return types on Windows and Unix. */ #ifdef _WIN32 -static unsigned __stdcall worker_thread(void *thread_func_param) +static unsigned __stdcall r_worker_thread(void *thread_func_param) { - worker_thread_run(thread_func_param); + worker_thread_run(thread_func_param, READ_CONNECTION); return 0; } #else static void * -worker_thread(void *thread_func_param) +r_worker_thread(void *thread_func_param) { - worker_thread_run(thread_func_param); + worker_thread_run(thread_func_param, READ_CONNECTION); + return NULL; +} +#endif /* _WIN32 */ +/* Threads have different return types on Windows and Unix. */ +#ifdef _WIN32 +static unsigned __stdcall w_worker_thread(void *thread_func_param) +{ + worker_thread_run(thread_func_param, WRITE_CONNECTION); + return 0; +} +#else +static void * +w_worker_thread(void *thread_func_param) +{ + worker_thread_run(thread_func_param, WRITE_CONNECTION); return NULL; } #endif /* _WIN32 */ @@ -12754,6 +12925,7 @@ master_thread_run(void *thread_func_param) mg_join_thread(ctx->workerthreadids[i]); } } + mg_join_thread(ctx->dispatcherthreadid); #if !defined(NO_SSL) if (ctx->ssl_ctx != NULL) { @@ -12774,6 +12946,21 @@ master_thread_run(void *thread_func_param) } +/* Threads have different return types on Windows and Unix. */ +#ifdef _WIN32 +static unsigned __stdcall dispatcher_thread(void *thread_func_param) +{ + dispatcher_thread_run(thread_func_param); + return 0; +} +#else +static void * +dispatcher_thread(void *thread_func_param) +{ + dispatcher_thread_run(thread_func_param); + return NULL; +} +#endif /* _WIN32 */ /* Threads have different return types on Windows and Unix. */ #ifdef _WIN32 static unsigned __stdcall master_thread(void *thread_func_param) @@ -12812,6 +12999,10 @@ free_context(struct mg_context *ctx) (void)pthread_cond_destroy(&ctx->thread_cond); (void)pthread_cond_destroy(&ctx->sq_empty); (void)pthread_cond_destroy(&ctx->sq_full); + (void)pthread_cond_destroy(&ctx->w_cq_empty); + (void)pthread_cond_destroy(&ctx->w_cq_full); + (void)pthread_cond_destroy(&ctx->r_cq_empty); + (void)pthread_cond_destroy(&ctx->r_cq_full); /* Destroy other context global data structures mutex */ (void)pthread_mutex_destroy(&ctx->nonce_mutex); @@ -13012,6 +13203,10 @@ mg_start(const struct mg_callbacks *callbacks, ok &= 0 == pthread_cond_init(&ctx->thread_cond, NULL); ok &= 0 == pthread_cond_init(&ctx->sq_empty, NULL); ok &= 0 == pthread_cond_init(&ctx->sq_full, NULL); + ok &= 0 == pthread_cond_init(&ctx->w_cq_empty, NULL); + ok &= 0 == pthread_cond_init(&ctx->w_cq_full, NULL); + ok &= 0 == pthread_cond_init(&ctx->r_cq_empty, NULL); + ok &= 0 == pthread_cond_init(&ctx->r_cq_full, NULL); ok &= 0 == pthread_mutex_init(&ctx->nonce_mutex, &pthread_mutex_attr); if (!ok) { /* Fatal error - abort start. However, this situation should never @@ -13134,15 +13329,26 @@ mg_start(const struct mg_callbacks *callbacks, /* Start master (listening) thread */ mg_start_thread_with_id(master_thread, ctx, &ctx->masterthreadid); + mg_start_thread_with_id(dispatcher_thread, ctx, &ctx->dispatcherthreadid); /* Start worker threads */ + int ret; for (i = 0; i < ctx->cfg_worker_threads; i++) { (void)pthread_mutex_lock(&ctx->thread_mutex); ctx->running_worker_threads++; (void)pthread_mutex_unlock(&ctx->thread_mutex); - if (mg_start_thread_with_id(worker_thread, - ctx, - &ctx->workerthreadids[i]) != 0) { + int read_write_perc = atoi(ctx->config[READ_TO_TOTAL_THREADS_PERCENTAGE]); + if (i<(read_write_perc * ctx->cfg_worker_threads)/100) { + ret = mg_start_thread_with_id(r_worker_thread, + ctx, + &ctx->workerthreadids[i]); + } + else { + ret = mg_start_thread_with_id(w_worker_thread, + ctx, + &ctx->workerthreadids[i]); + } + if (ret != 0) { (void)pthread_mutex_lock(&ctx->thread_mutex); ctx->running_worker_threads--; (void)pthread_mutex_unlock(&ctx->thread_mutex);