SCD41: On Demand CO2 Sensor

ESP32
IoT
Author

Jesse Anderson

Published

September 2, 2024

Original Post: 06/01/2024

Update: I updated this post as of 09/02/24 to reflect the changes made to the CO2 sensor, namely using 5V for stability, an OLED screen for display, switches for wifi and calibration, and finally an obnoxious buzzer to let me know I should open my windows/door(>1500ppm).

Disclaimer: Please note that the information presented in this article is for informational purposes only. It is not intended to serve as health advice, engineering advice, or any form of professional guidance. Readers should consult with qualified professionals for specific health or engineering concerns and should not rely solely on the content of this article for making decisions. The authors and publishers of this article are not responsible for any actions taken based on the information provided herein.

In this post I intend to cover the basics of SCD41 sensors, their technical performance, some MicroPython code to use with an ESP32 microcontroller(yes I know there are other ones out there), and some of the limitations/precautions one should take with this particular sensor.

What is an SCD41 Sensor?

An SCD41 sensor is Sensirion’s miniature CO2 sensor. It uses a photoacoustic NDIR sensing principle along with Sensirion’s patented technology to offer high accuracy at a competitive price. I personally picked up this sensor for $28.99, and it could likely be found at a lower price directly from the manufacturer or via platforms like AliExpress. The sensor also includes on-chip signal compensation with a built-in SHT4x humidity and temperature sensor to account for fluctuations in environmental conditions, ensuring accurate CO2 readings.

Technical Performance

Generalized CO2 Accuracy Specification:

  • 400-1,000 ppm: ±(50 ppm + 2.5% of reading)

  • 1,001-2,000 ppm: ±(50 ppm + 3% of reading)

  • 2,001-5,000 ppm: ±(40 ppm + 5% of reading)

Temperature and Humidity Specifications:

  • Temperature Accuracy: -10°C to 60°C range with an accuracy of ±0.8°C

  • Humidity Accuracy: 0-95% RH range with an accuracy of ±6% RH

More Detailed Specifications…
SCD41 Sensor Series Specifications
Specification
Value
Unit
CO₂ Measurement Range
0 - 40,000
ppm
CO₂ Measurement Accuracy (SCD41)
400 - 1,000
±(50 ppm + 2.5% of reading) ppm
1,001 - 2,000
±(50 ppm + 3% of reading) ppm
2,001 - 5,000
±(40 ppm + 5% of reading) ppm
CO₂ Repeatability
Typical
±10 ppm
CO₂ Response Time (τ63%)
Typical
60 s
Additional Accuracy Drift (5 years)
400 - 2,000, with ASC enabled
±(5 ppm + 0.5% of reading) ppm
Humidity Measurement Range
0 - 100
%RH
Humidity Accuracy (typical)
15 °C – 35 °C, 20 %RH – 65 %RH
±6 %RH
-10 °C – 60 °C, 0 %RH – 100 %RH
±9 %RH
Humidity Repeatability
Typical
±0.4 %RH
Humidity Response Time (τ63%)
Typical
90 s
Humidity Accuracy Drift
Yearly
<0.25 %RH
Temperature Measurement Range
-10 - 60
°C
Temperature Accuracy (typical)
15 °C – 35 °C
±0.8 °C
-10 °C – 60 °C
±1.5 °C
Temperature Repeatability
-
±0.1 °C
Temperature Response Time (τ63%)
Typical
120 s
Temperature Accuracy Drift
Yearly
<0.03 °C
Supply Voltage
2.4 - 5.5
V
Average Supply Current
Typical
15 mA
Max. Supply Current
-
205 mA

The response time is roughly every 2 minutes for temperature and every 90 seconds for humidity. Using the longest response time as a basis for our sampling, we get a sampling rate of once every 2 minutes. The sensor’s extensive command set, clearly detailed in the datasheet, simplifies programming but requires familiarity with numerous commands.

Before diving into the fundamentals let’s visit why we even care about CO2 to begin with.

CO2 Levels in an Enclosed Space

CO2 levels increase in an enclosed room predictably with time as it is an enclosed system with n number of producers(people). We can actually do a mass balance around the system if we really wanted to to evaluate the amount of CO2 if we had values per person. A mass balance for CO2 in an enclosed room involves calculating how the amount of CO2 changes over time. Since the room is sealed, we start with an initial amount of CO2 and add the CO2 produced by people inside the room. By knowing how much CO2 each person produces per minute, we can estimate the total CO2 in the room after a certain period. This helps us predict how CO2 levels will rise as people continue to breathe, ensuring the air quality is monitored and maintained.

As CO2 levels increase a number of health complaints can arise and these health complaints are CO2 concentration dependent. The Wisconsin Department of Health Services outline a ppm to health complaint table:

  • 400 ppm: average outdoor air level.

  • 400–1,000 ppm: typical level found in occupied spaces with good air exchange.

  • 1,000–2,000 ppm: level associated with complaints of drowsiness and poor air.

  • 2,000–5,000 ppm: level associated with headaches, sleepiness, and stagnant, stale, stuffy air. Poor concentration, loss of attention, increased heart rate and slight nausea may also be present.

  • 5,000 ppm: this indicates unusual air conditions where high levels of other gases could also be present. Toxicity or oxygen deprivation could occur. This is the permissible exposure limit for daily workplace exposures.

  • 40,000 ppm: this level is immediately harmful due to oxygen deprivation.

We also see plenty of examples of CO2 levels reaching dangerous levels in enclosed spaces and I am including images along with associated posts:

From https://vair-monitor.com/:

From https://cambridgecarbonfootprint.org/:

Now if we move to a more engineering based analysis we go to https://www.engineeringtoolbox.com/pollution-concentration-rooms-d_692.html and find that the carbon dioxide concentration in a room is a function of:

CO2 Concentration Calculation

The carbon dioxide concentration in a room filled with persons after a time \( t \) can be calculated as:

c = (q / (n V)) [1 - (1 / en t)] + (c0 - ci) (1 / en t) + ci                              
    

where:

  • c = carbon dioxide concentration in the room (m3/m3)
  • q = carbon dioxide supplied to the room (m3/h)
  • V = volume of the room (m3)
  • e = the constant 2.718...
  • n = number of air shifts per hour (1/h)
  • t = time (hour, h)
  • ci = carbon dioxide concentration in the inlet ventilation air (m3/m3)
  • c0 = carbon dioxide concentration in the room at start, \( t=0 \) (m3/m3)

Calculator(the same formula as Engineering Toolbox): Defaults: 2 people, 10ft by 10ft room with 10ft feet ceilings, 1 air change per hour, and the makeup air is the standard 400ppm CO2. This is of course assuming absolutely no gaps and you’re truly enclosed. Worst case scenario and as this is not cited anywhere take with a grain of salt!

CO2 Concentration Calculator

CO2 Concentration Calculator















CO2 concentration: 0.00657 m3/m3 (6574 ppm)

We also have a similar calculator here: https://www.soletairpower.fi/co2-calculator/

For a more scientifically based study one can navigate to the following: https://www.ncbi.nlm.nih.gov/pmc/articles/PMC7411428/

