Lyx – Threads & Nebenläufigkeit

Lyx unterstützt Nebenläufigkeit über POSIX-Threads (pthreads), gekapselt in der Unit std.thread. Sie bietet sechs Primitive: Thread (Ausführungsfaden), Mutex (gegenseitiger Ausschluss), Cond (Bedingungsvariablen), Atomic (lock-freie Ganzzahl), TLSKey (Thread-Local Storage) und SharedMem (gemeinsamer Speicher via mmap).

Threads teilen sich den Adressraum des Prozesses. Alle globalen Variablen und Heap-Objekte sind von jedem Thread aus erreichbar. Das macht schnelle Kommunikation möglich, erfordert aber sorgfältige Synchronisation: Gleichzeitige Schreibzugriffe auf dieselbe Variable ohne Schutz führen zu Race Conditions — undefiniertem, plattformabhängigem Verhalten.

Diese Seite behandelt POSIX-Threads auf Linux und macOS. Für Bare-Metal-Targets (ARM Cortex-M, RISC-V ohne OS) und RTOS-Umgebungen (ESP32/FreeRTOS) siehe: Nebenläufigkeit auf Embedded-Targets & RTOS

import std.thread;
import std.io;

1. Threads erstellen & beenden

ThreadCreate startet einen neuen Ausführungsfaden. Die Thread-Funktion erhält ein int64-Argument (Pointer auf Daten oder Wert) und wird parallel zum aufrufenden Thread ausgeführt.

unit hello_thread;
import std.thread;
import std.io;

fn Worker(arg: int64): int64 {
    PrintStr("Hallo vom Thread!\n");
    PrintInt(arg);
    return 0;
}

fn main(): int64 {
    var t := ThreadCreate(Worker as int64, 42);   // startet Worker(42)
    ThreadJoin(t);                                 // wartet bis Worker endet
    PrintStr("Thread beendet.\n");
    return 0;
}

Thread-API

Funktion Beschreibung
ThreadCreate(func: int64, arg: int64): Thread Erstellt und startet neuen Thread; func ist Funktionszeiger (als int64)
ThreadJoin(t: Thread): int64 Wartet auf Beendigung des Threads; gibt dessen Rückgabewert zurück
ThreadSelf(): int64 Handle des aktuellen Threads (für Logging, TLS-Lookup)
ThreadIsRunning(t: Thread): bool true wenn der Thread noch läuft
ThreadExit(code: int64) Beendet den aktuellen Thread mit Rückgabewert

Mehrere Threads starten

unit multi_thread;
import std.thread;
import std.io;

fn ComputeChunk(id: int64): int64 {
    var sum: int64 := 0;
    var start := id * 1000;
    for i := start to start + 999 do {
        sum := sum + i;
    }
    PrintStr("Thread ");
    PrintInt(id);
    PrintStr(" fertig\n");
    return sum;
}

fn main(): int64 {
    var t0 := ThreadCreate(ComputeChunk as int64, 0);
    var t1 := ThreadCreate(ComputeChunk as int64, 1);
    var t2 := ThreadCreate(ComputeChunk as int64, 2);
    var t3 := ThreadCreate(ComputeChunk as int64, 3);

    ThreadJoin(t0);
    ThreadJoin(t1);
    ThreadJoin(t2);
    ThreadJoin(t3);

    PrintStr("Alle Threads abgeschlossen.\n");
    return 0;
}

2. Race Conditions – das grundlegende Problem

Wenn zwei Threads dieselbe Variable lesen-modifizieren-schreiben, ohne sich abzustimmen, können Änderungen verloren gehen:

// ✗ FALSCH: Race Condition
var counter: int64 := 0;

fn IncrementBad(arg: int64): int64 {
    var i: int64 := 0;
    while (i < 100000) {
        counter++;   // Lesen + Addieren + Schreiben — nicht atomar!
        i++;
    }
    return 0;
}

// Ergebnis: nicht deterministisch (weniger als 200000)

Lösung: Mutex für exklusiven Zugriff oder Atomic für lock-freie Inkremente.

3. Mutex – Gegenseitiger Ausschluss

Ein Mutex (Mutual Exclusion) stellt sicher, dass immer nur ein Thread einen kritischen Abschnitt betreten kann. Der Lyx-Mutex ist rekursiv-sicher: Derselbe Thread kann MutexLock mehrfach aufrufen, ohne sich selbst zu blockieren.

