Yes, exactly. All we need is a resistor (around 4.7KOhm) going from the tach wire and then to the resistor and to the 3.3V line if I am understanding correctly.
I was planning on forking your repo, as what I have in mind is quite different (I am reading two NVME drives, I have three fans, not just the two internal fans, I want to read the RPMs, etc). Also, I am dumping the results to memory files as my intention is to have a systemd service running this script and read this memory file from Home Assistant so that I can monitor the raspberry (which runs proxmox and glances, so I have other things monitored as well).
In any case, here is the code. Note that it is not 100% tested yet… at least not comprehensively. I ordered a resistor book from Amazon and once it gets here, I will do more wiring to correctly read RPMs and will continue testing, but feel free to comment on the code below!
"""
Pironman Fan Controller (Production Grade)
------------------------------------------
Controls three PWM fans on Raspberry Pi 5.
Features
--------
• Independent PWM control
• Accurate RPM measurement (requires 3.3V pull-up on tach)
• Moving-average RPM smoothing
• Thermal hysteresis
• Startup boost
• Fan stall detection
• Watchdog failsafe
• NVMe multi-drive detection
• Shared-memory stats export
• Systemd friendly
Stats file:
/dev/shm/pironman_stats
Format:
CPU_TEMP,NVME_TEMP,RPM_NVME,RPM_TOP,RPM_BOTTOM
"""
import time
import signal
import logging
import os
import lgpio
import sys
import glob
# --------------------------------------------------
# GPIO CONFIGURATION
# --------------------------------------------------
PWM_PINS = {
"nvme": 12,
"top": 13,
"bottom": 19
}
TACH_PINS = {
"nvme": 25,
"top": 17,
"bottom": 23
}
# --------------------------------------------------
# FAN CHARACTERISTICS
# --------------------------------------------------
PWM_FREQ = 10000
PULSES_PER_REV = 2
MIN_DUTY = 35
SAFE_DUTY = 60
STALL_RPM_THRESHOLD = 200
RPM_SMOOTHING = 3
TEMP_HYSTERESIS = 2
STATS_FILE = "/dev/shm/pironman_stats"
# --------------------------------------------------
# FAN CURVES
# --------------------------------------------------
CPU_CURVE = [
(40, 25),
(50, 45),
(60, 75),
(70, 100)
]
NVME_CURVE = [
(35, 30),
(45, 55),
(55, 80),
(65, 100)
]
# --------------------------------------------------
# LOGGING
# --------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="[PIRONMAN] %(asctime)s %(levelname)s: %(message)s"
)
# --------------------------------------------------
# FAN CLASS
# --------------------------------------------------
class Fan:
def __init__(self, chip, pwm_gpio, tach_gpio, name):
self.name = name
self.h = chip
self.pwm = pwm_gpio
self.tach = tach_gpio
self.pulses = 0
self.last_time = time.time()
self.last_duty = MIN_DUTY
self.rpm_history = []
# Claim PWM pins
lgpio.gpio_claim_output(self.h, self.pwm)
# Configure tach GPIO input with pull-up
lgpio.gpio_claim_input(self.h, self.tach, lgpio.SET_PULL_UP)
# Callback to count pulses
self.cb = lgpio.callback(
self.h,
self.tach,
lgpio.FALLING_EDGE,
self._pulse
)
def _pulse(self, chip, gpio, level, tick):
self.pulses += 1
def set_speed(self, duty):
duty = max(MIN_DUTY, min(100, duty))
lgpio.tx_pwm(self.h, self.pwm, PWM_FREQ, duty)
self.last_duty = duty
def get_rpm(self):
now = time.time()
duration = max(0.1, now - self.last_time)
rpm = (self.pulses * 60) / (duration * PULSES_PER_REV)
self.pulses = 0
self.last_time = now
self.rpm_history.append(rpm)
if len(self.rpm_history) > RPM_SMOOTHING:
self.rpm_history.pop(0)
return int(sum(self.rpm_history) / len(self.rpm_history))
# --------------------------------------------------
# SENSOR DETECTION
# --------------------------------------------------
def find_cpu_sensor():
for i in range(5):
path = f"/sys/class/thermal/thermal_zone{i}"
if os.path.exists(path):
try:
with open(f"{path}/type") as f:
if "cpu" in f.read().lower():
return f"{path}/temp"
except:
pass
return "/sys/class/thermal/thermal_zone0/temp"
def find_nvme_sensors():
sensors = []
for d in glob.glob("/sys/class/hwmon/hwmon*"):
try:
with open(f"{d}/name") as f:
if "nvme" in f.read().lower():
sensors.append(f"{d}/temp1_input")
except:
pass
return sensors
def read_temp(path):
try:
with open(path) as f:
return float(f.read()) / 1000.0
except:
return None
# --------------------------------------------------
# CURVE INTERPOLATION
# --------------------------------------------------
def interpolate(temp, curve):
if temp <= curve[0][0]:
return curve[0][1]
if temp >= curve[-1][0]:
return curve[-1][1]
for i in range(len(curve)-1):
t1, d1 = curve[i]
t2, d2 = curve[i+1]
if t1 <= temp <= t2:
return d1 + (d2-d1)*(temp-t1)/(t2-t1)
# --------------------------------------------------
# CONTROLLER
# --------------------------------------------------
class Controller:
def __init__(self):
try:
self.h = lgpio.gpiochip_open(0)
except:
logging.error("Cannot open GPIO chip")
sys.exit(1)
self.nvme = Fan(self.h, PWM_PINS["nvme"], TACH_PINS["nvme"], "NVME")
self.top = Fan(self.h, PWM_PINS["top"], TACH_PINS["top"], "CASE_TOP")
self.bottom = Fan(self.h, PWM_PINS["bottom"], TACH_PINS["bottom"], "CASE_BOTTOM")
self.cpu_sensor = find_cpu_sensor()
self.nvme_sensors = find_nvme_sensors()
self.last_cpu = None
self.last_nvme = None
logging.info(f"CPU sensor: {self.cpu_sensor}")
logging.info(f"NVMe sensors: {self.nvme_sensors}")
# --------------------------------------------------
def get_temps(self):
cpu = read_temp(self.cpu_sensor) or 35
nvme_list = [read_temp(s) for s in self.nvme_sensors if read_temp(s) is not None]
nvme = max(nvme_list) if nvme_list else cpu
if self.last_cpu is not None and abs(cpu - self.last_cpu) < TEMP_HYSTERESIS:
cpu = self.last_cpu
if self.last_nvme is not None and abs(nvme - self.last_nvme) < TEMP_HYSTERESIS:
nvme = self.last_nvme
self.last_cpu = cpu
self.last_nvme = nvme
return cpu, nvme
# --------------------------------------------------
def startup_boost(self):
logging.info("Startup boost")
for f in [self.nvme, self.top, self.bottom]:
f.set_speed(100)
time.sleep(2)
# --------------------------------------------------
def check_stall(self, fan, rpm):
if fan.last_duty > 60 and rpm < STALL_RPM_THRESHOLD:
logging.warning(f"Fan stall suspected: {fan.name} duty={fan.last_duty} rpm={rpm}")
# --------------------------------------------------
def write_stats(self, cpu, nvme, r1, r2, r3):
try:
with open(STATS_FILE, "w") as f:
f.write(f"{cpu:.1f},{nvme:.1f},{r1},{r2},{r3}")
except:
pass
# --------------------------------------------------
def watchdog_refresh(self):
for fan in [self.nvme, self.top, self.bottom]:
fan.set_speed(fan.last_duty)
# --------------------------------------------------
def stop(self):
"""
Robust stop: sets all fans to SAFE_DUTY, waits, then closes GPIO
"""
logging.warning("Controller stopping → entering SAFE_DUTY mode")
for f in [self.nvme, self.top, self.bottom]:
try:
f.set_speed(SAFE_DUTY)
logging.info(f"{f.name} set to SAFE_DUTY={SAFE_DUTY}%")
except Exception as e:
logging.error(f"Failed to set {f.name} to SAFE_DUTY: {e}")
time.sleep(0.5) # Give PWM time to settle
try:
lgpio.gpiochip_close(self.h)
logging.info("GPIO chip closed successfully")
except Exception as e:
logging.error(f"Failed to close GPIO chip: {e}")
sys.exit(0)
# --------------------------------------------------
def loop(self):
self.startup_boost()
while True:
cpu, nvme = self.get_temps()
cpu_duty = interpolate(cpu, CPU_CURVE)
nvme_duty = interpolate(nvme, NVME_CURVE)
self.top.set_speed(cpu_duty)
self.bottom.set_speed(cpu_duty)
self.nvme.set_speed(nvme_duty)
rpm_nvme = self.nvme.get_rpm()
rpm_top = self.top.get_rpm()
rpm_bottom = self.bottom.get_rpm()
self.check_stall(self.nvme, rpm_nvme)
self.check_stall(self.top, rpm_top)
self.check_stall(self.bottom, rpm_bottom)
self.watchdog_refresh()
logging.info(
f"CPU:{cpu:.1f}C NVMe:{nvme:.1f}C | "
f"RPM NVME:{rpm_nvme} TOP:{rpm_top} BOT:{rpm_bottom}"
)
self.write_stats(cpu, nvme, rpm_nvme, rpm_top, rpm_bottom)
time.sleep(5)
# --------------------------------------------------
# MAIN
# --------------------------------------------------
if __name__ == "__main__":
ctrl = Controller()
signal.signal(signal.SIGTERM, lambda s,f: ctrl.stop())
signal.signal(signal.SIGINT, lambda s,f: ctrl.stop()) # CTRL+C
try:
ctrl.loop()
except Exception as e:
logging.error(f"Unexpected exception: {e}")
ctrl.stop()