====== Lyx – Threads & Nebenläufigkeit ======
Lyx unterstützt Nebenläufigkeit über **POSIX-Threads** (pthreads), gekapselt in der Unit ''std.thread''. Sie bietet sechs Primitive: ''Thread'' (Ausführungsfaden), ''Mutex'' (gegenseitiger Ausschluss), ''Cond'' (Bedingungsvariablen), ''Atomic'' (lock-freie Ganzzahl), ''TLSKey'' (Thread-Local Storage) und ''SharedMem'' (gemeinsamer Speicher via mmap).
Threads teilen sich den Adressraum des Prozesses. Alle globalen Variablen und Heap-Objekte sind von jedem Thread aus erreichbar. Das macht schnelle Kommunikation möglich, erfordert aber sorgfältige Synchronisation: Gleichzeitige Schreibzugriffe auf dieselbe Variable ohne Schutz führen zu **Race Conditions** — undefiniertem, plattformabhängigem Verhalten.
> Diese Seite behandelt **POSIX-Threads auf Linux und macOS**. Für Bare-Metal-Targets (ARM Cortex-M, RISC-V ohne OS) und RTOS-Umgebungen (ESP32/FreeRTOS) siehe: [[lyx_-_programmiersprache:rtos-embedded-concurrency|Nebenläufigkeit auf Embedded-Targets & RTOS]]
import std.thread;
import std.io;
===== 1. Threads erstellen & beenden =====
''ThreadCreate'' startet einen neuen Ausführungsfaden. Die Thread-Funktion erhält ein ''int64''-Argument (Pointer auf Daten oder Wert) und wird parallel zum aufrufenden Thread ausgeführt.
unit hello_thread;
import std.thread;
import std.io;
fn Worker(arg: int64): int64 {
PrintStr("Hallo vom Thread!\n");
PrintInt(arg);
return 0;
}
fn main(): int64 {
var t := ThreadCreate(Worker as int64, 42); // startet Worker(42)
ThreadJoin(t); // wartet bis Worker endet
PrintStr("Thread beendet.\n");
return 0;
}
==== Thread-API ====
^ Funktion ^ Beschreibung ^
| ''ThreadCreate(func: int64, arg: int64): Thread'' | Erstellt und startet neuen Thread; ''func'' ist Funktionszeiger (als int64) |
| ''ThreadJoin(t: Thread): int64'' | Wartet auf Beendigung des Threads; gibt dessen Rückgabewert zurück |
| ''ThreadSelf(): int64'' | Handle des aktuellen Threads (für Logging, TLS-Lookup) |
| ''ThreadIsRunning(t: Thread): bool'' | ''true'' wenn der Thread noch läuft |
| ''ThreadExit(code: int64)'' | Beendet den **aktuellen** Thread mit Rückgabewert |
==== Mehrere Threads starten ====
unit multi_thread;
import std.thread;
import std.io;
fn ComputeChunk(id: int64): int64 {
var sum: int64 := 0;
var start := id * 1000;
for i := start to start + 999 do {
sum := sum + i;
}
PrintStr("Thread ");
PrintInt(id);
PrintStr(" fertig\n");
return sum;
}
fn main(): int64 {
var t0 := ThreadCreate(ComputeChunk as int64, 0);
var t1 := ThreadCreate(ComputeChunk as int64, 1);
var t2 := ThreadCreate(ComputeChunk as int64, 2);
var t3 := ThreadCreate(ComputeChunk as int64, 3);
ThreadJoin(t0);
ThreadJoin(t1);
ThreadJoin(t2);
ThreadJoin(t3);
PrintStr("Alle Threads abgeschlossen.\n");
return 0;
}
===== 2. Race Conditions – das grundlegende Problem =====
Wenn zwei Threads dieselbe Variable lesen-modifizieren-schreiben, ohne sich abzustimmen, können Änderungen verloren gehen:
// ✗ FALSCH: Race Condition
var counter: int64 := 0;
fn IncrementBad(arg: int64): int64 {
var i: int64 := 0;
while (i < 100000) {
counter++; // Lesen + Addieren + Schreiben — nicht atomar!
i++;
}
return 0;
}
// Ergebnis: nicht deterministisch (weniger als 200000)
Lösung: **Mutex** für exklusiven Zugriff oder **Atomic** für lock-freie Inkremente.
===== 3. Mutex – Gegenseitiger Ausschluss =====
Ein ''Mutex'' (Mutual Exclusion) stellt sicher, dass immer nur **ein** Thread einen kritischen Abschnitt betreten kann. Der Lyx-Mutex ist **rekursiv-sicher**: Derselbe Thread kann ''MutexLock'' mehrfach aufrufen, ohne sich selbst zu blockieren.
==== API ====
^ Funktion ^ Beschreibung ^
| ''MutexInit(): Mutex'' | Erstellt und initialisiert neuen Mutex |
| ''MutexLock(m: Mutex): int64'' | Sperrt den Mutex (blockiert, bis er frei ist) |
| ''MutexUnlock(m: Mutex): int64'' | Gibt den Mutex frei |
==== Geschützter Zähler ====
unit mutex_counter;
import std.thread;
import std.io;
var counter: int64 := 0;
var mtx: Mutex;
fn Increment(arg: int64): int64 {
var i: int64 := 0;
while (i < 100000) {
MutexLock(mtx);
counter++; // kritischer Abschnitt: nur ein Thread gleichzeitig
MutexUnlock(mtx);
i++;
}
return 0;
}
fn main(): int64 {
mtx := MutexInit();
var t0 := ThreadCreate(Increment as int64, 0);
var t1 := ThreadCreate(Increment as int64, 0);
ThreadJoin(t0);
ThreadJoin(t1);
PrintInt(counter); // immer 200000
return 0;
}
==== Mutex-Scope-Muster ====
Mutex immer in einem klar abgegrenzten Block sperren/freigeben. Niemals mit ''return'' oder ''break'' aus einem gesperrten Abschnitt springen, ohne vorher ''MutexUnlock'' aufzurufen:
fn SafeUpdate(value: int64): bool {
MutexLock(mtx);
if (value < 0) {
MutexUnlock(mtx); // ← Pflicht vor return!
return false;
}
counter := value;
MutexUnlock(mtx);
return true;
}
===== 4. Bedingungsvariablen (Cond) – Producer/Consumer =====
''Cond'' (Condition Variable) ermöglicht es einem Thread zu **warten**, bis ein anderer Thread ein bestimmtes Ereignis signalisiert. Das ist das Kernmuster für Producer/Consumer-Queues und Task-Dispatcher.
==== API ====
^ Funktion ^ Beschreibung ^
| ''CondInit(): Cond'' | Erstellt neue Bedingungsvariable |
| ''CondWait(c: Cond, m: Mutex): int64'' | Gibt Mutex atomar frei und wartet auf Signal; nach Rückkehr ist Mutex wieder gesperrt |
| ''CondSignal(c: Cond): int64'' | Weckt **einen** wartenden Thread auf |
| ''pthread_cond_broadcast(c.signal, 0): int64'' | Weckt **alle** wartenden Threads auf |
==== Producer/Consumer mit Ringpuffer ====
unit producer_consumer;
import std.thread;
import std.io;
con QUEUE_SIZE: int64 := 64;
var queue: [64]int64;
var q_head: int64 := 0;
var q_tail: int64 := 0;
var q_count: int64 := 0;
var mtx: Mutex;
var not_empty: Cond;
var not_full: Cond;
fn Enqueue(value: int64) {
MutexLock(mtx);
while (q_count = QUEUE_SIZE) limit(100000) {
CondWait(not_full, mtx); // warte bis Platz frei
}
queue[q_tail] := value;
q_tail := (q_tail + 1) mod QUEUE_SIZE;
q_count++;
CondSignal(not_empty); // Consumer benachrichtigen
MutexUnlock(mtx);
}
fn Dequeue(): int64 {
MutexLock(mtx);
while (q_count = 0) limit(100000) {
CondWait(not_empty, mtx); // warte bis Item vorhanden
}
var value := queue[q_head];
q_head := (q_head + 1) mod QUEUE_SIZE;
q_count--;
CondSignal(not_full); // Producer benachrichtigen
MutexUnlock(mtx);
return value;
}
fn Producer(arg: int64): int64 {
for i := 0 to 199 do {
Enqueue(i);
}
Enqueue(-1); // Sentinel: Consumer soll enden
return 0;
}
fn Consumer(arg: int64): int64 {
while (true) {
var v := Dequeue();
if (v = -1) { break; }
PrintInt(v);
}
return 0;
}
fn main(): int64 {
mtx := MutexInit();
not_empty := CondInit();
not_full := CondInit();
var prod := ThreadCreate(Producer as int64, 0);
var cons := ThreadCreate(Consumer as int64, 0);
ThreadJoin(prod);
ThreadJoin(cons);
return 0;
}
> **CondWait-Invariante:**
> ''CondWait'' muss immer in einer ''while''-Schleife (nicht ''if'') aufgerufen werden. Spurious Wakeups — Aufwachen ohne echtes Signal — sind im POSIX-Standard erlaubt. Die Schleife prüft die Bedingung erneut und schläft weiter, falls sie noch nicht erfüllt ist.
===== 5. Atomare Operationen =====
''Atomic'' ist ein ''int64''-Wert, der ohne Mutex gelesen und verändert werden kann. Das Lesen-Modifizieren-Schreiben geschieht als **eine unteilbare Hardwareoperation** (LOCK XADD auf x86_64, LDADD auf ARM64).
Atomic-Operationen sind deutlich schneller als Mutex-basierter Code — aber nur für einfache Zähler und Flags geeignet.
==== API ====
^ Funktion ^ Beschreibung ^
| ''AtomicInit(initial: int64): Atomic'' | Erstellt atomaren Wert mit Startwert |
| ''AtomicAdd(a: Atomic, delta: int64): int64'' | Addiert ''delta'' atomar; gibt **neuen** Wert zurück |
| ''CAS(a: Atomic, oldVal: int64, newVal: int64): int64'' | Compare-And-Swap: schreibt ''newVal'' nur wenn aktueller Wert ''oldVal'' ist; gibt 1 (Erfolg) oder 0 (Mismatch) zurück |
==== Lock-freier Zähler ====
unit atomic_counter;
import std.thread;
import std.io;
var hits: Atomic;
fn Worker(arg: int64): int64 {
var i: int64 := 0;
while (i < 100000) {
AtomicAdd(hits, 1); // kein Mutex nötig
i++;
}
return 0;
}
fn main(): int64 {
hits := AtomicInit(0);
var t0 := ThreadCreate(Worker as int64, 0);
var t1 := ThreadCreate(Worker as int64, 0);
var t2 := ThreadCreate(Worker as int64, 0);
var t3 := ThreadCreate(Worker as int64, 0);
ThreadJoin(t0); ThreadJoin(t1);
ThreadJoin(t2); ThreadJoin(t3);
PrintInt(hits.value); // immer 400000
return 0;
}
==== Compare-And-Swap (CAS) – Lock-freier Stack ====
CAS ist der Baustein für lock-freie Datenstrukturen. Hier ein lock-freies Push auf eine Ganzzahl:
fn TrySetFlag(flag: Atomic, expected: int64, new_val: int64): bool {
return CAS(flag, expected, new_val) = 1;
}
fn main(): int64 {
var flag := AtomicInit(0);
// Exakt ein Thread kann von 0 → 1 wechseln:
if (TrySetFlag(flag, 0, 1)) {
PrintStr("Dieser Thread hat das Flag gesetzt.\n");
} else {
PrintStr("Anderer Thread war schneller.\n");
}
return 0;
}
===== 6. Thread-Local Storage (TLS) =====
TLS ermöglicht pro-Thread-Variablen: Jeder Thread hat seinen **eigenen** Wert für denselben Schlüssel. Typische Einsatzfälle: Thread-ID-Tracking, Error-Context, per-Thread-Buffer.
==== API ====
^ Funktion ^ Beschreibung ^
| ''TLSKeyCreate(): TLSKey'' | Erstellt neuen TLS-Schlüssel (global registriert) |
| ''TLSSetValue(key: TLSKey, value: int64): int64'' | Setzt Thread-lokalen Wert für diesen Schlüssel |
| ''TLSGetValue(key: TLSKey): int64'' | Liest Thread-lokalen Wert; gibt 0 zurück wenn nicht gesetzt |
unit tls_example;
import std.thread;
import std.io;
var thread_id_key: TLSKey;
fn Worker(id: int64): int64 {
TLSSetValue(thread_id_key, id); // speichere ID lokal für diesen Thread
// … andere Funktionen können die ID ohne Parameter-Übergabe lesen:
var my_id := TLSGetValue(thread_id_key);
PrintStr("Thread-ID: ");
PrintInt(my_id);
PrintStr("\n");
return 0;
}
fn main(): int64 {
thread_id_key := TLSKeyCreate();
var t0 := ThreadCreate(Worker as int64, 100);
var t1 := ThreadCreate(Worker as int64, 200);
var t2 := ThreadCreate(Worker as int64, 300);
ThreadJoin(t0); ThreadJoin(t1); ThreadJoin(t2);
return 0;
}
===== 7. Shared Memory (SharedMem) =====
''SharedMem'' erstellt einen anonymen Speicherbereich via ''mmap'', der von mehreren Threads (und optional Prozessen) geteilt werden kann. Anders als Heap-Speicher (''malloc'') ist mmap-Speicher direkt adressiert und kann für IPC (Inter-Process Communication) genutzt werden.
==== API ====
^ Funktion ^ Beschreibung ^
| ''SharedMemCreate(size: int64): SharedMem'' | Erstellt anonymes Shared-Memory-Segment (''mmap'') |
| ''SharedMemFree(mem: SharedMem): int64'' | Gibt Shared Memory frei (''munmap'') |
unit shared_mem;
import std.thread;
import std.io;
fn Writer(arg: int64): int64 {
var mem := arg as ^SharedMem;
var ptr := mem^.data as ^int64;
ptr^ := 12345; // schreibe in Shared Memory
return 0;
}
fn main(): int64 {
var mem := SharedMemCreate(64); // 64 Byte anonymes Shared Memory
var t := ThreadCreate(Writer as int64, ^mem as int64);
ThreadJoin(t);
var result := (mem.data as ^int64)^;
PrintInt(result); // 12345
SharedMemFree(mem);
return 0;
}
===== 8. Vollständiges Beispiel: Thread-Pool =====
Ein Thread-Pool hält eine feste Anzahl Worker-Threads bereit und verteilt Aufgaben über eine Mutex-geschützte Queue. Neue Tasks werden eingereiht, Worker holen sie und verarbeiten sie.
unit thread_pool;
import std.thread;
import std.io;
import std.math;
con POOL_SIZE: int64 := 4;
con QUEUE_CAP: int64 := 256;
// ── Aufgaben-Queue ────────────────────────────────────────────────────────────
var tasks: [256]int64;
var t_head: int64 := 0;
var t_tail: int64 := 0;
var t_count: int64 := 0;
var done: int64 := 0; // 1 = keine weiteren Tasks
var q_mtx: Mutex;
var q_ready: Cond;
fn PushTask(value: int64) {
MutexLock(q_mtx);
tasks[t_tail] := value;
t_tail := (t_tail + 1) mod QUEUE_CAP;
t_count++;
CondSignal(q_ready);
MutexUnlock(q_mtx);
}
fn PopTask(): (int64, bool) {
MutexLock(q_mtx);
while (t_count = 0 & done = 0) limit(1000000) {
CondWait(q_ready, q_mtx);
}
if (t_count = 0) {
MutexUnlock(q_mtx);
return (0, false); // keine Tasks mehr
}
var val := tasks[t_head];
t_head := (t_head + 1) mod QUEUE_CAP;
t_count--;
MutexUnlock(q_mtx);
return (val, true);
}
// ── Worker ────────────────────────────────────────────────────────────────────
var result_mtx: Mutex;
var total: int64 := 0;
fn Worker(id: int64): int64 {
while (true) {
var (task, ok) := PopTask();
if (!ok) { break; }
// Aufgabe: Quadratzahl berechnen
var sq := task * task;
MutexLock(result_mtx);
total := total + sq;
MutexUnlock(result_mtx);
}
return 0;
}
// ── Hauptprogramm ─────────────────────────────────────────────────────────────
fn main(): int64 {
q_mtx := MutexInit();
result_mtx := MutexInit();
q_ready := CondInit();
// Worker-Threads starten
var workers: [4]Thread;
for i := 0 to POOL_SIZE - 1 do {
workers[i] := ThreadCreate(Worker as int64, i);
}
// Aufgaben einreihen: 1² + 2² + ... + 100²
for i := 1 to 100 do {
PushTask(i);
}
// Fertig signalisieren
MutexLock(q_mtx);
done := 1;
pthread_cond_broadcast(q_ready.signal, 0); // alle Worker aufwecken
MutexUnlock(q_mtx);
// Warten
for i := 0 to POOL_SIZE - 1 do {
ThreadJoin(workers[i]);
}
PrintInt(total); // Summe der Quadrate 1..100 = 338350
return 0;
}
===== 9. Synchronisations-Primitive im Vergleich =====
^ Primitiv ^ Einsatz ^ Overhead ^ Blockiert ^
| ''Mutex'' | Exklusiver Zugriff auf beliebige Daten | Mittel (Syscall bei Konflikt) | Ja |
| ''Cond'' | Warten auf Zustandsänderung (Producer/Consumer) | Niedrig (mit Mutex kombiniert) | Ja |
| ''Atomic'' | Einfache Zähler, Flags, Sequenznummern | Sehr niedrig (Hardwareinstruktion) | Nein |
| ''TLSKey'' | Per-Thread-Kontext (Error-State, Logger) | Sehr niedrig | Nein |
| ''SharedMem'' | Großer gemeinsamer Puffer, IPC | Niedrig (mmap) | Nein |
**Faustregel:**
* Einfacher Zähler / Flag → **Atomic**
* Komplexe Datenstruktur, mehrere Felder → **Mutex**
* Warten auf Ereignis → **Cond + Mutex**
* Per-Thread-Zustand → **TLS**
* Großer Puffer ohne Kopie → **SharedMem**
===== 10. Nebenläufigkeit in Safety-Code =====
Threads und @flight_crit schließen sich nicht vollständig aus, aber die Kombination erfordert sorgfältige Architektur.
^ Aspekt ^ @flight_crit ^ @dal(A) ^ @dal(B/C) ^
| ''ThreadCreate'' in Init-Phase | ✅ Erlaubt | ✅ Erlaubt | ✅ Erlaubt |
| ''ThreadCreate'' im Regelzyklus | ❌ Verboten | ❌ Verboten | ⚠️ Warnung |
| ''MutexLock'' / ''MutexUnlock'' | ✅ Erlaubt | ✅ Mit Nachweis | ✅ Erlaubt |
| ''CondWait'' ohne Timeout | ❌ Verboten | ❌ Verboten | ⚠️ Warnung |
| ''CondWait'' mit ''limit'' | ✅ Erlaubt | ✅ Mit Nachweis | ✅ Erlaubt |
| ''AtomicAdd'', ''CAS'' | ✅ Erlaubt | ✅ Erlaubt | ✅ Erlaubt |
| ''SharedMemCreate'' in Init | ✅ Erlaubt | ✅ Erlaubt | ✅ Erlaubt |
==== Empfohlenes Muster für @dal(B) ====
Threads werden **einmalig in der Init-Phase** gestartet. Der Regelzyklus kommuniziert ausschließlich über Atomics oder Mutex-geschützte feste Puffer — keine dynamische Thread-Erzeugung, kein unbeschränktes Warten.
@dal(B)
unit flight_ipc;
import std.thread;
// Shared State zwischen Sensor-Thread und Regler-Thread
var sensor_val: Atomic;
var sensor_valid: Atomic;
var ipc_mtx: Mutex;
@flight_crit
@stack_limit(1024)
fn SensorThread(arg: int64): int64 {
while (true) {
var raw := ReadHardwareSensor();
AtomicAdd(sensor_valid, 0); // Memory-Barrier
var v := AtomicInit(raw);
sensor_val := v;
AtomicAdd(sensor_valid, 1); // Signalisiere neuen Wert
Sleep(10);
}
return 0;
}
@flight_crit
@stack_limit(512)
fn ControlThread(arg: int64): int64 {
var last_seq: int64 := 0;
while (true) {
var seq := sensor_valid.value;
if (seq != last_seq) {
var val := sensor_val.value; // atomar gelesen
last_seq := seq;
ComputeControl(val);
}
Sleep(5);
}
return 0;
}
fn main(): int64 {
sensor_val := AtomicInit(0);
sensor_valid := AtomicInit(0);
ipc_mtx := MutexInit();
// Init-Phase: Threads starten
var t_sensor := ThreadCreate(SensorThread as int64, 0);
var t_control := ThreadCreate(ControlThread as int64, 0);
// Warten (Endlosschleife im @dal(C)-Hauptmodul)
ThreadJoin(t_sensor);
ThreadJoin(t_control);
return 0;
}
===== 11. Häufige Fehler =====
^ Fehler ^ Ursache ^ Lösung ^
| Deadlock | Thread A hält Mutex 1, wartet auf Mutex 2; Thread B umgekehrt | Mutexe immer in derselben Reihenfolge sperren |
| Race Condition | Shared Variable ohne Mutex/Atomic | Atomic für primitive Werte; Mutex für Structs/Arrays |
| Vergessenes MutexUnlock | Früher ''return'' aus gesperrtem Block | Immer vor ''return'' entsperren |
| Spurious Wakeup ignoriert | ''if'' statt ''while'' bei CondWait | Immer ''while (Bedingung) limit(N)'' um CondWait |
| Thread-Leak | ThreadJoin vergessen | Jeden gestarteten Thread joinen oder mit Sentinel beenden |
| Stack-Overflow im Thread | Standardstack-Größe zu klein (POSIX: 8 MB) | @stack_limit + --stack-check für Thread-Funktionen |
**Weiterführende Seiten:**
* [[lyx_-_programmiersprache:memory-management|Memory Management – Heap, Stack, SharedMem]]
* [[lyx_-_programmiersprache:units:thread|std.thread – vollständige API-Referenz]]
* [[lyx_-_programmiersprache:pointer-inlining|Pointer & Inlining – Atomic auf Assembler-Ebene]]
* [[lyx_-_programmiersprache:abi-calling-conventions|ABI – Thread-Funktionen und Register-Konventionen]]
* [[lyx_-_programmiersprache:do-178c|DO-178C – Nebenläufigkeit in zertifizierter Software]]