API

Funktion Beschreibung
MutexInit(): Mutex Erstellt und initialisiert neuen Mutex
MutexLock(m: Mutex): int64 Sperrt den Mutex (blockiert, bis er frei ist)
MutexUnlock(m: Mutex): int64 Gibt den Mutex frei

Geschützter Zähler

unit mutex_counter;
import std.thread;
import std.io;

var counter: int64 := 0;
var mtx: Mutex;

fn Increment(arg: int64): int64 {
    var i: int64 := 0;
    while (i < 100000) {
        MutexLock(mtx);
        counter++;           // kritischer Abschnitt: nur ein Thread gleichzeitig
        MutexUnlock(mtx);
        i++;
    }
    return 0;
}

fn main(): int64 {
    mtx := MutexInit();

    var t0 := ThreadCreate(Increment as int64, 0);
    var t1 := ThreadCreate(Increment as int64, 0);

    ThreadJoin(t0);
    ThreadJoin(t1);

    PrintInt(counter);   // immer 200000
    return 0;
}

Mutex-Scope-Muster

Mutex immer in einem klar abgegrenzten Block sperren/freigeben. Niemals mit return oder break aus einem gesperrten Abschnitt springen, ohne vorher MutexUnlock aufzurufen:

fn SafeUpdate(value: int64): bool {
    MutexLock(mtx);

    if (value < 0) {
        MutexUnlock(mtx);   // ← Pflicht vor return!
        return false;
    }

    counter := value;
    MutexUnlock(mtx);
    return true;
}

4. Bedingungsvariablen (Cond) – Producer/Consumer

Cond (Condition Variable) ermöglicht es einem Thread zu warten, bis ein anderer Thread ein bestimmtes Ereignis signalisiert. Das ist das Kernmuster für Producer/Consumer-Queues und Task-Dispatcher.

API

Funktion Beschreibung
CondInit(): Cond Erstellt neue Bedingungsvariable
CondWait(c: Cond, m: Mutex): int64 Gibt Mutex atomar frei und wartet auf Signal; nach Rückkehr ist Mutex wieder gesperrt
CondSignal(c: Cond): int64 Weckt einen wartenden Thread auf
pthread_cond_broadcast(c.signal, 0): int64 Weckt alle wartenden Threads auf

Producer/Consumer mit Ringpuffer

unit producer_consumer;
import std.thread;
import std.io;

con QUEUE_SIZE: int64 := 64;

var queue:     [64]int64;
var q_head:    int64 := 0;
var q_tail:    int64 := 0;
var q_count:   int64 := 0;
var mtx:       Mutex;
var not_empty: Cond;
var not_full:  Cond;

fn Enqueue(value: int64) {
    MutexLock(mtx);
    while (q_count = QUEUE_SIZE) limit(100000) {
        CondWait(not_full, mtx);     // warte bis Platz frei
    }
    queue[q_tail] := value;
    q_tail := (q_tail + 1) mod QUEUE_SIZE;
    q_count++;
    CondSignal(not_empty);           // Consumer benachrichtigen
    MutexUnlock(mtx);
}

fn Dequeue(): int64 {
    MutexLock(mtx);
    while (q_count = 0) limit(100000) {
        CondWait(not_empty, mtx);    // warte bis Item vorhanden
    }
    var value := queue[q_head];
    q_head := (q_head + 1) mod QUEUE_SIZE;
    q_count--;
    CondSignal(not_full);            // Producer benachrichtigen
    MutexUnlock(mtx);
    return value;
}

fn Producer(arg: int64): int64 {
    for i := 0 to 199 do {
        Enqueue(i);
    }
    Enqueue(-1);   // Sentinel: Consumer soll enden
    return 0;
}

fn Consumer(arg: int64): int64 {
    while (true) {
        var v := Dequeue();
        if (v = -1) { break; }
        PrintInt(v);
    }
    return 0;
}

fn main(): int64 {
    mtx       := MutexInit();
    not_empty := CondInit();
    not_full  := CondInit();

    var prod := ThreadCreate(Producer as int64, 0);
    var cons := ThreadCreate(Consumer as int64, 0);

    ThreadJoin(prod);
    ThreadJoin(cons);
    return 0;
}

 
CondWait-Invariante:
CondWait muss immer in einer while-Schleife (nicht if) aufgerufen werden. Spurious Wakeups — Aufwachen ohne echtes Signal — sind im POSIX-Standard erlaubt. Die Schleife prüft die Bedingung erneut und schläft weiter, falls sie noch nicht erfüllt ist.

