Lyx unterstützt Nebenläufigkeit über POSIX-Threads (pthreads), gekapselt in der Unit std.thread. Sie bietet sechs Primitive: Thread (Ausführungsfaden), Mutex (gegenseitiger Ausschluss), Cond (Bedingungsvariablen), Atomic (lock-freie Ganzzahl), TLSKey (Thread-Local Storage) und SharedMem (gemeinsamer Speicher via mmap).
Threads teilen sich den Adressraum des Prozesses. Alle globalen Variablen und Heap-Objekte sind von jedem Thread aus erreichbar. Das macht schnelle Kommunikation möglich, erfordert aber sorgfältige Synchronisation: Gleichzeitige Schreibzugriffe auf dieselbe Variable ohne Schutz führen zu Race Conditions — undefiniertem, plattformabhängigem Verhalten.
Diese Seite behandelt POSIX-Threads auf Linux und macOS. Für Bare-Metal-Targets (ARM Cortex-M, RISC-V ohne OS) und RTOS-Umgebungen (ESP32/FreeRTOS) siehe: Nebenläufigkeit auf Embedded-Targets & RTOS
import std.thread;
import std.io;
ThreadCreate startet einen neuen Ausführungsfaden. Die Thread-Funktion erhält ein int64-Argument (Pointer auf Daten oder Wert) und wird parallel zum aufrufenden Thread ausgeführt.
unit hello_thread;
import std.thread;
import std.io;
fn Worker(arg: int64): int64 {
PrintStr("Hallo vom Thread!\n");
PrintInt(arg);
return 0;
}
fn main(): int64 {
var t := ThreadCreate(Worker as int64, 42); // startet Worker(42)
ThreadJoin(t); // wartet bis Worker endet
PrintStr("Thread beendet.\n");
return 0;
}
| Funktion | Beschreibung |
|---|---|
ThreadCreate(func: int64, arg: int64): Thread | Erstellt und startet neuen Thread; func ist Funktionszeiger (als int64) |
ThreadJoin(t: Thread): int64 | Wartet auf Beendigung des Threads; gibt dessen Rückgabewert zurück |
ThreadSelf(): int64 | Handle des aktuellen Threads (für Logging, TLS-Lookup) |
ThreadIsRunning(t: Thread): bool | true wenn der Thread noch läuft |
ThreadExit(code: int64) | Beendet den aktuellen Thread mit Rückgabewert |
unit multi_thread;
import std.thread;
import std.io;
fn ComputeChunk(id: int64): int64 {
var sum: int64 := 0;
var start := id * 1000;
for i := start to start + 999 do {
sum := sum + i;
}
PrintStr("Thread ");
PrintInt(id);
PrintStr(" fertig\n");
return sum;
}
fn main(): int64 {
var t0 := ThreadCreate(ComputeChunk as int64, 0);
var t1 := ThreadCreate(ComputeChunk as int64, 1);
var t2 := ThreadCreate(ComputeChunk as int64, 2);
var t3 := ThreadCreate(ComputeChunk as int64, 3);
ThreadJoin(t0);
ThreadJoin(t1);
ThreadJoin(t2);
ThreadJoin(t3);
PrintStr("Alle Threads abgeschlossen.\n");
return 0;
}
Wenn zwei Threads dieselbe Variable lesen-modifizieren-schreiben, ohne sich abzustimmen, können Änderungen verloren gehen:
// ✗ FALSCH: Race Condition
var counter: int64 := 0;
fn IncrementBad(arg: int64): int64 {
var i: int64 := 0;
while (i < 100000) {
counter++; // Lesen + Addieren + Schreiben — nicht atomar!
i++;
}
return 0;
}
// Ergebnis: nicht deterministisch (weniger als 200000)
Lösung: Mutex für exklusiven Zugriff oder Atomic für lock-freie Inkremente.
Ein Mutex (Mutual Exclusion) stellt sicher, dass immer nur ein Thread einen kritischen Abschnitt betreten kann. Der Lyx-Mutex ist rekursiv-sicher: Derselbe Thread kann MutexLock mehrfach aufrufen, ohne sich selbst zu blockieren.
| Funktion | Beschreibung |
|---|---|
MutexInit(): Mutex | Erstellt und initialisiert neuen Mutex |
MutexLock(m: Mutex): int64 | Sperrt den Mutex (blockiert, bis er frei ist) |
MutexUnlock(m: Mutex): int64 | Gibt den Mutex frei |
unit mutex_counter;
import std.thread;
import std.io;
var counter: int64 := 0;
var mtx: Mutex;
fn Increment(arg: int64): int64 {
var i: int64 := 0;
while (i < 100000) {
MutexLock(mtx);
counter++; // kritischer Abschnitt: nur ein Thread gleichzeitig
MutexUnlock(mtx);
i++;
}
return 0;
}
fn main(): int64 {
mtx := MutexInit();
var t0 := ThreadCreate(Increment as int64, 0);
var t1 := ThreadCreate(Increment as int64, 0);
ThreadJoin(t0);
ThreadJoin(t1);
PrintInt(counter); // immer 200000
return 0;
}
Mutex immer in einem klar abgegrenzten Block sperren/freigeben. Niemals mit return oder break aus einem gesperrten Abschnitt springen, ohne vorher MutexUnlock aufzurufen:
fn SafeUpdate(value: int64): bool {
MutexLock(mtx);
if (value < 0) {
MutexUnlock(mtx); // ← Pflicht vor return!
return false;
}
counter := value;
MutexUnlock(mtx);
return true;
}
Cond (Condition Variable) ermöglicht es einem Thread zu warten, bis ein anderer Thread ein bestimmtes Ereignis signalisiert. Das ist das Kernmuster für Producer/Consumer-Queues und Task-Dispatcher.
| Funktion | Beschreibung |
|---|---|
CondInit(): Cond | Erstellt neue Bedingungsvariable |
CondWait(c: Cond, m: Mutex): int64 | Gibt Mutex atomar frei und wartet auf Signal; nach Rückkehr ist Mutex wieder gesperrt |
CondSignal(c: Cond): int64 | Weckt einen wartenden Thread auf |
pthread_cond_broadcast(c.signal, 0): int64 | Weckt alle wartenden Threads auf |
unit producer_consumer;
import std.thread;
import std.io;
con QUEUE_SIZE: int64 := 64;
var queue: [64]int64;
var q_head: int64 := 0;
var q_tail: int64 := 0;
var q_count: int64 := 0;
var mtx: Mutex;
var not_empty: Cond;
var not_full: Cond;
fn Enqueue(value: int64) {
MutexLock(mtx);
while (q_count = QUEUE_SIZE) limit(100000) {
CondWait(not_full, mtx); // warte bis Platz frei
}
queue[q_tail] := value;
q_tail := (q_tail + 1) mod QUEUE_SIZE;
q_count++;
CondSignal(not_empty); // Consumer benachrichtigen
MutexUnlock(mtx);
}
fn Dequeue(): int64 {
MutexLock(mtx);
while (q_count = 0) limit(100000) {
CondWait(not_empty, mtx); // warte bis Item vorhanden
}
var value := queue[q_head];
q_head := (q_head + 1) mod QUEUE_SIZE;
q_count--;
CondSignal(not_full); // Producer benachrichtigen
MutexUnlock(mtx);
return value;
}
fn Producer(arg: int64): int64 {
for i := 0 to 199 do {
Enqueue(i);
}
Enqueue(-1); // Sentinel: Consumer soll enden
return 0;
}
fn Consumer(arg: int64): int64 {
while (true) {
var v := Dequeue();
if (v = -1) { break; }
PrintInt(v);
}
return 0;
}
fn main(): int64 {
mtx := MutexInit();
not_empty := CondInit();
not_full := CondInit();
var prod := ThreadCreate(Producer as int64, 0);
var cons := ThreadCreate(Consumer as int64, 0);
ThreadJoin(prod);
ThreadJoin(cons);
return 0;
}
CondWait-Invariante:
CondWaitmuss immer in einerwhile-Schleife (nichtif) aufgerufen werden. Spurious Wakeups — Aufwachen ohne echtes Signal — sind im POSIX-Standard erlaubt. Die Schleife prüft die Bedingung erneut und schläft weiter, falls sie noch nicht erfüllt ist.
Atomic ist ein int64-Wert, der ohne Mutex gelesen und verändert werden kann. Das Lesen-Modifizieren-Schreiben geschieht als eine unteilbare Hardwareoperation (LOCK XADD auf x86_64, LDADD auf ARM64).
Atomic-Operationen sind deutlich schneller als Mutex-basierter Code — aber nur für einfache Zähler und Flags geeignet.
| Funktion | Beschreibung |
|---|---|
AtomicInit(initial: int64): Atomic | Erstellt atomaren Wert mit Startwert |
AtomicAdd(a: Atomic, delta: int64): int64 | Addiert delta atomar; gibt neuen Wert zurück |
CAS(a: Atomic, oldVal: int64, newVal: int64): int64 | Compare-And-Swap: schreibt newVal nur wenn aktueller Wert oldVal ist; gibt 1 (Erfolg) oder 0 (Mismatch) zurück |
unit atomic_counter;
import std.thread;
import std.io;
var hits: Atomic;
fn Worker(arg: int64): int64 {
var i: int64 := 0;
while (i < 100000) {
AtomicAdd(hits, 1); // kein Mutex nötig
i++;
}
return 0;
}
fn main(): int64 {
hits := AtomicInit(0);
var t0 := ThreadCreate(Worker as int64, 0);
var t1 := ThreadCreate(Worker as int64, 0);
var t2 := ThreadCreate(Worker as int64, 0);
var t3 := ThreadCreate(Worker as int64, 0);
ThreadJoin(t0); ThreadJoin(t1);
ThreadJoin(t2); ThreadJoin(t3);
PrintInt(hits.value); // immer 400000
return 0;
}
CAS ist der Baustein für lock-freie Datenstrukturen. Hier ein lock-freies Push auf eine Ganzzahl:
fn TrySetFlag(flag: Atomic, expected: int64, new_val: int64): bool {
return CAS(flag, expected, new_val) = 1;
}
fn main(): int64 {
var flag := AtomicInit(0);
// Exakt ein Thread kann von 0 → 1 wechseln:
if (TrySetFlag(flag, 0, 1)) {
PrintStr("Dieser Thread hat das Flag gesetzt.\n");
} else {
PrintStr("Anderer Thread war schneller.\n");
}
return 0;
}
TLS ermöglicht pro-Thread-Variablen: Jeder Thread hat seinen eigenen Wert für denselben Schlüssel. Typische Einsatzfälle: Thread-ID-Tracking, Error-Context, per-Thread-Buffer.
| Funktion | Beschreibung |
|---|---|
TLSKeyCreate(): TLSKey | Erstellt neuen TLS-Schlüssel (global registriert) |
TLSSetValue(key: TLSKey, value: int64): int64 | Setzt Thread-lokalen Wert für diesen Schlüssel |
TLSGetValue(key: TLSKey): int64 | Liest Thread-lokalen Wert; gibt 0 zurück wenn nicht gesetzt |
unit tls_example;
import std.thread;
import std.io;
var thread_id_key: TLSKey;
fn Worker(id: int64): int64 {
TLSSetValue(thread_id_key, id); // speichere ID lokal für diesen Thread
// … andere Funktionen können die ID ohne Parameter-Übergabe lesen:
var my_id := TLSGetValue(thread_id_key);
PrintStr("Thread-ID: ");
PrintInt(my_id);
PrintStr("\n");
return 0;
}
fn main(): int64 {
thread_id_key := TLSKeyCreate();
var t0 := ThreadCreate(Worker as int64, 100);
var t1 := ThreadCreate(Worker as int64, 200);
var t2 := ThreadCreate(Worker as int64, 300);
ThreadJoin(t0); ThreadJoin(t1); ThreadJoin(t2);
return 0;
}
SharedMem erstellt einen anonymen Speicherbereich via mmap, der von mehreren Threads (und optional Prozessen) geteilt werden kann. Anders als Heap-Speicher (malloc) ist mmap-Speicher direkt adressiert und kann für IPC (Inter-Process Communication) genutzt werden.
| Funktion | Beschreibung |
|---|---|
SharedMemCreate(size: int64): SharedMem | Erstellt anonymes Shared-Memory-Segment (mmap) |
SharedMemFree(mem: SharedMem): int64 | Gibt Shared Memory frei (munmap) |
unit shared_mem;
import std.thread;
import std.io;
fn Writer(arg: int64): int64 {
var mem := arg as ^SharedMem;
var ptr := mem^.data as ^int64;
ptr^ := 12345; // schreibe in Shared Memory
return 0;
}
fn main(): int64 {
var mem := SharedMemCreate(64); // 64 Byte anonymes Shared Memory
var t := ThreadCreate(Writer as int64, ^mem as int64);
ThreadJoin(t);
var result := (mem.data as ^int64)^;
PrintInt(result); // 12345
SharedMemFree(mem);
return 0;
}
Ein Thread-Pool hält eine feste Anzahl Worker-Threads bereit und verteilt Aufgaben über eine Mutex-geschützte Queue. Neue Tasks werden eingereiht, Worker holen sie und verarbeiten sie.
unit thread_pool;
import std.thread;
import std.io;
import std.math;
con POOL_SIZE: int64 := 4;
con QUEUE_CAP: int64 := 256;
// ── Aufgaben-Queue ────────────────────────────────────────────────────────────
var tasks: [256]int64;
var t_head: int64 := 0;
var t_tail: int64 := 0;
var t_count: int64 := 0;
var done: int64 := 0; // 1 = keine weiteren Tasks
var q_mtx: Mutex;
var q_ready: Cond;
fn PushTask(value: int64) {
MutexLock(q_mtx);
tasks[t_tail] := value;
t_tail := (t_tail + 1) mod QUEUE_CAP;
t_count++;
CondSignal(q_ready);
MutexUnlock(q_mtx);
}
fn PopTask(): (int64, bool) {
MutexLock(q_mtx);
while (t_count = 0 & done = 0) limit(1000000) {
CondWait(q_ready, q_mtx);
}
if (t_count = 0) {
MutexUnlock(q_mtx);
return (0, false); // keine Tasks mehr
}
var val := tasks[t_head];
t_head := (t_head + 1) mod QUEUE_CAP;
t_count--;
MutexUnlock(q_mtx);
return (val, true);
}
// ── Worker ────────────────────────────────────────────────────────────────────
var result_mtx: Mutex;
var total: int64 := 0;
fn Worker(id: int64): int64 {
while (true) {
var (task, ok) := PopTask();
if (!ok) { break; }
// Aufgabe: Quadratzahl berechnen
var sq := task * task;
MutexLock(result_mtx);
total := total + sq;
MutexUnlock(result_mtx);
}
return 0;
}
// ── Hauptprogramm ─────────────────────────────────────────────────────────────
fn main(): int64 {
q_mtx := MutexInit();
result_mtx := MutexInit();
q_ready := CondInit();
// Worker-Threads starten
var workers: [4]Thread;
for i := 0 to POOL_SIZE - 1 do {
workers[i] := ThreadCreate(Worker as int64, i);
}
// Aufgaben einreihen: 1² + 2² + ... + 100²
for i := 1 to 100 do {
PushTask(i);
}
// Fertig signalisieren
MutexLock(q_mtx);
done := 1;
pthread_cond_broadcast(q_ready.signal, 0); // alle Worker aufwecken
MutexUnlock(q_mtx);
// Warten
for i := 0 to POOL_SIZE - 1 do {
ThreadJoin(workers[i]);
}
PrintInt(total); // Summe der Quadrate 1..100 = 338350
return 0;
}
| Primitiv | Einsatz | Overhead | Blockiert |
|---|---|---|---|
Mutex | Exklusiver Zugriff auf beliebige Daten | Mittel (Syscall bei Konflikt) | Ja |
Cond | Warten auf Zustandsänderung (Producer/Consumer) | Niedrig (mit Mutex kombiniert) | Ja |
Atomic | Einfache Zähler, Flags, Sequenznummern | Sehr niedrig (Hardwareinstruktion) | Nein |
TLSKey | Per-Thread-Kontext (Error-State, Logger) | Sehr niedrig | Nein |
SharedMem | Großer gemeinsamer Puffer, IPC | Niedrig (mmap) | Nein |
Faustregel:
Threads und @flight_crit schließen sich nicht vollständig aus, aber die Kombination erfordert sorgfältige Architektur.
| Aspekt | @flight_crit | @dal(A) | @dal(B/C) |
|---|---|---|---|
ThreadCreate in Init-Phase | ✅ Erlaubt | ✅ Erlaubt | ✅ Erlaubt |
ThreadCreate im Regelzyklus | ❌ Verboten | ❌ Verboten | ⚠️ Warnung |
MutexLock / MutexUnlock | ✅ Erlaubt | ✅ Mit Nachweis | ✅ Erlaubt |
CondWait ohne Timeout | ❌ Verboten | ❌ Verboten | ⚠️ Warnung |
CondWait mit limit | ✅ Erlaubt | ✅ Mit Nachweis | ✅ Erlaubt |
AtomicAdd, CAS | ✅ Erlaubt | ✅ Erlaubt | ✅ Erlaubt |
SharedMemCreate in Init | ✅ Erlaubt | ✅ Erlaubt | ✅ Erlaubt |
Threads werden einmalig in der Init-Phase gestartet. Der Regelzyklus kommuniziert ausschließlich über Atomics oder Mutex-geschützte feste Puffer — keine dynamische Thread-Erzeugung, kein unbeschränktes Warten.
@dal(B)
unit flight_ipc;
import std.thread;
// Shared State zwischen Sensor-Thread und Regler-Thread
var sensor_val: Atomic;
var sensor_valid: Atomic;
var ipc_mtx: Mutex;
@flight_crit
@stack_limit(1024)
fn SensorThread(arg: int64): int64 {
while (true) {
var raw := ReadHardwareSensor();
AtomicAdd(sensor_valid, 0); // Memory-Barrier
var v := AtomicInit(raw);
sensor_val := v;
AtomicAdd(sensor_valid, 1); // Signalisiere neuen Wert
Sleep(10);
}
return 0;
}
@flight_crit
@stack_limit(512)
fn ControlThread(arg: int64): int64 {
var last_seq: int64 := 0;
while (true) {
var seq := sensor_valid.value;
if (seq != last_seq) {
var val := sensor_val.value; // atomar gelesen
last_seq := seq;
ComputeControl(val);
}
Sleep(5);
}
return 0;
}
fn main(): int64 {
sensor_val := AtomicInit(0);
sensor_valid := AtomicInit(0);
ipc_mtx := MutexInit();
// Init-Phase: Threads starten
var t_sensor := ThreadCreate(SensorThread as int64, 0);
var t_control := ThreadCreate(ControlThread as int64, 0);
// Warten (Endlosschleife im @dal(C)-Hauptmodul)
ThreadJoin(t_sensor);
ThreadJoin(t_control);
return 0;
}
| Fehler | Ursache | Lösung |
|---|---|---|
| Deadlock | Thread A hält Mutex 1, wartet auf Mutex 2; Thread B umgekehrt | Mutexe immer in derselben Reihenfolge sperren |
| Race Condition | Shared Variable ohne Mutex/Atomic | Atomic für primitive Werte; Mutex für Structs/Arrays |
| Vergessenes MutexUnlock | Früher return aus gesperrtem Block | Immer vor return entsperren |
| Spurious Wakeup ignoriert | if statt while bei CondWait | Immer while (Bedingung) limit(N) um CondWait |
| Thread-Leak | ThreadJoin vergessen | Jeden gestarteten Thread joinen oder mit Sentinel beenden |
| Stack-Overflow im Thread | Standardstack-Größe zu klein (POSIX: 8 MB) | @stack_limit + –stack-check für Thread-Funktionen |
Weiterführende Seiten: