Inhaltsverzeichnis

std.mqueue

POSIX Message Queues (WP-11): Kernel-verwaltete IPC-Queues mit priorisiertem Zugriff. Nachrichten werden nach Priorität (0 = niedrigste, MQ_PRIO_MAX = 32767) sortiert und gepuffert. Queue-Namen beginnen mit / und erscheinen unter /dev/mqueue/. Verbindungen werden als Dateideskriptoren (int64) verwaltet.

Standard Library · std.pipe · std.thread


Konstanten

Open-Flags

Konstante Wert Beschreibung
MQ_OPEN_RDONLY 0 Queue nur zum Empfangen öffnen
MQ_OPEN_WRONLY 1 Queue nur zum Senden öffnen
MQ_OPEN_RDWR 2 Queue zum Senden und Empfangen öffnen
MQ_OPEN_CREATE 64 Queue anlegen wenn nicht vorhanden (wie O_CREAT)
MQ_OPEN_EXCL 128 Fehler wenn Queue bereits existiert (wie O_EXCL)
MQ_OPEN_NONBLOCK 2048 Nicht-blockierend: MqSend/MqRecv geben sofort zurück

Queue-Attribute

Konstante Wert Beschreibung
MQ_ATTR_SIZE 64 Größe des mq_attr-Puffers in Bytes
MQ_ATTR_FLAGS 0 Offset: mq_flags (O_NONBLOCK oder 0)
MQ_ATTR_MAXMSG 8 Offset: mq_maxmsg (maximale Nachrichtenanzahl)
MQ_ATTR_MSGSIZE 16 Offset: mq_msgsize (maximale Nachrichtenlänge in Bytes)
MQ_ATTR_CURMSGS 24 Offset: mq_curmsgs (aktuell wartende Nachrichten, Read-Only)
MQ_DEFAULT_MAXMSG 10 System-Standard: max. 10 Nachrichten
MQ_DEFAULT_MSGSIZE 8192 System-Standard: max. 8192 Bytes pro Nachricht
MQ_PRIO_MAX 32767 Maximale Nachrichtenpriorität

Funktionen

Queue öffnen, schließen, löschen

Signatur Beschreibung
MqOpen(name: pchar, flags: int64, mode: int64): int64 Öffnet oder erstellt eine Queue mit System-Defaults (max. 10 Nachrichten, 8192 Bytes). name muss mit / beginnen. mode setzt Zugriffsrechte (z. B. 420 = 0644)
MqOpenWith(name: pchar, flags: int64, mode: int64, maxmsg: int64, msgsize: int64): int64 Wie MqOpen, aber mit expliziter Kapazität. Wird bei MQ_OPEN_CREATE ausgewertet
MqClose(fd: int64): int64 Schließt den Queue-Deskriptor (wie close). Die Queue selbst bleibt im System
MqUnlink(name: pchar): int64 Löscht die Queue aus dem System. Sie wird beim letzten MqClose endgültig entfernt

Senden

Signatur Beschreibung
MqSend(fd: int64, msg: int64, msgLen: int64, prio: int64): int64 Sendet eine Nachricht. Blockiert wenn die Queue voll ist. prio: 0 (niedrigste) bis MQ_PRIO_MAX
MqTimedSend(fd: int64, msg: int64, msgLen: int64, prio: int64, timeoutNs: int64): int64 Wie MqSend, aber mit absolutem Timeout in Nanosekunden seit CLOCK_REALTIME. Gibt −110 (−ETIMEDOUT) zurück bei Timeout

Empfangen

Signatur Beschreibung
MqRecv(fd: int64, buf: int64, bufLen: int64, prioOut: int64): int64 Empfängt die Nachricht mit der höchsten Priorität. Blockiert wenn die Queue leer ist. prioOut: Zeiger auf int64 für Priorität (0 = ignorieren). Gibt empfangene Bytes zurück
MqTimedRecv(fd: int64, buf: int64, bufLen: int64, prioOut: int64, timeoutNs: int64): int64 Wie MqRecv, aber mit absolutem Timeout in Nanosekunden. Gibt −110 zurück bei Timeout

Attribute abfragen

