emme coverage


Directory: src/
File: src/server.c
Date: 2025-08-24 07:42:18
Exec Total Coverage
Lines: 0 306 0.0%
Functions: 0 12 0.0%
Branches: 0 148 0.0%

Line Branch Exec Source
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <poll.h>
5 #include <unistd.h>
6 #include <arpa/inet.h>
7 #include <sys/epoll.h>
8 #include <pthread.h>
9 #include <fcntl.h>
10 #include <liburing.h>
11 #include <sys/sendfile.h>
12 #include <sys/time.h> // for instrumentation
13 #include "config.h"
14 #include "server.h"
15 #include "http_parser.h"
16 #include "router.h"
17 #include "tls.h"
18 #include <openssl/ssl.h>
19 #include <openssl/err.h>
20 #include "thread_pool.h"
21 #include "log.h"
22 #include <ctype.h>
23 #include "http2_response.h"
24
25 /* Callback invoked for each header received in an HTTP/2 frame */
26 static int on_header_callback(nghttp2_session *session,
27 const nghttp2_frame *frame,
28 const uint8_t *name, size_t namelen,
29 const uint8_t *value, size_t valuelen,
30 uint8_t flags, void *user_data)
31 {
32 (void)session;
33 (void)flags;
34 (void)user_data;
35 if (frame->hd.type == NGHTTP2_HEADERS &&
36 frame->headers.cat == NGHTTP2_HCAT_REQUEST)
37 {
38 StreamData *data = nghttp2_session_get_stream_user_data(session, frame->hd.stream_id);
39 if (!data)
40 {
41 data = calloc(1, sizeof(StreamData));
42 nghttp2_session_set_stream_user_data(session, frame->hd.stream_id, data);
43 }
44 if (namelen >= 1 && name[0] == ':')
45 {
46 if (strncmp((const char *)name, ":method", namelen) == 0)
47 data->req.method = strndup((const char *)value, valuelen);
48 else if (strncmp((const char *)name, ":path", namelen) == 0)
49 data->req.path = strndup((const char *)value, valuelen);
50 else if (strncmp((const char *)name, ":scheme", namelen) == 0)
51 ; /* ignore scheme */
52 else if (strncmp((const char *)name, ":authority", namelen) == 0)
53 data->req.version = strndup((const char *)value, valuelen);
54 }
55 }
56 return 0;
57 }
58
59 static ssize_t http2_body_read_callback(
60 nghttp2_session *session, int32_t stream_id, uint8_t *buf, size_t length,
61 uint32_t *data_flags, nghttp2_data_source *source, void *user_data)
62 {
63 (void)session;
64 (void)stream_id;
65 (void)user_data;
66 struct {
67 const char *data;
68 size_t len;
69 size_t sent;
70 } *ctx = (void *)source->ptr;
71 size_t remaining = ctx->len - ctx->sent;
72 size_t to_copy = remaining < length ? remaining : length;
73 if (to_copy > 0)
74 {
75 memcpy(buf, ctx->data + ctx->sent, to_copy);
76 ctx->sent += to_copy;
77 }
78 if (ctx->sent >= ctx->len)
79 {
80 *data_flags = NGHTTP2_DATA_FLAG_EOF;
81 free(ctx);
82 }
83 return to_copy;
84 }
85
86 /* Callback invoked when a complete frame is received */
87 static int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame, void *user_data)
88 {
89 (void)user_data;
90 log_message(LOG_LEVEL_DEBUG, "on_frame_recv_callback: start");
91 if (frame->hd.type == NGHTTP2_HEADERS &&
92 frame->headers.cat == NGHTTP2_HCAT_REQUEST &&
93 (frame->hd.flags & NGHTTP2_FLAG_END_HEADERS))
94 {
95 StreamData *data = nghttp2_session_get_stream_user_data(session, frame->hd.stream_id);
96 if (data)
97 {
98 char raw_request[BUFFER_SIZE];
99 snprintf(raw_request, sizeof(raw_request), "%s %s %s\r\n",
100 data->req.method ? data->req.method : "GET",
101 data->req.path ? data->req.path : "/",
102 data->req.version ? data->req.version : "HTTP/2");
103 log_message(LOG_LEVEL_INFO, "Routing HTTP/2 request for path (synthesized): %s", data->req.path);
104 Http2Response h2resp = {0};
105 route_request_tls(&data->req, raw_request, strlen(raw_request), NULL, NULL, &h2resp);
106 if (h2resp.num_headers == 0)
107 {
108 char status_str[4];
109 snprintf(status_str, sizeof(status_str), "%d", h2resp.status_code ? h2resp.status_code : 200);
110 h2resp.headers[0] = MAKE_NV(":status", status_str);
111 h2resp.headers[1] = MAKE_NV("content-type", h2resp.content_type[0] ? h2resp.content_type : "text/plain");
112 char clen[32];
113 snprintf(clen, sizeof(clen), "%zu", h2resp.body_len);
114 h2resp.headers[2] = MAKE_NV("content-length", clen);
115 h2resp.num_headers = 3;
116 }
117 if (h2resp.body_len == 0)
118 {
119 static const char dummy_body[] = "\n";
120 strncpy(h2resp.body, dummy_body, sizeof(h2resp.body)-1);
121 h2resp.body[sizeof(h2resp.body)-1] = '\0';
122 h2resp.body_len = strlen(h2resp.body);
123 }
124 if (h2resp.status_code == 0)
125 h2resp.status_code = 200;
126 nghttp2_data_provider data_prd;
127 struct {
128 const char *data;
129 size_t len;
130 size_t sent;
131 } *body_ctx = malloc(sizeof(*body_ctx));
132 body_ctx->data = h2resp.body;
133 body_ctx->len = h2resp.body_len;
134 body_ctx->sent = 0;
135 data_prd.source.ptr = body_ctx;
136 data_prd.read_callback = http2_body_read_callback;
137 int rv = nghttp2_submit_response(session, frame->hd.stream_id,
138 h2resp.headers, h2resp.num_headers,
139 &data_prd);
140 if (rv != 0)
141 log_message(LOG_LEVEL_ERROR, "nghttp2_submit_response failed: %s", nghttp2_strerror(rv));
142 else
143 log_message(LOG_LEVEL_INFO, "nghttp2_submit_response succeeded for stream %d", frame->hd.stream_id);
144 }
145 }
146 return 0;
147 }
148
149 /* Callback invoked when a stream is closed */
150 static int on_stream_close_callback(nghttp2_session *session, int32_t stream_id,
151 uint32_t error_code, void *user_data)
152 {
153 (void)error_code;
154 (void)user_data;
155 StreamData *data = nghttp2_session_get_stream_user_data(session, stream_id);
156 if (data)
157 {
158 free((void *)data->req.method);
159 free((void *)data->req.path);
160 free((void *)data->req.version);
161 free(data);
162 nghttp2_session_set_stream_user_data(session, stream_id, NULL);
163 }
164 return 0;
165 }
166
167 SSL_CTX *ssl_ctx = NULL;
168 static struct io_uring global_ring; // Global io_uring instance for the accept loop.
169
170 static void handle_http2_connection(SSL *ssl, int client_fd, ServerConfig *config, struct io_uring *ring);
171 static void handle_http1_connection(SSL *ssl, int client_fd, ServerConfig *config);
172
173 /* Callback for nghttp2 to send data via SSL */
174 static ssize_t send_callback(nghttp2_session *session, const uint8_t *data,
175 size_t length, int flags, void *user_data)
176 {
177 (void)session;
178 (void)flags;
179 SSL *ssl = (SSL *)user_data;
180 ssize_t ret = SSL_write(ssl, data, length);
181 if (ret <= 0)
182 {
183 int ssl_error = SSL_get_error(ssl, ret);
184 log_message(LOG_LEVEL_ERROR, "SSL_write failed in send_callback. Error: %d", ssl_error);
185 }
186 return ret;
187 }
188
189 /* Callback for nghttp2 to receive data via SSL */
190 static ssize_t recv_callback(nghttp2_session *session, uint8_t *buf, size_t length,
191 int flags, void *user_data)
192 {
193 log_message(LOG_LEVEL_DEBUG, "recv_callback: start");
194 (void)session;
195 (void)flags;
196 SSL *ssl = (SSL *)user_data;
197 ssize_t ret = SSL_read(ssl, buf, length);
198 if (ret <= 0)
199 {
200 int ssl_error = SSL_get_error(ssl, ret);
201 if (ssl_error == SSL_ERROR_ZERO_RETURN)
202 {
203 log_message(LOG_LEVEL_INFO, "SSL connection closed by peer");
204 return NGHTTP2_ERR_EOF;
205 }
206 if (ssl_error == SSL_ERROR_WANT_READ || ssl_error == SSL_ERROR_WANT_WRITE)
207 return NGHTTP2_ERR_WOULDBLOCK;
208 log_message(LOG_LEVEL_ERROR, "SSL_read failed in recv_callback. Error: %d", ssl_error);
209 return NGHTTP2_ERR_CALLBACK_FAILURE;
210 }
211 log_message(LOG_LEVEL_DEBUG, "recv_callback: end, bytes read: %zd", ret);
212 return ret;
213 }
214
215 /* Nonblocking TLS handshake using thread-local io_uring with instrumentation */
216 static int perform_nonblocking_ssl_accept(SSL *ssl, int client_fd, struct io_uring *ring)
217 {
218 int ret;
219 struct timeval start, end;
220 gettimeofday(&start, NULL);
221 while ((ret = SSL_accept(ssl)) <= 0)
222 {
223 int err = SSL_get_error(ssl, ret);
224 if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE)
225 {
226 int poll_flags = (err == SSL_ERROR_WANT_READ) ? POLLIN : POLLOUT;
227 struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
228 if (!sqe)
229 {
230 log_message(LOG_LEVEL_ERROR, "Failed to get SQE during handshake");
231 return -1;
232 }
233 io_uring_prep_poll_add(sqe, client_fd, poll_flags);
234 int submit_ret = io_uring_submit(ring);
235 if (submit_ret < 0)
236 {
237 log_message(LOG_LEVEL_ERROR, "io_uring_submit failed during handshake: %s", strerror(-submit_ret));
238 return -1;
239 }
240 struct io_uring_cqe *cqe;
241 int wait_ret = io_uring_wait_cqe(ring, &cqe);
242 if (wait_ret < 0)
243 {
244 log_message(LOG_LEVEL_ERROR, "io_uring_wait_cqe failed during handshake: %s", strerror(-wait_ret));
245 return -1;
246 }
247 io_uring_cqe_seen(ring, cqe);
248 }
249 else
250 {
251 log_message(LOG_LEVEL_ERROR, "SSL_accept failed nonblockingly, error: %d", err);
252 return -1;
253 }
254 }
255 gettimeofday(&end, NULL);
256 long handshake_ms = (end.tv_sec - start.tv_sec) * 1000 + (end.tv_usec - start.tv_usec) / 1000;
257 log_message(LOG_LEVEL_INFO, "Nonblocking SSL handshake completed in %ld ms", handshake_ms);
258 return ret;
259 }
260
261 /* HTTP/2 connection handler using thread-local io_uring */
262 static void handle_http2_connection(SSL *ssl, int client_fd, ServerConfig *config, struct io_uring *ring)
263 {
264 (void)config;
265 int flags = fcntl(client_fd, F_GETFL, 0);
266 fcntl(client_fd, F_SETFL, flags | O_NONBLOCK);
267 nghttp2_session_callbacks *callbacks;
268 nghttp2_session *session;
269 if (nghttp2_session_callbacks_new(&callbacks) != 0)
270 {
271 log_message(LOG_LEVEL_ERROR, "Failed to initialize HTTP/2 callbacks");
272 return;
273 }
274 nghttp2_session_callbacks_set_send_callback(callbacks, send_callback);
275 nghttp2_session_callbacks_set_recv_callback(callbacks, recv_callback);
276 nghttp2_session_callbacks_set_on_header_callback(callbacks, on_header_callback);
277 nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, on_frame_recv_callback);
278 nghttp2_session_callbacks_set_on_stream_close_callback(callbacks, on_stream_close_callback);
279 if (nghttp2_session_server_new(&session, callbacks, ssl) != 0)
280 {
281 log_message(LOG_LEVEL_ERROR, "Failed to create nghttp2 session");
282 nghttp2_session_callbacks_del(callbacks);
283 return;
284 }
285 log_message(LOG_LEVEL_INFO, "HTTP/2 session started");
286 int rv = nghttp2_submit_settings(session, NGHTTP2_FLAG_NONE, NULL, 0);
287 if (rv < 0)
288 {
289 log_message(LOG_LEVEL_ERROR, "Failed to send initial SETTINGS frame: %s", nghttp2_strerror(rv));
290 nghttp2_session_del(session);
291 nghttp2_session_callbacks_del(callbacks);
292 return;
293 }
294 rv = 0;
295 while (nghttp2_session_want_read(session) || nghttp2_session_want_write(session))
296 {
297 struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
298 if (!sqe)
299 {
300 log_message(LOG_LEVEL_ERROR, "Failed to get io_uring SQE");
301 break;
302 }
303 io_uring_prep_poll_add(sqe, client_fd, POLLIN | POLLOUT);
304 int ret = io_uring_submit(ring);
305 if (ret < 0)
306 {
307 log_message(LOG_LEVEL_ERROR, "io_uring_submit failed: %s", strerror(-ret));
308 break;
309 }
310 struct io_uring_cqe *cqe;
311 ret = io_uring_wait_cqe(ring, &cqe);
312 if (ret < 0)
313 {
314 log_message(LOG_LEVEL_ERROR, "io_uring_wait_cqe failed: %s", strerror(-ret));
315 break;
316 }
317 int revents = cqe->res;
318 io_uring_cqe_seen(ring, cqe);
319 if (revents < 0)
320 {
321 log_message(LOG_LEVEL_ERROR, "Poll failed: %s", strerror(-revents));
322 break;
323 }
324 if (revents & POLLIN)
325 {
326 rv = nghttp2_session_recv(session);
327 if (rv < 0)
328 {
329 if (rv == NGHTTP2_ERR_EOF)
330 {
331 log_message(LOG_LEVEL_INFO, "HTTP/2 session closed by client");
332 break;
333 }
334 else if (rv == NGHTTP2_ERR_WOULDBLOCK)
335 {
336 rv = 0;
337 }
338 else
339 {
340 log_message(LOG_LEVEL_ERROR, "nghttp2_session_recv error: %s", nghttp2_strerror(rv));
341 break;
342 }
343 }
344 }
345 if (revents & POLLOUT)
346 {
347 rv = nghttp2_session_send(session);
348 if (rv < 0)
349 {
350 if (rv == NGHTTP2_ERR_WOULDBLOCK)
351 rv = 0;
352 else
353 {
354 log_message(LOG_LEVEL_ERROR, "nghttp2_session_send error: %s", nghttp2_strerror(rv));
355 break;
356 }
357 }
358 }
359 }
360 nghttp2_session_del(session);
361 nghttp2_session_callbacks_del(callbacks);
362 }
363
364 /* HTTP/1.1 connection handler using legacy methods */
365 static void handle_http1_connection(SSL *ssl, int client_fd, ServerConfig *config)
366 {
367 (void)client_fd;
368
369 // Serve multiple HTTP/1.x requests on the same TLS socket
370 for (;;) {
371 char buffer[BUFFER_SIZE];
372 int total_read = 0;
373
374 // 1) Read up to the end of headers (\r\n\r\n)
375 while (total_read < BUFFER_SIZE - 1) {
376 int n = SSL_read(ssl, buffer + total_read,
377 BUFFER_SIZE - 1 - total_read);
378 if (n <= 0) {
379 // client closed connection or SSL error
380 goto shutdown_and_close;
381 }
382 total_read += n;
383 if (strstr(buffer, "\r\n\r\n"))
384 break;
385 }
386 buffer[total_read] = '\0';
387
388 // 2) Parse the request
389 HttpRequest req;
390 if (parse_http_request(buffer, total_read, &req) != 0) {
391 const char *bad_response =
392 "HTTP/1.1 400 Bad Request\r\n"
393 "Content-Length: 0\r\n"
394 "\r\n";
395 SSL_write(ssl, bad_response, strlen(bad_response));
396 // malformed request → close connection
397 goto shutdown_and_close;
398 }
399
400 log_message(LOG_LEVEL_INFO, "Valid HTTP request received. Routing...");
401
402 // 3) Route & send response (static, proxy, etc.)
403 route_request_tls(&req, buffer, total_read, config, ssl, NULL);
404
405 // 4) Loop back to read the next request
406 // (do NOT shutdown/close here)
407 }
408
409 shutdown_and_close:
410 SSL_shutdown(ssl);
411 close(client_fd);
412 }
413
414 /* Worker task: uses thread-local io_uring for per-connection I/O */
415 void client_task(void *arg)
416 {
417 ClientTaskData *data = (ClientTaskData *)arg;
418 static __thread struct io_uring *local_ring = NULL;
419 if (!local_ring)
420 {
421 local_ring = malloc(sizeof(struct io_uring));
422 if (io_uring_queue_init(QUEUE_DEPTH, local_ring, 0) < 0)
423 {
424 log_message(LOG_LEVEL_ERROR, "Failed to initialize thread-local io_uring");
425 free(local_ring);
426 local_ring = NULL;
427 }
428 }
429 handle_client(data->client_fd, data->config, local_ring);
430 free(data);
431 }
432
433 /* Main per-connection handler; performs nonblocking TLS handshake before dispatching via ALPN */
434 void handle_client(int client_fd, ServerConfig *config, struct io_uring *ring)
435 {
436 struct timeval timeout;
437 timeout.tv_sec = 5;
438 timeout.tv_usec = 0;
439 setsockopt(client_fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout));
440 setsockopt(client_fd, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout));
441 SSL *ssl = SSL_new(ssl_ctx);
442 if (!ssl)
443 {
444 close(client_fd);
445 return;
446 }
447 SSL_set_fd(ssl, client_fd);
448 SSL_set_app_data(ssl, config);
449 if (perform_nonblocking_ssl_accept(ssl, client_fd, ring) <= 0)
450 {
451 log_message(LOG_LEVEL_ERROR, "Nonblocking SSL handshake failed");
452 SSL_free(ssl);
453 close(client_fd);
454 return;
455 }
456 const unsigned char *alpn_proto = NULL;
457 unsigned int alpn_len = 0;
458 SSL_get0_alpn_selected(ssl, &alpn_proto, &alpn_len);
459 if (alpn_len == 2 && memcmp(alpn_proto, "h2", 2) == 0)
460 {
461 log_message(LOG_LEVEL_INFO, "Negotiated HTTP/2");
462 handle_http2_connection(ssl, client_fd, config, ring);
463 }
464 else
465 {
466 log_message(LOG_LEVEL_INFO, "Negotiated HTTP/1.1");
467 handle_http1_connection(ssl, client_fd, config);
468 }
469 SSL_shutdown(ssl);
470 SSL_free(ssl);
471 close(client_fd);
472 }
473
474 /* Main server accept loop using a global io_uring instance */
475 int start_server(ServerConfig *config)
476 {
477 int server_fd, client_fd;
478 struct sockaddr_in server_addr, client_addr;
479 socklen_t client_len = sizeof(client_addr);
480 if (io_uring_queue_init(QUEUE_DEPTH * 2, &global_ring, 0) != 0)
481 {
482 perror("global io_uring_queue_init failed");
483 return 1;
484 }
485 server_fd = socket(AF_INET, SOCK_STREAM, 0);
486 if (server_fd == -1)
487 {
488 perror("Socket creation error");
489 io_uring_queue_exit(&global_ring);
490 return 1;
491 }
492 server_addr.sin_family = AF_INET;
493 server_addr.sin_addr.s_addr = INADDR_ANY;
494 server_addr.sin_port = htons(config->port);
495 if (bind(server_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1)
496 {
497 perror("Bind error");
498 close(server_fd);
499 io_uring_queue_exit(&global_ring);
500 return 1;
501 }
502 if (listen(server_fd, 2048) == -1)
503 {
504 perror("Listen error");
505 close(server_fd);
506 io_uring_queue_exit(&global_ring);
507 return 1;
508 }
509 ssl_ctx = create_ssl_context(config->ssl.certificate, config->ssl.private_key);
510 ThreadPool *pool = thread_pool_create(32, config->max_connections);
511 if (!pool)
512 {
513 fprintf(stderr, "Failed to create thread pool\n");
514 close(server_fd);
515 io_uring_queue_exit(&global_ring);
516 return 1;
517 }
518 printf("Emme listening on port %d...\n", config->port);
519 while (1)
520 {
521 struct io_uring_sqe *sqe = io_uring_get_sqe(&global_ring);
522 if (!sqe)
523 {
524 fprintf(stderr, "Failed to get SQE for accept\n");
525 break;
526 }
527 io_uring_prep_accept(sqe, server_fd, (struct sockaddr *)&client_addr, &client_len, 0);
528 if (io_uring_submit(&global_ring) < 0)
529 {
530 perror("io_uring_submit (accept) failed");
531 break;
532 }
533 struct io_uring_cqe *cqe;
534 if (io_uring_wait_cqe(&global_ring, &cqe) < 0)
535 {
536 perror("io_uring_wait_cqe (accept) failed");
537 break;
538 }
539 client_fd = cqe->res;
540 io_uring_cqe_seen(&global_ring, cqe);
541 int flags = fcntl(client_fd, F_GETFL, 0);
542 fcntl(client_fd, F_SETFL, flags & ~O_NONBLOCK);
543 ClientTaskData *task_data = malloc(sizeof(ClientTaskData));
544 if (!task_data)
545 {
546 perror("Failed to allocate memory for client task");
547 close(client_fd);
548 continue;
549 }
550 task_data->client_fd = client_fd;
551 task_data->config = config;
552 if (!thread_pool_add_task(pool, client_task, task_data))
553 {
554 fprintf(stderr, "Failed to add task to thread pool\n");
555 free(task_data);
556 close(client_fd);
557 }
558 }
559 thread_pool_destroy(pool);
560 close(server_fd);
561 io_uring_queue_exit(&global_ring);
562 cleanup_ssl_context(ssl_ctx);
563 return 0;
564 }
565