====== std.mqueue ======
POSIX Message Queues (WP-11): Kernel-verwaltete IPC-Queues mit priorisiertem Zugriff. Nachrichten werden nach Priorität (0 = niedrigste, MQ_PRIO_MAX = 32767) sortiert und gepuffert. Queue-Namen beginnen mit ''/'' und erscheinen unter ''/dev/mqueue/''. Verbindungen werden als Dateideskriptoren (''int64'') verwaltet.
→ [[lyx_-_programmiersprache:units|Standard Library]] · [[lyx_-_programmiersprache:units:pipe|std.pipe]] · [[lyx_-_programmiersprache:units:thread|std.thread]]
----
===== Konstanten =====
==== Open-Flags ====
^ Konstante ^ Wert ^ Beschreibung ^
| ''MQ_OPEN_RDONLY'' | 0 | Queue nur zum Empfangen öffnen |
| ''MQ_OPEN_WRONLY'' | 1 | Queue nur zum Senden öffnen |
| ''MQ_OPEN_RDWR'' | 2 | Queue zum Senden und Empfangen öffnen |
| ''MQ_OPEN_CREATE'' | 64 | Queue anlegen wenn nicht vorhanden (wie ''O_CREAT'') |
| ''MQ_OPEN_EXCL'' | 128 | Fehler wenn Queue bereits existiert (wie ''O_EXCL'') |
| ''MQ_OPEN_NONBLOCK'' | 2048 | Nicht-blockierend: ''MqSend''/''MqRecv'' geben sofort zurück |
==== Queue-Attribute ====
^ Konstante ^ Wert ^ Beschreibung ^
| ''MQ_ATTR_SIZE'' | 64 | Größe des ''mq_attr''-Puffers in Bytes |
| ''MQ_ATTR_FLAGS'' | 0 | Offset: ''mq_flags'' (''O_NONBLOCK'' oder 0) |
| ''MQ_ATTR_MAXMSG'' | 8 | Offset: ''mq_maxmsg'' (maximale Nachrichtenanzahl) |
| ''MQ_ATTR_MSGSIZE'' | 16 | Offset: ''mq_msgsize'' (maximale Nachrichtenlänge in Bytes) |
| ''MQ_ATTR_CURMSGS'' | 24 | Offset: ''mq_curmsgs'' (aktuell wartende Nachrichten, Read-Only) |
| ''MQ_DEFAULT_MAXMSG'' | 10 | System-Standard: max. 10 Nachrichten |
| ''MQ_DEFAULT_MSGSIZE'' | 8192 | System-Standard: max. 8192 Bytes pro Nachricht |
| ''MQ_PRIO_MAX'' | 32767 | Maximale Nachrichtenpriorität |
----
===== Funktionen =====
==== Queue öffnen, schließen, löschen ====
^ Signatur ^ Beschreibung ^
| ''MqOpen(name: pchar, flags: int64, mode: int64): int64'' | Öffnet oder erstellt eine Queue mit System-Defaults (max. 10 Nachrichten, 8192 Bytes). ''name'' muss mit ''/'' beginnen. ''mode'' setzt Zugriffsrechte (z. B. 420 = 0644) |
| ''MqOpenWith(name: pchar, flags: int64, mode: int64, maxmsg: int64, msgsize: int64): int64'' | Wie ''MqOpen'', aber mit expliziter Kapazität. Wird bei ''MQ_OPEN_CREATE'' ausgewertet |
| ''MqClose(fd: int64): int64'' | Schließt den Queue-Deskriptor (wie ''close''). Die Queue selbst bleibt im System |
| ''MqUnlink(name: pchar): int64'' | Löscht die Queue aus dem System. Sie wird beim letzten ''MqClose'' endgültig entfernt |
==== Senden ====
^ Signatur ^ Beschreibung ^
| ''MqSend(fd: int64, msg: int64, msgLen: int64, prio: int64): int64'' | Sendet eine Nachricht. Blockiert wenn die Queue voll ist. ''prio'': 0 (niedrigste) bis ''MQ_PRIO_MAX'' |
| ''MqTimedSend(fd: int64, msg: int64, msgLen: int64, prio: int64, timeoutNs: int64): int64'' | Wie ''MqSend'', aber mit absolutem Timeout in Nanosekunden seit CLOCK_REALTIME. Gibt −110 (''−ETIMEDOUT'') zurück bei Timeout |
==== Empfangen ====
^ Signatur ^ Beschreibung ^
| ''MqRecv(fd: int64, buf: int64, bufLen: int64, prioOut: int64): int64'' | Empfängt die Nachricht mit der höchsten Priorität. Blockiert wenn die Queue leer ist. ''prioOut'': Zeiger auf int64 für Priorität (0 = ignorieren). Gibt empfangene Bytes zurück |
| ''MqTimedRecv(fd: int64, buf: int64, bufLen: int64, prioOut: int64, timeoutNs: int64): int64'' | Wie ''MqRecv'', aber mit absolutem Timeout in Nanosekunden. Gibt −110 zurück bei Timeout |
==== Attribute abfragen ====
^ Signatur ^ Beschreibung ^
| ''MqGetAttr(fd: int64, attrBuf: int64): int64'' | Füllt ''attrBuf'' (''alloc(MQ_ATTR_SIZE)'') mit den aktuellen Queue-Attributen |
| ''MqGetMaxMsg(fd: int64): int64'' | Gibt die maximale Nachrichtenanzahl der Queue zurück |
| ''MqGetCurMsgs(fd: int64): int64'' | Gibt die aktuell wartende Nachrichtenanzahl zurück |
----
===== Verwendung =====
==== Einfache Producer-Consumer-Kommunikation ====
import std.mqueue;
import std.alloc;
import std.io;
fn Producer(): void {
var fd := MqOpen("/worker_queue",
MQ_OPEN_WRONLY | MQ_OPEN_CREATE, 420);
var msg: pchar := "job:render_frame";
MqSend(fd, msg, 16, 10); // Priorität 10
MqClose(fd);
}
fn Consumer(): void {
var fd := MqOpen("/worker_queue", MQ_OPEN_RDONLY, 0);
var buf: int64 := alloc(MQ_DEFAULT_MSGSIZE);
var prio: int64 := alloc(8);
var n := MqRecv(fd, buf, MQ_DEFAULT_MSGSIZE, prio);
PrintLn("Empfangen (prio=" + IntToStr(peek64(prio)) + "): " + buf);
free(buf, MQ_DEFAULT_MSGSIZE);
free(prio, 8);
MqClose(fd);
MqUnlink("/worker_queue");
}
==== Queue mit expliziter Kapazität ====
import std.mqueue;
import std.alloc;
fn SetupQueue(): int64 {
// Queue für bis zu 64 kleine Steuernachrichten (max. 64 Bytes)
return MqOpenWith("/ctrl",
MQ_OPEN_RDWR | MQ_OPEN_CREATE,
420, // 0644
64, // maxmsg
64); // msgsize
}
==== Prioritätsbasierte Verarbeitung ====
import std.mqueue;
import std.alloc;
import std.io;
fn PrioDispatch(): void {
var fd := MqOpenWith("/prio_q",
MQ_OPEN_RDWR | MQ_OPEN_CREATE,
420, 32, 128);
// Nachrichten mit verschiedenen Prioritäten senden
var low: pchar := "backup";
var high: pchar := "alert";
var mid: pchar := "status";
MqSend(fd, low, 7, 1); // Priorität 1 — niedrig
MqSend(fd, mid, 7, 5); // Priorität 5 — mittel
MqSend(fd, high, 6, 20); // Priorität 20 — hoch
// Empfang erfolgt in Prioritätsreihenfolge: high → mid → low
var buf: int64 := alloc(128);
var prio: int64 := alloc(8);
var i: int64 := 0;
while (i < 3) {
MqRecv(fd, buf, 128, prio);
PrintLn(buf + " (prio=" + IntToStr(peek64(prio)) + ")");
i := i + 1;
}
free(buf, 128);
free(prio, 8);
MqClose(fd);
MqUnlink("/prio_q");
}
==== Timeout-basiertes Empfangen ====
import std.mqueue;
import std.time;
import std.alloc;
import std.io;
fn TryReceive(name: pchar): bool {
var fd := MqOpen(name, MQ_OPEN_RDONLY, 0);
// Absoluter Timeout: jetzt + 500 ms
var now := TimeNowNs();
var timeout := now + 500000000;
var buf: int64 := alloc(MQ_DEFAULT_MSGSIZE);
var prio: int64 := alloc(8);
var n := MqTimedRecv(fd, buf, MQ_DEFAULT_MSGSIZE, prio, timeout);
MqClose(fd);
free(buf, MQ_DEFAULT_MSGSIZE);
free(prio, 8);
if (n < 0) {
PrintLn("Timeout oder Fehler");
return false;
}
return true;
}
==== Queue-Status abfragen ====
import std.mqueue;
import std.io;
fn PrintQueueStatus(name: pchar): void {
var fd := MqOpen(name, MQ_OPEN_RDONLY, 0);
PrintLn("Max. Nachrichten: " + IntToStr(MqGetMaxMsg(fd)));
PrintLn("Aktuelle Nachrichten: " + IntToStr(MqGetCurMsgs(fd)));
MqClose(fd);
}
----
===== Hinweise =====
* Queue-Namen müssen mit ''/'' beginnen und dürfen keinen weiteren ''/'' enthalten.
* ''bufLen'' bei ''MqRecv''/''MqTimedRecv'' muss ≥ ''mq_msgsize'' der Queue sein — sonst Fehler ''−EMSGSIZE''.
* Queues überleben den Prozess: ''MqUnlink'' muss explizit aufgerufen werden.
* Unter Linux liegt das Limit für ''mq_maxmsg'' standardmäßig bei 10 (sichtbar in ''/proc/sys/fs/mqueue/msg_max''). Größere Kapazitäten erfordern Root oder angepasste Systemgrenzen.
* Für Timeout-Werte: ''TimeNowNs()'' (aus ''std.time'') liefert Nanosekunden seit CLOCK_REALTIME, was direkt als ''timeoutNs'' verwendet werden kann.
----
===== Verwandte Units =====
* ''[[lyx_-_programmiersprache:units:pipe|std.pipe]]'' — Pipes (einfacheres IPC für Eltern-/Kindprozesse)
* ''[[lyx_-_programmiersprache:units:thread|std.thread]]'' — Threads mit Mutex/Bedingungsvariablen
* ''[[lyx_-_programmiersprache:units:signals|std.signals]]'' — UNIX-Signale als leichtgewichtige IPC-Alternative
* ''[[lyx_-_programmiersprache:units:net:epoll|std.net.epoll]]'' — epoll kann mqueue-fds überwachen
Letzte Aktualisierung: 2026-06-05