DeterministicESPAsyncWebServer 1.2.0
Zero-allocation, bounded-execution async HTTP server for ESP32
Loading...
Searching...
No Matches
transport.cpp
Go to the documentation of this file.
1// Copyright (C) 2026 Douglas Quigg (dstroy0) <dquigg123@gmail.com>
2// SPDX-License-Identifier: AGPL-3.0-or-later
3
4/**
5 * @file transport.cpp
6 * @brief Layer 4 (Transport) — TCP connection management implementation.
7 *
8 * All lwIP raw-API callbacks run in the `tcpip_thread` FreeRTOS task context.
9 * They are NOT hardware ISRs, which is why we use `xQueueSend()` (non-ISR
10 * variant) with timeout=0 rather than `xQueueSendFromISR()`.
11 *
12 * **Ring buffer write ordering**
13 * The producer (recv callback) writes payload bytes into `rx_buffer[]` and
14 * then advances `rx_head`. The consumer (main loop) reads from `rx_buffer[]`
15 * at `rx_tail` and advances `rx_tail`. Because Xtensa LX7 stores are
16 * in-order and `rx_head`/`rx_tail` are `volatile`, no memory barriers are
17 * needed beyond the `volatile` annotation.
18 */
19
20#include "transport.h"
21#include "freertos/FreeRTOS.h"
22#include "freertos/queue.h"
23#include "lwip/tcp.h"
24
26static struct tcp_pcb *listen_pcb = nullptr;
27
28static StaticQueue_t _queue_struct;
29static uint8_t _queue_storage[EVT_QUEUE_DEPTH * sizeof(TcpEvt)];
30
31QueueHandle_t DeterministicAsyncTCP::queue = nullptr;
33
34static err_t lowlevel_accept_cb(void *arg, struct tcp_pcb *newpcb, err_t err);
35static err_t lowlevel_recv_cb(void *arg, struct tcp_pcb *tpcb, struct pbuf *p, err_t err);
36static err_t lowlevel_sent_cb(void *arg, struct tcp_pcb *tpcb, u16_t len);
37static void lowlevel_err_cb(void *arg, err_t err);
38
39/**
40 * @brief Non-blocking event enqueue helper.
41 *
42 * xQueueSend with timeout=0 means the call returns immediately if the queue
43 * is full. In practice the queue (depth 16) is large enough for any burst
44 * of MAX_CONNS events; a full queue indicates the application is not calling
45 * server_tick() fast enough. Dropped events are recoverable via timeout.
46 */
47static inline void enqueue(const TcpEvt &evt)
48{
49 xQueueSend(DeterministicAsyncTCP::queue, &evt, 0);
50}
51
53{
54 return 0; // event queue is statically allocated in BSS
55}
56
58{
59 return true; // no heap allocation; always safe to call begin()
60}
61
62int32_t DeterministicAsyncTCP::init(uint16_t port, const WebServerConfig *cfg)
63{
64 // Load runtime config (or fall back to compile-time default)
66
67 queue = xQueueCreateStatic(EVT_QUEUE_DEPTH, sizeof(TcpEvt),
68 _queue_storage, &_queue_struct);
69 if (queue == nullptr)
70 return -1;
71
72 for (int i = 0; i < MAX_CONNS; i++)
73 {
74 conn_pool[i] = {}; // zero all fields, including last_activity_ms
75 conn_pool[i].id = i;
77 }
78
79 struct tcp_pcb *pcb = tcp_new_ip_type(IPADDR_TYPE_ANY);
80 if (!pcb)
81 return -1;
82
83 err_t err = tcp_bind(pcb, IP_ANY_TYPE, port);
84 if (err != ERR_OK)
85 {
86 tcp_abort(pcb);
87 return -1;
88 }
89
90 listen_pcb = tcp_listen_with_backlog(pcb, MAX_CONNS);
91 if (!listen_pcb)
92 {
93 tcp_abort(pcb);
94 return -1;
95 }
96
97 tcp_arg(listen_pcb, nullptr);
98 tcp_accept(listen_pcb, lowlevel_accept_cb);
99
100 return 1;
101}
102
104{
105 // Close the listener first — no new connections accepted
106 if (listen_pcb)
107 {
108 tcp_close(listen_pcb);
109 listen_pcb = nullptr;
110 }
111
112 // Abort all active connections
113 for (int i = 0; i < MAX_CONNS; i++)
114 {
115 if (conn_pool[i].state == CONN_ACTIVE && conn_pool[i].pcb)
116 {
117 struct tcp_pcb *pcb = conn_pool[i].pcb;
119 conn_pool[i].pcb = nullptr;
120 tcp_arg(pcb, nullptr);
121 tcp_abort(pcb);
122 }
124 conn_pool[i].pcb = nullptr;
125 }
126
127 // Free the event queue
128 if (queue)
129 {
130 vQueueDelete(queue);
131 queue = nullptr;
132 }
133}
134
136{
137 uint32_t now = millis();
138 for (int i = 0; i < MAX_CONNS; i++)
139 {
140 TcpConn *slot = &conn_pool[i];
141 if (slot->state != CONN_ACTIVE)
142 continue;
143 if ((now - slot->last_activity_ms) < conn_timeout_ms)
144 continue;
145
146 struct tcp_pcb *pcb = slot->pcb;
147 /*
148 * Clear state BEFORE calling tcp_abort so that any lwIP callback
149 * firing on the same PCB during or after abort sees state==CONN_FREE
150 * and exits immediately without accessing freed memory.
151 */
152 slot->state = CONN_FREE;
153 slot->pcb = nullptr;
154 if (pcb)
155 {
156 tcp_arg(pcb, nullptr); // detach slot pointer from PCB
157 tcp_abort(pcb);
158 }
159 TcpEvt evt = {EVT_ERROR, (uint8_t)i, 0};
160 enqueue(evt);
161 }
162}
163
164// ---------------------------------------------------------------------------
165// lwIP callbacks — execute in tcpip_thread task context
166// ---------------------------------------------------------------------------
167
168/**
169 * @brief lwIP accept callback — fires when a new client connects.
170 *
171 * Finds a free slot, wires up the per-connection callbacks, and posts
172 * EVT_CONNECT. If the pool is full, rejects the connection with ERR_ABRT
173 * (which tells lwIP the PCB is already gone from our side).
174 */
175static err_t lowlevel_accept_cb(void *arg, struct tcp_pcb *newpcb, err_t err)
176{
177 if (err != ERR_OK || newpcb == nullptr)
178 return ERR_VAL;
179
180 int free_slot = -1;
181 for (int i = 0; i < MAX_CONNS; i++)
182 {
183 if (conn_pool[i].state == CONN_FREE)
184 {
185 free_slot = i;
186 break;
187 }
188 }
189
190 if (free_slot == -1)
191 {
192 /*
193 * Pool is full — reject connection immediately. ERR_ABRT signals
194 * lwIP that the PCB has already been aborted by this callback.
195 */
196 tcp_abort(newpcb);
197 return ERR_ABRT;
198 }
199
200 TcpConn *slot = &conn_pool[free_slot];
201 slot->state = CONN_ACTIVE;
202 slot->pcb = newpcb;
203 slot->last_activity_ms = millis();
204 slot->rx_head = 0;
205 slot->rx_tail = 0;
206
207 tcp_arg(newpcb, slot);
208 tcp_recv(newpcb, lowlevel_recv_cb);
209 tcp_sent(newpcb, lowlevel_sent_cb);
210 tcp_err(newpcb, lowlevel_err_cb);
211
212 TcpEvt evt = {EVT_CONNECT, (uint8_t)free_slot, 0};
213 enqueue(evt);
214
215 return ERR_OK;
216}
217
218/**
219 * @brief lwIP receive callback — fires when data arrives on a connection.
220 *
221 * Copies pbuf chain bytes into the ring buffer and calls tcp_recved() with
222 * only the bytes actually stored, applying TCP-level backpressure when the
223 * buffer is full. A null pbuf signals graceful remote close (FIN received).
224 */
225static err_t lowlevel_recv_cb(void *arg, struct tcp_pcb *tpcb, struct pbuf *p, err_t err)
226{
227 TcpConn *slot = (TcpConn *)arg;
228 if (!slot || slot->state != CONN_ACTIVE)
229 return ERR_VAL;
230
231 if (p == nullptr)
232 {
233 /*
234 * Null pbuf signals graceful remote close (FIN received).
235 * Clear state and pcb before tcp_close so any stale callbacks
236 * are harmless.
237 */
238 slot->state = CONN_FREE;
239 slot->pcb = nullptr;
240 tcp_arg(tpcb, nullptr);
241 if (tcp_close(tpcb) != ERR_OK)
242 tcp_abort(tpcb);
243 TcpEvt evt = {EVT_DISCONNECT, slot->id, 0};
244 enqueue(evt);
245 return ERR_OK;
246 }
247
248 slot->last_activity_ms = millis();
249
250 size_t bytes_copied = 0;
251 bool full = false;
252 struct pbuf *q = p;
253 while (q != nullptr && !full)
254 {
255 uint8_t *payload = (uint8_t *)q->payload;
256 for (u16_t i = 0; i < q->len; i++)
257 {
258 size_t next_head = (slot->rx_head + 1) % RX_BUF_SIZE;
259 if (next_head == slot->rx_tail)
260 {
261 full = true;
262 break;
263 }
264 slot->rx_buffer[slot->rx_head] = payload[i];
265 slot->rx_head = next_head;
266 bytes_copied++;
267 }
268 q = q->next;
269 }
270
271 /*
272 * Acknowledge only bytes actually placed in the ring buffer.
273 * This shrinks the TCP receive window and applies backpressure if
274 * the application falls behind — data is never silently dropped.
275 */
276 tcp_recved(tpcb, (u16_t)bytes_copied);
277 pbuf_free(p);
278
279 if (bytes_copied > 0)
280 {
281 TcpEvt evt = {EVT_DATA, slot->id, bytes_copied};
282 enqueue(evt);
283 }
284
285 return ERR_OK;
286}
287
288/**
289 * @brief lwIP sent callback — fires after the stack acknowledges sent bytes.
290 *
291 * Used only to refresh the idle-timeout timestamp so an active sender doesn't
292 * get timed out while its responses are in flight.
293 */
294static err_t lowlevel_sent_cb(void *arg, struct tcp_pcb *tpcb, u16_t len)
295{
296 TcpConn *slot = (TcpConn *)arg;
297 if (slot)
298 slot->last_activity_ms = millis();
299 return ERR_OK;
300}
301
302/**
303 * @brief lwIP error callback — fires when the stack detects a fatal error.
304 *
305 * By the time this fires the PCB is already gone internally, so we must NOT
306 * call tcp_close() or tcp_abort(). Null out the slot's pcb pointer and post
307 * EVT_ERROR so the session layer resets the HTTP parser.
308 */
309static void lowlevel_err_cb(void *arg, err_t err)
310{
311 TcpConn *slot = (TcpConn *)arg;
312 if (!slot)
313 return;
314
315 /*
316 * When lwIP fires the error callback the PCB has already been freed
317 * internally. We must NOT call tcp_close/tcp_abort here — just null
318 * out our pointer to prevent any future access.
319 */
320 slot->state = CONN_FREE;
321 slot->pcb = nullptr;
322
323 TcpEvt evt = {EVT_ERROR, slot->id, 0};
324 enqueue(evt);
325}
#define CONN_TIMEOUT_MS
Compile-time default for connection idle timeout in milliseconds.
#define RX_BUF_SIZE
Ring-buffer capacity in bytes per connection slot.
#define EVT_QUEUE_DEPTH
Depth of the FreeRTOS event queue shared between lwIP callbacks and the main-loop task.
#define MAX_CONNS
Maximum simultaneous TCP connections.
static bool heap_available()
Always returns true — no heap allocation means no pre-flight needed.
Definition transport.cpp:57
static void check_timeouts()
Scan the pool and force-close connections idle for > conn_timeout_ms.
static int32_t init(uint16_t port, const WebServerConfig *cfg=nullptr)
Initialise the TCP stack, create the event queue, and begin listening.
Definition transport.cpp:62
static size_t heap_needed()
Always returns 0 — the library makes no heap allocations.
Definition transport.cpp:52
static uint32_t conn_timeout_ms
Runtime connection-idle timeout in milliseconds.
Definition transport.h:178
static void stop()
Stop the server: abort all connections, close the listener, free the queue.
static QueueHandle_t queue
FreeRTOS queue handle shared between lwIP callbacks and server_tick().
Definition transport.h:170
A single TCP connection context.
Definition transport.h:69
volatile ConnState state
Lifecycle state; volatile for inter-task visibility.
Definition transport.h:71
struct tcp_pcb * pcb
lwIP PCB; null when slot is free.
Definition transport.h:72
volatile size_t rx_tail
Consumer read index (main-loop context).
Definition transport.h:77
uint32_t last_activity_ms
millis() timestamp of last TX/RX event.
Definition transport.h:73
uint8_t id
Fixed slot index (0 … MAX_CONNS-1).
Definition transport.h:70
uint8_t rx_buffer[RX_BUF_SIZE]
Ring buffer storage.
Definition transport.h:75
volatile size_t rx_head
Producer write index (lwIP context).
Definition transport.h:76
Event record posted from lwIP callbacks to the main-loop task.
Definition transport.h:105
Runtime-tunable server parameters.
TcpConn conn_pool[MAX_CONNS]
Static pool of connection contexts. Defined in transport.cpp.
Definition transport.cpp:25
Layer 4 (Transport) — TCP connection pool, ring buffers, and lwIP integration.
@ CONN_FREE
Slot is available; no PCB is attached.
Definition transport.h:56
@ CONN_ACTIVE
Live connection; PCB is valid.
Definition transport.h:57
@ EVT_CONNECT
New connection accepted.
Definition transport.h:92
@ EVT_DISCONNECT
Remote peer closed the connection gracefully.
Definition transport.h:94
@ EVT_ERROR
lwIP reported an error (PCB may already be freed).
Definition transport.h:95
@ EVT_DATA
Data received; bytes are already in the ring buffer.
Definition transport.h:93
TcpConn conn_pool[MAX_CONNS]
Static pool of connection contexts. Defined in transport.cpp.
Definition transport.cpp:25