1 ---------------------------------
5 ---------------------------------
9 local
ffi = require
"ffi"
10 local dpdkc = require
"dpdkc"
11 local dpdk = require
"dpdk"
12 local memory = require
"memory"
13 local serpent = require
"Serpent"
14 local errors = require
"error"
17 -- FIXME: fix
this ugly duplicated code
enum
18 mod.RSS_FUNCTION_IPV4 = 1
19 mod.RSS_FUNCTION_IPV4_TCP = 2
20 mod.RSS_FUNCTION_IPV4_UDP = 3
21 mod.RSS_FUNCTION_IPV6 = 4
22 mod.RSS_FUNCTION_IPV6_TCP = 5
23 mod.RSS_FUNCTION_IPV6_UDP = 6
26 void rte_eth_macaddr_get ( uint8_t port_id,
27 struct ether_addr * mac_addr
31 mod.PCI_ID_X540 = 0x80861528
32 mod.PCI_ID_X520 = 0x8086154D
33 mod.PCI_ID_82599 = 0x808610FB
34 mod.PCI_ID_82580 = 0x8086150E
35 mod.PCI_ID_82576 = 0x80861526
36 mod.PCI_ID_XL710 = 0x80861583
39 dpdkc.rte_pmd_init_all_export();
40 dpdkc.rte_eal_pci_probe();
43 function mod.numDevices()
44 return dpdkc.rte_eth_dev_count();
51 function dev:__tostring()
52 return ("[Device:
id=%d]"):format(self.
id)
55 function dev:__serialize()
56 return ('local dev = require "device" return dev.get(%d)'):format(self.
id), true
60 txQueue.__index = txQueue
61 txQueue.__type =
"txQueue"
63 function txQueue:__tostring()
64 return ("[TxQueue:
id=%d, qid=%d]"):format(self.
id, self.qid)
67 function txQueue:__serialize()
68 return ('local dev = require "device" return dev.get(%d):getTxQueue(%d)'):format(self.
id, self.qid), true
72 rxQueue.__index = rxQueue
73 rxQueue.__type =
"rxQueue"
75 function rxQueue:__tostring()
76 return ("[RxQueue:
id=%d, qid=%d]"):format(self.
id, self.qid)
79 function rxQueue:__serialize()
80 return ('local dev = require "device" return dev.get(%d):getRxQueue(%d)'):format(self.
id, self.qid), true
85 --- Configure a device
86 --- @param args A table containing the following named arguments
87 --- port Port to configure
88 --- mempool
optional (
default = create a
new mempool) Mempool to associate to the device
89 --- rxQueues
optional (
default = 1) Number of RX queues to configure
90 --- txQueues
optional (default = 1) Number of TX queues to configure
94 --- dropEnable
optional (default = true)
95 --- rssNQueues
optional (default = 0) If this is >0 RSS will be activated for
96 --- this device. Incomming packates will be distributed to the
97 --- rxQueues number 0 to (rssNQueues - 1). For a fair distribution use one of
98 --- the following values (1, 2, 4, 8, 16). Values greater than 16 are not
100 --- rssFunctions
optional (default = all supported functions) A Table,
101 --- containing hashing methods, which can be used for RSS.
102 --- Possible methods are:
103 --- dev.RSS_FUNCTION_IPV4
104 --- dev.RSS_FUNCTION_IPV4_TCP
105 --- dev.RSS_FUNCTION_IPV4_UDP
106 --- dev.RSS_FUNCTION_IPV6
107 --- dev.RSS_FUNCTION_IPV6_TCP
108 --- dev.RSS_FUNCTION_IPV6_UDP
109 --- @todo FIXME:
add description for speed and dropEnable parameters.
112 if #args > 1 or type((...)) ==
"number" then
113 --
this is
for legacy compatibility when calling the
function without named arguments
114 print(
"[WARNING] You are using a deprecated method for invoking device.config. config(...) should be used with named arguments. For details: see documentation")
115 if not args[2] or type(args[2]) == "number" then
117 args.rxQueues = args[2]
118 args.txQueues = args[3]
119 args.rxDescs = args[4]
120 args.txDescs = args[5]
122 args.dropEnable = args[7]
125 args.mempool = args[2]
126 args.rxQueues = args[3]
127 args.txQueues = args[4]
128 args.rxDescs = args[5]
129 args.txDescs = args[6]
131 args.dropEnable = args[8]
133 elseif
#args == 1 then
134 -- here we receive named arguments
137 errorf(
"Device config needs at least one argument.")
140 args.rxQueues = args.rxQueues or 1
141 args.txQueues = args.txQueues or 1
142 args.rxDescs = args.rxDescs or 512
143 args.txDescs = args.txDescs or 256
144 args.rssNQueues = args.rssNQueues or 0
145 args.rssFunctions = args.rssFunctions or {
mod.RSS_FUNCTION_IPV4,
mod.RSS_FUNCTION_IPV4_UDP,
mod.RSS_FUNCTION_IPV4_TCP,
mod.RSS_FUNCTION_IPV6,
mod.RSS_FUNCTION_IPV6_UDP,
mod.RSS_FUNCTION_IPV6_TCP}
146 -- create a mempool with enough memory to hold tx, as well as rx descriptors
147 -- FIXME: should
n = 2^k-1 here too?
148 args.mempool = args.mempool or memory.createMemPool{n = args.rxQueues * args.rxDescs + args. txQueues * args.txDescs, socket = dpdkc.get_socket(args.port)}
149 if devices[args.port] and devices[args.port].initialized then
150 printf(
"[WARNING] Device %d already configured, skipping initilization", args.port)
151 return
mod.get(args.port)
153 args.speed = args.speed or 0
154 args.dropEnable = args.dropEnable == nil and true
155 if args.rxQueues == 0 or args.txQueues == 0 then
156 -- dpdk does not like devices without rx/tx queues :(
157 errorf("cannot initialize device without %s queues", args.rxQueues == 0 and args.txQueues == 0 and "rx and tx" or args.rxQueues == 0 and "rx" or "tx")
159 -- configure rss stuff
160 local rss_enabled = 0
161 local rss_hash_mask =
ffi.
new("struct mg_rss_hash_mask")
162 if(args.rssNQueues > 0) then
163 for i, v in ipairs(args.rssFunctions) do
164 if (v ==
mod.RSS_FUNCTION_IPV4) then
165 rss_hash_mask.ipv4 = 1
167 if (v ==
mod.RSS_FUNCTION_IPV4_TCP) then
168 rss_hash_mask.tcp_ipv4 = 1
170 if (v ==
mod.RSS_FUNCTION_IPV4_UDP) then
171 rss_hash_mask.udp_ipv4 = 1
173 if (v ==
mod.RSS_FUNCTION_IPV6) then
174 rss_hash_mask.ipv6 = 1
176 if (v ==
mod.RSS_FUNCTION_IPV6_TCP) then
177 rss_hash_mask.tcp_ipv6 = 1
179 if (v ==
mod.RSS_FUNCTION_IPV6_UDP) then
180 rss_hash_mask.udp_ipv6 = 1
185 -- TODO: support options
186 local rc = dpdkc.configure_device(args.port, args.rxQueues, args.txQueues, args.rxDescs, args.txDescs, args.speed, args.mempool, args.dropEnable, rss_enabled, rss_hash_mask)
188 errorf("could not configure device %d: error %d", args.port, rc)
190 local dev =
mod.get(args.port)
191 dev.initialized = true
192 if rss_enabled == 1 then
193 dev:setRssNQueues(args.rssNQueues)
203 struct rte_eth_rss_reta {
211 int mg_rte_eth_dev_rss_reta_update ( uint8_t port,
212 struct rte_eth_rss_reta * reta_conf
214 int rte_eth_dev_rss_reta_update ( uint8_t port,
215 struct rte_eth_rss_reta * reta_conf
219 function dev:setRssNQueues(n)
221 errorf("Maximum possible numbers of RSS queues is 16")
224 if(({[1]=1, [2]=1, [4]=1, [8]=1, [16]=1})[n] == nil) then
225 printf("[WARNING] RSS distribution to queues will not be fair. Fair distribution is only achieved with a number of Queues equal to 1, 2, 4, 8 or 16. However you are currently using %d queues", n)
227 local reta =
ffi.
new("struct rte_eth_rss_reta")
233 if (queue < n - 1) then
240 -- the mg_ version of rte_eth_dev_rss_reta_update() will also write the mask
241 -- to the reta_config struct, as lua can not do 64bit
unsigned int operations.
242 local ret =
ffi.C.mg_rte_eth_dev_rss_reta_update(self.
id, reta)
244 errorf("ERROR setting up RETA table: " .. errors.getstr(-ret))
254 devices[
id] = setmetatable({
id = id, rxQueues = {}, txQueues = {} }, dev)
255 if MOONGEN_TASK_NAME ~=
"master" and not MOONGEN_IGNORE_BAD_NUMA_MAPPING then
256 -- check the NUMA association
if we are
running in a worker thread
257 -- (it
's okay to do the initial config from the wrong socket, but sending packets from it is a bad idea)
258 local devSocket = devices[id]:getSocket()
259 local core, threadSocket = dpdk.getCore()
260 if devSocket ~= threadSocket then
261 printf("[WARNING] You are trying to use %s (attached to the CPU socket %d) from a thread on core %d on socket %d!",
262 devices[id], devSocket, core, threadSocket)
263 printf("[WARNING] This can significantly impact the performance or even not work at all")
264 printf("[WARNING] You can change the used CPU cores in dpdk-conf.lua or by using dpdk.launchLuaOnCore(core, ...)")
270 function dev:getTxQueue(id)
271 local tbl = self.txQueues
275 tbl[id] = setmetatable({ id = self.id, qid = id, dev = self }, txQueue)
280 function dev:getRxQueue(id)
281 local tbl = self.rxQueues
285 tbl[id] = setmetatable({ id = self.id, qid = id, dev = self }, rxQueue)
290 --- Waits until all given devices are initialized by calling wait() on them.
291 function mod.waitForLinks(...)
293 if select("#", ...) == 0 then
295 for port, dev in pairs(devices) do
296 if dev.initialized then
297 ports[#ports + 1] = port
303 print("Waiting for devices to come up...")
305 local portsSeen = {} -- do not wait twice if a port occurs more than once (e.g. if rx == tx)
306 for i, port in ipairs(ports) do
307 local port = mod.get(port)
308 if not portsSeen[port] then
309 portsSeen[port] = true
310 portsUp = portsUp + (port:wait() and 1 or 0)
313 printf("%d devices are up.", portsUp)
317 --- Wait until the device is fully initialized and up to maxWait seconds to establish a link.
318 -- @param maxWait maximum number of seconds to wait for the link, default = 9
319 -- This function then reports the current link state on stdout
320 function dev:wait(maxWait)
321 maxWait = maxWait or 9
324 link = self:getLinkStatus()
326 dpdk.sleepMillisIdle(1000)
327 maxWait = maxWait - 1
332 self.speed = link.speed
333 printf("Device %d (%s) is %s: %s%s MBit/s", self.id, self:getMacString(), link.status and "up" or "DOWN", link.duplexAutoneg and "" or link.duplex and "full-duplex " or "half-duplex ", link.speed)
338 function dev:getLinkStatus()
339 local link = ffi.new("struct rte_eth_link")
340 dpdkc.rte_eth_link_get_nowait(self.id, link)
341 return { status = link.link_status == 1, duplexAutoneg = link.link_duplex == 0, duplex = link.link_duplex == 2, speed = link.link_speed }
344 function dev:getMacString()
345 local buf = ffi.new("char[20]")
346 dpdkc.get_mac_addr(self.id, buf)
347 return ffi.string(buf)
350 function dev:getMac()
352 return parseMacAddress(self:getMacString())
355 function dev:getPciId()
356 return dpdkc.get_pci_id(self.id)
359 function dev:getSocket()
360 return dpdkc.get_socket(self.id)
363 local deviceNames = {
364 [mod.PCI_ID_82576] = "82576 Gigabit Network Connection",
365 [mod.PCI_ID_82580] = "82580 Gigabit Network Connection",
366 [mod.PCI_ID_82599] = "82599EB 10-Gigabit SFI/SFP+ Network Connection",
367 [mod.PCI_ID_X520] = "Ethernet 10G 2P X520 Adapter", -- Dell-branded NIC with an 82599
368 [mod.PCI_ID_X540] = "Ethernet Controller 10-Gigabit X540-AT2",
369 [mod.PCI_ID_XL710] = "Ethernet Controller LX710 for 40GbE QSFP+",
372 function dev:getName()
373 local id = self:getPciId()
374 return deviceNames[id] or ("unknown NIC (PCI ID %x:%x)"):format(bit.rshift(id, 16), bit.band(id, 0xFFFF))
377 function mod.getDeviceName(port)
378 return mod.get(port):getName()
381 function mod.getDevices()
383 for i = 0, dpdkc.rte_eth_dev_count() - 1 do
384 local dev = mod.get(i)
385 result[#result + 1] = { id = i, mac = dev:getMacString(i), name = dev:getName(i) }
390 local function readCtr32(id, addr, last)
391 local val = dpdkc.read_reg32(id, addr)
392 local diff = val - last
399 local function readCtr48(id, addr, last)
401 local addrh = addr + 4
402 -- TODO: we probably need a memory fence here
403 -- however, the intel driver doesn't use a fence here so I guess that should work
404 local h = dpdkc.read_reg32(
id, addrh)
405 local l = dpdkc.read_reg32(
id, addrl)
406 local h2 = dpdkc.read_reg32(
id, addrh) -- check
for overflow during read
408 -- overflow during the read
409 -- we can just read the lower value again (1 overflow every 850ms max)
410 l = dpdkc.read_reg32(
self.
id, 0x00300680)
411 h = h2 -- use the
new high value
413 local val = l + h * 2^32 -- 48 bits,
double is fine
414 local diff = val - last
421 -- FIXME: only tested on X540, 82599 and 82580 chips
422 -- these functions must be wrapped in a device-specific way
424 local GPRC = 0x00004074
425 local GORCL = 0x00004088
426 local GORCH = 0x0000408C
429 local GPTC = 0x00004080
430 local GOTCL = 0x00004090
431 local GOTCH = 0x00004094
438 ---
get the number of packets received since the last call to
this function
440 local devId = self:getPciId()
441 if devId ==
mod.PCI_ID_XL710 then
442 local uprc, mprc, bprc, gorc
443 uprc, lastUprc = readCtr32(self.
id, 0x003005A0, lastUprc)
444 mprc, lastMprc = readCtr32(self.
id, 0x003005C0, lastMprc)
445 bprc, lastBprc = readCtr32(self.
id, 0x003005E0, lastBprc)
446 gorc, lastGorc = readCtr48(self.
id, 0x00300000, lastGorc)
447 return uprc + mprc + bprc, gorc
449 return dpdkc.read_reg32(self.
id, GPRC), dpdkc.read_reg32(self.
id, GORCL) + dpdkc.read_reg32(self.
id, GORCH) * 2^32
460 function dev:getTxStats()
461 local badPkts = tonumber(dpdkc.get_bad_pkts_sent(self.
id))
462 local badBytes = tonumber(dpdkc.get_bad_bytes_sent(self.
id))
463 -- FIXME: this should really be split up into separate functions/files
464 local devId = self:getPciId()
465 if devId ==
mod.PCI_ID_XL710 then
466 local uptc, mptc, bptc, gotc
467 uptc, lastUptc = readCtr32(self.
id, 0x003009C0, lastUptc)
468 mptc, lastMptc = readCtr32(self.
id, 0x003009E0, lastMptc)
469 bptc, lastBptc = readCtr32(self.
id, 0x00300A00, lastBptc)
470 gotc, lastGotc = readCtr48(self.
id, 0x00300680, lastGotc)
471 return uptc + mptc + bptc - badPkts, gotc - badBytes
473 -- TODO: check for ixgbe
474 return dpdkc.read_reg32(self.
id, GPTC) - badPkts, dpdkc.read_reg32(self.
id, GOTCL) + dpdkc.read_reg32(self.
id, GOTCH) * 2^32 - badBytes
479 --- TODO: figure out how to actually acquire statistics in a meaningful way for dropped packets :/
481 local stats =
ffi.
new("struct rte_eth_stats")
482 dpdkc.rte_eth_stats_get(self.
id, stats)
486 local RTTDQSEL = 0x00004904
488 --- Set the tx rate of a queue in MBit/s.
489 --- This sets the payload rate, not to the actual wire rate, i.e. preamble, SFD, and IFG are ignored.
490 --- The X540 and 82599 chips seem to have a hardware bug (?): they seem use the wire rate in some point of the throttling process.
491 --- This causes erratic behavior for rates >= 64/84 * WireRate when using small packets.
492 --- The function is non-linear (not even monotonic) for such rates.
493 --- The function prints a warning if such a rate is configured.
494 --- A simple work-around for this is using two queues with 50% of the desired rate.
495 --- Note that this changes the inter-arrival times as the rate control of both queues is independent.
497 if self.dev:getPciId() ~=
mod.PCI_ID_82599 and self.dev:getPciId() ~=
mod.PCI_ID_X540 and self.dev:getPciId() ~=
mod.PCI_ID_X520 then
498 error("tx rate control not yet implemented for this NIC")
500 local speed = self.dev:getLinkStatus().speed
502 print("WARNING: link down, assuming 10 GbE connection")
508 self.rate = math.min(rate, speed)
510 local link = self.dev:getLinkStatus()
511 self.speed = link.speed
513 -- the X540 and 82599 chips have a hardware bug: they assume that the wire size of an
514 -- ethernet frame is 64 byte when it is actually 84 byte (8 byte preamble/SFD, 12 byte IFG)
515 -- TODO: software fallback for bugged rates and unsupported NICs
516 if rate >= (64 * 64) / (84 * 84) and rate < 1 then
517 print("WARNING: rates with a payload rate >= 64/84% do not work properly with small packets due to a hardware bug, see documentation for details")
520 error("rate must be > 0")
523 self:setTxRateRaw(0, true)
525 self:setTxRateRaw(1 / rate)
529 function txQueue:setRateMpps(rate, pktSize)
530 pktSize = pktSize or 60
531 self:
setRate(rate * (pktSize + 4) * 8)
534 local RF_X540_82599 = 0x00004984
535 local RF_ENABLE_BIT = bit.lshift(1, 31)
537 function txQueue:setTxRateRaw(rate,
disable)
538 dpdkc.write_reg32(self.
id, RTTDQSEL, self.qid)
540 dpdkc.write_reg32(self.
id, RF_X540_82599, 0)
544 local rateInt = math.floor(rate)
545 local rateDec = math.floor((rate - rateInt) * 2^14)
546 dpdkc.write_reg32(self.
id, RF_X540_82599, bit.
bor(bit.lshift(rateInt, 14), rateDec, RF_ENABLE_BIT))
549 function txQueue:getTxRate()
550 local link = self.dev:getLinkStatus()
551 self.speed = link.speed > 0 and link.speed or 10000
552 dpdkc.write_reg32(self.
id, RTTDQSEL, self.qid)
553 local reg = dpdkc.read_reg32(self.
id, RF_X540_82599)
559 local rateInt = bit.
band(bit.rshift(reg, 14), 0x3FFF)
560 local rateDec = bit.
band(reg, 0x3FF)
561 self.rate = (1 / (rateInt + rateDec / 2^14)) * self.speed
565 function txQueue:send(bufs)
567 dpdkc.send_all_packets(self.
id, self.qid, bufs.array, bufs.size);
571 function txQueue:start()
572 assert(dpdkc.rte_eth_dev_tx_queue_start(self.
id, self.qid) == 0)
575 function txQueue:
stop()
576 assert(dpdkc.rte_eth_dev_tx_queue_stop(self.
id, self.qid) == 0)
581 function txQueue:sendWithDelay(bufs, method)
583 mempool = mempool or memory.
createMemPool(2047, nil, nil, 4095)
584 method = method or "crc"
585 if method == "crc" then
586 dpdkc.send_all_packets_with_delay_bad_crc(self.
id, self.qid, bufs.array, bufs.size, mempool)
587 elseif method == "size" then
588 dpdkc.send_all_packets_with_delay_invalid_size(self.
id, self.qid, bufs.array, bufs.size, mempool)
590 errorf("unknown delay method %s", method)
596 --- Restarts all tx queues that were actively used by this task.
597 --- 'Actively used' means that either :send() or :sendWithDelay() was called from the current task.
599 for _, dev in pairs(devices) do
600 for _, queue in pairs(dev.txQueues) do
609 --- Receive packets from a rx queue.
610 --- Returns as soon as at least one packet is available.
613 local rx = dpdkc.rte_eth_rx_burst_export(self.
id, self.qid,
bufArray.array,
bufArray.size)
621 function rxQueue:getMacAddr()
622 return
ffi.cast("struct mac_address",
ffi.C.rte_eth_macaddr_get(self.
id))
625 function txQueue:getMacAddr()
626 return
ffi.cast("struct mac_address",
ffi.C.rte_eth_macaddr_get(self.
id))
633 --- Receive packets from a rx queue with a timeout.
635 maxWait = maxWait or math.huge
636 while maxWait >= 0 do
637 local rx = dpdkc.rte_eth_rx_burst_export(self.
id, self.qid, bufArray.array, bufArray.size)
641 maxWait = maxWait - 1
642 -- don't sleep pointlessly
651 --- Receive packets from a rx queue with a timeout.
652 --- Does not perform a busy
wait, this is not suitable for high-throughput applications.
654 maxWait = maxWait or math.huge
655 while maxWait >= 0 do
656 local rx = dpdkc.rte_eth_rx_burst_export(self.
id, self.qid, bufArray.array, bufArray.size)
660 maxWait = maxWait - 1
661 -- don't sleep pointlessly
670 -- export prototypes to extend them in other modules (TODO: use a proper 'class' system with mix-ins or something)
671 mod.__devicePrototype = dev
672 mod.__txQueuePrototype = txQueue
673 mod.__rxQueuePrototype = rxQueue
param n optional(default=2047)
Create a new memory pool.
function rxQueue recv(bufArray)
Receive packets from a rx queue.
function mod disable(port)
Disable the Hardware Crypto Engine.
local ffi
low-level dpdk wrapper
function mod sleepMicrosIdle(t)
Sleep by t microseconds by calling usleep().
function rxQueue tryRecv(bufArray, maxWait)
Receive packets from a rx queue with a timeout.
function mod band(mask1, mask2, result)
Bitwise and.
function mod sleepMicros(t)
Delay by t microseconds.
function dev wait(maxWait)
Wait until the device is fully initialized and up to maxWait seconds to establish a link...
function mod bor(mask1, mask2, result)
Bitwise or.
function mempool bufArray(n)
Create a new array of memory buffers (initialized to nil).
local mod
high-level dpdk wrapper
function mod config(...)
Configure a device.
function printf(str,...)
Print a formatted string.
function ip4Addr add(val)
Add a number to an IPv4 address in-place.
function txQueue setRate(rate)
Set the tx rate of a queue in MBit/s.
function mod stop()
request all tasks to exit
function mod createMemPool(...)
function rxQueue tryRecvIdle(bufArray, maxWait)
Receive packets from a rx queue with a timeout.
function mod reclaimTxBuffers()
Restarts all tx queues that were actively used by this task.
function dev getRxStatsAll()
TODO: figure out how to actually acquire statistics in a meaningful way for dropped packets :/...
n
Create a new array of memory buffers (initialized to nil).
function errorf(str,...)
Print a formatted error string.
function mod running(extraTime)
Returns false once the app receives SIGTERM or SIGINT, the time set via setRuntime expires...
function dev getRxStats()
get the number of packets received since the last call to this function