ndn-embeds 0.1.0
Lightweight NDN protocol stack for embedded systems
Loading...
Searching...
No Matches
forwarder.cpp
1#include "ndn/forwarder.hpp"
2#include "ndn/tlv.hpp"
3
4#include "esp_log.h"
5
6namespace {
7const char* TAG = "FWD";
8} // namespace
9
10namespace ndn {
11
13 for (auto& face : faces_) {
14 face = nullptr;
15 }
16}
17
18Error Forwarder::init(size_t csMaxEntries) {
19 if (initialized_) {
20 return Error::Success;
21 }
22
23 // Initialize Content Store
24 const Error err = cs_.init(csMaxEntries);
25 if (err != Error::Success) {
26 ESP_LOGE(TAG, "Failed to initialize Content Store (size=%zu)", csMaxEntries);
27 return err;
28 }
29
30 // Could add FIB routes for LocalFace (FaceId = 1) here
31 // No special initialization needed at this point
32
33 initialized_ = true;
34 return Error::Success;
35}
36
38 if (face == nullptr) {
39 return Error::InvalidParam;
40 }
41
42 const FaceId newFaceId = face->id();
43 if (newFaceId == FACE_ID_INVALID) {
44 return Error::InvalidParam;
45 }
46
47 // Check for duplicates
48 for (auto& f : faces_) {
49 if (f != nullptr && f->id() == newFaceId) {
50 return Error::InvalidParam; // Duplicate
51 }
52 }
53
54 // Find a free slot
55 for (auto& f : faces_) {
56 if (f == nullptr) {
57 f = face;
58 face->setPacketCallback([this](FaceId faceId, const uint8_t* data, size_t len) {
59 onPacketReceived(faceId, data, len);
60 });
61 numFaces_++;
62 return Error::Success;
63 }
64 }
65
66 return Error::Full;
67}
68
70 for (auto& f : faces_) {
71 if (f != nullptr && f->id() == faceId) {
72 f->stop();
73 f = nullptr;
74 numFaces_--;
75 fib_.removeFace(faceId);
76 break;
77 }
78 }
79}
80
82 TimeoutCallback onTimeout) {
83 // Register pending Interest
84 for (auto& pending : pendingInterests_) {
85 if (!pending.inUse) {
86 pending.interest = interest;
87 pending.dataCallback = onData;
88 pending.timeoutCallback = onTimeout;
89 pending.inUse = true;
90
91 // Insert into PIT
92 PitEntry* entry = nullptr;
93 auto result = pit_.insert(interest, FACE_ID_LOCAL, &entry);
94
95 if (result == PitInsertResult::Full) {
96 pending.inUse = false;
97 return Error::Full;
98 }
99
100 // Forward
101 forwardInterest(interest, FACE_ID_LOCAL);
102 stats_.interestsSent++;
103 return Error::Success;
104 }
105 }
106
107 return Error::Full;
108}
109
111 // Forward only without PIT registration (e.g., Sync Interest)
112 forwardInterest(interest, FACE_ID_LOCAL);
113 stats_.interestsSent++;
114 return Error::Success;
115}
116
118 for (auto& reg : prefixRegs_) {
119 if (!reg.inUse) {
120 reg.prefix = prefix;
121 reg.callback = callback;
122 reg.inUse = true;
123
124 // Add route to LocalFace
125 fib_.addRoute(prefix, FACE_ID_LOCAL, 0);
126 return Error::Success;
127 }
128 }
129 return Error::Full;
130}
131
132Error Forwarder::registerPrefix(std::string_view prefixUri, InterestCallback callback) {
133 auto nameResult = Name::fromUri(prefixUri);
134 if (!nameResult.ok()) {
135 return nameResult.error;
136 }
137 return registerPrefix(nameResult.value, callback);
138}
139
141 for (auto& reg : prefixRegs_) {
142 if (reg.inUse && reg.prefix.equals(prefix)) {
143 reg.inUse = false;
144 fib_.removeRoute(prefix, FACE_ID_LOCAL);
145 break;
146 }
147 }
148}
149
151 ESP_LOGI(TAG, "putData called");
152
153 // Store in CS
154 cs_.insert(data, currentTimeMs());
155
156 // Match PIT and forward
157 PitEntry* pitEntry = pit_.find(data.name());
158 if (pitEntry != nullptr) {
159 ESP_LOGI(TAG, "putData: PIT match found, forwarding to %zu faces", pitEntry->faceCount());
160 forwardData(data, pitEntry);
161 pit_.remove(pitEntry);
162 } else {
163 ESP_LOGW(TAG, "putData: no PIT match");
164 }
165
166 return Error::Success;
167}
168
169Error Forwarder::addRoute(const Name& prefix, FaceId faceId, uint8_t cost) {
170 return fib_.addRoute(prefix, faceId, cost);
171}
172
173Error Forwarder::addRoute(std::string_view prefixUri, FaceId faceId, uint8_t cost) {
174 auto nameResult = Name::fromUri(prefixUri);
175 if (!nameResult.ok()) {
176 return nameResult.error;
177 }
178 return fib_.addRoute(nameResult.value, faceId, cost);
179}
180
182 const TimeMs now = currentTimeMs();
183
184 // PIT timeout processing
185 pit_.processTimeouts(now, [this](const PitEntry& entry) {
186 // Notify local application of timeout
187 for (auto& pending : pendingInterests_) {
188 if (pending.inUse && pending.interest.name().equals(entry.name())) {
189 if (pending.timeoutCallback) {
190 pending.timeoutCallback(pending.interest);
191 }
192 pending.inUse = false;
193 break;
194 }
195 }
196 });
197
198 // Evict stale CS entries (periodically)
199 static TimeMs lastEviction = 0;
200 if (now - lastEviction > 10000) { // Every 10 seconds
201 cs_.evictStale(now);
202 lastEviction = now;
203 }
204}
205
206void Forwarder::onPacketReceived(FaceId faceId, const uint8_t* data, size_t len) {
207 if (len < 2) {
208 return;
209 }
210
211 // Check TLV type
212 uint32_t type = data[0];
213 if (type == 253 && len >= 3) {
214 type = (static_cast<uint32_t>(data[1]) << 8) | data[2];
215 }
216
217 if (type == tlv::Interest) {
218 auto result = Interest::fromWire(data, len);
219 if (result.ok()) {
220 ESP_LOGI(TAG, "Interest received from face=%u", faceId);
221 stats_.interestsReceived++;
222 onInterestReceived(faceId, result.value);
223 } else {
224 ESP_LOGW(TAG, "Interest decode failed from face=%u, len=%zu", faceId, len);
225 }
226 } else if (type == tlv::Data) {
227 auto result = Data::fromWire(data, len);
228 if (result.ok()) {
229 ESP_LOGI(TAG, "Data received from face=%u", faceId);
230 stats_.dataReceived++;
231 onDataReceived(faceId, result.value);
232 }
233 }
234}
235
236void Forwarder::onInterestReceived(FaceId faceId, const Interest& interest) {
237 // 1. CS lookup (considering MustBeFresh flag)
238 const CsEntry* csEntry = cs_.find(interest.name(), interest.mustBeFresh(), currentTimeMs());
239 if (csEntry != nullptr) {
240 // Cache hit: send Data back
241 ESP_LOGI(TAG, "Cache hit for Interest, sending Data to face=%u", faceId);
242 stats_.cacheHits++;
243 uint8_t buf[PACKET_MAX_SIZE];
244 size_t len = 0;
245 if (csEntry->data().encode(buf, sizeof(buf), len) == Error::Success) {
246 // Unicast reply to the originating peer
247 for (auto& f : faces_) {
248 if (f != nullptr) {
249 const Error err = f->sendTo(faceId, buf, len);
250 if (err == Error::Success) {
251 stats_.dataSent++;
252 ESP_LOGI(TAG, "Cache hit: Data sent to face=%u", faceId);
253 break;
254 }
255 // If NotFound, try the next Face
256 }
257 }
258 }
259 return;
260 }
261 stats_.cacheMisses++;
262
263 // 2. PIT registration
264 PitEntry* pitEntry = nullptr;
265 auto pitResult = pit_.insert(interest, faceId, &pitEntry);
266
267 if (pitResult == PitInsertResult::Duplicate) {
268 // Loop detection - drop packet
269 ESP_LOGI(TAG, "PIT: Duplicate Interest, dropping");
270 return;
271 }
272
273 if (pitResult == PitInsertResult::Aggregated) {
274 // Face already added to existing PIT entry - no need to re-forward
275 ESP_LOGI(TAG, "PIT: Aggregated Interest");
276 return;
277 }
278
279 // 3. Check local prefix registrations
280 bool localHandled = false;
281 char interestUri[128];
282 interest.name().toUri(interestUri, sizeof(interestUri));
283
284 for (auto& reg : prefixRegs_) {
285 if (reg.inUse) {
286 char prefixUri[128];
287 reg.prefix.toUri(prefixUri, sizeof(prefixUri));
288 const bool match = reg.prefix.isPrefixOf(interest.name());
289 ESP_LOGI(TAG, "prefix check: interest=%s prefix=%s match=%d", interestUri, prefixUri,
290 match);
291 if (match) {
292 // Deliver to local application
293 reg.callback(interest, faceId);
294 localHandled = true;
295 break;
296 }
297 }
298 }
299
300 // 4. FIB lookup and forward (only if not handled locally)
301 // If handled locally, Data will be returned via putData(), no need to forward
302 if (!localHandled) {
303 forwardInterest(interest, faceId);
304 }
305}
306
307void Forwarder::onDataReceived(FaceId faceId, const Data& data) {
308 // 1. PIT match
309 PitEntry* pitEntry = pit_.find(data.name());
310 if (pitEntry == nullptr) {
311 // Unsolicited Data - drop
312 return;
313 }
314
315 // 2. Store in CS
316 cs_.insert(data, currentTimeMs());
317
318 // 3. Callback to pending Interest
319 for (auto& pending : pendingInterests_) {
320 if (pending.inUse && pending.interest.name().equals(data.name())) {
321 if (pending.dataCallback) {
322 pending.dataCallback(data);
323 }
324 pending.inUse = false;
325 break;
326 }
327 }
328
329 // 4. Forward to other Faces
330 forwardData(data, pitEntry);
331
332 // 5. Remove PIT entry
333 pit_.remove(pitEntry);
334}
335
336void Forwarder::forwardInterest(const Interest& interest, FaceId incomingFace) {
337 const FibEntry* fibEntry = fib_.findLongestMatch(interest.name());
338 if (fibEntry == nullptr || fibEntry->nexthopCount() == 0) {
339 ESP_LOGW(TAG, "forwardInterest: no route");
340 return; // No route
341 }
342
343 char nameUri[64];
344 interest.name().toUri(nameUri, sizeof(nameUri));
345 ESP_LOGI(TAG, "forwardInterest: name=%s nexthops=%zu incoming=%u", nameUri,
346 fibEntry->nexthopCount(), incomingFace);
347
348 // Encode
349 uint8_t buf[PACKET_MAX_SIZE];
350 size_t len = 0;
351 if (interest.encode(buf, sizeof(buf), len) != Error::Success) {
352 ESP_LOGE(TAG, "forwardInterest: encode failed");
353 return;
354 }
355
356 // Forward to each nexthop (except incoming)
357 for (size_t i = 0; i < fibEntry->nexthopCount(); ++i) {
358 const FaceId nextFace = fibEntry->nexthop(i).faceId;
359
360 ESP_LOGI(TAG, "forwardInterest: nexthop[%zu]=%u", i, nextFace);
361
362 if (nextFace == incomingFace || nextFace == FACE_ID_LOCAL) {
363 ESP_LOGI(TAG, "forwardInterest: skipping (incoming or local)");
364 continue; // Do not send to incoming face or local
365 }
366
367 for (auto& f : faces_) {
368 if (f != nullptr && f->id() == nextFace) {
369 f->send(buf, len);
370 ESP_LOGI(TAG, "forwardInterest: sent to face=%u len=%zu", nextFace, len);
371 stats_.interestsSent++;
372 break;
373 }
374 }
375 }
376}
377
378void Forwarder::forwardData(const Data& data, PitEntry* pitEntry) {
379 if (pitEntry == nullptr) {
380 return;
381 }
382
383 // Encode
384 uint8_t buf[PACKET_MAX_SIZE];
385 size_t len = 0;
386 if (data.encode(buf, sizeof(buf), len) != Error::Success) {
387 ESP_LOGE(TAG, "forwardData: encode failed");
388 return;
389 }
390
391 ESP_LOGI(TAG, "forwardData: sending to %zu faces", pitEntry->faceCount());
392
393 // Send to each incoming face
394 for (size_t i = 0; i < pitEntry->faceCount(); ++i) {
395 const FaceId destFaceId = pitEntry->face(i);
396
397 ESP_LOGI(TAG, "forwardData: face[%zu]=%u", i, destFaceId);
398
399 if (destFaceId == FACE_ID_LOCAL) {
400 ESP_LOGI(TAG, "forwardData: skipping local face");
401 continue; // Local face already handled separately
402 }
403
404 // FaceId recorded in PIT is the originating peer ID
405 // Use sendTo() to unicast to that peer
406 for (auto& f : faces_) {
407 if (f != nullptr) {
408 const Error err = f->sendTo(destFaceId, buf, len);
409 if (err == Error::Success) {
410 ESP_LOGI(TAG, "forwardData: sent to face=%u", destFaceId);
411 stats_.dataSent++;
412 break;
413 }
414 // If NotFound, try the next Face
415 }
416 }
417 }
418}
419
420} // namespace ndn
Error init(size_t maxEntries=CS_DEFAULT_ENTRIES)
Initialize the Content Store.
Definition cs.cpp:33
Error insert(const Data &data, TimeMs now)
Insert Data into the cache.
Definition cs.cpp:61
const CsEntry * find(const Name &name, bool mustBeFresh=false, TimeMs now=0) const
Search the cache by Name.
Definition cs.cpp:115
void evictStale(TimeMs now)
Remove stale entries.
Definition cs.cpp:144
NDN Data packet.
Definition data.hpp:49
const Name & name() const
Get the Name (const reference)
Definition data.hpp:104
static Result< Data > fromWire(const uint8_t *buf, size_t len)
Decode a Data packet from TLV wire format.
Definition data.cpp:362
NDN Face abstract base class.
Definition face.hpp:48
void setPacketCallback(PacketCallback callback)
Set the packet receive callback.
Definition face.hpp:132
virtual FaceId id() const =0
Get the Face ID.
const FibEntry * findLongestMatch(const Name &name) const
Look up using Longest Prefix Match.
Definition fib.cpp:117
void removeRoute(const Name &prefix, FaceId faceId)
Remove a specific next-hop.
Definition fib.cpp:81
void removeFace(FaceId faceId)
Remove a specified Face from all entries.
Definition fib.cpp:105
Error addRoute(const Name &prefix, FaceId faceId, uint8_t cost=0)
Add a route.
Definition fib.cpp:56
Error addRoute(const Name &prefix, FaceId faceId, uint8_t cost=0)
Add a route.
Error addFace(Face *face)
Add a Face.
Definition forwarder.cpp:37
Error registerPrefix(const Name &prefix, InterestCallback callback)
Register a prefix.
Forwarder()
Constructor.
Definition forwarder.cpp:12
void unregisterPrefix(const Name &prefix)
Unregister a prefix.
void removeFace(FaceId faceId)
Remove a Face.
Definition forwarder.cpp:69
Error expressInterest(const Interest &interest, DataCallback onData, TimeoutCallback onTimeout=nullptr)
Send an Interest and wait for Data.
Definition forwarder.cpp:81
Error putData(const Data &data)
Send Data.
Error sendInterest(const Interest &interest)
Send an Interest (without PIT registration)
Error init(size_t csMaxEntries=CS_DEFAULT_ENTRIES)
Initialize the Forwarder.
Definition forwarder.cpp:18
void processEvents()
Process events.
NDN Interest packet.
Definition interest.hpp:50
static Result< Interest > fromWire(const uint8_t *buf, size_t len)
Decode an Interest from TLV wire format.
Definition interest.cpp:580
NDN Name class.
Definition name.hpp:64
static Result< Name > fromUri(std::string_view uri)
Create a Name from a URI string.
Definition name.cpp:25
PIT entry.
Definition pit.hpp:46
size_t faceCount() const
Get the number of registered Faces.
Definition pit.hpp:74
const Name & name() const
Get the Interest Name.
Definition pit.hpp:52
PitInsertResult insert(const Interest &interest, FaceId incomingFace, PitEntry **outEntry=nullptr)
Insert an Interest.
Definition pit.cpp:40
void processTimeouts(TimeMs now, TimeoutCallback callback=nullptr)
Process timed-out entries.
Definition pit.cpp:121
PitEntry * find(const Name &name)
Find an entry by Name.
Definition pit.cpp:88
void remove(PitEntry *entry)
Remove an entry.
Definition pit.cpp:106
TimeMs currentTimeMs()
Get current time (milliseconds)
Definition common.cpp:7
uint64_t TimeMs
Timestamp type (milliseconds)
Definition common.hpp:107
constexpr FaceId FACE_ID_INVALID
Invalid Face ID.
Definition common.hpp:99
constexpr FaceId FACE_ID_LOCAL
Face ID for local application.
Definition common.hpp:102
constexpr size_t PACKET_MAX_SIZE
Maximum packet size (ESP-NOW v2.0 compatible)
Definition common.hpp:122
uint16_t FaceId
Face identifier.
Definition common.hpp:96
Error
Error codes.
Definition common.hpp:24
NDN Forwarder.
std::function< void(const Interest &, FaceId)> InterestCallback
Interest receive callback.
Definition forwarder.hpp:32
std::function< void(const Interest &)> TimeoutCallback
Timeout callback.
Definition forwarder.hpp:50
std::function< void(const Data &)> DataCallback
Data receive callback.
Definition forwarder.hpp:41
constexpr uint32_t Interest
Interest packet.
Definition tlv.hpp:27
constexpr uint32_t Data
Data packet.
Definition tlv.hpp:28
uint32_t interestsReceived
Number of Interests received.
uint32_t cacheHits
Number of cache hits.
uint32_t cacheMisses
Number of cache misses.
uint32_t dataReceived
Number of Data received.
uint32_t interestsSent
Number of Interests sent.
uint32_t dataSent
Number of Data sent.
NDN TLV (Type-Length-Value) encoding.