====== Lyx – Threads & Nebenläufigkeit ====== Lyx unterstützt Nebenläufigkeit über **POSIX-Threads** (pthreads), gekapselt in der Unit ''std.thread''. Sie bietet sechs Primitive: ''Thread'' (Ausführungsfaden), ''Mutex'' (gegenseitiger Ausschluss), ''Cond'' (Bedingungsvariablen), ''Atomic'' (lock-freie Ganzzahl), ''TLSKey'' (Thread-Local Storage) und ''SharedMem'' (gemeinsamer Speicher via mmap). Threads teilen sich den Adressraum des Prozesses. Alle globalen Variablen und Heap-Objekte sind von jedem Thread aus erreichbar. Das macht schnelle Kommunikation möglich, erfordert aber sorgfältige Synchronisation: Gleichzeitige Schreibzugriffe auf dieselbe Variable ohne Schutz führen zu **Race Conditions** — undefiniertem, plattformabhängigem Verhalten. > Diese Seite behandelt **POSIX-Threads auf Linux und macOS**. Für Bare-Metal-Targets (ARM Cortex-M, RISC-V ohne OS) und RTOS-Umgebungen (ESP32/FreeRTOS) siehe: [[lyx_-_programmiersprache:rtos-embedded-concurrency|Nebenläufigkeit auf Embedded-Targets & RTOS]] import std.thread; import std.io; ===== 1. Threads erstellen & beenden ===== ''ThreadCreate'' startet einen neuen Ausführungsfaden. Die Thread-Funktion erhält ein ''int64''-Argument (Pointer auf Daten oder Wert) und wird parallel zum aufrufenden Thread ausgeführt. unit hello_thread; import std.thread; import std.io; fn Worker(arg: int64): int64 { PrintStr("Hallo vom Thread!\n"); PrintInt(arg); return 0; } fn main(): int64 { var t := ThreadCreate(Worker as int64, 42); // startet Worker(42) ThreadJoin(t); // wartet bis Worker endet PrintStr("Thread beendet.\n"); return 0; } ==== Thread-API ==== ^ Funktion ^ Beschreibung ^ | ''ThreadCreate(func: int64, arg: int64): Thread'' | Erstellt und startet neuen Thread; ''func'' ist Funktionszeiger (als int64) | | ''ThreadJoin(t: Thread): int64'' | Wartet auf Beendigung des Threads; gibt dessen Rückgabewert zurück | | ''ThreadSelf(): int64'' | Handle des aktuellen Threads (für Logging, TLS-Lookup) | | ''ThreadIsRunning(t: Thread): bool'' | ''true'' wenn der Thread noch läuft | | ''ThreadExit(code: int64)'' | Beendet den **aktuellen** Thread mit Rückgabewert | ==== Mehrere Threads starten ==== unit multi_thread; import std.thread; import std.io; fn ComputeChunk(id: int64): int64 { var sum: int64 := 0; var start := id * 1000; for i := start to start + 999 do { sum := sum + i; } PrintStr("Thread "); PrintInt(id); PrintStr(" fertig\n"); return sum; } fn main(): int64 { var t0 := ThreadCreate(ComputeChunk as int64, 0); var t1 := ThreadCreate(ComputeChunk as int64, 1); var t2 := ThreadCreate(ComputeChunk as int64, 2); var t3 := ThreadCreate(ComputeChunk as int64, 3); ThreadJoin(t0); ThreadJoin(t1); ThreadJoin(t2); ThreadJoin(t3); PrintStr("Alle Threads abgeschlossen.\n"); return 0; } ===== 2. Race Conditions – das grundlegende Problem ===== Wenn zwei Threads dieselbe Variable lesen-modifizieren-schreiben, ohne sich abzustimmen, können Änderungen verloren gehen: // ✗ FALSCH: Race Condition var counter: int64 := 0; fn IncrementBad(arg: int64): int64 { var i: int64 := 0; while (i < 100000) { counter++; // Lesen + Addieren + Schreiben — nicht atomar! i++; } return 0; } // Ergebnis: nicht deterministisch (weniger als 200000) Lösung: **Mutex** für exklusiven Zugriff oder **Atomic** für lock-freie Inkremente. ===== 3. Mutex – Gegenseitiger Ausschluss ===== Ein ''Mutex'' (Mutual Exclusion) stellt sicher, dass immer nur **ein** Thread einen kritischen Abschnitt betreten kann. Der Lyx-Mutex ist **rekursiv-sicher**: Derselbe Thread kann ''MutexLock'' mehrfach aufrufen, ohne sich selbst zu blockieren. ==== API ==== ^ Funktion ^ Beschreibung ^ | ''MutexInit(): Mutex'' | Erstellt und initialisiert neuen Mutex | | ''MutexLock(m: Mutex): int64'' | Sperrt den Mutex (blockiert, bis er frei ist) | | ''MutexUnlock(m: Mutex): int64'' | Gibt den Mutex frei | ==== Geschützter Zähler ==== unit mutex_counter; import std.thread; import std.io; var counter: int64 := 0; var mtx: Mutex; fn Increment(arg: int64): int64 { var i: int64 := 0; while (i < 100000) { MutexLock(mtx); counter++; // kritischer Abschnitt: nur ein Thread gleichzeitig MutexUnlock(mtx); i++; } return 0; } fn main(): int64 { mtx := MutexInit(); var t0 := ThreadCreate(Increment as int64, 0); var t1 := ThreadCreate(Increment as int64, 0); ThreadJoin(t0); ThreadJoin(t1); PrintInt(counter); // immer 200000 return 0; } ==== Mutex-Scope-Muster ==== Mutex immer in einem klar abgegrenzten Block sperren/freigeben. Niemals mit ''return'' oder ''break'' aus einem gesperrten Abschnitt springen, ohne vorher ''MutexUnlock'' aufzurufen: fn SafeUpdate(value: int64): bool { MutexLock(mtx); if (value < 0) { MutexUnlock(mtx); // ← Pflicht vor return! return false; } counter := value; MutexUnlock(mtx); return true; } ===== 4. Bedingungsvariablen (Cond) – Producer/Consumer ===== ''Cond'' (Condition Variable) ermöglicht es einem Thread zu **warten**, bis ein anderer Thread ein bestimmtes Ereignis signalisiert. Das ist das Kernmuster für Producer/Consumer-Queues und Task-Dispatcher. ==== API ==== ^ Funktion ^ Beschreibung ^ | ''CondInit(): Cond'' | Erstellt neue Bedingungsvariable | | ''CondWait(c: Cond, m: Mutex): int64'' | Gibt Mutex atomar frei und wartet auf Signal; nach Rückkehr ist Mutex wieder gesperrt | | ''CondSignal(c: Cond): int64'' | Weckt **einen** wartenden Thread auf | | ''pthread_cond_broadcast(c.signal, 0): int64'' | Weckt **alle** wartenden Threads auf | ==== Producer/Consumer mit Ringpuffer ==== unit producer_consumer; import std.thread; import std.io; con QUEUE_SIZE: int64 := 64; var queue: [64]int64; var q_head: int64 := 0; var q_tail: int64 := 0; var q_count: int64 := 0; var mtx: Mutex; var not_empty: Cond; var not_full: Cond; fn Enqueue(value: int64) { MutexLock(mtx); while (q_count = QUEUE_SIZE) limit(100000) { CondWait(not_full, mtx); // warte bis Platz frei } queue[q_tail] := value; q_tail := (q_tail + 1) mod QUEUE_SIZE; q_count++; CondSignal(not_empty); // Consumer benachrichtigen MutexUnlock(mtx); } fn Dequeue(): int64 { MutexLock(mtx); while (q_count = 0) limit(100000) { CondWait(not_empty, mtx); // warte bis Item vorhanden } var value := queue[q_head]; q_head := (q_head + 1) mod QUEUE_SIZE; q_count--; CondSignal(not_full); // Producer benachrichtigen MutexUnlock(mtx); return value; } fn Producer(arg: int64): int64 { for i := 0 to 199 do { Enqueue(i); } Enqueue(-1); // Sentinel: Consumer soll enden return 0; } fn Consumer(arg: int64): int64 { while (true) { var v := Dequeue(); if (v = -1) { break; } PrintInt(v); } return 0; } fn main(): int64 { mtx := MutexInit(); not_empty := CondInit(); not_full := CondInit(); var prod := ThreadCreate(Producer as int64, 0); var cons := ThreadCreate(Consumer as int64, 0); ThreadJoin(prod); ThreadJoin(cons); return 0; } > **CondWait-Invariante:** > ''CondWait'' muss immer in einer ''while''-Schleife (nicht ''if'') aufgerufen werden. Spurious Wakeups — Aufwachen ohne echtes Signal — sind im POSIX-Standard erlaubt. Die Schleife prüft die Bedingung erneut und schläft weiter, falls sie noch nicht erfüllt ist. ===== 5. Atomare Operationen ===== ''Atomic'' ist ein ''int64''-Wert, der ohne Mutex gelesen und verändert werden kann. Das Lesen-Modifizieren-Schreiben geschieht als **eine unteilbare Hardwareoperation** (LOCK XADD auf x86_64, LDADD auf ARM64). Atomic-Operationen sind deutlich schneller als Mutex-basierter Code — aber nur für einfache Zähler und Flags geeignet. ==== API ==== ^ Funktion ^ Beschreibung ^ | ''AtomicInit(initial: int64): Atomic'' | Erstellt atomaren Wert mit Startwert | | ''AtomicAdd(a: Atomic, delta: int64): int64'' | Addiert ''delta'' atomar; gibt **neuen** Wert zurück | | ''CAS(a: Atomic, oldVal: int64, newVal: int64): int64'' | Compare-And-Swap: schreibt ''newVal'' nur wenn aktueller Wert ''oldVal'' ist; gibt 1 (Erfolg) oder 0 (Mismatch) zurück | ==== Lock-freier Zähler ==== unit atomic_counter; import std.thread; import std.io; var hits: Atomic; fn Worker(arg: int64): int64 { var i: int64 := 0; while (i < 100000) { AtomicAdd(hits, 1); // kein Mutex nötig i++; } return 0; } fn main(): int64 { hits := AtomicInit(0); var t0 := ThreadCreate(Worker as int64, 0); var t1 := ThreadCreate(Worker as int64, 0); var t2 := ThreadCreate(Worker as int64, 0); var t3 := ThreadCreate(Worker as int64, 0); ThreadJoin(t0); ThreadJoin(t1); ThreadJoin(t2); ThreadJoin(t3); PrintInt(hits.value); // immer 400000 return 0; } ==== Compare-And-Swap (CAS) – Lock-freier Stack ==== CAS ist der Baustein für lock-freie Datenstrukturen. Hier ein lock-freies Push auf eine Ganzzahl: fn TrySetFlag(flag: Atomic, expected: int64, new_val: int64): bool { return CAS(flag, expected, new_val) = 1; } fn main(): int64 { var flag := AtomicInit(0); // Exakt ein Thread kann von 0 → 1 wechseln: if (TrySetFlag(flag, 0, 1)) { PrintStr("Dieser Thread hat das Flag gesetzt.\n"); } else { PrintStr("Anderer Thread war schneller.\n"); } return 0; } ===== 6. Thread-Local Storage (TLS) ===== TLS ermöglicht pro-Thread-Variablen: Jeder Thread hat seinen **eigenen** Wert für denselben Schlüssel. Typische Einsatzfälle: Thread-ID-Tracking, Error-Context, per-Thread-Buffer. ==== API ==== ^ Funktion ^ Beschreibung ^ | ''TLSKeyCreate(): TLSKey'' | Erstellt neuen TLS-Schlüssel (global registriert) | | ''TLSSetValue(key: TLSKey, value: int64): int64'' | Setzt Thread-lokalen Wert für diesen Schlüssel | | ''TLSGetValue(key: TLSKey): int64'' | Liest Thread-lokalen Wert; gibt 0 zurück wenn nicht gesetzt | unit tls_example; import std.thread; import std.io; var thread_id_key: TLSKey; fn Worker(id: int64): int64 { TLSSetValue(thread_id_key, id); // speichere ID lokal für diesen Thread // … andere Funktionen können die ID ohne Parameter-Übergabe lesen: var my_id := TLSGetValue(thread_id_key); PrintStr("Thread-ID: "); PrintInt(my_id); PrintStr("\n"); return 0; } fn main(): int64 { thread_id_key := TLSKeyCreate(); var t0 := ThreadCreate(Worker as int64, 100); var t1 := ThreadCreate(Worker as int64, 200); var t2 := ThreadCreate(Worker as int64, 300); ThreadJoin(t0); ThreadJoin(t1); ThreadJoin(t2); return 0; } ===== 7. Shared Memory (SharedMem) ===== ''SharedMem'' erstellt einen anonymen Speicherbereich via ''mmap'', der von mehreren Threads (und optional Prozessen) geteilt werden kann. Anders als Heap-Speicher (''malloc'') ist mmap-Speicher direkt adressiert und kann für IPC (Inter-Process Communication) genutzt werden. ==== API ==== ^ Funktion ^ Beschreibung ^ | ''SharedMemCreate(size: int64): SharedMem'' | Erstellt anonymes Shared-Memory-Segment (''mmap'') | | ''SharedMemFree(mem: SharedMem): int64'' | Gibt Shared Memory frei (''munmap'') | unit shared_mem; import std.thread; import std.io; fn Writer(arg: int64): int64 { var mem := arg as ^SharedMem; var ptr := mem^.data as ^int64; ptr^ := 12345; // schreibe in Shared Memory return 0; } fn main(): int64 { var mem := SharedMemCreate(64); // 64 Byte anonymes Shared Memory var t := ThreadCreate(Writer as int64, ^mem as int64); ThreadJoin(t); var result := (mem.data as ^int64)^; PrintInt(result); // 12345 SharedMemFree(mem); return 0; } ===== 8. Vollständiges Beispiel: Thread-Pool ===== Ein Thread-Pool hält eine feste Anzahl Worker-Threads bereit und verteilt Aufgaben über eine Mutex-geschützte Queue. Neue Tasks werden eingereiht, Worker holen sie und verarbeiten sie. unit thread_pool; import std.thread; import std.io; import std.math; con POOL_SIZE: int64 := 4; con QUEUE_CAP: int64 := 256; // ── Aufgaben-Queue ──────────────────────────────────────────────────────────── var tasks: [256]int64; var t_head: int64 := 0; var t_tail: int64 := 0; var t_count: int64 := 0; var done: int64 := 0; // 1 = keine weiteren Tasks var q_mtx: Mutex; var q_ready: Cond; fn PushTask(value: int64) { MutexLock(q_mtx); tasks[t_tail] := value; t_tail := (t_tail + 1) mod QUEUE_CAP; t_count++; CondSignal(q_ready); MutexUnlock(q_mtx); } fn PopTask(): (int64, bool) { MutexLock(q_mtx); while (t_count = 0 & done = 0) limit(1000000) { CondWait(q_ready, q_mtx); } if (t_count = 0) { MutexUnlock(q_mtx); return (0, false); // keine Tasks mehr } var val := tasks[t_head]; t_head := (t_head + 1) mod QUEUE_CAP; t_count--; MutexUnlock(q_mtx); return (val, true); } // ── Worker ──────────────────────────────────────────────────────────────────── var result_mtx: Mutex; var total: int64 := 0; fn Worker(id: int64): int64 { while (true) { var (task, ok) := PopTask(); if (!ok) { break; } // Aufgabe: Quadratzahl berechnen var sq := task * task; MutexLock(result_mtx); total := total + sq; MutexUnlock(result_mtx); } return 0; } // ── Hauptprogramm ───────────────────────────────────────────────────────────── fn main(): int64 { q_mtx := MutexInit(); result_mtx := MutexInit(); q_ready := CondInit(); // Worker-Threads starten var workers: [4]Thread; for i := 0 to POOL_SIZE - 1 do { workers[i] := ThreadCreate(Worker as int64, i); } // Aufgaben einreihen: 1² + 2² + ... + 100² for i := 1 to 100 do { PushTask(i); } // Fertig signalisieren MutexLock(q_mtx); done := 1; pthread_cond_broadcast(q_ready.signal, 0); // alle Worker aufwecken MutexUnlock(q_mtx); // Warten for i := 0 to POOL_SIZE - 1 do { ThreadJoin(workers[i]); } PrintInt(total); // Summe der Quadrate 1..100 = 338350 return 0; } ===== 9. Synchronisations-Primitive im Vergleich ===== ^ Primitiv ^ Einsatz ^ Overhead ^ Blockiert ^ | ''Mutex'' | Exklusiver Zugriff auf beliebige Daten | Mittel (Syscall bei Konflikt) | Ja | | ''Cond'' | Warten auf Zustandsänderung (Producer/Consumer) | Niedrig (mit Mutex kombiniert) | Ja | | ''Atomic'' | Einfache Zähler, Flags, Sequenznummern | Sehr niedrig (Hardwareinstruktion) | Nein | | ''TLSKey'' | Per-Thread-Kontext (Error-State, Logger) | Sehr niedrig | Nein | | ''SharedMem'' | Großer gemeinsamer Puffer, IPC | Niedrig (mmap) | Nein | **Faustregel:** * Einfacher Zähler / Flag → **Atomic** * Komplexe Datenstruktur, mehrere Felder → **Mutex** * Warten auf Ereignis → **Cond + Mutex** * Per-Thread-Zustand → **TLS** * Großer Puffer ohne Kopie → **SharedMem** ===== 10. Nebenläufigkeit in Safety-Code ===== Threads und @flight_crit schließen sich nicht vollständig aus, aber die Kombination erfordert sorgfältige Architektur. ^ Aspekt ^ @flight_crit ^ @dal(A) ^ @dal(B/C) ^ | ''ThreadCreate'' in Init-Phase | ✅ Erlaubt | ✅ Erlaubt | ✅ Erlaubt | | ''ThreadCreate'' im Regelzyklus | ❌ Verboten | ❌ Verboten | ⚠️ Warnung | | ''MutexLock'' / ''MutexUnlock'' | ✅ Erlaubt | ✅ Mit Nachweis | ✅ Erlaubt | | ''CondWait'' ohne Timeout | ❌ Verboten | ❌ Verboten | ⚠️ Warnung | | ''CondWait'' mit ''limit'' | ✅ Erlaubt | ✅ Mit Nachweis | ✅ Erlaubt | | ''AtomicAdd'', ''CAS'' | ✅ Erlaubt | ✅ Erlaubt | ✅ Erlaubt | | ''SharedMemCreate'' in Init | ✅ Erlaubt | ✅ Erlaubt | ✅ Erlaubt | ==== Empfohlenes Muster für @dal(B) ==== Threads werden **einmalig in der Init-Phase** gestartet. Der Regelzyklus kommuniziert ausschließlich über Atomics oder Mutex-geschützte feste Puffer — keine dynamische Thread-Erzeugung, kein unbeschränktes Warten. @dal(B) unit flight_ipc; import std.thread; // Shared State zwischen Sensor-Thread und Regler-Thread var sensor_val: Atomic; var sensor_valid: Atomic; var ipc_mtx: Mutex; @flight_crit @stack_limit(1024) fn SensorThread(arg: int64): int64 { while (true) { var raw := ReadHardwareSensor(); AtomicAdd(sensor_valid, 0); // Memory-Barrier var v := AtomicInit(raw); sensor_val := v; AtomicAdd(sensor_valid, 1); // Signalisiere neuen Wert Sleep(10); } return 0; } @flight_crit @stack_limit(512) fn ControlThread(arg: int64): int64 { var last_seq: int64 := 0; while (true) { var seq := sensor_valid.value; if (seq != last_seq) { var val := sensor_val.value; // atomar gelesen last_seq := seq; ComputeControl(val); } Sleep(5); } return 0; } fn main(): int64 { sensor_val := AtomicInit(0); sensor_valid := AtomicInit(0); ipc_mtx := MutexInit(); // Init-Phase: Threads starten var t_sensor := ThreadCreate(SensorThread as int64, 0); var t_control := ThreadCreate(ControlThread as int64, 0); // Warten (Endlosschleife im @dal(C)-Hauptmodul) ThreadJoin(t_sensor); ThreadJoin(t_control); return 0; } ===== 11. Häufige Fehler ===== ^ Fehler ^ Ursache ^ Lösung ^ | Deadlock | Thread A hält Mutex 1, wartet auf Mutex 2; Thread B umgekehrt | Mutexe immer in derselben Reihenfolge sperren | | Race Condition | Shared Variable ohne Mutex/Atomic | Atomic für primitive Werte; Mutex für Structs/Arrays | | Vergessenes MutexUnlock | Früher ''return'' aus gesperrtem Block | Immer vor ''return'' entsperren | | Spurious Wakeup ignoriert | ''if'' statt ''while'' bei CondWait | Immer ''while (Bedingung) limit(N)'' um CondWait | | Thread-Leak | ThreadJoin vergessen | Jeden gestarteten Thread joinen oder mit Sentinel beenden | | Stack-Overflow im Thread | Standardstack-Größe zu klein (POSIX: 8 MB) | @stack_limit + --stack-check für Thread-Funktionen | **Weiterführende Seiten:** * [[lyx_-_programmiersprache:memory-management|Memory Management – Heap, Stack, SharedMem]] * [[lyx_-_programmiersprache:units:thread|std.thread – vollständige API-Referenz]] * [[lyx_-_programmiersprache:pointer-inlining|Pointer & Inlining – Atomic auf Assembler-Ebene]] * [[lyx_-_programmiersprache:abi-calling-conventions|ABI – Thread-Funktionen und Register-Konventionen]] * [[lyx_-_programmiersprache:do-178c|DO-178C – Nebenläufigkeit in zertifizierter Software]]