MoonGen
 All Files Functions Variables Pages
pipe.lua
Go to the documentation of this file.
1 ---------------------------------
2 --- @file pipe.lua
3 --- @brief Pipe ...
4 --- @todo TODO docu
5 ---------------------------------
6 
7 local mod = {}
8 
9 local memory = require "memory"
10 local ffi = require "ffi"
11 local serpent = require "Serpent"
12 local dpdk = require "dpdk"
13 
14 ffi.cdef [[
15  // dummy
16  struct spsc_ptr_queue { };
17 
18  struct spsc_ptr_queue* make_pipe();
19  void enqueue(struct spsc_ptr_queue* queue, void* data);
20  void* try_dequeue(struct spsc_ptr_queue* queue);
21  void* peek(struct spsc_ptr_queue* queue);
22  uint8_t pop(struct spsc_ptr_queue* queue);
23  size_t count(struct spsc_ptr_queue* queue);
24 ]]
25 
26 local C = ffi.C
27 
28 
29 mod.slowPipe = {}
30 local slowPipe = mod.slowPipe
31 slowPipe.__index = slowPipe
32 
33 --- Create a new slow pipe.
34 --- A pipe can only be used by exactly two tasks: a single reader and a single writer.
35 --- Slow pipes are called slow pipe because they are slow (duh).
36 --- Any objects passed to it will be *serialized* as strings.
37 --- This means that it supports arbitrary Lua objects following MoonGens usual serialization rules.
38 --- Use a 'fast pipe' if you need fast inter-task communication. Fast pipes are restricted to LuaJIT FFI objects.
39 function mod:newSlowPipe()
40  return setmetatable({
41  pipe = C.make_pipe()
42  }, slowPipe)
43 end
44 
45 function slowPipe:send(...)
46  local vals = serpent.dump({ ... })
47  local buf = memory.alloc("char*", #vals + 1)
48  ffi.copy(buf, vals)
49  C.enqueue(self.pipe, buf)
50 end
51 
52 function slowPipe:tryRecv(wait)
53  while wait >= 0 do
54  local buf = C.try_dequeue(self.pipe)
55  if buf ~= nil then
56  local result = loadstring(ffi.string(buf))()
57  memory.free(buf)
58  return unpackAll(result)
59  end
60  wait = wait - 10
61  if wait < 0 then
62  break
63  end
64  dpdk.sleepMicros(10)
65  end
66 end
67 
68 function slowPipe:recv()
69  local function loop(...)
70  if not ... then
71  return loop(self:tryRecv(10))
72  else
73  return ...
74  end
75  end
76  return loop()
77 end
78 
79 function slowPipe:count()
80  return tonumber(C.count(self.pipe))
81 end
82 
83 function slowPipe:__serialize()
84  return "require'pipe'; return " .. serpent.addMt(serpent.dumpRaw(self), "require('pipe').slowPipe"), true
85 end
86 
87 
88 function mod:newFastPipe()
89  error("NYI")
90 end
91 
92 return mod
93 
function rxQueue recv(bufArray)
Receive packets from a rx queue.
local ffi
low-level dpdk wrapper
Definition: dpdkc.lua:6
function rxQueue tryRecv(bufArray, maxWait)
Receive packets from a rx queue with a timeout.
function mod free(buf)
Free off-heap allocated object.
function pkt dump(bytes)
Dumps the packet data cast to the best fitting packet struct.
function mod sleepMicros(t)
Delay by t microseconds.
function mod newSlowPipe()
Create a new slow pipe.
local mod
high-level dpdk wrapper
Definition: dpdk.lua:6