The study gives a CO2 based occupancy estimation by correlating the levels of CO2 with the number of occupants in a room with the ultimate goal of reducing energy use by dynamically adjusting HVAC systems in real time. This means you don’t ventilate rooms that are unoccupied or ventilate less if they are under-occupied. Its probably possible to tease out a solid mathematical formula that could be put into a calculator, but that is time away from the goal of this article SCD41 and CO2 measurement.

Characteristics of NDIR Sensors

NDIR sensors measure CO2 by exploiting its property to absorb IR light at around 4.2 µm. They use a non-dispersive band-pass filter to allow only the relevant IR wavelengths to pass, hence the name Non-Dispersive Infrared. An image from Wikipedia below outlines various gases and their Mid-infrared absorption spectra:

How does Photoacoustic NDIR Work?

Photoacoustic Non-Dispersive Infrared (NDIR) technology is an advanced method used in sensors like the SCD41 to measure gas concentrations, such as CO2, using light and sound. Here’s a brief explanation of how it works:

Key Steps in Photoacoustic NDIR

  1. Infrared Light Source: An infrared (IR) light source emits light at wavelengths absorbed by CO2.

  2. Gas Absorption: CO2 molecules absorb this light, causing them to heat up and vibrate.

  3. Pressure Waves: The vibrations create tiny sound waves.

  4. Microphone Detection: A microphone detects these pressure waves.

  5. Signal Processing: The sensor converts these waves into electrical signals to determine CO2 concentration.

  6. Output: The final CO2 concentration is displayed for monitoring or further processing.

This method offers high sensitivity and accuracy, making it suitable for real-time CO2 monitoring in various applications, including indoor air quality and industrial processes. The SCD41 sensor is compact, low-power, and cost-effective, ideal for integration into diverse systems.

Transmissive NDIR

These sensors feature an IR emitter and an optical detector at opposite ends of an optical cavity. Here’s a quick rundown of how they work:

  1. IR Emitter: Emits light through the gas sample.

  2. Absorption by CO2: CO2 absorbs specific wavelengths of the IR light.

  3. Detection: The detector measures the transmitted IR light.

  4. Calculation: CO2 concentration is calculated based on the difference in emitted and transmitted light.

Transmissive NDIR sensors require precise positioning and minimal optical path length to ensure accurate readings.

TVOC Sensors and eCO2 Readings

Using Total Volatile Organic Compounds (TVOC) sensors, such as the Sensirion SGP30, to estimate CO2 levels is generally unreliable. TVOC sensors measure the concentration of various organic compounds in the air, which can include emissions from household products, cooking, cleaning agents, and even human breath. While these sensors are adept at detecting a wide range of VOCs, they are not designed to specifically measure CO2.

Why TVOC Sensors Fall Short for CO2 Estimation

  • Broad Detection Spectrum: TVOC sensors detect a broad range of organic compounds, not just CO2. This means they can be influenced by numerous sources of VOCs that are unrelated to CO2 levels, such as air fresheners, deodorizers, and various chemical products used indoors.

  • Lack of Specificity: TVOC sensors lack the specificity required to accurately distinguish between CO2 and other VOCs. The presence of other VOCs can cause the sensor to give false indications of high CO2 levels.

  • Environmental Factors: Various environmental factors such as temperature, humidity, and the presence of other gases can affect the readings of TVOC sensors, further complicating their accuracy when estimating CO2 levels.

Despite these limitations, some vendors and manufacturers still promote TVOC sensors for CO2 estimation. This can be misleading for consumers who may believe they are getting accurate CO2 measurements. It is important for users to understand these limitations and consider more reliable methods, such as NDIR or photoacoustic sensors, for precise CO2 monitoring.

Comparing Low-Cost CO2 Sensors

NDIR Sensors

NDIR sensors measure CO2 based on gas absorption of IR light. They typically feature an IR emitter, dual channels for reference and measurement, and calculate CO2 concentration by comparing light absorption in these channels.

Photo-Acoustic Sensors

Photo-acoustic sensors also measure absorption but use a microphone to detect the resulting pressure waves. They are smaller and do not rely on line-of-sight, making them suitable for compact applications.

In the image below we can see a comparison of the photoacoustic and transmissive NDIR sensors:

Working Principle References:

https://www.airgradient.com/blog/co2-sensors-photo-acoustic-vs-ndir/

https://www.sensirion.com/resource/application_note/ndir-sensors-types

https://www.ncbi.nlm.nih.gov/pmc/articles/PMC7248969/

Setting up the Circuit

The circuit used here is fairly simple as the SCD41 has 4 inputs.

  • GROUND = GND

  • Voltage_in = VDD

  • SCL = Serial Clock Line

  • SDA = Serial Data Line

We also use an LCD to monitor the output and one switch to control calibration on startup(two switches for toggling wifi). For the sake of being able to recreate this fast with the same board here are the associated pin numbers and side.

SCD41

  • SDA: 14-Left

  • SCL: 17-Left

  • VDD: 1-Right

  • GND: 19-Left

Switch Calibration(Left Switch)

  • Top(ON): 12-Right

  • GND: Ground Bar-Right

  • Bottom: N/A

Switch Wifi(Right Switch)

  • Top(ON): 13-Right

  • GND:Ground Bar - Right

  • Bottom: N/A

LCD

  • SDA:10-Left

  • SCL:7-Left

  • VCC: 12-Right

  • GND: Ground Bar-Right

Buzzer

  • 15-Right

  • Ground Bar - Right

Note that with some changes to the code one can omit the LCD and the switches, but being able to calibrate on the fly with fresh air is nice as is having an offline version of the sensor. Also note that at 3.3V the peak supply current is typical 175mA with a max of 205mA and at 5.0V typical is 115mA with a max of 137mA. I personally experienced many issues using the 3.3V and switched to 5V and experienced absolutely no issues.

The MicroPython library used is below. Simply open up the file in Thonny and save it to the device as SCD41.py so it can be imported in the main.py script. Note that the comments can be removed to cut down on space on your device. Implementing the Google Drive database and the ThingSpeak database are separate posts. See Raspberry Pi Sensor Server Project for more details on setting that up.

Library Code
# SCD41.py
#This code is pretty heavily based off of the Sensirion type code at: https://github.com/octaprog7/SCD4x
# And this library written by Sensirion: https://github.com/Sensirion/python-i2c-scd
# This file should be saved as SCD41.py and called as import SCD41. See main.py in this folder for an example usage.
import time
import math
from machine import SoftI2C, Pin, SPI
import micropython
import ustruct

@micropython.native
def _calc_crc(sequence) -> int:
    """
    Calculate CRC-8 checksum for the given sequence.
    """
    return crc8(sequence, polynomial=0x31, init_value=0xFF)

@micropython.native
def check_value(value: int, valid_range, error_msg: str) -> int:
    """
    Check if the value is within the valid range. Raise ValueError if not.
    """
    if value not in valid_range:
        raise ValueError(error_msg)
    return value