Signatur Beschreibung
MqGetAttr(fd: int64, attrBuf: int64): int64 Füllt attrBuf (alloc(MQ_ATTR_SIZE)) mit den aktuellen Queue-Attributen
MqGetMaxMsg(fd: int64): int64 Gibt die maximale Nachrichtenanzahl der Queue zurück
MqGetCurMsgs(fd: int64): int64 Gibt die aktuell wartende Nachrichtenanzahl zurück

Verwendung

Einfache Producer-Consumer-Kommunikation

import std.mqueue;
import std.alloc;
import std.io;

fn Producer(): void {
    var fd := MqOpen("/worker_queue",
                     MQ_OPEN_WRONLY | MQ_OPEN_CREATE, 420);

    var msg: pchar := "job:render_frame";
    MqSend(fd, msg, 16, 10);  // Priorität 10

    MqClose(fd);
}

fn Consumer(): void {
    var fd := MqOpen("/worker_queue", MQ_OPEN_RDONLY, 0);

    var buf: int64 := alloc(MQ_DEFAULT_MSGSIZE);
    var prio: int64 := alloc(8);
    var n := MqRecv(fd, buf, MQ_DEFAULT_MSGSIZE, prio);

    PrintLn("Empfangen (prio=" + IntToStr(peek64(prio)) + "): " + buf);

    free(buf, MQ_DEFAULT_MSGSIZE);
    free(prio, 8);
    MqClose(fd);
    MqUnlink("/worker_queue");
}

Queue mit expliziter Kapazität

import std.mqueue;
import std.alloc;

fn SetupQueue(): int64 {
    // Queue für bis zu 64 kleine Steuernachrichten (max. 64 Bytes)
    return MqOpenWith("/ctrl",
                      MQ_OPEN_RDWR | MQ_OPEN_CREATE,
                      420,   // 0644
                      64,    // maxmsg
                      64);   // msgsize
}

Prioritätsbasierte Verarbeitung

import std.mqueue;
import std.alloc;
import std.io;

fn PrioDispatch(): void {
    var fd := MqOpenWith("/prio_q",
                         MQ_OPEN_RDWR | MQ_OPEN_CREATE,
                         420, 32, 128);

    // Nachrichten mit verschiedenen Prioritäten senden
    var low:  pchar := "backup";
    var high: pchar := "alert";
    var mid:  pchar := "status";
    MqSend(fd, low,  7,  1);   // Priorität 1  — niedrig
    MqSend(fd, mid,  7,  5);   // Priorität 5  — mittel
    MqSend(fd, high, 6, 20);   // Priorität 20 — hoch

    // Empfang erfolgt in Prioritätsreihenfolge: high → mid → low
    var buf: int64 := alloc(128);
    var prio: int64 := alloc(8);
    var i: int64 := 0;
    while (i < 3) {
        MqRecv(fd, buf, 128, prio);
        PrintLn(buf + " (prio=" + IntToStr(peek64(prio)) + ")");
        i := i + 1;
    }

    free(buf, 128);
    free(prio, 8);
    MqClose(fd);
    MqUnlink("/prio_q");
}

Timeout-basiertes Empfangen

import std.mqueue;
import std.time;
import std.alloc;
import std.io;

fn TryReceive(name: pchar): bool {
    var fd := MqOpen(name, MQ_OPEN_RDONLY, 0);

    // Absoluter Timeout: jetzt + 500 ms
    var now    := TimeNowNs();
    var timeout := now + 500000000;

    var buf: int64 := alloc(MQ_DEFAULT_MSGSIZE);
    var prio: int64 := alloc(8);
    var n := MqTimedRecv(fd, buf, MQ_DEFAULT_MSGSIZE, prio, timeout);

    MqClose(fd);
    free(buf, MQ_DEFAULT_MSGSIZE);
    free(prio, 8);

    if (n < 0) {
        PrintLn("Timeout oder Fehler");
        return false;
    }
    return true;
}

Queue-Status abfragen

import std.mqueue;
import std.io;

fn PrintQueueStatus(name: pchar): void {
    var fd := MqOpen(name, MQ_OPEN_RDONLY, 0);

    PrintLn("Max. Nachrichten:     " + IntToStr(MqGetMaxMsg(fd)));
    PrintLn("Aktuelle Nachrichten: " + IntToStr(MqGetCurMsgs(fd)));

    MqClose(fd);
}


Hinweise


Verwandte Units

Letzte Aktualisierung: 2026-06-05