Original Post: 06/01/2024
Update: I updated this post as of 09/02/24 to reflect the changes made to the CO2 sensor, namely using 5V for stability, an OLED screen for display, switches for wifi and calibration, and finally an obnoxious buzzer to let me know I should open my windows/door(>1500ppm).
Disclaimer: Please note that the information presented in this article is for informational purposes only. It is not intended to serve as health advice, engineering advice, or any form of professional guidance. Readers should consult with qualified professionals for specific health or engineering concerns and should not rely solely on the content of this article for making decisions. The authors and publishers of this article are not responsible for any actions taken based on the information provided herein.
In this post I intend to cover the basics of SCD41 sensors, their technical performance, some MicroPython code to use with an ESP32 microcontroller(yes I know there are other ones out there), and some of the limitations/precautions one should take with this particular sensor.
What is an SCD41 Sensor?
An SCD41 sensor is Sensirion’s miniature CO2 sensor. It uses a photoacoustic NDIR sensing principle along with Sensirion’s patented technology to offer high accuracy at a competitive price. I personally picked up this sensor for $28.99, and it could likely be found at a lower price directly from the manufacturer or via platforms like AliExpress. The sensor also includes on-chip signal compensation with a built-in SHT4x humidity and temperature sensor to account for fluctuations in environmental conditions, ensuring accurate CO2 readings.
Technical Performance
Generalized CO2 Accuracy Specification:
400-1,000 ppm: ±(50 ppm + 2.5% of reading)
1,001-2,000 ppm: ±(50 ppm + 3% of reading)
2,001-5,000 ppm: ±(40 ppm + 5% of reading)
Temperature and Humidity Specifications:
Temperature Accuracy: -10°C to 60°C range with an accuracy of ±0.8°C
Humidity Accuracy: 0-95% RH range with an accuracy of ±6% RH
More Detailed Specifications…
SCD41 Sensor Series Specifications
|
|
|
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
The response time is roughly every 2 minutes for temperature and every 90 seconds for humidity. Using the longest response time as a basis for our sampling, we get a sampling rate of once every 2 minutes. The sensor’s extensive command set, clearly detailed in the datasheet, simplifies programming but requires familiarity with numerous commands.
Before diving into the fundamentals let’s visit why we even care about CO2 to begin with.
CO2 Levels in an Enclosed Space
CO2 levels increase in an enclosed room predictably with time as it is an enclosed system with n number of producers(people). We can actually do a mass balance around the system if we really wanted to to evaluate the amount of CO2 if we had values per person. A mass balance for CO2 in an enclosed room involves calculating how the amount of CO2 changes over time. Since the room is sealed, we start with an initial amount of CO2 and add the CO2 produced by people inside the room. By knowing how much CO2 each person produces per minute, we can estimate the total CO2 in the room after a certain period. This helps us predict how CO2 levels will rise as people continue to breathe, ensuring the air quality is monitored and maintained.
As CO2 levels increase a number of health complaints can arise and these health complaints are CO2 concentration dependent. The Wisconsin Department of Health Services outline a ppm to health complaint table:
400 ppm: average outdoor air level.
400–1,000 ppm: typical level found in occupied spaces with good air exchange.
1,000–2,000 ppm: level associated with complaints of drowsiness and poor air.
2,000–5,000 ppm: level associated with headaches, sleepiness, and stagnant, stale, stuffy air. Poor concentration, loss of attention, increased heart rate and slight nausea may also be present.
5,000 ppm: this indicates unusual air conditions where high levels of other gases could also be present. Toxicity or oxygen deprivation could occur. This is the permissible exposure limit for daily workplace exposures.
40,000 ppm: this level is immediately harmful due to oxygen deprivation.
We also see plenty of examples of CO2 levels reaching dangerous levels in enclosed spaces and I am including images along with associated posts:
From https://vair-monitor.com/:
From https://cambridgecarbonfootprint.org/:
Now if we move to a more engineering based analysis we go to https://www.engineeringtoolbox.com/pollution-concentration-rooms-d_692.html and find that the carbon dioxide concentration in a room is a function of:
CO2 Concentration Calculation
The carbon dioxide concentration in a room filled with persons after a time \( t \) can be calculated as:
c = (q / (n V)) [1 - (1 / en t)] + (c0 - ci) (1 / en t) + ci
where:
- c = carbon dioxide concentration in the room (m3/m3)
- q = carbon dioxide supplied to the room (m3/h)
- V = volume of the room (m3)
- e = the constant 2.718...
- n = number of air shifts per hour (1/h)
- t = time (hour, h)
- ci = carbon dioxide concentration in the inlet ventilation air (m3/m3)
- c0 = carbon dioxide concentration in the room at start, \( t=0 \) (m3/m3)
Calculator(the same formula as Engineering Toolbox): Defaults: 2 people, 10ft by 10ft room with 10ft feet ceilings, 1 air change per hour, and the makeup air is the standard 400ppm CO2. This is of course assuming absolutely no gaps and you’re truly enclosed. Worst case scenario and as this is not cited anywhere take with a grain of salt!
CO2 Concentration Calculator
CO2 concentration: 0.00657 m3/m3 (6574 ppm)
We also have a similar calculator here: https://www.soletairpower.fi/co2-calculator/
For a more scientifically based study one can navigate to the following: https://www.ncbi.nlm.nih.gov/pmc/articles/PMC7411428/
The study gives a CO2 based occupancy estimation by correlating the levels of CO2 with the number of occupants in a room with the ultimate goal of reducing energy use by dynamically adjusting HVAC systems in real time. This means you don’t ventilate rooms that are unoccupied or ventilate less if they are under-occupied. Its probably possible to tease out a solid mathematical formula that could be put into a calculator, but that is time away from the goal of this article SCD41 and CO2 measurement.
Characteristics of NDIR Sensors
NDIR sensors measure CO2 by exploiting its property to absorb IR light at around 4.2 µm. They use a non-dispersive band-pass filter to allow only the relevant IR wavelengths to pass, hence the name Non-Dispersive Infrared. An image from Wikipedia below outlines various gases and their Mid-infrared absorption spectra:
How does Photoacoustic NDIR Work?
Photoacoustic Non-Dispersive Infrared (NDIR) technology is an advanced method used in sensors like the SCD41 to measure gas concentrations, such as CO2, using light and sound. Here’s a brief explanation of how it works:
Key Steps in Photoacoustic NDIR
Infrared Light Source: An infrared (IR) light source emits light at wavelengths absorbed by CO2.
Gas Absorption: CO2 molecules absorb this light, causing them to heat up and vibrate.
Pressure Waves: The vibrations create tiny sound waves.
Microphone Detection: A microphone detects these pressure waves.
Signal Processing: The sensor converts these waves into electrical signals to determine CO2 concentration.
Output: The final CO2 concentration is displayed for monitoring or further processing.
This method offers high sensitivity and accuracy, making it suitable for real-time CO2 monitoring in various applications, including indoor air quality and industrial processes. The SCD41 sensor is compact, low-power, and cost-effective, ideal for integration into diverse systems.
Transmissive NDIR
These sensors feature an IR emitter and an optical detector at opposite ends of an optical cavity. Here’s a quick rundown of how they work:
IR Emitter: Emits light through the gas sample.
Absorption by CO2: CO2 absorbs specific wavelengths of the IR light.
Detection: The detector measures the transmitted IR light.
Calculation: CO2 concentration is calculated based on the difference in emitted and transmitted light.
Transmissive NDIR sensors require precise positioning and minimal optical path length to ensure accurate readings.
TVOC Sensors and eCO2 Readings
Using Total Volatile Organic Compounds (TVOC) sensors, such as the Sensirion SGP30, to estimate CO2 levels is generally unreliable. TVOC sensors measure the concentration of various organic compounds in the air, which can include emissions from household products, cooking, cleaning agents, and even human breath. While these sensors are adept at detecting a wide range of VOCs, they are not designed to specifically measure CO2.
Why TVOC Sensors Fall Short for CO2 Estimation
Broad Detection Spectrum: TVOC sensors detect a broad range of organic compounds, not just CO2. This means they can be influenced by numerous sources of VOCs that are unrelated to CO2 levels, such as air fresheners, deodorizers, and various chemical products used indoors.
Lack of Specificity: TVOC sensors lack the specificity required to accurately distinguish between CO2 and other VOCs. The presence of other VOCs can cause the sensor to give false indications of high CO2 levels.
Environmental Factors: Various environmental factors such as temperature, humidity, and the presence of other gases can affect the readings of TVOC sensors, further complicating their accuracy when estimating CO2 levels.
Despite these limitations, some vendors and manufacturers still promote TVOC sensors for CO2 estimation. This can be misleading for consumers who may believe they are getting accurate CO2 measurements. It is important for users to understand these limitations and consider more reliable methods, such as NDIR or photoacoustic sensors, for precise CO2 monitoring.
Recommended Use of TVOC Sensors
While TVOC sensors are not suitable for accurate CO2 measurement, they are valuable tools for:
Indoor Air Quality Monitoring: Detecting the presence of harmful VOCs and identifying sources of indoor air pollution.
Health and Safety: Ensuring that indoor environments are free from harmful concentrations of volatile organic compounds.
Industrial Applications: Monitoring air quality in industrial settings to ensure compliance with safety regulations.
Comparing Low-Cost CO2 Sensors
NDIR Sensors
NDIR sensors measure CO2 based on gas absorption of IR light. They typically feature an IR emitter, dual channels for reference and measurement, and calculate CO2 concentration by comparing light absorption in these channels.
Photo-Acoustic Sensors
Photo-acoustic sensors also measure absorption but use a microphone to detect the resulting pressure waves. They are smaller and do not rely on line-of-sight, making them suitable for compact applications.
In the image below we can see a comparison of the photoacoustic and transmissive NDIR sensors:
Working Principle References:
https://www.airgradient.com/blog/co2-sensors-photo-acoustic-vs-ndir/
https://www.sensirion.com/resource/application_note/ndir-sensors-types
Setting up the Circuit
The circuit used here is fairly simple as the SCD41 has 4 inputs.
GROUND = GND
Voltage_in = VDD
SCL = Serial Clock Line
SDA = Serial Data Line
We also use an LCD to monitor the output and one switch to control calibration on startup(two switches for toggling wifi). For the sake of being able to recreate this fast with the same board here are the associated pin numbers and side.
SCD41
SDA: 14-Left
SCL: 17-Left
VDD: 1-Right
GND: 19-Left
Switch Calibration(Left Switch)
Top(ON): 12-Right
GND: Ground Bar-Right
Bottom: N/A
Switch Wifi(Right Switch)
Top(ON): 13-Right
GND:Ground Bar - Right
Bottom: N/A
LCD
SDA:10-Left
SCL:7-Left
VCC: 12-Right
GND: Ground Bar-Right
Buzzer
15-Right
Ground Bar - Right
Note that with some changes to the code one can omit the LCD and the switches, but being able to calibrate on the fly with fresh air is nice as is having an offline version of the sensor. Also note that at 3.3V the peak supply current is typical 175mA with a max of 205mA and at 5.0V typical is 115mA with a max of 137mA. I personally experienced many issues using the 3.3V and switched to 5V and experienced absolutely no issues.
The MicroPython library used is below. Simply open up the file in Thonny and save it to the device as SCD41.py so it can be imported in the main.py script. Note that the comments can be removed to cut down on space on your device. Implementing the Google Drive database and the ThingSpeak database are separate posts. See Raspberry Pi Sensor Server Project for more details on setting that up.
Library Code
# SCD41.py
#This code is pretty heavily based off of the Sensirion type code at: https://github.com/octaprog7/SCD4x
# And this library written by Sensirion: https://github.com/Sensirion/python-i2c-scd
# This file should be saved as SCD41.py and called as import SCD41. See main.py in this folder for an example usage.
import time
import math
from machine import SoftI2C, Pin, SPI
import micropython
import ustruct
@micropython.native
def _calc_crc(sequence) -> int:
"""
Calculate CRC-8 checksum for the given sequence.
"""
return crc8(sequence, polynomial=0x31, init_value=0xFF)
@micropython.native
def check_value(value: int, valid_range, error_msg: str) -> int:
"""
Check if the value is within the valid range. Raise ValueError if not.
"""
if value not in valid_range:
raise ValueError(error_msg)
return value
class Device:
"""Base class for devices."""
def __init__(self, adapter, address: [int, SPI], big_byte_order: bool):
"""
Initialize the device with adapter, address, and byte order.
"""
self.adapter = adapter
self.address = address
self.big_byte_order = big_byte_order
self.msb_first = True
def _get_byteorder_as_str(self) -> tuple:
"""
Return the byte order as a string ('big' or 'little') and format character ('>' or '<').
"""
if self.is_big_byteorder():
return 'big', '>'
else:
return 'little', '<'
def unpack(self, fmt_char: str, source: bytes, redefine_byte_order: str = None) -> tuple:
"""
Unpack the given source bytes according to the format character and byte order.
"""
if not fmt_char:
raise ValueError(f"Invalid length fmt_char parameter: {len(fmt_char)}")
bo = self._get_byteorder_as_str()[1]
if redefine_byte_order is not None:
bo = redefine_byte_order[0]
return ustruct.unpack(bo + fmt_char, source)
@micropython.native
def is_big_byteorder(self) -> bool:
"""
Check if the device uses big byte order.
"""
return self.big_byte_order
class BaseSensor(Device):
"""Base class for sensors."""
def get_id(self):
raise NotImplementedError
def soft_reset(self):
raise NotImplementedError
class Iterator:
"""Iterator class for sensors."""
def __iter__(self):
return self
def __next__(self):
raise NotImplementedError
class BitField:
"""Class for working with bit fields."""
def __init__(self, start: int, stop: int, alias: [str, None]):
"""
Initialize the bit field with start, stop, and alias.
"""
check(start, stop)
self.alias = alias
self.start = start
self.stop = stop
self.bitmask = _bitmask(start, stop)
def put(self, source: int, value: int) -> int:
"""
Write the value to the specified bit range in the source.
"""
src = source & ~self.bitmask
src |= (value << self.start) & self.bitmask
return src
def get(self, source: int) -> int:
"""
Get the value from the specified bit range in the source.
"""
return (source & self.bitmask) >> self.start
@micropython.native
def put(start: int, stop: int, source: int, value: int) -> int:
"""
Write the value to the specified bit range in the source.
"""
check(start, stop)
bitmask = _bitmask(start, stop)
src = source & bitmask
src |= (value << start) & bitmask
return src
@micropython.native
def _bitmask(start: int, stop: int) -> int:
"""
Generate a bitmask from start to stop bits.
"""
res = 0
for i in range(start, 1 + stop):
res |= 1 << i
return res
def check(start: int, stop: int):
"""
Check if start is less than or equal to stop. Raise ValueError if not.
"""
if start > stop:
raise ValueError(f"Invalid start: {start}, stop value: {stop}")
def _mpy_bl(value: int) -> int:
"""
Calculate the bit length of the value.
"""
if 0 == value:
return 0
return 1 + int(math.log2(abs(value)))
class BusAdapter:
"""Adapter class for bus communication."""
def __init__(self, bus: [I2C, SPI]):
"""
Initialize the adapter with the bus.
"""
self.bus = bus
def get_bus_type(self) -> type:
"""
Return the type of the bus.
"""
return type(self.bus)
def read_register(self, device_addr: [int, Pin], reg_addr: int, bytes_count: int) -> bytes:
raise NotImplementedError
def write_register(self, device_addr: [int, Pin], reg_addr: int, value: [int, bytes, bytearray],
bytes_count: int, byte_order: str):
raise NotImplementedError
def read(self, device_addr: [int, Pin], n_bytes: int) -> bytes:
raise NotImplementedError
def write(self, device_addr: [int, Pin], buf: bytes):
raise NotImplementedError
def write_const(self, device_addr: [int, Pin], val: int, count: int):
"""
Write a constant value to the device multiple times.
"""
if 0 == count:
return
bl = _mpy_bl(val)
if bl > 8:
raise ValueError(f"The value must take no more than 8 bits! Current: {bl}")
_max = 16
if count < _max:
_max = count
repeats = count // _max
b = bytearray([val for _ in range(_max)])
for _ in range(repeats):
self.write(device_addr, b)
remainder = count - _max * repeats
if remainder:
b = bytearray([val for _ in range(remainder)])
self.write(device_addr, b)
class I2cAdapter(BusAdapter):
"""Adapter class for I2C bus communication."""
def __init__(self, bus: I2C):
super().__init__(bus)
def write_register(self, device_addr: int, reg_addr: int, value: [int, bytes, bytearray],
bytes_count: int, byte_order: str):
"""
Write to the register of the device.
"""
buf = None
if isinstance(value, int):
buf = value.to_bytes(bytes_count, byte_order)
if isinstance(value, (bytes, bytearray)):
buf = value
return self.bus.writeto_mem(device_addr, reg_addr, buf)
def read_register(self, device_addr: int, reg_addr: int, bytes_count: int) -> bytes:
"""
Read from the register of the device.
"""
return self.bus.readfrom_mem(device_addr, reg_addr, bytes_count)
def read(self, device_addr: int, n_bytes: int) -> bytes:
"""
Read bytes from the device.
"""
return self.bus.readfrom(device_addr, n_bytes)
def readfrom_into(self, device_addr: int, buf):
"""
Read bytes from the device into the buffer.
"""
return self.bus.readfrom_into(device_addr, buf)
def read_buf_from_mem(self, device_addr: int, mem_addr, buf):
"""
Read bytes from the device memory into the buffer.
"""
return self.bus.readfrom_mem_into(device_addr, mem_addr, buf)
def write(self, device_addr: int, buf: bytes):
"""
Write bytes to the device.
"""
return self.bus.writeto(device_addr, buf)
def write_buf_to_mem(self, device_addr: int, mem_addr, buf):
"""
Write bytes to the device memory.
"""
return self.bus.writeto_mem(device_addr, mem_addr, buf)
class SCD4xSensirion(BaseSensor, Iterator):
"""Class for SCD4x Sensirion CO2 sensor."""
def __init__(self, adapter: I2cAdapter, address=0x62, this_is_scd41: bool = True, check_crc: bool = True):
"""
Initialize the sensor with adapter, address, and settings.
"""
super().__init__(adapter, address, True)
self._buf_3 = bytearray((0 for _ in range(3)))
self._buf_9 = bytearray((0 for _ in range(9)))
self.check_crc = check_crc
self._low_power_mode = False
self._single_shot_mode = False
self._rht_only = False
self._isSCD41 = this_is_scd41
self.byte_order = self._get_byteorder_as_str()
def _get_local_buf(self, bytes_for_read: int) -> [None, bytearray]:
"""
Return the local buffer for reading.
"""
if bytes_for_read not in (0, 3, 9):
raise ValueError(f"Invalid value for bytes_for_read: {bytes_for_read}")
if not bytes_for_read:
return None
if 3 == bytes_for_read:
return self._buf_3
return self._buf_9
def _to_bytes(self, value, length: int):
"""
Convert value to bytes with specified length.
"""
byteorder = self.byte_order[0]
return value.to_bytes(length, byteorder)
def _write(self, buf: bytes) -> bytes:
"""
Write buffer to the device.
"""
return self.adapter.write(self.address, buf)
def _readfrom_into(self, buf):
"""
Read bytes from the device into the buffer.
"""
return self.adapter.readfrom_into(self.address, buf)
def _send_command(self, cmd: int, value: [bytes, None], wait_time: int = 0, bytes_for_read: int = 0,
crc_index: range = None, value_index: tuple = None) -> [bytes, None]:
"""
Send a command to the sensor.
"""
raw_cmd = self._to_bytes(cmd, 2)
raw_out = raw_cmd
if value:
raw_out += value
raw_out += self._to_bytes(_calc_crc(value), 1)
self._write(raw_out)
if wait_time:
time.sleep_ms(wait_time)
if not bytes_for_read:
return None
b = self._get_local_buf(bytes_for_read)
self._readfrom_into(b)
check_value(len(b), (bytes_for_read,), f"Invalid buffer length for cmd: {cmd}. Received {len(b)} out of {bytes_for_read}")
if self.check_crc:
crc_from_buf = [b[i] for i in crc_index]
calculated_crc = [_calc_crc(b[rng.start:rng.stop]) for rng in value_index]
if crc_from_buf != calculated_crc:
raise ValueError(f"Invalid CRC! Calculated{calculated_crc}. From buffer {crc_from_buf}")
return b
def save_config(self):
"""
Save the sensor configuration to EEPROM.
"""
cmd = 0x3615
self._send_command(cmd, None, 800)
def get_id(self) -> tuple:
"""
Get the unique serial number of the sensor.
"""
cmd = 0x3682
b = self._send_command(cmd, None, 0, bytes_for_read=9,
crc_index=range(2, 9, 3), value_index=(range(2), range(3, 5), range(6, 8)))
return tuple([(b[i] << 8) | b[i+1] for i in range(0, 9, 3)])
def soft_reset(self):
"""
Perform a soft reset of the sensor.
"""
return None
def exec_self_test(self) -> bool:
"""
Execute self-test on the sensor. Returns True if successful.
"""
cmd = 0x3639
length = 3
b = self._send_command(cmd, None, wait_time=10_000,
bytes_for_read=length, crc_index=range(2, 3), value_index=(range(2),))
res = self.unpack("H", b)[0]
return 0 == res
def reinit(self) -> None:
"""
Reinitialize the sensor by reloading user settings from EEPROM.
"""
cmd = 0x3646
self._send_command(cmd, None, 20)
def set_temperature_offset(self, offset: float):
"""
Set the temperature offset for the sensor.
"""
cmd = 0x241D
offset_raw = self._to_bytes(int(374.49142857 * offset), 2)
self._send_command(cmd, offset_raw, 1)
def get_temperature_offset(self) -> float:
"""
Get the temperature offset from the sensor.
"""
cmd = 0x2318
b = self._send_command(cmd, None, wait_time=1, bytes_for_read=3, crc_index=range(2, 3), value_index=(range(2),))
temp_offs = self.unpack("H", b)[0]
return 0.0026702880859375 * temp_offs
def set_altitude(self, masl: int):
"""
Set the altitude for the sensor in meters above sea level.
"""
cmd = 0x2427
masl_raw = self._to_bytes(masl, 2)
self._send_command(cmd, masl_raw, 1)
def get_altitude(self) -> int:
"""
Get the altitude from the sensor in meters above sea level.
"""
cmd = 0x2322
b = self._send_command(cmd, None, wait_time=1, bytes_for_read=3, crc_index=range(2, 3), value_index=(range(2),))
return self.unpack("H", b)[0]
def set_ambient_pressure(self, pressure: float):
"""
Set the ambient pressure for the sensor in Pascals.
"""
cmd = 0xE000
press_raw = self._to_bytes(int(pressure // 100), 2)
self._send_command(cmd, press_raw, 1)
def force_recalibration(self, target_co2_concentration: int) -> int:
"""
Force recalibration of the sensor with the target CO2 concentration.
"""
check_value(target_co2_concentration, range(2**16),
f"Invalid target CO2 concentration: {target_co2_concentration} ppm")
cmd = 0x362F
target_raw = self._to_bytes(target_co2_concentration, 2)
b = self._send_command(cmd, target_raw, 400, 3, crc_index=range(2, 3), value_index=(range(2),))
return self.unpack("h", b)[0]
def is_auto_calibration(self) -> bool:
"""
Check if automatic self-calibration is enabled on the sensor.
"""
cmd = 0x2313
b = self._send_command(cmd, None, 1, 3, crc_index=range(2, 3), value_index=(range(2),))
return 0 != self.unpack("H", b)[0]
def set_auto_calibration(self, value: bool):
"""
Enable or disable automatic self-calibration on the sensor.
"""
cmd = 0x2416
value_raw = self._to_bytes(value, 2)
self._send_command(cmd, value_raw, 1, 3)
def set_measurement(self, start: bool, single_shot: bool = False, rht_only: bool = False):
"""
Start or stop periodic measurements, or perform a single shot measurement.
"""
if single_shot:
return self._single_shot_meas(rht_only)
return self._periodic_measurement(start)
def _periodic_measurement(self, start: bool):
"""
Start or stop periodic measurements.
"""
wt = 0
if start:
cmd = 0x21AC if self._low_power_mode else 0x21B1
else:
cmd = 0x3F86
wt = 500
self._send_command(cmd, None, wt)
self._single_shot_mode = False
self._rht_only = False
def get_meas_data(self) -> tuple:
"""
Get the measurement data from the sensor (CO2, temperature, and humidity).
"""
cmd = 0xEC05
val_index = (range(2), range(3, 5), range(6, 8))
b = self._send_command(cmd, None, 1, bytes_for_read=9,
crc_index=range(2, 9, 3), value_index=val_index)
words = [self.unpack("H", b[val_rng.start:val_rng.stop])[0] for val_rng in val_index]
return words[0], -45 + 0.0026703288 * words[1], 0.0015259022 * words[2]
def is_data_ready(self) -> bool:
"""
Check if the measurement data is ready to be read from the sensor.
"""
cmd = 0xE4B8
b = self._send_command(cmd, None, 1, 3, crc_index=range(2, 3), value_index=(range(2),))
return bool(self.unpack("H", b)[0] & 0b0000_0111_1111_1111)
@micropython.native
def get_conversion_cycle_time(self) -> int:
"""
Get the conversion cycle time of the sensor in milliseconds.
"""
if self.is_single_shot_mode and self.is_rht_only:
return 50
return 5000
def set_power(self, value: bool):
"""
Power up or power down the sensor.
"""
if not self._isSCD41:
return
cmd = 0x36F6 if value else 0x36E0
wt = 20 if value else 1
self._send_command(cmd, None, wt)
def _single_shot_meas(self, rht_only: bool = False):
"""
Perform a single shot measurement.
"""
if not self._isSCD41:
return
cmd = 0x2196 if rht_only else 0x219D
self._send_command(cmd, None, 0)
self._single_shot_mode = True
self._rht_only = rht_only
@property
def is_single_shot_mode(self) -> bool:
"""
Check if the sensor is in single shot mode.
"""
return self._single_shot_mode
@property
def is_rht_only(self) -> bool:
"""
Check if the sensor is in RHT-only mode.
"""
return self._rht_only
def __iter__(self):
return self
def __next__(self) -> [tuple, None]:
"""
Get the next set of measurement data.
"""
if self._single_shot_mode:
return None
if self.is_data_ready():
return self.get_meas_data()
return None
def pa_mmhg(value: float) -> float:
"""
Convert air pressure from Pascals to millimeters of mercury.
"""
return 7.50062E-3 * value
def crc8(sequence, polynomial: int, init_value: int = 0x00):
"""
Calculate CRC-8 checksum for the given sequence.
"""
mask = 0xFF
crc = init_value & mask
for item in sequence:
crc ^= item & mask
for _ in range(8):
if crc & 0x80:
crc = mask & ((crc << 1) ^ polynomial)
else:
crc = mask & (crc << 1)
return crc
def check_device_presence(i2c, address):
"""
Check if a device with the given address is present on the I2C bus.
"""
devices = i2c.scan()
return address in devicesAnd once you have that done run the following code below on your device or save it to main.py, whatever you’d like. Note that we power cycle every 6 hours on the device to ensure that we don’t run into memory issues. I encountered a memory leak that became a problem roughly 5 days in and felt that chasing it down was a waste compared to a simple power cycling using machine.reset().
Main Code
# main.py
from machine import SoftI2C, Pin, reset, freq
import time
import gc # Make sure we import gc
from SCD41 import SCD4xSensirion, I2cAdapter, check_device_presence
from ssd1306 import SSD1306_I2C # Ensure you have the SSD1306 library
import urequests as requests
import network
import json
import utime
import usocket as socket
import ssl
import ntptime
# ThingSpeak settings
THINGSPEAK_API_KEY = 'YOUR_KEY_HERE'
THINGSPEAK_URL = 'https://api.thingspeak.com/update'
THINGSPEAK_CHANNEL_ID = '00000000'
THINGSPEAK_BULK_UPDATE_URL = 'https://api.thingspeak.com/channels/'+str(THINGSPEAK_CHANNEL_ID)+'/bulk_update.json'
SEND_TO_THINGSPEAK = True
thingspeak_buffer = [] # Buffer for ThingSpeak data
# Google Sheets settings
SPREADSHEET_ID = 'VERY_LONG_SPREADSHEET_ID_HERE'
RANGE_NAME = 'Sheet1!A1:C1'
SHEET_NAME = 'Sheet1'
GOOGLE_URL = 'https://script.google.com/macros/s/VERY_LONG_URL/exec'
# WiFi settings
SSID = 'YOUR_SSID'
PASSWORD = 'WIFI_PSWD'
# NTP sync settings
last_ntp_sync = 0 # Last NTP sync timestamp
NTP_SYNC_INTERVAL = 3600 # Sync once per hour (in seconds)
switchOnWifi = Pin(32, Pin.IN, Pin.PULL_UP)
switchOnCalibrate = Pin(33, Pin.IN, Pin.PULL_UP)
buzzer = Pin(26, Pin.OUT)
switch_state_calibrate = switchOnCalibrate.value() # If switch is in up position we consider it on.
print(switch_state_calibrate)
switch_state_wifi = switchOnWifi.value() # If switch is in up position we consider it on.
print(switch_state_wifi)
if switch_state_calibrate == 0:
print("Calibration switch is on! Calibrating Device...")
if switch_state_wifi == 0:
print("Wifi switch is on! Will use Wifi...")
# Setup GPIO for OLED power
oled_power = Pin(19, Pin.OUT)
# Function to get system status
def get_system_status(firstRun):
gc.collect() # Ensure memory is collected before checking
free_heap = gc.mem_free()
total_heap = gc.mem_alloc() + free_heap
free_heap_percent = (free_heap / total_heap) * 100
if firstRun == True:
print(f"Total heap memory: {total_heap} bytes")
# Additional information about the system
print(f"Frequency: {freq()} Hz")
print(f"Free heap memory: {free_heap} bytes ({free_heap_percent:.2f}%)")
def connect_wifi(ssid, password):
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
# Check if already connected
if wlan.isconnected():
print("Already connected to WiFi")
print(wlan.ifconfig())
return True
print("Connecting to WiFi...")
wlan.connect(ssid, password)
# Wait with timeout instead of infinite loop
max_wait = 20 # 20 seconds timeout
while max_wait > 0 and not wlan.isconnected():
max_wait -= 1
time.sleep(1)
print("Connecting to WiFi...")
if wlan.isconnected():
print("Connected to WiFi")
print(wlan.ifconfig())
return True
else:
print("Failed to connect to WiFi")
return False
def get_time_chicago():
"""Get Chicago time with NTP sync once per hour"""
global last_ntp_sync
current_time = time.time()
# Check if we should sync (if an hour has passed since last sync)
if current_time - last_ntp_sync >= NTP_SYNC_INTERVAL:
print(f"NTP sync needed (last sync was {(current_time - last_ntp_sync) / 60:.1f} minutes ago)")
max_retries = 3 # Limit retries to save resources
for attempt in range(max_retries):
try:
ntptime.settime()
last_ntp_sync = time.time() # Update the last sync time
print(f"NTP sync successful at: {last_ntp_sync}")
break
except OSError as e:
print(f"Failed to get NTP time, attempt {attempt + 1} of {max_retries}. Error: {e}")
time.sleep(1)
else:
print("Could not sync with NTP server, using internal RTC time")
else:
minutes_since_sync = (current_time - last_ntp_sync) / 60
print(f"Using cached time (next sync in {60 - minutes_since_sync:.1f} minutes)")
# Get current time and apply timezone offset
current_time = utime.localtime()
# Determine if it is daylight saving time (DST)
month = current_time[1]
day = current_time[2]
hour = current_time[3]
if (month > 3 and month < 11) or (month == 3 and day >= 8 and hour >= 2) or (month == 11 and day < 1 and hour < 2):
is_dst = True
else:
is_dst = False
offset = -6 * 3600 if not is_dst else -5 * 3600
local_time = utime.mktime(current_time) + offset
return utime.localtime(local_time)
# Function to sound the buzzer
def sound_buzzer():
for _ in range(5):
buzzer.value(1) # Turn on buzzer
time.sleep(0.25) # 250 ms delay
buzzer.value(0) # Turn off buzzer
time.sleep(0.25) # 250 ms delay
# Memory-optimized version of send_data_to_google_sheets
def send_data_to_google_sheets(data):
for attempt in range(3):
try:
# Force garbage collection before making a network connection
gc.collect()
# Format the URL components separately to avoid long string concatenation
url = GOOGLE_URL
_, _, host, path = url.split('/', 3)
# Format data carefully to minimize memory usage
encoded_data = "Date=" + data['date']
encoded_data += "&Time=" + data['time']
encoded_data += "&CO2=" + str(data['co2'])
encoded_data += "&Temperature=" + str(data['temp_f'])
encoded_data += "&Humidity=" + str(data['humidity'])
s = None
try:
# Create socket first without SSL
addr = socket.getaddrinfo(host, 443)[0][-1]
s = socket.socket()
s.settimeout(15) # Add timeout to prevent hanging
s.connect(addr)
# Build request in parts
request_header = f"POST /{path} HTTP/1.1\r\nHost: {host}\r\n"
request_header += "Content-Type: application/x-www-form-urlencoded\r\n"
request_header += f"Content-Length: {len(encoded_data)}\r\n\r\n"
# Wrap socket with SSL AFTER connecting
s = ssl.wrap_socket(s)
# Send in two parts to reduce memory pressure
s.write(request_header)
s.write(encoded_data)
# Read minimal response to verify success
response = s.read(64)
# Close the socket before any other operations
s.close()
s = None
print('Data sent to Google Sheets!')
gc.collect() # Force collection after network operation
return True
except Exception as e:
print('Failed to send data to Google Sheets:', e)
if s:
try:
s.close()
except:
pass
s = None
gc.collect() # Force collection after error
except Exception as e:
print(f"Exception occurred... Retrying sending data to Google Sheets {attempt + 1} of 3 attempts.")
time.sleep(2) # Wait before retry
gc.collect() # Force collection
return False # Failed after all attempts
# Memory-optimized ThingSpeak function
def send_data_to_thingspeak():
"""Send data to ThingSpeak."""
if not SEND_TO_THINGSPEAK or not thingspeak_buffer:
return False
for attempt in range(3):
try:
# Force garbage collection before sending
gc.collect()
if len(thingspeak_buffer) > 1:
# Handle bulk update with memory considerations
bulk_send_thingspeak()
else:
# Handle single update
data = thingspeak_buffer.pop(0)
payload = {
'api_key': THINGSPEAK_API_KEY,
'field1': data['co2'],
'field2': data['temp_f'],
'field3': data['humidity']
}
try:
response = requests.post(THINGSPEAK_URL, json=payload)
if response.status_code == 200:
print('Data posted to ThingSpeak:', response.text)
response.close()
thingspeak_buffer.clear() # Clear any remaining data
return True
else:
print(f'Failed to send data to ThingSpeak: {response.status_code}')
response.close()
except Exception as e:
print('Failed to send data to ThingSpeak:', e)
time.sleep(2) # Wait before retry
gc.collect() # Force collection
except Exception as e:
print(f"Exception occurred... Retrying sending data to ThingSpeak {attempt + 1} of 3 attempts.")
time.sleep(2) # Wait before retry
gc.collect() # Force collection
return False # Failed after all attempts
# Helper function to handle bulk ThingSpeak updates with memory considerations
def bulk_send_thingspeak():
# Force garbage collection
gc.collect()
# Prepare payload
payload = {
'write_api_key': THINGSPEAK_API_KEY,
'updates': []
}
# Add updates to payload
for data in thingspeak_buffer:
update = {
'created_at': f"{data['date']} {data['time']} -0500",
'field1': data['co2'],
'field2': data['temp_f'],
'field3': data['humidity']
}
payload['updates'].append(update)
# Convert to JSON
json_data = json.dumps(payload)
# Send request
try:
headers = {'Content-Type': 'application/json'}
response = requests.post(THINGSPEAK_BULK_UPDATE_URL, headers=headers, data=json_data)
if response.status_code == 202:
print('Data posted to ThingSpeak (bulk update):', response.text)
thingspeak_buffer.clear() # Clear the buffer after successful update
response.close()
return True
else:
print(f'Failed to send data to ThingSpeak (bulk update): {response.status_code}')
response.close()
return False
except Exception as e:
print('Failed to send data to ThingSpeak (bulk update):', e)
return False
finally:
gc.collect() # Force collection
def get_sensor_reading(sensor, conversion_cycle_time, last_update_sensor):
"""Get a single sensor reading with a delay equal to the conversion cycle time."""
update = False
while update == False:
time_since_update = time.time() - last_update_sensor
print(time_since_update)
if time_since_update >= 30:
try:
# Check if data is ready
if sensor.is_data_ready():
update = True
return sensor.get_meas_data()
else:
print("Data not ready.")
update = False
time.sleep(1)
return None
except OSError as e:
print(f"Error during sensor reading: {e}")
update = True
return None
else:
time.sleep(1)
update = False
led = Pin(2, Pin.OUT)
led.value(0) # turn off red LED, doesn't work :L hard soldered in.
def main():
# Force garbage collection at start
gc.collect()
get_system_status(True)
reading_count = 0
if switch_state_wifi == 0:
# Connect to WiFi
connect_wifi(SSID, PASSWORD)
# Initial time sync
get_time_chicago()
# Initialize I2C communication with the specified pins and frequency
i2c = SoftI2C(scl=Pin(22), sda=Pin(21), freq=400_000)
device_address = 0x62
# Check if the device is present on the I2C bus
if not check_device_presence(i2c, device_address):
print(f"Device with address {device_address} not found on I2C bus.")
return
print(f"Device with address {device_address} found on I2C bus.")
oled_power.value(1) # Turn on OLED display
# Setup SoftI2C for OLED
i2c_oled = SoftI2C(scl=Pin(4), sda=Pin(5))
# Scan for OLED device
print('Scanning for I2C devices...')
devices = i2c_oled.scan()
if len(devices) == 0:
print("No I2C OLED devices found.")
OLEDaddress = None
else:
print('I2C OLED devices found:', len(devices))
OLEDaddress = devices[0] # Assuming the first device found is the OLED
for device in devices:
print("Device address: ", hex(device))
# Initialize OLED if found
oled = None
if OLEDaddress:
print("Testing OLED...")
oled = SSD1306_I2C(128, 64, i2c_oled)
i = 0
while i < 5:
time.sleep(1)
oled_power.value(1)
oled.fill(0)
oled.text(f"Init: Test {i}", 0, 0)
oled.show()
time.sleep(1)
i = i+1
oled_power.value(0)
oled_power.value(1)
oled.fill(0)
if switch_state_calibrate == 0:
oled.text(f"CALIBRATION ON", 0, 0)
if switch_state_calibrate == 1:
oled.text(f"CALIBRATION OFF", 0, 0)
if switch_state_wifi == 0:
oled.text(f"WIFI ON", 0, 10)
if switch_state_wifi == 1:
oled.text(f"WIFI OFF", 0, 10)
oled.show()
time.sleep(5) # Reduced from 10 to 5 seconds to save power
oled_power.value(0)
# Create an I2C adapter and sensor instance
adapter = I2cAdapter(i2c)
sensor = SCD4xSensirion(adapter)
sensor.set_measurement(start=False, single_shot=False) #if looping need to put sensor back into IDLE mode. Sensor can't exec self test if currently reading...
# Check if sensor is good to go. Note you may need to power cycle to pass this test. Make sure 5V in.
selfTest = False
retry_count = 0
while selfTest == False and retry_count < 5: # Added retry limit
try:
if sensor.exec_self_test():
print("Sensor self test shows good to go")
selfTest = True
else:
print("Sensor self test failed!")
selfTest = False
retry_count += 1
time.sleep(1)
except Exception as e:
print("Exception occurred:", e)
retry_count += 1
time.sleep(1)
# Ensure the sensor is in IDLE mode
sensor.set_measurement(start=False, single_shot=False)
# Retrieve and display the sensor ID
sensor_id = sensor.get_id()
print(f"Sensor ID: {sensor_id[0]:x}:{sensor_id[1]:x}:{sensor_id[2]:x}")
# Retrieve and display the temperature offset
temp_offset = sensor.get_temperature_offset()
print(f"Temperature offset: {temp_offset:.6f} F")
# Set and retrieve the altitude (meters above sea level)
altitude = 198
retrieved_altitude = sensor.get_altitude()
print(f"Desired Altitude: {altitude} m, Retrieved Altitude: {retrieved_altitude} m")
if altitude != retrieved_altitude:
sensor.set_altitude(altitude)
print(f"Altitude set to: {altitude}")
# Check if automatic self-calibration is enabled
if sensor.is_auto_calibration():
print("Automatic self-calibration is ON.")
else:
print("Automatic self-calibration is OFF.")
# Start periodic measurement
sensor._low_power_mode = True
sensor.set_measurement(start=True, single_shot=False)
conversion_cycle_time = sensor.get_conversion_cycle_time()
last_update_sensor = time.time()
print(f"Low Power Mode: {sensor._low_power_mode}")
print(f"Low power periodic measurement started with conversion cycle time: {conversion_cycle_time} ms")
print("Periodic measurement started.")
# Handle calibration mode
if switch_state_calibrate == 0:
# We want to calibrate it with this chunk if anything...
# Start periodic measurement
print("Performing sensor calibration in 30 seconds...")
print("The sensor will be calibrated over 60 min in fresh air.")
time.sleep(30)
sensorCycle = 0
calibrateMe = True
# Perform factory reset and reinitialize
sensor.perform_factory_reset()
print("Factory reset performed.")
time.sleep(2) # Wait for the factory reset to complete
# Perform soft reset and reinitialize
sensor.soft_reset()
time.sleep(1) # Wait for the reset to complete
sensor.set_measurement(start=False, single_shot=False)
time.sleep(1)
sensor.set_measurement(start=True, single_shot=False)
time.sleep(1)
print("Performing soft reset and reinitialization...")
while sensorCycle < 60:
# Force garbage collection in calibration loop
gc.collect()
conversion_cycle_time = sensor.get_conversion_cycle_time()
reading = get_sensor_reading(sensor, conversion_cycle_time, last_update_sensor)
if reading:
co2, temp, humidity = reading
last_update_sensor = time.time()
temp_f = temp * 9 / 5 + 32 # Convert to Fahrenheit
time.sleep(30)
sensorCycle += 1
print(f"Continuous Reading: CO2: {co2} ppm, Temperature: {temp_f:.2f} F, Humidity: {humidity:.2f} %")
print(f"Sensor Cycled {sensorCycle} times of 60")
# Display on OLED during calibration
if oled:
oled_power.value(1)
oled.fill(0)
oled.text(f"CALIBRATING", 0, 0)
oled.text(f"Cycle: {sensorCycle}/60", 0, 10)
oled.text(f"CO2: {co2} ppm", 0, 20)
oled.text(f"Temp: {temp_f:.1f}F", 0, 30)
oled.text(f"Hum: {humidity:.1f}%", 0, 40)
oled.show()
time.sleep(5) # Show for 5 seconds
oled_power.value(0)
if sensorCycle == 59:
if calibrateMe == True:
print("Initiating forced recalibration...")
target_co2_concentration = int(426 * 1.15) # Mauna Loa CO2 = 426.57ppm April 2024 + 15-20% differential for being near/in a major city(Chicago), to int bc need int for low level
correction_value = sensor.force_recalibration(target_co2_concentration)
print(f"Forced recalibration completed with correction value: {correction_value} ppm")
sensor.set_auto_calibration(False)
print(f"Sensor Auto Calibration set to: {sensor.is_auto_calibration()}. Rerun the program after power cycling with the calibration commented out to double check persistence!")
sensor.save_config()
print("Settings saved.")
else:
# Reset the sensor and reinitialize if an error occurs
print("Attempting to reinitialize the sensor...")
sensor.set_measurement(start=False, single_shot=False)
sensor.set_measurement(start=True, single_shot=False)
else:
print("Skipping calibration.")
# Main monitoring loop
while True:
try:
# Check memory status
gc.collect()
get_system_status(False)
# Eventually...somewhere...memory will leak. Power Cycle every 6 hours for good measure.
if reading_count >= 360:
print("Scheduled restart after 360 readings")
reset()
date_str = None
reading = get_sensor_reading(sensor, conversion_cycle_time, last_update_sensor)
if reading:
co2, temp, humidity = reading
last_update_sensor = time.time()
temp_f = temp * 9 / 5 + 32
print(f"Continuous Reading: CO2: {co2} ppm, Temperature: {temp_f:.2f} F, Humidity: {humidity:.2f} %")
get_system_status(True)
reading_count = reading_count + 1
# Check CO2 level and sound the buzzer if above threshold
if co2 > 1500:
print("CO2 level exceeded 1500 ppm. Activating buzzer.")
sound_buzzer()
# Handle data transmission if WiFi is enabled
if switch_state_wifi == 0:
# Check WiFi connection and reconnect if needed
if not network.WLAN(network.STA_IF).isconnected():
print("WiFi connection lost, reconnecting...")
connect_wifi(SSID, PASSWORD)
# Get current date and time (with hourly NTP sync)
current_time = get_time_chicago()
date_str = "{:04}-{:02}-{:02}".format(current_time[0], current_time[1], current_time[2])
time_str = "{:02}:{:02}:{:02}".format(current_time[3], current_time[4], current_time[5])
# Prepare data for sending
data = {
'date': date_str,
'time': time_str,
'co2': co2,
'temp_f': temp_f,
'humidity': humidity
}
# Collect garbage before network operations
gc.collect()
# Send to Google Sheets
send_data_to_google_sheets(data)
# Add to ThingSpeak buffer and send
thingspeak_buffer.append(data)
send_data_to_thingspeak()
elif switch_state_wifi == 1:
thingspeak_buffer.clear()
# OLED Logic - display for a shorter time to save power
if oled:
oled_power.value(1)
oled.fill(0)
oled.text(f'Reading: {reading_count}', 0, 0)
if date_str is not None:
oled.text(f'Date: {date_str}', 0, 10)
oled.text(f'Time: {time_str}', 0, 20)
oled.text(f'CO2:{int(co2)}ppm', 0, 30)
oled.text(f'Temp:{temp_f:.2f}F',0,40)
oled.text(f'Hum:{humidity:.2f}%', 0, 50)
oled.show()
time.sleep(10) # Reduced from 20 to 10 seconds to save power
oled_power.value(0)
# Force garbage collection after all operations
gc.collect()
except Exception as e:
print("Error occurred:", e)
time.sleep(5) # Wait for 5 seconds before retrying
gc.collect() # Force collection after error
if __name__ == '__main__':
# Initialize garbage collection
gc.collect()
try:
main()
except Exception as e:
print("Error occurred:", e)
time.sleep(5) # Wait for 5 seconds before retryingI’m not going to the go too much into the code as the SCD41 chip supports a variety of different modes and unless you do a deep dive into the datasheet and the code for some cursed reason you should be fine with the code above. I was able to log readings fairly close to the reported values for my town and breathing on the device and near the device increased the sensor readings reliably. I did note that the sensor took some time to get back down to baseline and took a bit to get up to the max with me breathing on it. It was roughly 2.5 minutes to get back down to baseline and that’s roughly in line with what we would ideally sample at due to the humidity sampling interval mentioned in the technical specs above. See image below:
Limitations of the SCD41
As mentioned above we will have a slight delay in logging values so the results won’t be instantaneous. Additionally as mentioned by: https://github.com/octaprog7/SCD4x one will notice an increase in temperature reading from the sensor if the interval is less than 15 seconds as the sensor will self heat. Additionally, both the SCD40/41 sensors have an auto-calibrate mode which will take the lowest CO2 value from the past 7 days and assumes it is 400ppm. This can cause sensor drift over time unless one regularly(once a week) exposes the sensor to fresh air. It is possible to turn off this auto-calibration mode and one will note that in the code above[set_auto_calibration]. An anecdote from User Anx2K on the r/ESP32 Reddit mentions that there is roughly a 40ppm(10%) drift year to year if one turns off auto calibration. Calibration takes 5 minutes, but is an inherently manual process. It would be nice to be somewhere near a weather monitoring station and set up this CO2 monitor alongside a MH-Z19B/C and MH-Z1+ which use NDIR and compare the initial and end of 24 hour values and assess drift.
Applications of the SCD41
I am particularly interested in environmental monitoring to assess indoor air quality at a competitive price, allowing users to swap sensors in and out as needed. This flexibility is crucial for adapting to different monitoring requirements without significant additional costs.
Additionally, monitoring transient dump flows from businesses during off-hours (midnight to 3 AM) is vital. This involves tracking the release of CO2 and other gaseous components using the MQ-* sensor series. While this approach may not provide quantitative measurements due to various confounding factors, it serves as an initial assessment tool. This preliminary analysis can justify the purchase or lease of more advanced testing equipment, costing under $100 in upfront expenses and requiring only a few hours of labor.
Another intriguing experiment involves mapping a town with sensors placed every few blocks to monitor localized CO2 concentrations. This data can be correlated with consumer, commercial, and industrial activities, providing valuable insights into the town’s environmental impact.
Other applications include monitoring the respiratory activity of plants by measuring CO2 exchange in greenhouse or agricultural research settings. This can offer insights into plant health, photosynthetic efficiency, and growth patterns. Pairing this with R/G/B, UV, and light sensors can help determine several growth parameters. Note that NPK (Nitrogen/Phosphorous/Potassium) sensors are often unreliable based on my research. Therefore, it’s better to use actual chemical testing methods or automate the chemical testing apparatus rather than relying on electronic sensors for these measurements.
In biotechnology and biochemical engineering contexts, monitoring CO2 levels in cellular cultures and bioreactors is essential. Maintaining precise CO2 concentrations ensures the optimal growth of microorganisms, which can significantly reduce production costs. This is particularly important for the pharmaceutical industry, biofuel production, and other bio-based manufacturing processes.
Personal Testing
In the recent heat wave I noted that it took roughly 30 minutes to air out the house and cut CO2 levels from ~1200ppm to ~650ppm.
Its interesting to note that the temperature jumped only a few degrees over the 30 minutes per below:
The actual “enclosure” of the sensor is just an old Raspberry Pi 5 Case box, a FLIRC model. The solid wire coming from the main board is hooked up to a smaller board used for data display, switching wifi/calibration on/off, and alerting users to high CO2(1500ppm). Note that I didn’t feel like potentially burning the board or buying a soldering attachment to safely remove the LED so I covered it with some cardboard to remove some of the interference caused by the light on the sensor. See below:
Raw Boards, no case
Wifi/Calibration Screen, with case
Normal Output, with case
Conclusion
The SCD41 sensor offers a versatile and accurate solution for CO2 monitoring in various applications. For our particular application, provided we either calibrate it once at the beginning then once more every year we can enjoy accurate CO2 monitoring at a low price.