class Device:
    """Base class for devices."""
    def __init__(self, adapter, address: [int, SPI], big_byte_order: bool):
        """
        Initialize the device with adapter, address, and byte order.
        """
        self.adapter = adapter
        self.address = address
        self.big_byte_order = big_byte_order
        self.msb_first = True

    def _get_byteorder_as_str(self) -> tuple:
        """
        Return the byte order as a string ('big' or 'little') and format character ('>' or '<').
        """
        if self.is_big_byteorder():
            return 'big', '>'
        else:
            return 'little', '<'

    def unpack(self, fmt_char: str, source: bytes, redefine_byte_order: str = None) -> tuple:
        """
        Unpack the given source bytes according to the format character and byte order.
        """
        if not fmt_char:
            raise ValueError(f"Invalid length fmt_char parameter: {len(fmt_char)}")
        bo = self._get_byteorder_as_str()[1]
        if redefine_byte_order is not None:
            bo = redefine_byte_order[0]
        return ustruct.unpack(bo + fmt_char, source)

    @micropython.native
    def is_big_byteorder(self) -> bool:
        """
        Check if the device uses big byte order.
        """
        return self.big_byte_order

class BaseSensor(Device):
    """Base class for sensors."""
    def get_id(self):
        raise NotImplementedError

    def soft_reset(self):
        raise NotImplementedError

class Iterator:
    """Iterator class for sensors."""
    def __iter__(self):
        return self

    def __next__(self):
        raise NotImplementedError

class BitField:
    """Class for working with bit fields."""
    def __init__(self, start: int, stop: int, alias: [str, None]):
        """
        Initialize the bit field with start, stop, and alias.
        """
        check(start, stop)
        self.alias = alias
        self.start = start
        self.stop = stop
        self.bitmask = _bitmask(start, stop)

    def put(self, source: int, value: int) -> int:
        """
        Write the value to the specified bit range in the source.
        """
        src = source & ~self.bitmask
        src |= (value << self.start) & self.bitmask
        return src

    def get(self, source: int) -> int:
        """
        Get the value from the specified bit range in the source.
        """
        return (source & self.bitmask) >> self.start

@micropython.native
def put(start: int, stop: int, source: int, value: int) -> int:
    """
    Write the value to the specified bit range in the source.
    """
    check(start, stop)
    bitmask = _bitmask(start, stop)
    src = source & bitmask
    src |= (value << start) & bitmask
    return src

@micropython.native
def _bitmask(start: int, stop: int) -> int:
    """
    Generate a bitmask from start to stop bits.
    """
    res = 0
    for i in range(start, 1 + stop):
        res |= 1 << i
    return res

def check(start: int, stop: int):
    """
    Check if start is less than or equal to stop. Raise ValueError if not.
    """
    if start > stop:
        raise ValueError(f"Invalid start: {start}, stop value: {stop}")

def _mpy_bl(value: int) -> int:
    """
    Calculate the bit length of the value.
    """
    if 0 == value:
        return 0
    return 1 + int(math.log2(abs(value)))

class BusAdapter:
    """Adapter class for bus communication."""
    def __init__(self, bus: [I2C, SPI]):
        """
        Initialize the adapter with the bus.
        """
        self.bus = bus

    def get_bus_type(self) -> type:
        """
        Return the type of the bus.
        """
        return type(self.bus)

    def read_register(self, device_addr: [int, Pin], reg_addr: int, bytes_count: int) -> bytes:
        raise NotImplementedError

    def write_register(self, device_addr: [int, Pin], reg_addr: int, value: [int, bytes, bytearray],
                       bytes_count: int, byte_order: str):
        raise NotImplementedError

    def read(self, device_addr: [int, Pin], n_bytes: int) -> bytes:
        raise NotImplementedError

    def write(self, device_addr: [int, Pin], buf: bytes):
        raise NotImplementedError

    def write_const(self, device_addr: [int, Pin], val: int, count: int):
        """
        Write a constant value to the device multiple times.
        """
        if 0 == count:
            return
        bl = _mpy_bl(val)
        if bl > 8:
            raise ValueError(f"The value must take no more than 8 bits! Current: {bl}")
        _max = 16
        if count < _max:
            _max = count
        repeats = count // _max
        b = bytearray([val for _ in range(_max)])
        for _ in range(repeats):
            self.write(device_addr, b)
        remainder = count - _max * repeats
        if remainder:
            b = bytearray([val for _ in range(remainder)])
            self.write(device_addr, b)

class I2cAdapter(BusAdapter):
    """Adapter class for I2C bus communication."""
    def __init__(self, bus: I2C):
        super().__init__(bus)

    def write_register(self, device_addr: int, reg_addr: int, value: [int, bytes, bytearray],
                       bytes_count: int, byte_order: str):
        """
        Write to the register of the device.
        """
        buf = None
        if isinstance(value, int):
            buf = value.to_bytes(bytes_count, byte_order)
        if isinstance(value, (bytes, bytearray)):
            buf = value
        return self.bus.writeto_mem(device_addr, reg_addr, buf)

    def read_register(self, device_addr: int, reg_addr: int, bytes_count: int) -> bytes:
        """
        Read from the register of the device.
        """
        return self.bus.readfrom_mem(device_addr, reg_addr, bytes_count)

    def read(self, device_addr: int, n_bytes: int) -> bytes:
        """
        Read bytes from the device.
        """
        return self.bus.readfrom(device_addr, n_bytes)

    def readfrom_into(self, device_addr: int, buf):
        """
        Read bytes from the device into the buffer.
        """
        return self.bus.readfrom_into(device_addr, buf)

    def read_buf_from_mem(self, device_addr: int, mem_addr, buf):
        """
        Read bytes from the device memory into the buffer.
        """
        return self.bus.readfrom_mem_into(device_addr, mem_addr, buf)

    def write(self, device_addr: int, buf: bytes):
        """
        Write bytes to the device.
        """
        return self.bus.writeto(device_addr, buf)

    def write_buf_to_mem(self, device_addr: int, mem_addr, buf):
        """
        Write bytes to the device memory.
        """
        return self.bus.writeto_mem(device_addr, mem_addr, buf)