5. Atomare Operationen

Atomic ist ein int64-Wert, der ohne Mutex gelesen und verändert werden kann. Das Lesen-Modifizieren-Schreiben geschieht als eine unteilbare Hardwareoperation (LOCK XADD auf x86_64, LDADD auf ARM64).

Atomic-Operationen sind deutlich schneller als Mutex-basierter Code — aber nur für einfache Zähler und Flags geeignet.

API

Funktion Beschreibung
AtomicInit(initial: int64): Atomic Erstellt atomaren Wert mit Startwert
AtomicAdd(a: Atomic, delta: int64): int64 Addiert delta atomar; gibt neuen Wert zurück
CAS(a: Atomic, oldVal: int64, newVal: int64): int64 Compare-And-Swap: schreibt newVal nur wenn aktueller Wert oldVal ist; gibt 1 (Erfolg) oder 0 (Mismatch) zurück

Lock-freier Zähler

unit atomic_counter;
import std.thread;
import std.io;

var hits: Atomic;

fn Worker(arg: int64): int64 {
    var i: int64 := 0;
    while (i < 100000) {
        AtomicAdd(hits, 1);   // kein Mutex nötig
        i++;
    }
    return 0;
}

fn main(): int64 {
    hits := AtomicInit(0);

    var t0 := ThreadCreate(Worker as int64, 0);
    var t1 := ThreadCreate(Worker as int64, 0);
    var t2 := ThreadCreate(Worker as int64, 0);
    var t3 := ThreadCreate(Worker as int64, 0);

    ThreadJoin(t0);  ThreadJoin(t1);
    ThreadJoin(t2);  ThreadJoin(t3);

    PrintInt(hits.value);   // immer 400000
    return 0;
}

Compare-And-Swap (CAS) – Lock-freier Stack

CAS ist der Baustein für lock-freie Datenstrukturen. Hier ein lock-freies Push auf eine Ganzzahl:

fn TrySetFlag(flag: Atomic, expected: int64, new_val: int64): bool {
    return CAS(flag, expected, new_val) = 1;
}

fn main(): int64 {
    var flag := AtomicInit(0);

    // Exakt ein Thread kann von 0 → 1 wechseln:
    if (TrySetFlag(flag, 0, 1)) {
        PrintStr("Dieser Thread hat das Flag gesetzt.\n");
    } else {
        PrintStr("Anderer Thread war schneller.\n");
    }
    return 0;
}

6. Thread-Local Storage (TLS)

TLS ermöglicht pro-Thread-Variablen: Jeder Thread hat seinen eigenen Wert für denselben Schlüssel. Typische Einsatzfälle: Thread-ID-Tracking, Error-Context, per-Thread-Buffer.

API

Funktion Beschreibung
TLSKeyCreate(): TLSKey Erstellt neuen TLS-Schlüssel (global registriert)
TLSSetValue(key: TLSKey, value: int64): int64 Setzt Thread-lokalen Wert für diesen Schlüssel
TLSGetValue(key: TLSKey): int64 Liest Thread-lokalen Wert; gibt 0 zurück wenn nicht gesetzt

unit tls_example;
import std.thread;
import std.io;

var thread_id_key: TLSKey;

fn Worker(id: int64): int64 {
    TLSSetValue(thread_id_key, id);   // speichere ID lokal für diesen Thread

    // … andere Funktionen können die ID ohne Parameter-Übergabe lesen:
    var my_id := TLSGetValue(thread_id_key);
    PrintStr("Thread-ID: ");
    PrintInt(my_id);
    PrintStr("\n");
    return 0;
}

fn main(): int64 {
    thread_id_key := TLSKeyCreate();

    var t0 := ThreadCreate(Worker as int64, 100);
    var t1 := ThreadCreate(Worker as int64, 200);
    var t2 := ThreadCreate(Worker as int64, 300);

    ThreadJoin(t0);  ThreadJoin(t1);  ThreadJoin(t2);
    return 0;
}

7. Shared Memory (SharedMem)

