====== std.mqueue ====== POSIX Message Queues (WP-11): Kernel-verwaltete IPC-Queues mit priorisiertem Zugriff. Nachrichten werden nach Priorität (0 = niedrigste, MQ_PRIO_MAX = 32767) sortiert und gepuffert. Queue-Namen beginnen mit ''/'' und erscheinen unter ''/dev/mqueue/''. Verbindungen werden als Dateideskriptoren (''int64'') verwaltet. → [[lyx_-_programmiersprache:units|Standard Library]] · [[lyx_-_programmiersprache:units:pipe|std.pipe]] · [[lyx_-_programmiersprache:units:thread|std.thread]] ---- ===== Konstanten ===== ==== Open-Flags ==== ^ Konstante ^ Wert ^ Beschreibung ^ | ''MQ_OPEN_RDONLY'' | 0 | Queue nur zum Empfangen öffnen | | ''MQ_OPEN_WRONLY'' | 1 | Queue nur zum Senden öffnen | | ''MQ_OPEN_RDWR'' | 2 | Queue zum Senden und Empfangen öffnen | | ''MQ_OPEN_CREATE'' | 64 | Queue anlegen wenn nicht vorhanden (wie ''O_CREAT'') | | ''MQ_OPEN_EXCL'' | 128 | Fehler wenn Queue bereits existiert (wie ''O_EXCL'') | | ''MQ_OPEN_NONBLOCK'' | 2048 | Nicht-blockierend: ''MqSend''/''MqRecv'' geben sofort zurück | ==== Queue-Attribute ==== ^ Konstante ^ Wert ^ Beschreibung ^ | ''MQ_ATTR_SIZE'' | 64 | Größe des ''mq_attr''-Puffers in Bytes | | ''MQ_ATTR_FLAGS'' | 0 | Offset: ''mq_flags'' (''O_NONBLOCK'' oder 0) | | ''MQ_ATTR_MAXMSG'' | 8 | Offset: ''mq_maxmsg'' (maximale Nachrichtenanzahl) | | ''MQ_ATTR_MSGSIZE'' | 16 | Offset: ''mq_msgsize'' (maximale Nachrichtenlänge in Bytes) | | ''MQ_ATTR_CURMSGS'' | 24 | Offset: ''mq_curmsgs'' (aktuell wartende Nachrichten, Read-Only) | | ''MQ_DEFAULT_MAXMSG'' | 10 | System-Standard: max. 10 Nachrichten | | ''MQ_DEFAULT_MSGSIZE'' | 8192 | System-Standard: max. 8192 Bytes pro Nachricht | | ''MQ_PRIO_MAX'' | 32767 | Maximale Nachrichtenpriorität | ---- ===== Funktionen ===== ==== Queue öffnen, schließen, löschen ==== ^ Signatur ^ Beschreibung ^ | ''MqOpen(name: pchar, flags: int64, mode: int64): int64'' | Öffnet oder erstellt eine Queue mit System-Defaults (max. 10 Nachrichten, 8192 Bytes). ''name'' muss mit ''/'' beginnen. ''mode'' setzt Zugriffsrechte (z. B. 420 = 0644) | | ''MqOpenWith(name: pchar, flags: int64, mode: int64, maxmsg: int64, msgsize: int64): int64'' | Wie ''MqOpen'', aber mit expliziter Kapazität. Wird bei ''MQ_OPEN_CREATE'' ausgewertet | | ''MqClose(fd: int64): int64'' | Schließt den Queue-Deskriptor (wie ''close''). Die Queue selbst bleibt im System | | ''MqUnlink(name: pchar): int64'' | Löscht die Queue aus dem System. Sie wird beim letzten ''MqClose'' endgültig entfernt | ==== Senden ==== ^ Signatur ^ Beschreibung ^ | ''MqSend(fd: int64, msg: int64, msgLen: int64, prio: int64): int64'' | Sendet eine Nachricht. Blockiert wenn die Queue voll ist. ''prio'': 0 (niedrigste) bis ''MQ_PRIO_MAX'' | | ''MqTimedSend(fd: int64, msg: int64, msgLen: int64, prio: int64, timeoutNs: int64): int64'' | Wie ''MqSend'', aber mit absolutem Timeout in Nanosekunden seit CLOCK_REALTIME. Gibt −110 (''−ETIMEDOUT'') zurück bei Timeout | ==== Empfangen ==== ^ Signatur ^ Beschreibung ^ | ''MqRecv(fd: int64, buf: int64, bufLen: int64, prioOut: int64): int64'' | Empfängt die Nachricht mit der höchsten Priorität. Blockiert wenn die Queue leer ist. ''prioOut'': Zeiger auf int64 für Priorität (0 = ignorieren). Gibt empfangene Bytes zurück | | ''MqTimedRecv(fd: int64, buf: int64, bufLen: int64, prioOut: int64, timeoutNs: int64): int64'' | Wie ''MqRecv'', aber mit absolutem Timeout in Nanosekunden. Gibt −110 zurück bei Timeout | ==== Attribute abfragen ==== ^ Signatur ^ Beschreibung ^ | ''MqGetAttr(fd: int64, attrBuf: int64): int64'' | Füllt ''attrBuf'' (''alloc(MQ_ATTR_SIZE)'') mit den aktuellen Queue-Attributen | | ''MqGetMaxMsg(fd: int64): int64'' | Gibt die maximale Nachrichtenanzahl der Queue zurück | | ''MqGetCurMsgs(fd: int64): int64'' | Gibt die aktuell wartende Nachrichtenanzahl zurück | ---- ===== Verwendung ===== ==== Einfache Producer-Consumer-Kommunikation ==== import std.mqueue; import std.alloc; import std.io; fn Producer(): void { var fd := MqOpen("/worker_queue", MQ_OPEN_WRONLY | MQ_OPEN_CREATE, 420); var msg: pchar := "job:render_frame"; MqSend(fd, msg, 16, 10); // Priorität 10 MqClose(fd); } fn Consumer(): void { var fd := MqOpen("/worker_queue", MQ_OPEN_RDONLY, 0); var buf: int64 := alloc(MQ_DEFAULT_MSGSIZE); var prio: int64 := alloc(8); var n := MqRecv(fd, buf, MQ_DEFAULT_MSGSIZE, prio); PrintLn("Empfangen (prio=" + IntToStr(peek64(prio)) + "): " + buf); free(buf, MQ_DEFAULT_MSGSIZE); free(prio, 8); MqClose(fd); MqUnlink("/worker_queue"); } ==== Queue mit expliziter Kapazität ==== import std.mqueue; import std.alloc; fn SetupQueue(): int64 { // Queue für bis zu 64 kleine Steuernachrichten (max. 64 Bytes) return MqOpenWith("/ctrl", MQ_OPEN_RDWR | MQ_OPEN_CREATE, 420, // 0644 64, // maxmsg 64); // msgsize } ==== Prioritätsbasierte Verarbeitung ==== import std.mqueue; import std.alloc; import std.io; fn PrioDispatch(): void { var fd := MqOpenWith("/prio_q", MQ_OPEN_RDWR | MQ_OPEN_CREATE, 420, 32, 128); // Nachrichten mit verschiedenen Prioritäten senden var low: pchar := "backup"; var high: pchar := "alert"; var mid: pchar := "status"; MqSend(fd, low, 7, 1); // Priorität 1 — niedrig MqSend(fd, mid, 7, 5); // Priorität 5 — mittel MqSend(fd, high, 6, 20); // Priorität 20 — hoch // Empfang erfolgt in Prioritätsreihenfolge: high → mid → low var buf: int64 := alloc(128); var prio: int64 := alloc(8); var i: int64 := 0; while (i < 3) { MqRecv(fd, buf, 128, prio); PrintLn(buf + " (prio=" + IntToStr(peek64(prio)) + ")"); i := i + 1; } free(buf, 128); free(prio, 8); MqClose(fd); MqUnlink("/prio_q"); } ==== Timeout-basiertes Empfangen ==== import std.mqueue; import std.time; import std.alloc; import std.io; fn TryReceive(name: pchar): bool { var fd := MqOpen(name, MQ_OPEN_RDONLY, 0); // Absoluter Timeout: jetzt + 500 ms var now := TimeNowNs(); var timeout := now + 500000000; var buf: int64 := alloc(MQ_DEFAULT_MSGSIZE); var prio: int64 := alloc(8); var n := MqTimedRecv(fd, buf, MQ_DEFAULT_MSGSIZE, prio, timeout); MqClose(fd); free(buf, MQ_DEFAULT_MSGSIZE); free(prio, 8); if (n < 0) { PrintLn("Timeout oder Fehler"); return false; } return true; } ==== Queue-Status abfragen ==== import std.mqueue; import std.io; fn PrintQueueStatus(name: pchar): void { var fd := MqOpen(name, MQ_OPEN_RDONLY, 0); PrintLn("Max. Nachrichten: " + IntToStr(MqGetMaxMsg(fd))); PrintLn("Aktuelle Nachrichten: " + IntToStr(MqGetCurMsgs(fd))); MqClose(fd); } ---- ===== Hinweise ===== * Queue-Namen müssen mit ''/'' beginnen und dürfen keinen weiteren ''/'' enthalten. * ''bufLen'' bei ''MqRecv''/''MqTimedRecv'' muss ≥ ''mq_msgsize'' der Queue sein — sonst Fehler ''−EMSGSIZE''. * Queues überleben den Prozess: ''MqUnlink'' muss explizit aufgerufen werden. * Unter Linux liegt das Limit für ''mq_maxmsg'' standardmäßig bei 10 (sichtbar in ''/proc/sys/fs/mqueue/msg_max''). Größere Kapazitäten erfordern Root oder angepasste Systemgrenzen. * Für Timeout-Werte: ''TimeNowNs()'' (aus ''std.time'') liefert Nanosekunden seit CLOCK_REALTIME, was direkt als ''timeoutNs'' verwendet werden kann. ---- ===== Verwandte Units ===== * ''[[lyx_-_programmiersprache:units:pipe|std.pipe]]'' — Pipes (einfacheres IPC für Eltern-/Kindprozesse) * ''[[lyx_-_programmiersprache:units:thread|std.thread]]'' — Threads mit Mutex/Bedingungsvariablen * ''[[lyx_-_programmiersprache:units:signals|std.signals]]'' — UNIX-Signale als leichtgewichtige IPC-Alternative * ''[[lyx_-_programmiersprache:units:net:epoll|std.net.epoll]]'' — epoll kann mqueue-fds überwachen Letzte Aktualisierung: 2026-06-05