std.mqueue
POSIX Message Queues (WP-11): Kernel-verwaltete IPC-Queues mit priorisiertem Zugriff. Nachrichten werden nach Priorität (0 = niedrigste, MQ_PRIO_MAX = 32767) sortiert und gepuffert. Queue-Namen beginnen mit / und erscheinen unter /dev/mqueue/. Verbindungen werden als Dateideskriptoren (int64) verwaltet.
→ Standard Library · std.pipe · std.thread
Konstanten
Open-Flags
| Konstante | Wert | Beschreibung |
|---|---|---|
MQ_OPEN_RDONLY | 0 | Queue nur zum Empfangen öffnen |
MQ_OPEN_WRONLY | 1 | Queue nur zum Senden öffnen |
MQ_OPEN_RDWR | 2 | Queue zum Senden und Empfangen öffnen |
MQ_OPEN_CREATE | 64 | Queue anlegen wenn nicht vorhanden (wie O_CREAT) |
MQ_OPEN_EXCL | 128 | Fehler wenn Queue bereits existiert (wie O_EXCL) |
MQ_OPEN_NONBLOCK | 2048 | Nicht-blockierend: MqSend/MqRecv geben sofort zurück |
Queue-Attribute
| Konstante | Wert | Beschreibung |
|---|---|---|
MQ_ATTR_SIZE | 64 | Größe des mq_attr-Puffers in Bytes |
MQ_ATTR_FLAGS | 0 | Offset: mq_flags (O_NONBLOCK oder 0) |
MQ_ATTR_MAXMSG | 8 | Offset: mq_maxmsg (maximale Nachrichtenanzahl) |
MQ_ATTR_MSGSIZE | 16 | Offset: mq_msgsize (maximale Nachrichtenlänge in Bytes) |
MQ_ATTR_CURMSGS | 24 | Offset: mq_curmsgs (aktuell wartende Nachrichten, Read-Only) |
MQ_DEFAULT_MAXMSG | 10 | System-Standard: max. 10 Nachrichten |
MQ_DEFAULT_MSGSIZE | 8192 | System-Standard: max. 8192 Bytes pro Nachricht |
MQ_PRIO_MAX | 32767 | Maximale Nachrichtenpriorität |
Funktionen
Queue öffnen, schließen, löschen
| Signatur | Beschreibung |
|---|---|
MqOpen(name: pchar, flags: int64, mode: int64): int64 | Öffnet oder erstellt eine Queue mit System-Defaults (max. 10 Nachrichten, 8192 Bytes). name muss mit / beginnen. mode setzt Zugriffsrechte (z. B. 420 = 0644) |
MqOpenWith(name: pchar, flags: int64, mode: int64, maxmsg: int64, msgsize: int64): int64 | Wie MqOpen, aber mit expliziter Kapazität. Wird bei MQ_OPEN_CREATE ausgewertet |
MqClose(fd: int64): int64 | Schließt den Queue-Deskriptor (wie close). Die Queue selbst bleibt im System |
MqUnlink(name: pchar): int64 | Löscht die Queue aus dem System. Sie wird beim letzten MqClose endgültig entfernt |
Senden
| Signatur | Beschreibung |
|---|---|
MqSend(fd: int64, msg: int64, msgLen: int64, prio: int64): int64 | Sendet eine Nachricht. Blockiert wenn die Queue voll ist. prio: 0 (niedrigste) bis MQ_PRIO_MAX |
MqTimedSend(fd: int64, msg: int64, msgLen: int64, prio: int64, timeoutNs: int64): int64 | Wie MqSend, aber mit absolutem Timeout in Nanosekunden seit CLOCK_REALTIME. Gibt −110 (−ETIMEDOUT) zurück bei Timeout |
Empfangen
| Signatur | Beschreibung |
|---|---|
MqRecv(fd: int64, buf: int64, bufLen: int64, prioOut: int64): int64 | Empfängt die Nachricht mit der höchsten Priorität. Blockiert wenn die Queue leer ist. prioOut: Zeiger auf int64 für Priorität (0 = ignorieren). Gibt empfangene Bytes zurück |
MqTimedRecv(fd: int64, buf: int64, bufLen: int64, prioOut: int64, timeoutNs: int64): int64 | Wie MqRecv, aber mit absolutem Timeout in Nanosekunden. Gibt −110 zurück bei Timeout |
Attribute abfragen
| Signatur | Beschreibung |
|---|---|
MqGetAttr(fd: int64, attrBuf: int64): int64 | Füllt attrBuf (alloc(MQ_ATTR_SIZE)) mit den aktuellen Queue-Attributen |
MqGetMaxMsg(fd: int64): int64 | Gibt die maximale Nachrichtenanzahl der Queue zurück |
MqGetCurMsgs(fd: int64): int64 | Gibt die aktuell wartende Nachrichtenanzahl zurück |
Verwendung
Einfache Producer-Consumer-Kommunikation
import std.mqueue;
import std.alloc;
import std.io;
fn Producer(): void {
var fd := MqOpen("/worker_queue",
MQ_OPEN_WRONLY | MQ_OPEN_CREATE, 420);
var msg: pchar := "job:render_frame";
MqSend(fd, msg, 16, 10); // Priorität 10
MqClose(fd);
}
fn Consumer(): void {
var fd := MqOpen("/worker_queue", MQ_OPEN_RDONLY, 0);
var buf: int64 := alloc(MQ_DEFAULT_MSGSIZE);
var prio: int64 := alloc(8);
var n := MqRecv(fd, buf, MQ_DEFAULT_MSGSIZE, prio);
PrintLn("Empfangen (prio=" + IntToStr(peek64(prio)) + "): " + buf);
free(buf, MQ_DEFAULT_MSGSIZE);
free(prio, 8);
MqClose(fd);
MqUnlink("/worker_queue");
}
Queue mit expliziter Kapazität
import std.mqueue;
import std.alloc;
fn SetupQueue(): int64 {
// Queue für bis zu 64 kleine Steuernachrichten (max. 64 Bytes)
return MqOpenWith("/ctrl",
MQ_OPEN_RDWR | MQ_OPEN_CREATE,
420, // 0644
64, // maxmsg
64); // msgsize
}
Prioritätsbasierte Verarbeitung
import std.mqueue;
import std.alloc;
import std.io;
fn PrioDispatch(): void {
var fd := MqOpenWith("/prio_q",
MQ_OPEN_RDWR | MQ_OPEN_CREATE,
420, 32, 128);
// Nachrichten mit verschiedenen Prioritäten senden
var low: pchar := "backup";
var high: pchar := "alert";
var mid: pchar := "status";
MqSend(fd, low, 7, 1); // Priorität 1 — niedrig
MqSend(fd, mid, 7, 5); // Priorität 5 — mittel
MqSend(fd, high, 6, 20); // Priorität 20 — hoch
// Empfang erfolgt in Prioritätsreihenfolge: high → mid → low
var buf: int64 := alloc(128);
var prio: int64 := alloc(8);
var i: int64 := 0;
while (i < 3) {
MqRecv(fd, buf, 128, prio);
PrintLn(buf + " (prio=" + IntToStr(peek64(prio)) + ")");
i := i + 1;
}
free(buf, 128);
free(prio, 8);
MqClose(fd);
MqUnlink("/prio_q");
}
Timeout-basiertes Empfangen
import std.mqueue;
import std.time;
import std.alloc;
import std.io;
fn TryReceive(name: pchar): bool {
var fd := MqOpen(name, MQ_OPEN_RDONLY, 0);
// Absoluter Timeout: jetzt + 500 ms
var now := TimeNowNs();
var timeout := now + 500000000;
var buf: int64 := alloc(MQ_DEFAULT_MSGSIZE);
var prio: int64 := alloc(8);
var n := MqTimedRecv(fd, buf, MQ_DEFAULT_MSGSIZE, prio, timeout);
MqClose(fd);
free(buf, MQ_DEFAULT_MSGSIZE);
free(prio, 8);
if (n < 0) {
PrintLn("Timeout oder Fehler");
return false;
}
return true;
}
Queue-Status abfragen
import std.mqueue;
import std.io;
fn PrintQueueStatus(name: pchar): void {
var fd := MqOpen(name, MQ_OPEN_RDONLY, 0);
PrintLn("Max. Nachrichten: " + IntToStr(MqGetMaxMsg(fd)));
PrintLn("Aktuelle Nachrichten: " + IntToStr(MqGetCurMsgs(fd)));
MqClose(fd);
}
Hinweise
- Queue-Namen müssen mit
/beginnen und dürfen keinen weiteren/enthalten. bufLenbeiMqRecv/MqTimedRecvmuss ≥mq_msgsizeder Queue sein — sonst Fehler−EMSGSIZE.- Queues überleben den Prozess:
MqUnlinkmuss explizit aufgerufen werden. - Unter Linux liegt das Limit für
mq_maxmsgstandardmäßig bei 10 (sichtbar in/proc/sys/fs/mqueue/msg_max). Größere Kapazitäten erfordern Root oder angepasste Systemgrenzen. - Für Timeout-Werte:
TimeNowNs()(ausstd.time) liefert Nanosekunden seit CLOCK_REALTIME, was direkt alstimeoutNsverwendet werden kann.
Verwandte Units
std.pipe— Pipes (einfacheres IPC für Eltern-/Kindprozesse)std.thread— Threads mit Mutex/Bedingungsvariablenstd.signals— UNIX-Signale als leichtgewichtige IPC-Alternativestd.net.epoll— epoll kann mqueue-fds überwachen
Letzte Aktualisierung: 2026-06-05