SharedMem erstellt einen anonymen Speicherbereich via mmap, der von mehreren Threads (und optional Prozessen) geteilt werden kann. Anders als Heap-Speicher (malloc) ist mmap-Speicher direkt adressiert und kann für IPC (Inter-Process Communication) genutzt werden.

API

Funktion Beschreibung
SharedMemCreate(size: int64): SharedMem Erstellt anonymes Shared-Memory-Segment (mmap)
SharedMemFree(mem: SharedMem): int64 Gibt Shared Memory frei (munmap)

unit shared_mem;
import std.thread;
import std.io;

fn Writer(arg: int64): int64 {
    var mem := arg as ^SharedMem;
    var ptr := mem^.data as ^int64;
    ptr^ := 12345;   // schreibe in Shared Memory
    return 0;
}

fn main(): int64 {
    var mem := SharedMemCreate(64);   // 64 Byte anonymes Shared Memory
    var t := ThreadCreate(Writer as int64, ^mem as int64);
    ThreadJoin(t);

    var result := (mem.data as ^int64)^;
    PrintInt(result);   // 12345

    SharedMemFree(mem);
    return 0;
}

8. Vollständiges Beispiel: Thread-Pool

Ein Thread-Pool hält eine feste Anzahl Worker-Threads bereit und verteilt Aufgaben über eine Mutex-geschützte Queue. Neue Tasks werden eingereiht, Worker holen sie und verarbeiten sie.

unit thread_pool;
import std.thread;
import std.io;
import std.math;

con POOL_SIZE:  int64 := 4;
con QUEUE_CAP:  int64 := 256;

// ── Aufgaben-Queue ────────────────────────────────────────────────────────────
var tasks:      [256]int64;
var t_head:     int64 := 0;
var t_tail:     int64 := 0;
var t_count:    int64 := 0;
var done:       int64 := 0;   // 1 = keine weiteren Tasks

var q_mtx:      Mutex;
var q_ready:    Cond;

fn PushTask(value: int64) {
    MutexLock(q_mtx);
    tasks[t_tail] := value;
    t_tail := (t_tail + 1) mod QUEUE_CAP;
    t_count++;
    CondSignal(q_ready);
    MutexUnlock(q_mtx);
}

fn PopTask(): (int64, bool) {
    MutexLock(q_mtx);
    while (t_count = 0 & done = 0) limit(1000000) {
        CondWait(q_ready, q_mtx);
    }
    if (t_count = 0) {
        MutexUnlock(q_mtx);
        return (0, false);   // keine Tasks mehr
    }
    var val := tasks[t_head];
    t_head := (t_head + 1) mod QUEUE_CAP;
    t_count--;
    MutexUnlock(q_mtx);
    return (val, true);
}

// ── Worker ────────────────────────────────────────────────────────────────────
var result_mtx: Mutex;
var total:      int64 := 0;

fn Worker(id: int64): int64 {
    while (true) {
        var (task, ok) := PopTask();
        if (!ok) { break; }

        // Aufgabe: Quadratzahl berechnen
        var sq := task * task;

        MutexLock(result_mtx);
        total := total + sq;
        MutexUnlock(result_mtx);
    }
    return 0;
}

// ── Hauptprogramm ─────────────────────────────────────────────────────────────
fn main(): int64 {
    q_mtx      := MutexInit();
    result_mtx := MutexInit();
    q_ready    := CondInit();

    // Worker-Threads starten
    var workers: [4]Thread;
    for i := 0 to POOL_SIZE - 1 do {
        workers[i] := ThreadCreate(Worker as int64, i);
    }

    // Aufgaben einreihen: 1² + 2² + ... + 100²
    for i := 1 to 100 do {
        PushTask(i);
    }

    // Fertig signalisieren
    MutexLock(q_mtx);
    done := 1;
    pthread_cond_broadcast(q_ready.signal, 0);   // alle Worker aufwecken
    MutexUnlock(q_mtx);

    // Warten
    for i := 0 to POOL_SIZE - 1 do {
        ThreadJoin(workers[i]);
    }

    PrintInt(total);   // Summe der Quadrate 1..100 = 338350
    return 0;
}

9. Synchronisations-Primitive im Vergleich