class SCD4xSensirion(BaseSensor, Iterator):
    """Class for SCD4x Sensirion CO2 sensor."""
    def __init__(self, adapter: I2cAdapter, address=0x62, this_is_scd41: bool = True, check_crc: bool = True):
        """
        Initialize the sensor with adapter, address, and settings.
        """
        super().__init__(adapter, address, True)
        self._buf_3 = bytearray((0 for _ in range(3)))
        self._buf_9 = bytearray((0 for _ in range(9)))
        self.check_crc = check_crc
        self._low_power_mode = False
        self._single_shot_mode = False
        self._rht_only = False
        self._isSCD41 = this_is_scd41
        self.byte_order = self._get_byteorder_as_str()

    def _get_local_buf(self, bytes_for_read: int) -> [None, bytearray]:
        """
        Return the local buffer for reading.
        """
        if bytes_for_read not in (0, 3, 9):
            raise ValueError(f"Invalid value for bytes_for_read: {bytes_for_read}")
        if not bytes_for_read:
            return None
        if 3 == bytes_for_read:
            return self._buf_3
        return self._buf_9

    def _to_bytes(self, value, length: int):
        """
        Convert value to bytes with specified length.
        """
        byteorder = self.byte_order[0]
        return value.to_bytes(length, byteorder)

    def _write(self, buf: bytes) -> bytes:
        """
        Write buffer to the device.
        """
        return self.adapter.write(self.address, buf)

    def _readfrom_into(self, buf):
        """
        Read bytes from the device into the buffer.
        """
        return self.adapter.readfrom_into(self.address, buf)

    def _send_command(self, cmd: int, value: [bytes, None], wait_time: int = 0, bytes_for_read: int = 0,
                      crc_index: range = None, value_index: tuple = None) -> [bytes, None]:
        """
        Send a command to the sensor.
        """
        raw_cmd = self._to_bytes(cmd, 2)
        raw_out = raw_cmd
        if value:
            raw_out += value
            raw_out += self._to_bytes(_calc_crc(value), 1)
        self._write(raw_out)
        if wait_time:
            time.sleep_ms(wait_time)
        if not bytes_for_read:
            return None
        b = self._get_local_buf(bytes_for_read)
        self._readfrom_into(b)
        check_value(len(b), (bytes_for_read,), f"Invalid buffer length for cmd: {cmd}. Received {len(b)} out of {bytes_for_read}")
        if self.check_crc:
            crc_from_buf = [b[i] for i in crc_index]
            calculated_crc = [_calc_crc(b[rng.start:rng.stop]) for rng in value_index]
            if crc_from_buf != calculated_crc:
                raise ValueError(f"Invalid CRC! Calculated{calculated_crc}. From buffer {crc_from_buf}")
        return b

    def save_config(self):
        """
        Save the sensor configuration to EEPROM.
        """
        cmd = 0x3615
        self._send_command(cmd, None, 800)

    def get_id(self) -> tuple:
        """
        Get the unique serial number of the sensor.
        """
        cmd = 0x3682
        b = self._send_command(cmd, None, 0, bytes_for_read=9,
                               crc_index=range(2, 9, 3), value_index=(range(2), range(3, 5), range(6, 8)))
        return tuple([(b[i] << 8) | b[i+1] for i in range(0, 9, 3)])

    def soft_reset(self):
        """
        Perform a soft reset of the sensor.
        """
        return None

    def exec_self_test(self) -> bool:
        """
        Execute self-test on the sensor. Returns True if successful.
        """
        cmd = 0x3639
        length = 3
        b = self._send_command(cmd, None, wait_time=10_000,
                               bytes_for_read=length, crc_index=range(2, 3), value_index=(range(2),))
        res = self.unpack("H", b)[0]
        return 0 == res

    def reinit(self) -> None:
        """
        Reinitialize the sensor by reloading user settings from EEPROM.
        """
        cmd = 0x3646
        self._send_command(cmd, None, 20)

    def set_temperature_offset(self, offset: float):
        """
        Set the temperature offset for the sensor.
        """
        cmd = 0x241D
        offset_raw = self._to_bytes(int(374.49142857 * offset), 2)
        self._send_command(cmd, offset_raw, 1)

    def get_temperature_offset(self) -> float:
        """
        Get the temperature offset from the sensor.
        """
        cmd = 0x2318
        b = self._send_command(cmd, None, wait_time=1, bytes_for_read=3, crc_index=range(2, 3), value_index=(range(2),))
        temp_offs = self.unpack("H", b)[0]
        return 0.0026702880859375 * temp_offs

    def set_altitude(self, masl: int):
        """
        Set the altitude for the sensor in meters above sea level.
        """
        cmd = 0x2427
        masl_raw = self._to_bytes(masl, 2)
        self._send_command(cmd, masl_raw, 1)

    def get_altitude(self) -> int:
        """
        Get the altitude from the sensor in meters above sea level.
        """
        cmd = 0x2322
        b = self._send_command(cmd, None, wait_time=1, bytes_for_read=3, crc_index=range(2, 3), value_index=(range(2),))
        return self.unpack("H", b)[0]

    def set_ambient_pressure(self, pressure: float):
        """
        Set the ambient pressure for the sensor in Pascals.
        """
        cmd = 0xE000
        press_raw = self._to_bytes(int(pressure // 100), 2)
        self._send_command(cmd, press_raw, 1)

    def force_recalibration(self, target_co2_concentration: int) -> int:
        """
        Force recalibration of the sensor with the target CO2 concentration.
        """
        check_value(target_co2_concentration, range(2**16),
                    f"Invalid target CO2 concentration: {target_co2_concentration} ppm")
        cmd = 0x362F
        target_raw = self._to_bytes(target_co2_concentration, 2)
        b = self._send_command(cmd, target_raw, 400, 3, crc_index=range(2, 3), value_index=(range(2),))
        return self.unpack("h", b)[0]

    def is_auto_calibration(self) -> bool:
        """
        Check if automatic self-calibration is enabled on the sensor.
        """
        cmd = 0x2313
        b = self._send_command(cmd, None, 1, 3, crc_index=range(2, 3), value_index=(range(2),))
        return 0 != self.unpack("H", b)[0]

    def set_auto_calibration(self, value: bool):
        """
        Enable or disable automatic self-calibration on the sensor.
        """
        cmd = 0x2416
        value_raw = self._to_bytes(value, 2)
        self._send_command(cmd, value_raw, 1, 3)

    def set_measurement(self, start: bool, single_shot: bool = False, rht_only: bool = False):
        """
        Start or stop periodic measurements, or perform a single shot measurement.
        """
        if single_shot:
            return self._single_shot_meas(rht_only)
        return self._periodic_measurement(start)

    def _periodic_measurement(self, start: bool):
        """
        Start or stop periodic measurements.
        """
        wt = 0
        if start:
            cmd = 0x21AC if self._low_power_mode else 0x21B1
        else:
            cmd = 0x3F86
            wt = 500
        self._send_command(cmd, None, wt)
        self._single_shot_mode = False
        self._rht_only = False

    def get_meas_data(self) -> tuple:
        """
        Get the measurement data from the sensor (CO2, temperature, and humidity).
        """
        cmd = 0xEC05
        val_index = (range(2), range(3, 5), range(6, 8))
        b = self._send_command(cmd, None, 1, bytes_for_read=9,
                               crc_index=range(2, 9, 3), value_index=val_index)
        words = [self.unpack("H", b[val_rng.start:val_rng.stop])[0] for val_rng in val_index]
        return words[0], -45 + 0.0026703288 * words[1], 0.0015259022 * words[2]

    def is_data_ready(self) -> bool:
        """
        Check if the measurement data is ready to be read from the sensor.
        """
        cmd = 0xE4B8
        b = self._send_command(cmd, None, 1, 3, crc_index=range(2, 3), value_index=(range(2),))
        return bool(self.unpack("H", b)[0] & 0b0000_0111_1111_1111)

    @micropython.native
    def get_conversion_cycle_time(self) -> int:
        """
        Get the conversion cycle time of the sensor in milliseconds.
        """
        if self.is_single_shot_mode and self.is_rht_only:
            return 50
        return 5000

    def set_power(self, value: bool):
        """
        Power up or power down the sensor.
        """
        if not self._isSCD41:
            return
        cmd = 0x36F6 if value else 0x36E0
        wt = 20 if value else 1
        self._send_command(cmd, None, wt)

    def _single_shot_meas(self, rht_only: bool = False):
        """
        Perform a single shot measurement.
        """
        if not self._isSCD41:
            return
        cmd = 0x2196 if rht_only else 0x219D
        self._send_command(cmd, None, 0)
        self._single_shot_mode = True
        self._rht_only = rht_only

    @property
    def is_single_shot_mode(self) -> bool:
        """
        Check if the sensor is in single shot mode.
        """
        return self._single_shot_mode

    @property
    def is_rht_only(self) -> bool:
        """
        Check if the sensor is in RHT-only mode.
        """
        return self._rht_only

    def __iter__(self):
        return self

    def __next__(self) -> [tuple, None]:
        """
        Get the next set of measurement data.
        """
        if self._single_shot_mode:
            return None
        if self.is_data_ready():
            return self.get_meas_data()
        return None

def pa_mmhg(value: float) -> float:
    """
    Convert air pressure from Pascals to millimeters of mercury.
    """
    return 7.50062E-3 * value

def crc8(sequence, polynomial: int, init_value: int = 0x00):
    """
    Calculate CRC-8 checksum for the given sequence.
    """
    mask = 0xFF
    crc = init_value & mask
    for item in sequence:
        crc ^= item & mask
        for _ in range(8):
            if crc & 0x80:
                crc = mask & ((crc << 1) ^ polynomial)
            else:
                crc = mask & (crc << 1)
    return crc

def check_device_presence(i2c, address):
    """
    Check if a device with the given address is present on the I2C bus.
    """
    devices = i2c.scan()
    return address in devices

And once you have that done run the following code below on your device or save it to main.py, whatever you’d like. Note that we power cycle every 6 hours on the device to ensure that we don’t run into memory issues. I encountered a memory leak that became a problem roughly 5 days in and felt that chasing it down was a waste compared to a simple power cycling using machine.reset().

Main Code
# main.py

from machine import SoftI2C, Pin, reset, freq
import time
import gc  # Make sure we import gc
from SCD41 import SCD4xSensirion, I2cAdapter, check_device_presence
from ssd1306 import SSD1306_I2C  # Ensure you have the SSD1306 library
import urequests as requests
import network
import json
import utime
import usocket as socket
import ssl
import ntptime

# ThingSpeak settings
THINGSPEAK_API_KEY = 'YOUR_KEY_HERE'
THINGSPEAK_URL = 'https://api.thingspeak.com/update'
THINGSPEAK_CHANNEL_ID = '00000000'
THINGSPEAK_BULK_UPDATE_URL = 'https://api.thingspeak.com/channels/'+str(THINGSPEAK_CHANNEL_ID)+'/bulk_update.json'
SEND_TO_THINGSPEAK = True

thingspeak_buffer = []  # Buffer for ThingSpeak data

# Google Sheets settings
SPREADSHEET_ID = 'VERY_LONG_SPREADSHEET_ID_HERE'
RANGE_NAME = 'Sheet1!A1:C1'
SHEET_NAME = 'Sheet1'
GOOGLE_URL = 'https://script.google.com/macros/s/VERY_LONG_URL/exec'

# WiFi settings
SSID = 'YOUR_SSID'
PASSWORD = 'WIFI_PSWD'
# NTP sync settings
last_ntp_sync = 0  # Last NTP sync timestamp
NTP_SYNC_INTERVAL = 3600  # Sync once per hour (in seconds)

switchOnWifi = Pin(32, Pin.IN, Pin.PULL_UP)
switchOnCalibrate = Pin(33, Pin.IN, Pin.PULL_UP)
buzzer = Pin(26, Pin.OUT)

switch_state_calibrate = switchOnCalibrate.value() # If switch is in up position we consider it on.
print(switch_state_calibrate)

switch_state_wifi = switchOnWifi.value() # If switch is in up position we consider it on.
print(switch_state_wifi)

if switch_state_calibrate == 0:
    print("Calibration switch is on! Calibrating Device...")
if switch_state_wifi == 0:
    print("Wifi switch is on! Will use Wifi...")
    
# Setup GPIO for OLED power
oled_power = Pin(19, Pin.OUT)

# Function to get system status
def get_system_status(firstRun):
    gc.collect()  # Ensure memory is collected before checking
    free_heap = gc.mem_free()
    total_heap = gc.mem_alloc() + free_heap
    free_heap_percent = (free_heap / total_heap) * 100
    if firstRun == True:
        print(f"Total heap memory: {total_heap} bytes")
        # Additional information about the system
        print(f"Frequency: {freq()} Hz")
    print(f"Free heap memory: {free_heap} bytes ({free_heap_percent:.2f}%)")
    
def connect_wifi(ssid, password):
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    
    # Check if already connected
    if wlan.isconnected():
        print("Already connected to WiFi")
        print(wlan.ifconfig())
        return True
        
    print("Connecting to WiFi...")
    wlan.connect(ssid, password)
    
    # Wait with timeout instead of infinite loop
    max_wait = 20  # 20 seconds timeout
    while max_wait > 0 and not wlan.isconnected():
        max_wait -= 1
        time.sleep(1)
        print("Connecting to WiFi...")
    
    if wlan.isconnected():
        print("Connected to WiFi")
        print(wlan.ifconfig())
        return True
    else:
        print("Failed to connect to WiFi")
        return False
    
def get_time_chicago():
    """Get Chicago time with NTP sync once per hour"""
    global last_ntp_sync
    current_time = time.time()
    
    # Check if we should sync (if an hour has passed since last sync)
    if current_time - last_ntp_sync >= NTP_SYNC_INTERVAL:
        print(f"NTP sync needed (last sync was {(current_time - last_ntp_sync) / 60:.1f} minutes ago)")
        max_retries = 3  # Limit retries to save resources
        for attempt in range(max_retries):
            try:
                ntptime.settime()
                last_ntp_sync = time.time()  # Update the last sync time
                print(f"NTP sync successful at: {last_ntp_sync}")
                break
            except OSError as e:
                print(f"Failed to get NTP time, attempt {attempt + 1} of {max_retries}. Error: {e}")
                time.sleep(1)
        else:
            print("Could not sync with NTP server, using internal RTC time")
    else:
        minutes_since_sync = (current_time - last_ntp_sync) / 60
        print(f"Using cached time (next sync in {60 - minutes_since_sync:.1f} minutes)")
    
    # Get current time and apply timezone offset
    current_time = utime.localtime()
    
    # Determine if it is daylight saving time (DST)
    month = current_time[1]
    day = current_time[2]
    hour = current_time[3]
    if (month > 3 and month < 11) or (month == 3 and day >= 8 and hour >= 2) or (month == 11 and day < 1 and hour < 2):
        is_dst = True
    else:
        is_dst = False
    
    offset = -6 * 3600 if not is_dst else -5 * 3600
    local_time = utime.mktime(current_time) + offset
    return utime.localtime(local_time)

# Function to sound the buzzer
def sound_buzzer():
    for _ in range(5):
        buzzer.value(1)  # Turn on buzzer
        time.sleep(0.25)  # 250 ms delay
        buzzer.value(0)  # Turn off buzzer
        time.sleep(0.25)  # 250 ms delay

# Memory-optimized version of send_data_to_google_sheets
def send_data_to_google_sheets(data):
    for attempt in range(3):
        try:
            # Force garbage collection before making a network connection
            gc.collect()
            
            # Format the URL components separately to avoid long string concatenation
            url = GOOGLE_URL
            _, _, host, path = url.split('/', 3)
            
            # Format data carefully to minimize memory usage
            encoded_data = "Date=" + data['date']
            encoded_data += "&Time=" + data['time']
            encoded_data += "&CO2=" + str(data['co2'])
            encoded_data += "&Temperature=" + str(data['temp_f'])
            encoded_data += "&Humidity=" + str(data['humidity'])
            
            s = None
            try:
                # Create socket first without SSL
                addr = socket.getaddrinfo(host, 443)[0][-1]
                s = socket.socket()
                s.settimeout(15)  # Add timeout to prevent hanging
                s.connect(addr)
                
                # Build request in parts
                request_header = f"POST /{path} HTTP/1.1\r\nHost: {host}\r\n"
                request_header += "Content-Type: application/x-www-form-urlencoded\r\n"
                request_header += f"Content-Length: {len(encoded_data)}\r\n\r\n"
                
                # Wrap socket with SSL AFTER connecting
                s = ssl.wrap_socket(s)
                
                # Send in two parts to reduce memory pressure
                s.write(request_header)
                s.write(encoded_data)
                
                # Read minimal response to verify success
                response = s.read(64)
                
                # Close the socket before any other operations
                s.close()
                s = None
                
                print('Data sent to Google Sheets!')
                gc.collect()  # Force collection after network operation
                return True
                
            except Exception as e:
                print('Failed to send data to Google Sheets:', e)
                if s:
                    try:
                        s.close()
                    except:
                        pass
                    s = None
                gc.collect()  # Force collection after error
                
        except Exception as e:
            print(f"Exception occurred... Retrying sending data to Google Sheets {attempt + 1} of 3 attempts.")
            time.sleep(2)  # Wait before retry
            gc.collect()  # Force collection
    
    return False  # Failed after all attempts

# Memory-optimized ThingSpeak function
def send_data_to_thingspeak():
    """Send data to ThingSpeak."""
    if not SEND_TO_THINGSPEAK or not thingspeak_buffer:
        return False
        
    for attempt in range(3):
        try:
            # Force garbage collection before sending
            gc.collect()
            
            if len(thingspeak_buffer) > 1:
                # Handle bulk update with memory considerations
                bulk_send_thingspeak()
            else:
                # Handle single update
                data = thingspeak_buffer.pop(0)
                payload = {
                    'api_key': THINGSPEAK_API_KEY,
                    'field1': data['co2'],
                    'field2': data['temp_f'],
                    'field3': data['humidity']
                }
                
                try:
                    response = requests.post(THINGSPEAK_URL, json=payload)
                    if response.status_code == 200:
                        print('Data posted to ThingSpeak:', response.text)
                        response.close()
                        thingspeak_buffer.clear()  # Clear any remaining data
                        return True
                    else:
                        print(f'Failed to send data to ThingSpeak: {response.status_code}')
                        response.close()
                except Exception as e:
                    print('Failed to send data to ThingSpeak:', e)
            
            time.sleep(2)  # Wait before retry
            gc.collect()  # Force collection
            
        except Exception as e:
            print(f"Exception occurred... Retrying sending data to ThingSpeak {attempt + 1} of 3 attempts.")
            time.sleep(2)  # Wait before retry
            gc.collect()  # Force collection
    
    return False  # Failed after all attempts

# Helper function to handle bulk ThingSpeak updates with memory considerations
def bulk_send_thingspeak():
    # Force garbage collection
    gc.collect()
    
    # Prepare payload
    payload = {
        'write_api_key': THINGSPEAK_API_KEY,
        'updates': []
    }
    
    # Add updates to payload
    for data in thingspeak_buffer:
        update = {
            'created_at': f"{data['date']} {data['time']} -0500",
            'field1': data['co2'],
            'field2': data['temp_f'],
            'field3': data['humidity']
        }
        payload['updates'].append(update)
    
    # Convert to JSON
    json_data = json.dumps(payload)
    
    # Send request
    try:
        headers = {'Content-Type': 'application/json'}
        response = requests.post(THINGSPEAK_BULK_UPDATE_URL, headers=headers, data=json_data)
        
        if response.status_code == 202:
            print('Data posted to ThingSpeak (bulk update):', response.text)
            thingspeak_buffer.clear()  # Clear the buffer after successful update
            response.close()
            return True
        else:
            print(f'Failed to send data to ThingSpeak (bulk update): {response.status_code}')
            response.close()
            return False
    except Exception as e:
        print('Failed to send data to ThingSpeak (bulk update):', e)
        return False
    finally:
        gc.collect()  # Force collection

def get_sensor_reading(sensor, conversion_cycle_time, last_update_sensor):
    """Get a single sensor reading with a delay equal to the conversion cycle time."""
    update = False
    while update == False:
        time_since_update = time.time() - last_update_sensor
        print(time_since_update)
        if time_since_update >= 30:       
            try:
                # Check if data is ready
                if sensor.is_data_ready():
                    update = True
                    return sensor.get_meas_data()
                else:
                    print("Data not ready.")
                    update = False
                    time.sleep(1)
                    return None
            except OSError as e:
                print(f"Error during sensor reading: {e}")
                update = True
                return None
        else:
            time.sleep(1)
            update = False

led = Pin(2, Pin.OUT)
led.value(0)  # turn off red LED, doesn't work :L hard soldered in.

def main():
    # Force garbage collection at start
    gc.collect()
    get_system_status(True)
    
    reading_count = 0
    if switch_state_wifi == 0:
        # Connect to WiFi
        connect_wifi(SSID, PASSWORD)
        # Initial time sync
        get_time_chicago()
    
    # Initialize I2C communication with the specified pins and frequency
    i2c = SoftI2C(scl=Pin(22), sda=Pin(21), freq=400_000)
    device_address = 0x62

    # Check if the device is present on the I2C bus
    if not check_device_presence(i2c, device_address):
        print(f"Device with address {device_address} not found on I2C bus.")
        return

    print(f"Device with address {device_address} found on I2C bus.")
    oled_power.value(1)  # Turn on OLED display
    
    # Setup SoftI2C for OLED
    i2c_oled = SoftI2C(scl=Pin(4), sda=Pin(5))
    
    # Scan for OLED device
    print('Scanning for I2C devices...')
    devices = i2c_oled.scan()
    if len(devices) == 0:
        print("No I2C OLED devices found.")
        OLEDaddress = None
    else:
        print('I2C OLED devices found:', len(devices))
        OLEDaddress = devices[0]  # Assuming the first device found is the OLED
        for device in devices:
            print("Device address: ", hex(device))

    # Initialize OLED if found
    oled = None
    if OLEDaddress:
        print("Testing OLED...")
        oled = SSD1306_I2C(128, 64, i2c_oled)
        i = 0
        while i < 5:
            time.sleep(1)
            oled_power.value(1)
            oled.fill(0)
            oled.text(f"Init: Test {i}", 0, 0)
            oled.show()
            time.sleep(1)
            i = i+1
            oled_power.value(0)
        oled_power.value(1)
        oled.fill(0)
        if switch_state_calibrate == 0:
            oled.text(f"CALIBRATION ON", 0, 0)
        if switch_state_calibrate == 1:
            oled.text(f"CALIBRATION OFF", 0, 0)
        if switch_state_wifi == 0:
            oled.text(f"WIFI ON", 0, 10)
        if switch_state_wifi == 1:
            oled.text(f"WIFI OFF", 0, 10) 
        oled.show()
        time.sleep(5)  # Reduced from 10 to 5 seconds to save power
        oled_power.value(0)
    
    # Create an I2C adapter and sensor instance
    adapter = I2cAdapter(i2c)
    sensor = SCD4xSensirion(adapter)
    sensor.set_measurement(start=False, single_shot=False) #if looping need to put sensor back into IDLE mode. Sensor can't exec self test if currently reading...
    
    # Check if sensor is good to go. Note you may need to power cycle to pass this test. Make sure 5V in.
    selfTest = False
    retry_count = 0
    while selfTest == False and retry_count < 5:  # Added retry limit
        try:
            if sensor.exec_self_test():
                print("Sensor self test shows good to go")
                selfTest = True
            else:
                print("Sensor self test failed!")
                selfTest = False
                retry_count += 1
                time.sleep(1)
        except Exception as e:
            print("Exception occurred:", e)
            retry_count += 1
            time.sleep(1)

    # Ensure the sensor is in IDLE mode
    sensor.set_measurement(start=False, single_shot=False)

    # Retrieve and display the sensor ID
    sensor_id = sensor.get_id()
    print(f"Sensor ID: {sensor_id[0]:x}:{sensor_id[1]:x}:{sensor_id[2]:x}")

    # Retrieve and display the temperature offset
    temp_offset = sensor.get_temperature_offset()
    print(f"Temperature offset: {temp_offset:.6f} F")

    # Set and retrieve the altitude (meters above sea level)
    altitude = 198
    retrieved_altitude = sensor.get_altitude()
    print(f"Desired Altitude: {altitude} m, Retrieved Altitude: {retrieved_altitude} m")
    if altitude != retrieved_altitude:
        sensor.set_altitude(altitude)
        print(f"Altitude set to: {altitude}")
    
    # Check if automatic self-calibration is enabled
    if sensor.is_auto_calibration():
        print("Automatic self-calibration is ON.")
    else:
        print("Automatic self-calibration is OFF.")
    
    # Start periodic measurement
    sensor._low_power_mode = True
    sensor.set_measurement(start=True, single_shot=False)
    conversion_cycle_time = sensor.get_conversion_cycle_time()
    last_update_sensor = time.time()
    print(f"Low Power Mode: {sensor._low_power_mode}")
    print(f"Low power periodic measurement started with conversion cycle time: {conversion_cycle_time} ms")
    print("Periodic measurement started.")

    # Handle calibration mode
    if switch_state_calibrate == 0:
        # We want to calibrate it with this chunk if anything...
        # Start periodic measurement
        print("Performing sensor calibration in 30 seconds...")
        print("The sensor will be calibrated over 60 min in fresh air.")
        time.sleep(30)
        sensorCycle = 0
        calibrateMe = True

        # Perform factory reset and reinitialize
        sensor.perform_factory_reset()
        print("Factory reset performed.")
        time.sleep(2)  # Wait for the factory reset to complete
        
        # Perform soft reset and reinitialize
        sensor.soft_reset()
        time.sleep(1)  # Wait for the reset to complete
        sensor.set_measurement(start=False, single_shot=False) 
        time.sleep(1)
        sensor.set_measurement(start=True, single_shot=False)
        time.sleep(1)
        print("Performing soft reset and reinitialization...")

        while sensorCycle < 60:
            # Force garbage collection in calibration loop
            gc.collect()
            
            conversion_cycle_time = sensor.get_conversion_cycle_time()
            reading = get_sensor_reading(sensor, conversion_cycle_time, last_update_sensor)
            if reading:
                co2, temp, humidity = reading
                last_update_sensor = time.time()
                temp_f = temp * 9 / 5 + 32  # Convert to Fahrenheit
                time.sleep(30)
                sensorCycle += 1
                print(f"Continuous Reading: CO2: {co2} ppm, Temperature: {temp_f:.2f} F, Humidity: {humidity:.2f} %")
                print(f"Sensor Cycled {sensorCycle} times of 60")
                
                # Display on OLED during calibration
                if oled:
                    oled_power.value(1)
                    oled.fill(0)
                    oled.text(f"CALIBRATING", 0, 0)
                    oled.text(f"Cycle: {sensorCycle}/60", 0, 10)
                    oled.text(f"CO2: {co2} ppm", 0, 20)
                    oled.text(f"Temp: {temp_f:.1f}F", 0, 30)
                    oled.text(f"Hum: {humidity:.1f}%", 0, 40)
                    oled.show()
                    time.sleep(5)  # Show for 5 seconds
                    oled_power.value(0)
                
                if sensorCycle == 59:
                    if calibrateMe == True:
                        print("Initiating forced recalibration...")
                        target_co2_concentration = int(426 * 1.15)  # Mauna Loa CO2 = 426.57ppm April 2024 + 15-20% differential for being near/in a major city(Chicago), to int bc need int for low level
                        correction_value = sensor.force_recalibration(target_co2_concentration)
                        print(f"Forced recalibration completed with correction value: {correction_value} ppm")
                        sensor.set_auto_calibration(False)
                        print(f"Sensor Auto Calibration set to: {sensor.is_auto_calibration()}. Rerun the program after power cycling with the calibration commented out to double check persistence!")
                        sensor.save_config()
                        print("Settings saved.")
            else:
                # Reset the sensor and reinitialize if an error occurs
                print("Attempting to reinitialize the sensor...")
                sensor.set_measurement(start=False, single_shot=False)
                sensor.set_measurement(start=True, single_shot=False)
    else:
        print("Skipping calibration.")
    
    # Main monitoring loop
    while True:
        try:
            # Check memory status
            gc.collect()
            get_system_status(False)
            
            # Eventually...somewhere...memory will leak. Power Cycle every 6 hours for good measure.
            if reading_count >= 360:
                print("Scheduled restart after 360 readings")
                reset()
                
            date_str = None
            reading = get_sensor_reading(sensor, conversion_cycle_time, last_update_sensor)
            if reading:
                co2, temp, humidity = reading
                last_update_sensor = time.time()
                temp_f = temp * 9 / 5 + 32
                print(f"Continuous Reading: CO2: {co2} ppm, Temperature: {temp_f:.2f} F, Humidity: {humidity:.2f} %")
                get_system_status(True)
                reading_count = reading_count + 1
                
                # Check CO2 level and sound the buzzer if above threshold
                if co2 > 1500:
                    print("CO2 level exceeded 1500 ppm. Activating buzzer.")
                    sound_buzzer()
                
                # Handle data transmission if WiFi is enabled
                if switch_state_wifi == 0:
                    # Check WiFi connection and reconnect if needed
                    if not network.WLAN(network.STA_IF).isconnected():
                        print("WiFi connection lost, reconnecting...")
                        connect_wifi(SSID, PASSWORD)
                    
                    # Get current date and time (with hourly NTP sync)
                    current_time = get_time_chicago()
                    date_str = "{:04}-{:02}-{:02}".format(current_time[0], current_time[1], current_time[2])
                    time_str = "{:02}:{:02}:{:02}".format(current_time[3], current_time[4], current_time[5])

                    # Prepare data for sending
                    data = {
                        'date': date_str, 
                        'time': time_str, 
                        'co2': co2,
                        'temp_f': temp_f,
                        'humidity': humidity
                    }
                    
                    # Collect garbage before network operations
                    gc.collect()
                    
                    # Send to Google Sheets
                    send_data_to_google_sheets(data)
                    
                    # Add to ThingSpeak buffer and send
                    thingspeak_buffer.append(data)
                    send_data_to_thingspeak()
                elif switch_state_wifi == 1:
                    thingspeak_buffer.clear()
                
                # OLED Logic - display for a shorter time to save power
                if oled:
                    oled_power.value(1)
                    oled.fill(0)
                    oled.text(f'Reading: {reading_count}', 0, 0)
                    if date_str is not None:
                        oled.text(f'Date: {date_str}', 0, 10)
                        oled.text(f'Time: {time_str}', 0, 20)
                    oled.text(f'CO2:{int(co2)}ppm', 0, 30)
                    oled.text(f'Temp:{temp_f:.2f}F',0,40)
                    oled.text(f'Hum:{humidity:.2f}%', 0, 50)
                    oled.show()
                    time.sleep(10)  # Reduced from 20 to 10 seconds to save power
                    oled_power.value(0) 
                
                # Force garbage collection after all operations
                gc.collect()
                
        except Exception as e:
            print("Error occurred:", e)
            time.sleep(5)  # Wait for 5 seconds before retrying
            gc.collect()  # Force collection after error

if __name__ == '__main__':
    # Initialize garbage collection
    gc.collect()
    
    try:
        main()
    except Exception as e:
        print("Error occurred:", e)
        time.sleep(5)  # Wait for 5 seconds before retrying

I’m not going to the go too much into the code as the SCD41 chip supports a variety of different modes and unless you do a deep dive into the datasheet and the code for some cursed reason you should be fine with the code above. I was able to log readings fairly close to the reported values for my town and breathing on the device and near the device increased the sensor readings reliably. I did note that the sensor took some time to get back down to baseline and took a bit to get up to the max with me breathing on it. It was roughly 2.5 minutes to get back down to baseline and that’s roughly in line with what we would ideally sample at due to the humidity sampling interval mentioned in the technical specs above. See image below:

Limitations of the SCD41

As mentioned above we will have a slight delay in logging values so the results won’t be instantaneous. Additionally as mentioned by: https://github.com/octaprog7/SCD4x one will notice an increase in temperature reading from the sensor if the interval is less than 15 seconds as the sensor will self heat. Additionally, both the SCD40/41 sensors have an auto-calibrate mode which will take the lowest CO2 value from the past 7 days and assumes it is 400ppm. This can cause sensor drift over time unless one regularly(once a week) exposes the sensor to fresh air. It is possible to turn off this auto-calibration mode and one will note that in the code above[set_auto_calibration]. An anecdote from User Anx2K on the r/ESP32 Reddit mentions that there is roughly a 40ppm(10%) drift year to year if one turns off auto calibration. Calibration takes 5 minutes, but is an inherently manual process. It would be nice to be somewhere near a weather monitoring station and set up this CO2 monitor alongside a MH-Z19B/C and MH-Z1+ which use NDIR and compare the initial and end of 24 hour values and assess drift.

Applications of the SCD41

I am particularly interested in environmental monitoring to assess indoor air quality at a competitive price, allowing users to swap sensors in and out as needed. This flexibility is crucial for adapting to different monitoring requirements without significant additional costs.

Additionally, monitoring transient dump flows from businesses during off-hours (midnight to 3 AM) is vital. This involves tracking the release of CO2 and other gaseous components using the MQ-* sensor series. While this approach may not provide quantitative measurements due to various confounding factors, it serves as an initial assessment tool. This preliminary analysis can justify the purchase or lease of more advanced testing equipment, costing under $100 in upfront expenses and requiring only a few hours of labor.

Another intriguing experiment involves mapping a town with sensors placed every few blocks to monitor localized CO2 concentrations. This data can be correlated with consumer, commercial, and industrial activities, providing valuable insights into the town’s environmental impact.

Other applications include monitoring the respiratory activity of plants by measuring CO2 exchange in greenhouse or agricultural research settings. This can offer insights into plant health, photosynthetic efficiency, and growth patterns. Pairing this with R/G/B, UV, and light sensors can help determine several growth parameters. Note that NPK (Nitrogen/Phosphorous/Potassium) sensors are often unreliable based on my research. Therefore, it’s better to use actual chemical testing methods or automate the chemical testing apparatus rather than relying on electronic sensors for these measurements.

In biotechnology and biochemical engineering contexts, monitoring CO2 levels in cellular cultures and bioreactors is essential. Maintaining precise CO2 concentrations ensures the optimal growth of microorganisms, which can significantly reduce production costs. This is particularly important for the pharmaceutical industry, biofuel production, and other bio-based manufacturing processes.

Personal Testing

In the recent heat wave I noted that it took roughly 30 minutes to air out the house and cut CO2 levels from ~1200ppm to ~650ppm.

Its interesting to note that the temperature jumped only a few degrees over the 30 minutes per below:

The actual “enclosure” of the sensor is just an old Raspberry Pi 5 Case box, a FLIRC model. The solid wire coming from the main board is hooked up to a smaller board used for data display, switching wifi/calibration on/off, and alerting users to high CO2(1500ppm). Note that I didn’t feel like potentially burning the board or buying a soldering attachment to safely remove the LED so I covered it with some cardboard to remove some of the interference caused by the light on the sensor. See below:

Raw Boards, no case

Wifi/Calibration Screen, with case

Normal Output, with case

Conclusion

The SCD41 sensor offers a versatile and accurate solution for CO2 monitoring in various applications. For our particular application, provided we either calibrate it once at the beginning then once more every year we can enjoy accurate CO2 monitoring at a low price.

Support Page

Support my work with a Coffee/Monster

Share