OK, final update. I did some improvements to the script, so that is leaner and faster… it now uses very little CPU.
I did try with one fan to pull up the tach line to the 3.3V line with a 4.7KOhm resistor, but interestingly, I get the same correct readings without it! The trick is the new code, which uses the much faster raspberry pi 5 gpiod library for RPM (and lgpio for PWM as it is the best combo). By setting an event_buffer_size to 4096 I get to read all the pulses and the RPMs read are correct.
The Noctua 4cm fan I am using (https://www.noctua.at/en/products/nf-a4x10-5v/specifications) is rated at 4800 rpm at 100%, so the readings I am getting are fairly accurate.
This is the code:
"""
Pironman Fan Controller (Production Grade)
------------------------------------------
Controls three PWM fans on Raspberry Pi 5.
Features
--------
• Independent PWM control
• Accurate RPM measurement (requires 3.3V pull-up on tach)
• Moving-average RPM smoothing
• Thermal hysteresis
• Startup boost
• Fan stall detection
• NVMe multi-drive detection
• Shared-memory stats export
• Systemd friendly
Stats file:
/dev/shm/pironman_stats
Format:
CPU_TEMP,NVME_TEMP,RPM_NVME,RPM_TOP,RPM_BOTTOM
"""
import time
import signal
import logging
import os
import sys
import glob
from collections import deque
import lgpio
import gpiod
from gpiod.line import Direction, Edge
# --------------------------------------------------
# GPIO CONFIGURATION
# --------------------------------------------------
PWM_PINS = {
"nvme": 12,
"top": 18,
"bottom": 19
}
TACH_PINS = {
"nvme": 25,
"top": 17,
"bottom": 23
}
# adjust this path to the correct gpiochip for your tach lines
GPIOCHIP_DEVICE = "/dev/gpiochip0"
# --------------------------------------------------
# FAN CHARACTERISTICS
# --------------------------------------------------
PWM_FREQ = 10000
PULSES_PER_REV = 2
MIN_DUTY = 30
SAFE_DUTY = 60
STALL_RPM_THRESHOLD = 200
RPM_SMOOTHING = 3
TEMP_HYSTERESIS = 2
STATS_FILE = "/dev/shm/pironman_stats"
STATS_TMP = "/dev/shm/pironman_stats.tmp"
# --------------------------------------------------
# FAN CURVES
# --------------------------------------------------
CPU_CURVE = [
(35, 30),
(45, 50),
(55, 75),
(65, 100)
]
NVME_CURVE = [
(35, 30),
(45, 50),
(55, 75),
(65, 100)
]
# --------------------------------------------------
# LOGGING
# --------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="[PIRONMAN] %(asctime)s %(levelname)s: %(message)s"
)
# --------------------------------------------------
# FAN CLASS
# --------------------------------------------------
class Fan:
def __init__(self, chip_handle, pwm_gpio, tach_gpio, name):
self.name = name
self.h = chip_handle
self.pwm = pwm_gpio
self.tach = tach_gpio
self.last_time = time.perf_counter()
self.rpm_history = deque(maxlen=RPM_SMOOTHING)
self.last_duty = -1 # force first update
lgpio.gpio_claim_output(self.h, self.pwm)
ls = gpiod.LineSettings(
direction=Direction.INPUT,
edge_detection=Edge.FALLING
)
self.request = gpiod.request_lines(
GPIOCHIP_DEVICE,
consumer=f"pironman_{name}",
config={self.tach: ls},
event_buffer_size=4906
)
def set_speed(self, duty):
duty = int(max(MIN_DUTY, min(100, duty)))
# only write to hardware if duty actually changed
if duty != self.last_duty:
lgpio.tx_pwm(self.h, self.pwm, PWM_FREQ, duty)
self.last_duty = duty
def get_rpm(self):
now = time.perf_counter()
duration = max(0.01, now - self.last_time)
# Drain all events accumulated since last call.
# Reading a kernel buffer is cheap — no callbacks, no threads.
# max_events guarantees a full drain in one call.
count = 0
if self.request.wait_edge_events(timeout=0):
events = self.request.read_edge_events(max_events=16384)
count += len(events)
rpm = (count * 60) / (duration * PULSES_PER_REV)
self.last_time = now
# deque handles max length automatically, no pop(0) needed
self.rpm_history.append(rpm)
return int(sum(self.rpm_history) / len(self.rpm_history))
def cleanup(self):
try:
self.request.release()
except Exception:
pass
# --------------------------------------------------
# SENSOR DETECTION
# --------------------------------------------------
def find_cpu_sensor():
for i in range(5):
path = f"/sys/class/thermal/thermal_zone{i}"
if os.path.exists(path):
try:
with open(f"{path}/type") as f:
if "cpu" in f.read().lower():
return f"{path}/temp"
except:
pass
return "/sys/class/thermal/thermal_zone0/temp"
def find_nvme_sensors():
sensors = []
for d in glob.glob("/sys/class/hwmon/hwmon*"):
try:
with open(f"{d}/name") as f:
if "nvme" in f.read().lower():
sensors.append(f"{d}/temp1_input")
except:
pass
return sensors
def read_temp(path):
try:
with open(path) as f:
return float(f.read()) / 1000.0
except:
return None
# --------------------------------------------------
# CURVE INTERPOLATION
# --------------------------------------------------
def interpolate(temp, curve):
if temp <= curve[0][0]:
return curve[0][1]
if temp >= curve[-1][0]:
return curve[-1][1]
for i in range(len(curve) - 1):
t1, d1 = curve[i]
t2, d2 = curve[i + 1]
if t1 <= temp <= t2:
return d1 + (d2 - d1) * (temp - t1) / (t2 - t1)
# --------------------------------------------------
# CONTROLLER
# --------------------------------------------------
class Controller:
def __init__(self):
try:
self.h = lgpio.gpiochip_open(0)
except Exception as e:
logging.error(f"Cannot open PWM GPIO chip: {e}")
sys.exit(1)
self.fans = (
Fan(self.h, PWM_PINS["nvme"], TACH_PINS["nvme"], "NVME"),
Fan(self.h, PWM_PINS["top"], TACH_PINS["top"], "CASE_TOP"),
Fan(self.h, PWM_PINS["bottom"], TACH_PINS["bottom"], "CASE_BOTTOM")
)
self.nvme, self.top, self.bottom = self.fans
self.cpu_sensor = find_cpu_sensor()
self.nvme_sensors = find_nvme_sensors()
self.last_cpu = None
self.last_nvme = None
logging.info(f"CPU sensor: {self.cpu_sensor}")
logging.info(f"NVMe sensors: {self.nvme_sensors}")
def get_temps(self):
cpu = read_temp(self.cpu_sensor) or 35
# read each sensor exactly once
nvme_list = []
for s in self.nvme_sensors:
t = read_temp(s)
if t is not None:
nvme_list.append(t)
nvme = max(nvme_list) if nvme_list else cpu
if self.last_cpu is not None and abs(cpu - self.last_cpu) < TEMP_HYSTERESIS:
cpu = self.last_cpu
if self.last_nvme is not None and abs(nvme - self.last_nvme) < TEMP_HYSTERESIS:
nvme = self.last_nvme
self.last_cpu = cpu
self.last_nvme = nvme
return cpu, nvme
def startup_boost(self):
logging.info("Startup boost")
for f in self.fans:
f.set_speed(100)
time.sleep(2)
def check_stall(self, fan, rpm):
if fan.last_duty > 50 and rpm < STALL_RPM_THRESHOLD:
logging.warning(f"Fan stall suspected: {fan.name} duty={fan.last_duty:.2f} rpm={rpm}")
def write_stats(self, cpu, nvme, r1, r2, r3):
try:
# atomic write: readers never see a partial file
with open(STATS_TMP, "w") as f:
f.write(f"{cpu:.1f},{nvme:.1f},{r1},{r2},{r3}")
os.replace(STATS_TMP, STATS_FILE)
except Exception:
pass
def stop(self):
logging.warning("Controller stopping → entering SAFE_DUTY mode")
for f in self.fans:
try:
f.set_speed(SAFE_DUTY)
logging.info(f"{f.name} set to SAFE_DUTY={SAFE_DUTY}%")
except Exception as e:
logging.error(f"Failed to set {f.name} to SAFE_DUTY: {e}")
f.cleanup()
time.sleep(0.5)
try:
lgpio.gpiochip_close(self.h)
logging.info("GPIO chip closed successfully")
except Exception as e:
logging.error(f"Failed to close GPIO chip: {e}")
sys.exit(0)
def loop(self):
self.startup_boost()
while True:
cpu, nvme = self.get_temps()
cpu_duty = interpolate(cpu, CPU_CURVE)
nvme_duty = interpolate(nvme, NVME_CURVE)
self.top.set_speed(cpu_duty)
self.bottom.set_speed(cpu_duty)
self.nvme.set_speed(nvme_duty)
rpm_nvme = self.nvme.get_rpm()
rpm_top = self.top.get_rpm()
rpm_bottom = self.bottom.get_rpm()
self.check_stall(self.nvme, rpm_nvme)
self.check_stall(self.top, rpm_top)
self.check_stall(self.bottom, rpm_bottom)
logging.info(
f"CPU:{cpu:.1f}C NVMe:{nvme:.1f}C | "
f"RPM NVME:{rpm_nvme}, duty={nvme_duty:.0f}% TOP:{rpm_top} BOT:{rpm_bottom}, duties={cpu_duty:.0f}%"
)
self.write_stats(cpu, nvme, rpm_nvme, rpm_top, rpm_bottom)
time.sleep(5)
if __name__ == "__main__":
ctrl = Controller()
signal.signal(signal.SIGTERM, lambda s, f: ctrl.stop())
signal.signal(signal.SIGINT, lambda s, f: ctrl.stop())
try:
ctrl.loop()
except Exception as e:
logging.error(f"Unexpected exception: {e}")
ctrl.stop()
Example output:
[PIRONMAN] 2026-04-09 21:20:32,225 INFO: CPU sensor: /sys/class/thermal/thermal_zone0/temp
[PIRONMAN] 2026-04-09 21:20:32,225 INFO: NVMe sensors: ['/sys/class/hwmon/hwmon2/temp1_input', '/sys/class/hwmon/hwmon1/temp1_input']
[PIRONMAN] 2026-04-09 21:20:32,225 INFO: Startup boost
[PIRONMAN] 2026-04-09 21:20:34,229 INFO: CPU:41.4C NVMe:28.9C | RPM NVME:3428, duty=30% TOP:3233 BOT:3248, duties=43%
[PIRONMAN] 2026-04-09 21:20:39,234 INFO: CPU:41.4C NVMe:28.9C | RPM NVME:2865, duty=30% TOP:2902 BOT:2873, duties=43%
[PIRONMAN] 2026-04-09 21:20:44,238 INFO: CPU:41.4C NVMe:28.9C | RPM NVME:2321, duty=30% TOP:2576 BOT:2539, duties=43%
[PIRONMAN] 2026-04-09 21:20:49,243 INFO: CPU:41.4C NVMe:28.9C | RPM NVME:1590, duty=30% TOP:2136 BOT:2080, duties=43%
[PIRONMAN] 2026-04-09 21:20:54,248 INFO: CPU:41.4C NVMe:28.9C | RPM NVME:1232, duty=30% TOP:1920 BOT:1872, duties=43%
[PIRONMAN] 2026-04-09 21:20:59,252 INFO: CPU:41.4C NVMe:28.9C | RPM NVME:1232, duty=30% TOP:1918 BOT:1874, duties=43%
Note how the fan duty is interpolated from the curve.
I am going to call my Raspberry pi 5 case “The Borg”… It is a cube full of cables 