Primitiv Einsatz Overhead Blockiert
Mutex Exklusiver Zugriff auf beliebige Daten Mittel (Syscall bei Konflikt) Ja
Cond Warten auf Zustandsänderung (Producer/Consumer) Niedrig (mit Mutex kombiniert) Ja
Atomic Einfache Zähler, Flags, Sequenznummern Sehr niedrig (Hardwareinstruktion) Nein
TLSKey Per-Thread-Kontext (Error-State, Logger) Sehr niedrig Nein
SharedMem Großer gemeinsamer Puffer, IPC Niedrig (mmap) Nein

Faustregel:

  • Einfacher Zähler / Flag → Atomic
  • Komplexe Datenstruktur, mehrere Felder → Mutex
  • Warten auf Ereignis → Cond + Mutex
  • Per-Thread-Zustand → TLS
  • Großer Puffer ohne Kopie → SharedMem

10. Nebenläufigkeit in Safety-Code

Threads und @flight_crit schließen sich nicht vollständig aus, aber die Kombination erfordert sorgfältige Architektur.

Aspekt @flight_crit @dal(A) @dal(B/C)
ThreadCreate in Init-Phase ✅ Erlaubt ✅ Erlaubt ✅ Erlaubt
ThreadCreate im Regelzyklus ❌ Verboten ❌ Verboten ⚠️ Warnung
MutexLock / MutexUnlock ✅ Erlaubt ✅ Mit Nachweis ✅ Erlaubt
CondWait ohne Timeout ❌ Verboten ❌ Verboten ⚠️ Warnung
CondWait mit limit ✅ Erlaubt ✅ Mit Nachweis ✅ Erlaubt
AtomicAdd, CAS ✅ Erlaubt ✅ Erlaubt ✅ Erlaubt
SharedMemCreate in Init ✅ Erlaubt ✅ Erlaubt ✅ Erlaubt

Empfohlenes Muster für @dal(B)

Threads werden einmalig in der Init-Phase gestartet. Der Regelzyklus kommuniziert ausschließlich über Atomics oder Mutex-geschützte feste Puffer — keine dynamische Thread-Erzeugung, kein unbeschränktes Warten.

@dal(B)
unit flight_ipc;
import std.thread;

// Shared State zwischen Sensor-Thread und Regler-Thread
var sensor_val:   Atomic;
var sensor_valid: Atomic;
var ipc_mtx:      Mutex;

@flight_crit
@stack_limit(1024)
fn SensorThread(arg: int64): int64 {
    while (true) {
        var raw := ReadHardwareSensor();
        AtomicAdd(sensor_valid, 0);      // Memory-Barrier
        var v := AtomicInit(raw);
        sensor_val := v;
        AtomicAdd(sensor_valid, 1);      // Signalisiere neuen Wert
        Sleep(10);
    }
    return 0;
}

@flight_crit
@stack_limit(512)
fn ControlThread(arg: int64): int64 {
    var last_seq: int64 := 0;
    while (true) {
        var seq := sensor_valid.value;
        if (seq != last_seq) {
            var val := sensor_val.value;   // atomar gelesen
            last_seq := seq;
            ComputeControl(val);
        }
        Sleep(5);
    }
    return 0;
}

fn main(): int64 {
    sensor_val   := AtomicInit(0);
    sensor_valid := AtomicInit(0);
    ipc_mtx      := MutexInit();

    // Init-Phase: Threads starten
    var t_sensor  := ThreadCreate(SensorThread as int64, 0);
    var t_control := ThreadCreate(ControlThread as int64, 0);

    // Warten (Endlosschleife im @dal(C)-Hauptmodul)
    ThreadJoin(t_sensor);
    ThreadJoin(t_control);
    return 0;
}

11. Häufige Fehler

Fehler Ursache Lösung
Deadlock Thread A hält Mutex 1, wartet auf Mutex 2; Thread B umgekehrt Mutexe immer in derselben Reihenfolge sperren
Race Condition Shared Variable ohne Mutex/Atomic Atomic für primitive Werte; Mutex für Structs/Arrays
Vergessenes MutexUnlock Früher return aus gesperrtem Block Immer vor return entsperren
Spurious Wakeup ignoriert if statt while bei CondWait Immer while (Bedingung) limit(N) um CondWait
Thread-Leak ThreadJoin vergessen Jeden gestarteten Thread joinen oder mit Sentinel beenden
Stack-Overflow im Thread Standardstack-Größe zu klein (POSIX: 8 MB) @stack_limit + –stack-check für Thread-Funktionen

Weiterführende Seiten: