简易甲醛监测仪-ESP32读取BME680

 MicroPython在ESP32上实现读取BME680,并自动连接WiFi,建立本地浏览器供访问,实时数据和历史趋势图。在0.96的oled显示:IP,状态,实时数据。

项目概述

一个基于 ESP32 的环境监测站,具备以下功能:

传感器读取:通过 I2C 总线读取 BME680 传感器的温度、湿度、气压和气体电阻(VOC 指数)。WiFi 连接:设备启动后自动连接到指定的 WiFi 网络。Web 服务器:在 ESP32 上建立一个简单的 Web 服务器。
实时数据:通过浏览器访问,可以看到当前的温、湿、压、气数据。历史趋势图:使用 Chart.js 库在浏览器端绘制过去半小时的数据趋势图。 OLED 显示:在 0.96 寸 I2C OLED 屏幕上循环显示:
ESP32 的 IP 地址和 WiFi 连接状态。实时环境数据。过去半小时内的温度(或其他你选择的参数)变化曲线。设备运行时长。当前时间(需要网络时间同步 NTP)。


硬件准备

ESP32 开发板 (如 NodeMCU-32S),                                                        16RMB
简易甲醛监测仪-ESP32读取BME680BME680 传感器模块 (I2C 接口)                                                                35RMB
简易甲醛监测仪-ESP32读取BME6800.96 寸 I2C OLED 显示屏 (128×64 分辨率)                                                6RMB
简易甲醛监测仪-ESP32读取BME680合适的小盒子和电线,烙铁。USB 数据线

硬件接线

所有传感器和显示屏都将通过 I2C 总线与 ESP32 连接。I2C 总线需要两根线:SDA (数据线) 和 SCL (时钟线)。

ESP32 引脚 BME680 引脚 OLED 引脚 说明
3V3 VCC VCC 供电 (3.3V,切勿接 5V)
GND GND GND 接地
GPIO21 (SDA) SDA SDA I2C 数据线
GPIO22 (SCL) SCL SCL I2C 时钟线

注意:

请确认你的 BME680 和 OLED 模块的 VCC 电压。大多数模块是 3.3V,有些 OLED 模块可能支持 5V,但 ESP32 的 GPIO 是 3.3V 电平,为安全起见,统一使用 3.3V 供电。I2C 设备地址:BME680 默认地址通常是 
0x76
 或 
0x77
。OLED 屏幕默认地址通常是 
0x3C
 或 
0x3D
。如果你的代码无法找到设备,可能需要扫描 I2C 地址或修改代码中的地址。


软件准备

MicroPython 固件:确保ESP32 刷入了最新版的 MicroPython 固件。Thonny IDE:用于编写和上传代码。必要的 MicroPython 库

ssd1306.py
: 用于驱动 0.96 寸 OLED 屏幕。
bme680.py
: 用于驱动 BME680 传感器。

如何获取库文件并上传到 ESP32:

下载库文件:


ssd1306.py
: https://github.com/micropython/micropython/blob/master/drivers/display/ssd1306.py
bme680.py
: https://github.com/adafruit/Adafruit_CircuitPython_BME680 

上传库文件:

在 Thonny 中,打开下载或安装的 
ssd1306.py
 和 
bme680.py
 文件。对于每个文件,点击 
文件
 -> 
另存为...
,在弹出的对话框中选择 
ESP32
 (或 
MicroPython 设备
),文件名保持不变,点击保存。这样库文件就被上传到了 ESP32 的文件系统中。 ssd1306.py



# MicroPython SSD1306 OLED driver, I2C and SPI interfaces
 
from micropython import const
import framebuf
 
 
# register definitions
SET_CONTRAST = const(0x81)
SET_ENTIRE_ON = const(0xA4)
SET_NORM_INV = const(0xA6)
SET_DISP = const(0xAE)
SET_MEM_ADDR = const(0x20)
SET_COL_ADDR = const(0x21)
SET_PAGE_ADDR = const(0x22)
SET_DISP_START_LINE = const(0x40)
SET_SEG_REMAP = const(0xA0)
SET_MUX_RATIO = const(0xA8)
SET_COM_OUT_DIR = const(0xC0)
SET_DISP_OFFSET = const(0xD3)
SET_COM_PIN_CFG = const(0xDA)
SET_DISP_CLK_DIV = const(0xD5)
SET_PRECHARGE = const(0xD9)
SET_VCOM_DESEL = const(0xDB)
SET_CHARGE_PUMP = const(0x8D)
 
# Subclassing FrameBuffer provides support for graphics primitives
# http://docs.micropython.org/en/latest/pyboard/library/framebuf.html
class SSD1306(framebuf.FrameBuffer):
    def __init__(self, width, height, external_vcc):
        self.width = width
        self.height = height
        self.external_vcc = external_vcc
        self.pages = self.height // 8
        self.buffer = bytearray(self.pages * self.width)
        super().__init__(self.buffer, self.width, self.height, framebuf.MONO_VLSB)
        self.init_display()
 
    def init_display(self):
        for cmd in (
            SET_DISP | 0x00,  # off
            # address setting
            SET_MEM_ADDR,
            0x00,  # horizontal
            # resolution and layout
            SET_DISP_START_LINE | 0x00,
            SET_SEG_REMAP | 0x01,  # column addr 127 mapped to SEG0
            SET_MUX_RATIO,
            self.height - 1,
            SET_COM_OUT_DIR | 0x08,  # scan from COM[N] to COM0
            SET_DISP_OFFSET,
            0x00,
            SET_COM_PIN_CFG,
            0x02 if self.width > 2 * self.height else 0x12,
            # timing and driving scheme
            SET_DISP_CLK_DIV,
            0x80,
            SET_PRECHARGE,
            0x22 if self.external_vcc else 0xF1,
            SET_VCOM_DESEL,
            0x30,  # 0.83*Vcc
            # display
            SET_CONTRAST,
            0xFF,  # maximum
            SET_ENTIRE_ON,  # output follows RAM contents
            SET_NORM_INV,  # not inverted
            # charge pump
            SET_CHARGE_PUMP,
            0x10 if self.external_vcc else 0x14,
            SET_DISP | 0x01,
        ):  # on
            self.write_cmd(cmd)
        self.fill(0)
        self.show()
 
    def poweroff(self):
        self.write_cmd(SET_DISP | 0x00)
 
    def poweron(self):
        self.write_cmd(SET_DISP | 0x01)
 
    def contrast(self, contrast):
        self.write_cmd(SET_CONTRAST)
        self.write_cmd(contrast)
 
    def invert(self, invert):
        self.write_cmd(SET_NORM_INV | (invert & 1))
 
    def show(self):
        x0 = 0
        x1 = self.width - 1
        if self.width == 64:
            # displays with width of 64 pixels are shifted by 32
            x0 += 32
            x1 += 32
        self.write_cmd(SET_COL_ADDR)
        self.write_cmd(x0)
        self.write_cmd(x1)
        self.write_cmd(SET_PAGE_ADDR)
        self.write_cmd(0)
        self.write_cmd(self.pages - 1)
        self.write_data(self.buffer)
 
 
class SSD1306_I2C(SSD1306):
    def __init__(self, width, height, i2c, addr=0x3C, external_vcc=False):
        self.i2c = i2c
        self.addr = addr
        self.temp = bytearray(2)
        self.write_list = [b"x40", None]  # Co=0, D/C#=1
        super().__init__(width, height, external_vcc)
 
    def write_cmd(self, cmd):
        self.temp[0] = 0x80  # Co=1, D/C#=0
        self.temp[1] = cmd
        self.i2c.writeto(self.addr, self.temp)
 
    def write_data(self, buf):
        self.write_list[1] = buf
        self.i2c.writevto(self.addr, self.write_list)
 
 
class SSD1306_SPI(SSD1306):
    def __init__(self, width, height, spi, dc, res, cs, external_vcc=False):
        self.rate = 10 * 1024 * 1024
        dc.init(dc.OUT, value=0)
        res.init(res.OUT, value=0)
        cs.init(cs.OUT, value=1)
        self.spi = spi
        self.dc = dc
        self.res = res
        self.cs = cs
        import time
 
        self.res(1)
        time.sleep_ms(1)
        self.res(0)
        time.sleep_ms(10)
        self.res(1)
        super().__init__(width, height, external_vcc)
 
    def write_cmd(self, cmd):
        self.spi.init(baudrate=self.rate, polarity=0, phase=0)
        self.cs(1)
        self.dc(0)
        self.cs(0)
        self.spi.write(bytearray([cmd]))
        self.cs(1)
 
    def write_data(self, buf):
        self.spi.init(baudrate=self.rate, polarity=0, phase=0)
        self.cs(1)
        self.dc(1)
        self.cs(0)
        self.spi.write(buf)
        self.cs(1)
 

ssd屏幕检测ssd1306check.py



import time  # <--- 添加这一行
import machine
import ssd1306
import network  # 如果用到了 network 也需要导入
 
# I2C 配置(与你的接线一致)
I2C_SDA_PIN = 21
I2C_SCL_PIN = 22
OLED_I2C_ADDR = 0x3C  # 你的 OLED 地址
 
# 初始化 I2C
i2c = machine.I2C(0, sda=machine.Pin(I2C_SDA_PIN), scl=machine.Pin(I2C_SCL_PIN), freq=400000)
 
# 扫描 I2C 设备,确认 OLED 已连接
print("I2C 设备地址:", [hex(i) for i in i2c.scan()])
if OLED_I2C_ADDR not in i2c.scan():
    print("未找到 OLED 设备,请检查接线!")
else:
    # 读取 SSD1306 分辨率配置(核心指令)
    def read_oled_resolution(i2c, addr):
        # 发送读取命令(SSD1306 控制寄存器地址 0x00)
        i2c.writeto_mem(addr, 0x00, b'x0C')  # 读取显示模式
        time.sleep(0.1)
        # 读取分辨率相关配置(不同芯片可能略有差异,这里兼容主流型号)
        try:
            # 尝试 128x64 配置验证
            i2c.writeto_mem(addr, 0x00, b'x20x00')  # 设置内存地址模式
            time.sleep(0.1)
            # 若能正常写入,大概率是 128x64;若报错,尝试 128x32
            return (128, 64)
        except:
            try:
                i2c.writeto_mem(addr, 0x00, b'x20x01')
                time.sleep(0.1)
                return (128, 32)
            except:
                return (None, None)
    
    # 执行检测
    width, height = read_oled_resolution(i2c, OLED_I2C_ADDR)
    if width and height:
        print(f"检测到 OLED 分辨率:{width}x{height}")
    else:
        print("自动检测失败,尝试手动验证!")

ssd1306test.py



import machine
import ssd1306
import network
import time
 
# --- 配置 ---
# WiFi 信息
WIFI_SSID = "XJ-000120"
WIFI_PASSWORD = "wansbdx666"
 
# I2C 配置
# 对于大多数 ESP32 开发板,推荐使用 GPIO21 (SDA) 和 GPIO22 (SCL)
I2C_SDA_PIN = 21
I2C_SCL_PIN = 22
OLED_I2C_ADDR = 0x3C # OLED 的 I2C 地址
 
# OLED 屏幕尺寸 (0.96寸通常是 128x64)
OLED_WIDTH = 128
OLED_HEIGHT = 64
 
# --- 初始化 ---
# 1. 初始化 I2C 总线
print("初始化 I2C 总线...")
i2c = machine.I2C(0, sda=machine.Pin(I2C_SDA_PIN), scl=machine.Pin(I2C_SCL_PIN), freq=400000)
 
# 扫描 I2C 设备,确认 OLED 是否被识别
print("扫描 I2C 设备地址:", [hex(i) for i in i2c.scan()])
 
if OLED_I2C_ADDR not in i2c.scan():
    print(f"错误:未找到 I2C 设备地址 {hex(OLED_I2C_ADDR)}。请检查接线或地址是否正确。")
else:
    # 2. 初始化 OLED 显示屏
    print(f"初始化 OLED 显示屏 (地址: {hex(OLED_I2C_ADDR)})...")
    oled = ssd1306.SSD1306_I2C(OLED_WIDTH, OLED_HEIGHT, i2c, addr=OLED_I2C_ADDR)
 
    # 3. 连接 WiFi
    print("连接 WiFi...")
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    wlan.connect(WIFI_SSID, WIFI_PASSWORD)
 
    # 等待 WiFi 连接
    while not wlan.isconnected():
        oled.fill(0)
        oled.text("Connecting to", 0, 0)
        oled.text(WIFI_SSID, 0, 16)
        oled.show()
        time.sleep(0.5)
        print(".", end="")
 
    print("WiFi 连接成功!")
    ip_address = wlan.ifconfig()[0]
    print(f"IP 地址: {ip_address}")
 
    # --- 在 OLED 上显示信息 ---
    # 清屏
    oled.fill(0)
 
    # 显示标题
    oled.text("ESP32 OLED Demo", 4, 0)
    # 画一条水平线
    oled.hline(0, 12, 128, 1)
 
    # 显示 WiFi 信息
    oled.text("WiFi:"+WIFI_SSID, 0, 20)
    #oled.text(WIFI_SSID, 0, 32)
 
    # 显示 IP 地址
    oled.text("IP:", 0, 30)
    oled.text(ip_address, 0, 45)
 
    # 将缓存的内容显示到屏幕上
    oled.show()
 
    print("信息已显示在 OLED 屏幕上。")

bme680.py文件



# The MIT License (MIT)
#
# Copyright (c) 2017 ladyada for Adafruit Industries
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
 
# We have a lot of attributes for this complex sensor.
# pylint: disable=too-many-instance-attributes
 
"""
`bme680` - BME680 - Temperature, Humidity, Pressure & Gas Sensor
================================================================

MicroPython driver from BME680 air quality sensor, based on Adafruit_bme680

* Author(s): Limor 'Ladyada' Fried of Adafruit
             Jeff Raber (SPI support)
             and many more contributors
"""
 
import time
import math
from micropython import const
from ubinascii import hexlify as hex
try:
    import struct
except ImportError:
    import ustruct as struct
 
#    I2C ADDRESS/BITS/SETTINGS
#    -----------------------------------------------------------------------
_BME680_CHIPID = const(0x61)
 
_BME680_REG_CHIPID = const(0xD0)
_BME680_BME680_COEFF_ADDR1 = const(0x89)
_BME680_BME680_COEFF_ADDR2 = const(0xE1)
_BME680_BME680_RES_HEAT_0 = const(0x5A)
_BME680_BME680_GAS_WAIT_0 = const(0x64)
 
_BME680_REG_SOFTRESET = const(0xE0)
_BME680_REG_CTRL_GAS = const(0x71)
_BME680_REG_CTRL_HUM = const(0x72)
_BME280_REG_STATUS = const(0xF3)
_BME680_REG_CTRL_MEAS = const(0x74)
_BME680_REG_CONFIG = const(0x75)
 
_BME680_REG_PAGE_SELECT = const(0x73)
_BME680_REG_MEAS_STATUS = const(0x1D)
_BME680_REG_PDATA = const(0x1F)
_BME680_REG_TDATA = const(0x22)
_BME680_REG_HDATA = const(0x25)
 
_BME680_SAMPLERATES = (0, 1, 2, 4, 8, 16)
_BME680_FILTERSIZES = (0, 1, 3, 7, 15, 31, 63, 127)
 
_BME680_RUNGAS = const(0x10)
 
_LOOKUP_TABLE_1 = (2147483647.0, 2147483647.0, 2147483647.0, 2147483647.0, 2147483647.0,
                   2126008810.0, 2147483647.0, 2130303777.0, 2147483647.0, 2147483647.0,
                   2143188679.0, 2136746228.0, 2147483647.0, 2126008810.0, 2147483647.0,
                   2147483647.0)
 
_LOOKUP_TABLE_2 = (4096000000.0, 2048000000.0, 1024000000.0, 512000000.0, 255744255.0, 127110228.0,
                   64000000.0, 32258064.0, 16016016.0, 8000000.0, 4000000.0, 2000000.0, 1000000.0,
                   500000.0, 250000.0, 125000.0)
 
 
def _read24(arr):
    """Parse an unsigned 24-bit value as a floating point and return it."""
    ret = 0.0
    #print([hex(i) for i in arr])
    for b in arr:
        ret *= 256.0
        ret += float(b & 0xFF)
    return ret
 
 
class Adafruit_BME680:
    """Driver from BME680 air quality sensor

       :param int refresh_rate: Maximum number of readings per second. Faster property reads
         will be from the previous reading."""
    def __init__(self, *, refresh_rate=10):
        """Check the BME680 was found, read the coefficients and enable the sensor for continuous
           reads."""
        self._write(_BME680_REG_SOFTRESET, [0xB6])
        time.sleep(0.005)
 
        # Check device ID.
        chip_id = self._read_byte(_BME680_REG_CHIPID)
        if chip_id != _BME680_CHIPID:
            raise RuntimeError('Failed to find BME680! Chip ID 0x%x' % chip_id)
 
        self._read_calibration()
 
        # set up heater
        self._write(_BME680_BME680_RES_HEAT_0, [0x73])
        self._write(_BME680_BME680_GAS_WAIT_0, [0x65])
 
        self.sea_level_pressure = 1013.25
        """Pressure in hectoPascals at sea level. Used to calibrate ``altitude``."""
 
        # Default oversampling and filter register values.
        self._pressure_oversample = 0b011
        self._temp_oversample = 0b100
        self._humidity_oversample = 0b010
        self._filter = 0b010
 
        self._adc_pres = None
        self._adc_temp = None
        self._adc_hum = None
        self._adc_gas = None
        self._gas_range = None
        self._t_fine = None
 
        self._last_reading = time.ticks_ms()
        self._min_refresh_time = 1000 // refresh_rate
 
    @property
    def pressure_oversample(self):
        """The oversampling for pressure sensor"""
        return _BME680_SAMPLERATES[self._pressure_oversample]
 
    @pressure_oversample.setter
    def pressure_oversample(self, sample_rate):
        if sample_rate in _BME680_SAMPLERATES:
            self._pressure_oversample = _BME680_SAMPLERATES.index(sample_rate)
        else:
            raise RuntimeError("Invalid oversample")
 
    @property
    def humidity_oversample(self):
        """The oversampling for humidity sensor"""
        return _BME680_SAMPLERATES[self._humidity_oversample]
 
    @humidity_oversample.setter
    def humidity_oversample(self, sample_rate):
        if sample_rate in _BME680_SAMPLERATES:
            self._humidity_oversample = _BME680_SAMPLERATES.index(sample_rate)
        else:
            raise RuntimeError("Invalid oversample")
 
    @property
    def temperature_oversample(self):
        """The oversampling for temperature sensor"""
        return _BME680_SAMPLERATES[self._temp_oversample]
 
    @temperature_oversample.setter
    def temperature_oversample(self, sample_rate):
        if sample_rate in _BME680_SAMPLERATES:
            self._temp_oversample = _BME680_SAMPLERATES.index(sample_rate)
        else:
            raise RuntimeError("Invalid oversample")
 
    @property
    def filter_size(self):
        """The filter size for the built in IIR filter"""
        return _BME680_FILTERSIZES[self._filter]
 
    @filter_size.setter
    def filter_size(self, size):
        if size in _BME680_FILTERSIZES:
            self._filter = _BME680_FILTERSIZES[size]
        else:
            raise RuntimeError("Invalid size")
 
    @property
    def temperature(self):
        """The compensated temperature in degrees celsius."""
        self._perform_reading()
        calc_temp = (((self._t_fine * 5) + 128) / 256)
        return calc_temp / 100
 
    @property
    def pressure(self):
        """The barometric pressure in hectoPascals"""
        self._perform_reading()
        var1 = (self._t_fine / 2) - 64000
        var2 = ((var1 / 4) * (var1 / 4)) / 2048
        var2 = (var2 * self._pressure_calibration[5]) / 4
        var2 = var2 + (var1 * self._pressure_calibration[4] * 2)
        var2 = (var2 / 4) + (self._pressure_calibration[3] * 65536)
        var1 = (((((var1 / 4) * (var1 / 4)) / 8192) *
                (self._pressure_calibration[2] * 32) / 8) +
                ((self._pressure_calibration[1] * var1) / 2))
        var1 = var1 / 262144
        var1 = ((32768 + var1) * self._pressure_calibration[0]) / 32768
        calc_pres = 1048576 - self._adc_pres
        calc_pres = (calc_pres - (var2 / 4096)) * 3125
        calc_pres = (calc_pres / var1) * 2
        var1 = (self._pressure_calibration[8] * (((calc_pres / 8) * (calc_pres / 8)) / 8192)) / 4096
        var2 = ((calc_pres / 4) * self._pressure_calibration[7]) / 8192
        var3 = (((calc_pres / 256) ** 3) * self._pressure_calibration[9]) / 131072
        calc_pres += ((var1 + var2 + var3 + (self._pressure_calibration[6] * 128)) / 16)
        return calc_pres/100
 
    @property
    def humidity(self):
        """The relative humidity in RH %"""
        self._perform_reading()
        temp_scaled = ((self._t_fine * 5) + 128) / 256
        var1 = ((self._adc_hum - (self._humidity_calibration[0] * 16)) -
                ((temp_scaled * self._humidity_calibration[2]) / 200))
        var2 = (self._humidity_calibration[1] *
                (((temp_scaled * self._humidity_calibration[3]) / 100) +
                 (((temp_scaled * ((temp_scaled * self._humidity_calibration[4]) / 100)) /
                   64) / 100) + 16384)) / 1024
        var3 = var1 * var2
        var4 = self._humidity_calibration[5] * 128
        var4 = (var4 + ((temp_scaled * self._humidity_calibration[6]) / 100)) / 16
        var5 = ((var3 / 16384) * (var3 / 16384)) / 1024
        var6 = (var4 * var5) / 2
        calc_hum = (((var3 + var6) / 1024) * 1000) / 4096
        calc_hum /= 1000  # get back to RH
 
        if calc_hum > 100:
            calc_hum = 100
        if calc_hum < 0:
            calc_hum = 0
        return calc_hum
 
    @property
    def altitude(self):
        """The altitude based on current ``pressure`` vs the sea level pressure
           (``sea_level_pressure``) - which you must enter ahead of time)"""
        pressure = self.pressure # in Si units for hPascal
        return 44330.77 * (1.0 - math.pow(pressure / self.sea_level_pressure, 0.1902632))
 
    @property
    def gas(self):
        """The gas resistance in ohms"""
        self._perform_reading()
        var1 = ((1340 + (5 * self._sw_err)) * (_LOOKUP_TABLE_1[self._gas_range])) / 65536
        var2 = ((self._adc_gas * 32768) - 16777216) + var1
        var3 = (_LOOKUP_TABLE_2[self._gas_range] * var1) / 512
        calc_gas_res = (var3 + (var2 / 2)) / var2
        return int(calc_gas_res)
 
    def _perform_reading(self):
        """Perform a single-shot reading from the sensor and fill internal data structure for
           calculations"""
        expired = time.ticks_diff(self._last_reading, time.ticks_ms()) * time.ticks_diff(0, 1)
        if 0 <= expired < self._min_refresh_time:
            time.sleep_ms(self._min_refresh_time - expired)
 
        # set filter
        self._write(_BME680_REG_CONFIG, [self._filter << 2])
        # turn on temp oversample & pressure oversample
        self._write(_BME680_REG_CTRL_MEAS,
                    [(self._temp_oversample << 5)|(self._pressure_oversample << 2)])
        # turn on humidity oversample
        self._write(_BME680_REG_CTRL_HUM, [self._humidity_oversample])
        # gas measurements enabled
        self._write(_BME680_REG_CTRL_GAS, [_BME680_RUNGAS])
 
        ctrl = self._read_byte(_BME680_REG_CTRL_MEAS)
        ctrl = (ctrl & 0xFC) | 0x01  # enable single shot!
        self._write(_BME680_REG_CTRL_MEAS, [ctrl])
        new_data = False
        while not new_data:
            data = self._read(_BME680_REG_MEAS_STATUS, 15)
            new_data = data[0] & 0x80 != 0
            time.sleep(0.005)
        self._last_reading = time.ticks_ms()
 
        self._adc_pres = _read24(data[2:5]) / 16
        self._adc_temp = _read24(data[5:8]) / 16
        self._adc_hum = struct.unpack('>H', bytes(data[8:10]))[0]
        self._adc_gas = int(struct.unpack('>H', bytes(data[13:15]))[0] / 64)
        self._gas_range = data[14] & 0x0F
 
        var1 = (self._adc_temp / 8) - (self._temp_calibration[0] * 2)
        var2 = (var1 * self._temp_calibration[1]) / 2048
        var3 = ((var1 / 2) * (var1 / 2)) / 4096
        var3 = (var3 * self._temp_calibration[2] * 16) / 16384
 
        self._t_fine = int(var2 + var3)
 
    def _read_calibration(self):
        """Read & save the calibration coefficients"""
        coeff = self._read(_BME680_BME680_COEFF_ADDR1, 25)
        coeff += self._read(_BME680_BME680_COEFF_ADDR2, 16)
 
        coeff = list(struct.unpack('<hbBHhbBhhbbHhhBBBHbbbBbHhbb', bytes(coeff[1:39])))
        # print("

",coeff)
        coeff = [float(i) for i in coeff]
        self._temp_calibration = [coeff[x] for x in [23, 0, 1]]
        self._pressure_calibration = [coeff[x] for x in [3, 4, 5, 7, 8, 10, 9, 12, 13, 14]]
        self._humidity_calibration = [coeff[x] for x in [17, 16, 18, 19, 20, 21, 22]]
        self._gas_calibration = [coeff[x] for x in [25, 24, 26]]
 
        # flip around H1 & H2
        self._humidity_calibration[1] *= 16
        self._humidity_calibration[1] += self._humidity_calibration[0] % 16
        self._humidity_calibration[0] /= 16
 
        self._heat_range = (self._read_byte(0x02) & 0x30) / 16
        self._heat_val = self._read_byte(0x00)
        self._sw_err = (self._read_byte(0x04) & 0xF0) / 16
 
    def _read_byte(self, register):
        """Read a byte register value and return it"""
        return self._read(register, 1)[0]
 
    def _read(self, register, length):
        raise NotImplementedError()
 
    def _write(self, register, values):
        raise NotImplementedError()
 
class BME680_I2C(Adafruit_BME680):
    """Driver for I2C connected BME680.

        :param i2c: I2C device object
        :param int address: I2C device address
        :param bool debug: Print debug statements when True.
        :param int refresh_rate: Maximum number of readings per second. Faster property reads
          will be from the previous reading."""
    def __init__(self, i2c, address=0x77, debug=False, *, refresh_rate=10):
        """Initialize the I2C device at the 'address' given"""
        self._i2c = i2c
        self._address = address
        self._debug = debug
        super().__init__(refresh_rate=refresh_rate)
 
    def _read(self, register, length):
        """Returns an array of 'length' bytes from the 'register'"""
        result = bytearray(length)
        self._i2c.readfrom_mem_into(self._address, register & 0xff, result)
        if self._debug:
            print("	${:x} read ".format(register), " ".join(["{:02x}".format(i) for i in result]))
        return result
 
    def _write(self, register, values):
        """Writes an array of 'length' bytes to the 'register'"""
        if self._debug:
            print("	${:x} write".format(register), " ".join(["{:02x}".format(i) for i in values]))
        for value in values:
            self._i2c.writeto_mem(self._address, register, bytearray([value & 0xFF]))
            register += 1
 
 
class BME680_SPI(Adafruit_BME680):
    """Driver for SPI connected BME680.

        :param spi: SPI device object, configured
        :param cs: Chip Select Pin object, configured to OUT mode
        :param bool debug: Print debug statements when True.
        :param int refresh_rate: Maximum number of readings per second. Faster property reads
          will be from the previous reading.
      """
 
    def __init__(self, spi, cs, debug=False, *, refresh_rate=10):
        self._spi = spi
        self._cs = cs
        self._debug = debug
        self._cs(1)
        super().__init__(refresh_rate=refresh_rate)
 
    def _read(self, register, length):
        if register != _BME680_REG_PAGE_SELECT:
            # _BME680_REG_PAGE_SELECT exists in both SPI memory pages
            # For all other registers, we must set the correct memory page
            self._set_spi_mem_page(register)
        register = (register | 0x80) & 0xFF  # Read single, bit 7 high.
 
        try:
            self._cs(0)
            self._spi.write(bytearray([register]))  # pylint: disable=no-member
            result = bytearray(length)
            self._spi.readinto(result)  # pylint: disable=no-member
            if self._debug:
                print("	${:x} read ".format(register), " ".join(["{:02x}".format(i) for i in result]))
        except Exception as e:
            print (e)
            result = None
        finally:
            self._cs(1)
        return result
 
    def _write(self, register, values):
        if register != _BME680_REG_PAGE_SELECT:
            # _BME680_REG_PAGE_SELECT exists in both SPI memory pages
            # For all other registers, we must set the correct memory page
            self._set_spi_mem_page(register)
        register &= 0x7F  # Write, bit 7 low.
        try:
            self._cs(0)
            buffer = bytearray(2 * len(values))
            for i, value in enumerate(values):
                buffer[2 * i] = register + i
                buffer[2 * i + 1] = value & 0xFF
            self._spi.write(buffer)  # pylint: disable=no-member
            if self._debug:
                print("	${:x} write".format(register), " ".join(["{:02x}".format(i) for i in values]))
        except Exception as e:
            print (e)
        finally:
            self._cs(1)
 
    def _set_spi_mem_page(self, register):
        spi_mem_page = 0x00
        if register < 0x80:
            spi_mem_page = 0x10
        self._write(_BME680_REG_PAGE_SELECT, [spi_mem_page])
 

bme680test.py文件



import machine
import time
import bme680
 
# 初始化 I2C
i2c = machine.I2C(0, sda=machine.Pin(21), scl=machine.Pin(22), freq=400000)
 
# 扫描 I2C 设备地址,确认传感器是否被识别
print("I2C 设备地址:", [hex(i) for i in i2c.scan()])
 
# 初始化 BME680
try:
    # 如果你的传感器地址不是默认的 0x76,请在这里修改
    bme = bme680.BME680_I2C(i2c, address=0x77)
    
    # 可以根据需要配置 oversampling
    bme.overscan_temperature = 8
    bme.overscan_humidity = 2
    bme.overscan_pressure = 4
    bme.filter_size = 3
    
    print("BME680 初始化成功!")
    
except OSError as e:
    print("BME680 初始化失败,请检查接线和地址:", e)
    raise
 
# 读取数据
while True:
    print("
温度: {:.2f} °C".format(bme.temperature))
    print("湿度: {:.2f} %".format(bme.humidity))
    print("气压: {:.2f} hPa".format(bme.pressure))
    print("气体电阻: {:.0f} Ohms".format(bme.gas))
    
    time.sleep(2)

BME680 的气体电阻值(通常以 Ohms 为单位)无法直接精准区分甲醛含量,也没有统一的空气质量等级标准,但可通过气体电阻的相对变化和校准,间接判断空气质量优劣(核心逻辑:气体电阻越低,说明空气中挥发性有机化合物 / VOC 浓度越高,空气质量越差)。

一、核心原理

BME680 的气体传感器通过检测 VOC(甲醛属于 VOC 之一)等还原性气体,气体浓度越高,传感器表面化学反应越剧烈,导电能力越强,表现为气体电阻值越低

关键前提:气体电阻是 “相对参考值”,受温度、湿度、传感器老化影响大,必须先校准基准值,再通过相对变化判断。局限性:无法单独识别甲醛(会响应酒精、油烟、香水等所有 VOC),也不能直接换算甲醛具体浓度(需专业甲醛传感器如 SGP30/SGP40 配合)。

二、空气质量等级划分(基于气体电阻的相对标准)

需先在 “清洁空气” 中校准基准电阻(R0),再根据实际电阻(R)与 R0 的比值划分等级,以下是行业通用参考方案(需根据实际环境微调):

空气质量等级 气体电阻与基准值关系 直观表现 适用场景
优秀 R ≥ 0.8×R0 气体电阻接近清洁空气基准 室内通风良好、无异味
良好 0.5×R0 < R < 0.8×R0 气体电阻略有下降 少量 VOC 释放(如刚打开包装)
一般(注意) 0.3×R0 < R ≤ 0.5×R0 气体电阻明显下降 室内有轻微异味(如家具释放、吸烟后)
较差(警示) R ≤ 0.3×R0 气体电阻大幅降低 高浓度 VOC(如甲醛超标、油烟、酒精挥发)
校准基准值(R0)的步骤:

将 BME680 放在通风良好、无异味的清洁空气中(如户外新鲜空气或开窗通风 1 小时后的室内)。连续读取 10-20 次气体电阻值,取平均值作为基准值 R0,存入 ESP32 的 NVS 非易失性存储(避免每次重启重新校准)。

三、甲醛含量的间接判断(仅参考,非精准测量)

由于 BME680 不专门检测甲醛,只能通过气体电阻变化 “推测甲醛是否可能超标”,需结合甲醛的特性(常温下甲醛是无色有刺激性气味的 VOC,浓度超标时会伴随气体电阻大幅下降):

甲醛安全标准:我国室内甲醛限值为 ≤ 0.1 mg/m³(约 0.08 ppm),超标时人体可能感到眼鼻刺激。间接判断逻辑:
若气体电阻 R ≤ 0.3×R0,且环境中无明显其他 VOC 来源(如无油烟、酒精、香水),则可能存在甲醛等有害气体超标风险。若要精准测量甲醛含量,需额外搭配甲醛专用传感器(如 EC202、SGP41),与 BME680 数据互补。

补偿思路(简化版)我们将采用一个简化的补偿模型,主要基于温度进行线性补偿,湿度的影响则通过一个经验系数来调整。

温度补偿:气体电阻值(Rg)与绝对温度(T,单位为 Kelvin)成反比。

Rg_compensated ∝ Rg_measured * (T_reference / T_measured)
湿度补偿:湿度增加会导致 Rg 略有下降。我们引入一个经验系数 
H_factor
 来修正这个影响。

H_factor = 1.0 + (H_measured - H_reference) * 0.01
这个公式意味着,每偏离参考湿度(50% RH)1%,电阻值就修正 1%。你可以根据实际情况微调 
0.01
 这个系数。

最终补偿公式
Rg_compensated = Rg_measured * (298.15 / (273.15 + T_measured)) * (1.0 + (H_measured - 50.0) * 0.01)

完整代码

1. 1个chart.min.js文件(用于在网页显示曲线趋势)

简易甲醛监测仪-ESP32读取BME680

2. wifi_config.json,用于保存可连接WiFi清单,在开机后会按照顺序尝试连接


{"ssid": ["XJ-000120","CMCC-FfeQ"], "password": ["wansbdx666","fukf3tg4"]}

3. air_quality_config.json, 当前只使用了R0这个值,保存空气质量基准电阻。


{"calibration": 1.2, "last_reading": 95.5, "threshold": 100, "r0": 455217.6}

4.下面是完整的 
main.py
 代码。将此代码保存到 ESP32 中,它将在设备启动时自动运行。



import _thread
import machine
import network
import time
import socket
import ntptime
import ujson
from ssd1306 import SSD1306_I2C
import bme680
import uos
import uasyncio as asyncio
import gc
import os
from machine import Pin, I2C
import urequests
 
# --- 配置区 ---
WIFI_CONFIG_FILE = "wifi_config.json"
WIFI_SSID = "CMCC-FfeQ00"
WIFI_PASSWORD = "fukf3t4"
AP_SSID   = "ESP32_Air_Quality"
AP_PASS   = "12345678"
WEB_PORT  = 8080
WIFI_OK = True
I2C_SDA_PIN = 21
I2C_SCL_PIN = 22
OLED_WIDTH = 128
OLED_HEIGHT = 64
OLED_I2C_ADDR = 0x3C
BME680_ADDR = 0x77
RECORD_INTERVAL_SEC = 10
MAX_RECORDS = 12
CONFIG_FILE = "air_quality_config.json"
 
# --- 全局变量 ---
sensor_data_history = []
start_time = time.time()
current_time_str = "00:00:00"
ip_addr = "0.0.0.0"
is_calibrating = False
latest_data = (0, 0, 0, 0)  # (temp, humidity, pressure, gas)
latest_compensated_gas = 0
bme_OK = False
i2c = None
oled = None
bme = None
oled_paused = False
 
# 校准相关
R0 = None
data_to_save = {
    "r0": 10.0,
    "threshold": 100,
    "calibration": 1.2,
    "last_reading": 95.5
}
 
 
# --- 工具函数 ---
def save_to_file(filename, data):
    """保存数据到JSON文件"""
    try:
        with open(filename, 'w') as f:
            f.write(ujson.dumps(data))
        print(f"数据已保存到 {filename}")
    except Exception as e:
        print(f"保存数据失败: {e}")
 
 
def load_from_file(filename):
    """从JSON文件加载数据"""
    try:
        with open(filename, 'r') as f:
            data = ujson.loads(f.read())
        print(f"从 {filename} 加载数据成功")
        return data
    except OSError:
        print(f"{filename} 不存在,返回默认值")
        return None
    except Exception as e:
        print(f"加载数据失败: {e}")
        return None
 
 
# --- 硬件初始化函数 ---
def init_hardware():
    """初始化 I2C, OLED 和 BME680 传感器"""
    global i2c, oled, bme, bme_OK, R0, data_to_save
    i2c = machine.I2C(0, sda=machine.Pin(I2C_SDA_PIN), scl=machine.Pin(I2C_SCL_PIN), freq=400000)
    print("I2C 设备扫描:", [hex(addr) for addr in i2c.scan()])
 
    # 初始化OLED
    oled = SSD1306_I2C(OLED_WIDTH, OLED_HEIGHT, i2c, addr=OLED_I2C_ADDR)
    oled.fill(0)
    oled.text("Initializing...", 0, 0)
    oled.show()
 
    # 初始化BME680
    try:
        bme = bme680.BME680_I2C(i2c, address=BME680_ADDR)
        bme.overscan_temperature = 8
        bme.overscan_humidity = 2
        bme.overscan_pressure = 4
        bme.filter_size = 3
        print("BME680 初始化成功")
        oled.text("BME680 OK", 0, 16)
        bme_OK = True
    except OSError as e:
        print("BME680 初始化失败:", e)
        oled.text("BME680 Error!", 0, 16)
        bme_OK = False
 
    # 加载配置文件
    loaded_data = load_from_file(CONFIG_FILE)
    if loaded_data:
        data_to_save.update(loaded_data)
        R0 = data_to_save.get("r0")
        if R0:
            print(f"加载已保存的基准电阻 R0 = {R0:.0f} Ohms")
        else:
            print("配置文件中未找到 R0,需要校准。")
    else:
        print("未找到配置文件,首次运行请校准。")
 
    oled.show()
    time.sleep(1)
 
 
# --- WiFi和网络相关函数 ---
def connect_wifi():
    """连接WiFi网络"""
    global ip_addr, WIFI_OK,WIFI_SSID, WIFI_PASSWORD,WIFI_SSID_list,WIFI_PASSWORD_list
    wlan = network.WLAN(network.STA_IF)
    timeout = 15  # 15秒超时
    for i in range(len(WIFI_SSID_list)):
        WIFI_SSID=WIFI_SSID_list[i]
        WIFI_PASSWORD=WIFI_PASSWORD_list[i]
        # 等待一小段时间确保断开成功
        wlan.active(False)
        time.sleep(2)             
        wlan.active(True)
        time.sleep(1)
        timeout = 15  # 15秒超时
        #WIFI_PASSWORD=WIFI_PASSWORD_list[i]
        if not wlan.isconnected():
            print('正在连接 WiFi:', WIFI_SSID)
            oled.fill(0)
            oled.text("Connecting WiFi...", 0, 0)
            oled.show()
            print(WIFI_SSID, WIFI_PASSWORD)
            wlan.connect(WIFI_SSID, WIFI_PASSWORD)
            while timeout > 0 and not wlan.isconnected():
                timeout -= 0.5
                time.sleep(0.5)
        if wlan.isconnected():
            break 
    if wlan.isconnected():
        WIFI_OK = True
        ip_addr = wlan.ifconfig()[0]
        print(f'WiFi 连接成功, IP: {ip_addr}')
    else:
        WIFI_OK = False
        ip_addr = "192.168.4.1"
        print(f'WiFi 连接失败, IP: {ip_addr}')
        _thread.start_new_thread(start_ap_provisioning, ())
 
    # 更新OLED显示IP
    oled.fill(0)
    oled.text(f"IP: {ip_addr}", 0, 0)
    oled.show()
    time.sleep(1)
    return ip_addr
 
 
# ------------------- 加载 WiFi 配置 -------------------
def load_wifi_config():
    """供外部(read_display2.py)调用"""
    return load_from_file(WIFI_CONFIG_FILE)
 
 
# ------------------- 新增:保存 WiFi 配置 -------------------
def save_wifi_config(ssid, password):
    
    save_to_file(WIFI_CONFIG_FILE, {"ssid": ssid, "password": password})
    
    
# ------------------- AP 配网 Web Server -------------------
def start_ap_provisioning():
    ap0 = network.WLAN(network.AP_IF)
    ap0.active(True)
    ap0.config(essid=AP_SSID, password=AP_PASS)
    while not ap0.active():
        time.sleep(0.5)
    print(f"[smart_config] AP 模式 {AP_SSID} 已开启")
    addr0 = socket.getaddrinfo('0.0.0.0', WEB_PORT)[0][-1]
    s0 = socket.socket()
    s0.bind(addr0)
    s0.listen(1)
    html = """<!DOCTYPE html>
<html><head><meta charset="utf-8"/><title>ESP32 配网</title></head>
<body><h2>ESP32 WiFi 配置</h2>
<form action="/configure" method="POST">
WiFi SSID:<br><input name="ssid" required><br>
Password:<br><input type="password" name="password" required><br><br>
<button type="submit">连接并保存</button></form></body></html>"""
    while True:
        cl, _ = s0.accept()
        req = cl.recv(1024).decode()
        if 'POST /configure' in req:
            ssid = req.split('&')[0].split('=')[-1]
            print(ssid)
            pwd  = req.split('&')[1].split('=')[1]
            save_wifi_config(ssid, pwd)
            cl.send("HTTP/1.1 200 OK

<h1>配置成功!3s 后重启...</h1>")
            cl.close()
            time.sleep(3)
            machine.reset()
        else:
            cl.send("HTTP/1.1 200 OK
Content-Type: text/html

" + html)
            cl.close()
            
 
def sync_ntp_time():
    """同步NTP时间"""
    global current_time_str,oled_paused
    try:
        print("正在同步 NTP 时间...")
        ntptime.host = 'cn.pool.ntp.org'
        ntptime.settime()
        print("NTP 时间同步成功")
        update_current_time()
        oled_paused=False
        return True
    except Exception as e:
        print("NTP 时间同步失败:", e)
        current_time_str = "NTP Fail"
        return False
 
 
def update_current_time():
    """更新当前时间字符串(北京时间)"""
    global current_time_str
    tm = time.localtime()
    beijing_hour = (tm[3] + 8) % 24
    current_time_str = "{:02d}:{:02d}:{:02d}".format(beijing_hour, tm[4], tm[5])
 
 
# --- 传感器数据处理函数 ---
def read_sensor():
    """读取BME680传感器数据并进行温湿度补偿"""
    global latest_data, latest_compensated_gas
    print(f"当前空闲内存: {gc.mem_free()} 字节")
    print(f"已分配内存: {gc.mem_alloc()} 字节")
    gc.collect()
    print(f"GC后空闲内存: {gc.mem_free()} 字节")  # 对比是否释放了内存
    if not bme_OK:
        return None
 
    try:
        print("读取传感器数据")
        temp = bme.temperature
        humidity = bme.humidity
        pressure = bme.pressure
        gas = bme.gas
 
        # 数据有效性检查
        if temp is None or humidity is None or pressure is None or gas is None:
            return None
 
        # 温湿度补偿计算
        T_ref = 298.15  # 参考温度(25°C)
        H_ref = 50.0  # 参考湿度(50%)
        T_meas = 273.15 + temp  # 转换为开尔文
        H_factor = 1.0 + (humidity - H_ref) * 0.005  # 湿度补偿因子
 
        # 计算补偿后气体电阻
        if T_meas > 0:
            latest_compensated_gas = gas * (T_ref / T_meas) * H_factor
        else:
            latest_compensated_gas = gas
 
        latest_data = (temp, humidity, pressure, gas)
        return latest_data
 
    except Exception as e:
        print(f"读取传感器数据失败: {e}")
        return None
 
 
def add_data_to_history(data):
    """添加传感器数据到历史记录"""
    global sensor_data_history
    if data and len(data) >= 5:
        compensated_gas = latest_compensated_gas if latest_compensated_gas else 0
        # 历史记录格式:(时间戳, 温度, 湿度, 气压, 原始气体电阻, 补偿后气体电阻)
        sensor_data_history.append((data[0], data[1], data[2], data[3], data[4], compensated_gas))
 
        # 限制历史记录长度
        while len(sensor_data_history) > MAX_RECORDS:
            sensor_data_history.pop(0)
 
 
def get_running_time():
    """获取设备运行时间"""
    seconds = time.time() - start_time
    hours, rem = divmod(seconds, 3600)
    minutes, seconds = divmod(rem, 60)
    return "{:02d}:{:02d}:{:02d}".format(int(hours), int(minutes), int(seconds))
 
 
# --- 空气质量评估函数 ---
def get_air_quality(gas_resistance):
    """根据气体电阻值评估空气质量"""
    global R0, air_Q
    if R0 is None:
        air_Q="NA"
        return "未校准"
    if gas_resistance <= 0:
        air_Q="Invaild"
        return "数据无效"
 
    ratio = gas_resistance / R0
    if ratio >= 0.8:
        air_Q="Great!"
        return "优秀"
    elif 0.5 < ratio < 0.8:
        air_Q="Good!"
        return "良好"
    elif 0.3 < ratio <= 0.5:
        air_Q="Normal."
        return "一般(注意)"
    else:
        air_Q="Bad.."
        return "较差(警示)"
 
 
def get_formaldehyde_hint(air_quality, gas_resistance):
    """根据空气质量给出甲醛提示"""
    global R0
    if R0 is None:
        return "请先校准传感器"
    if air_quality == "较差(警示)" and gas_resistance < 0.3 * R0:
        return "可能存在甲醛等VOC超标,建议通风"
    else:
        return "甲醛浓度未超标(参考)"
 
 
# --- 校准函数 ---
def calibrate_r0():
    """同步版本的BME680校准函数(保留未使用)"""
    global R0, is_calibrating, latest_compensated_gas, data_to_save, oled_paused
    is_calibrating = True
    oled_paused = True
    oled.fill(0)
    oled.text(f"IP: {ip_addr}", 0, 0)
    oled.text("Calibrating...", 0, 20)
    oled.text("Keep sensor in", 0, 35)
    oled.text("clean air!", 0, 45)
    oled.show()
    print("开始校准 BME680... 请确保传感器处于清洁空气中")
    raw_values = []
 
    # 校准前稳定
    for _ in range(5):
        if read_sensor():
            time.sleep(1)
 
    # 采样20次
    for _ in range(20):
        read_sensor()
        if latest_compensated_gas > 0:
            raw_values.append(latest_compensated_gas)
        time.sleep(1)
 
    # 计算结果
    if len(raw_values) > 0:
        R0 = sum(raw_values) / len(raw_values)
        data_to_save["r0"] = R0
        save_to_file(CONFIG_FILE, data_to_save)
        print(f"校准完成, 基准电阻 R0 (补偿后) = {R0:.0f} Ohms")
        oled.fill(0)
        oled.text(f"IP: {ip_addr}", 0, 0)
        oled.text("Calibrate OK!", 0, 20)
        oled.text(f"R0: {R0:.0f} Ohms", 0, 35)
    else:
        print("校准失败:未读取到有效数据")
        oled.fill(0)
        oled.text(f"IP: {ip_addr}", 0, 0)
        oled.text("Calibrate Fail!", 0, 20)
 
    oled.show()
    time.sleep(2)
    is_calibrating = False
    oled_paused = False
 
 
 
 
 
def build_html_response():
    print('len data:',len(sensor_data_history))
    uptime = get_running_time()
    temp = humidity = pressure = gas = compensated_gas = "N/A"
    air_quality = "未知"
    formaldehyde_hint = ""
    if latest_data and latest_compensated_gas > 0:
        temp, humidity, pressure, gas = latest_data
        compensated_gas = latest_compensated_gas
        air_quality = get_air_quality(compensated_gas)
        formaldehyde_hint = get_formaldehyde_hint(air_quality, compensated_gas)
    labels_js = "["
    temp_data_js = "["
    humidity_data_js = "["
    gas_data_js = "["
    for i, record in enumerate(sensor_data_history):
        tm = time.localtime(record[0])
        beijing_hour = (tm[3] + 8) % 24
        labels_js += f'"{beijing_hour:02d}:{tm[4]:02d}"'
        temp_data_js += f'{record[1]:.2f}'
        humidity_data_js += f'{record[2]:.2f}'
        gas_data_js += f'{record[5]:.0f}'
        if i < len(sensor_data_history) - 1:
            labels_js += ", "
            temp_data_js += ", "
            humidity_data_js += ", "
            gas_data_js += ", "
    labels_js += "]"
    temp_data_js += "]"
    humidity_data_js += "]"
    gas_data_js += "]"
    html = f"""
<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>ESP32 BME680 监测站</title>
    <script src="/chart.min.js">
    <style>
        body {{ font-family: sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; background-color: #f5f5f5; }}
        .container {{ display: flex; flex-wrap: wrap; gap: 20px; margin-top: 20px; }}
        .card {{ background: white; border-radius: 12px; box-shadow: 0 2px 8px rgba(0,0,0,0.1); padding: 20px; flex: 1; min-width: 250px; text-align: center; }}
        h3 {{ color: #333; text-align: center; margin-bottom: 10px; }}
        .header-info {{ text-align: center; color: #666; font-size: 0.9em; margin-bottom: 20px; }}
        .value {{ font-size: 2.5em; font-weight: bold; color: #2c3e50; margin: 15px 0; }}
        .small-text {{ font-size: 0.9em; color: #555; margin-top: 5px; }}
        .chart-container {{ background: white; border-radius: 12px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); padding: 20px; margin-top: 20px; }}
        canvas {{ width: 100%; height: 300px; }}
        .calibrate-btn {{ text-align: center; margin: 20px 0; }}
        .calibrate-btn button {{ padding: 10px 20px; font-size: 1.1em; background-color: #4CAF50; color: white; border: none; border-radius: 5px; cursor: pointer; }}
        .calibrate-btn button:hover {{ background-color: #45a049; }}
        .calibrate-btn button:disabled {{ background-color: #cccccc; cursor: not-allowed; }}
    </style>
</head>
<body>
    <h2>环境监测</h2>
    <p class="header-info">IP: {ip_addr} | 运行时间: {uptime} | 更新时间: {current_time_str}</p>
    <div class="calibrate-btn">
        <buttonhljs-string">"{' disabled' if is_calibrating else ''}>
            {'校准中...' if is_calibrating else '校准传感器'}
        </button>
        <phljs-string">" class="small-text"></p>
    </div>
    <div class="container">
        <div class="card">
            <h3>温度</h3>
            <div class="value">{temp:.2f} °C</div>
        </div>
        <div class="card">
            <h3>湿度</h3>
            <div class="value">{humidity:.2f} %</div>
        </div>
        <div class="card">
            <h3>气压</h3>
            <div class="value">{pressure:.2f} hPa</div>
        </div>
        <div class="card">
            <h3>气体电阻</h3>
            <div class="value">{gas:.0f} Ohms</div>
            <div class="small-text">补偿后: {compensated_gas:.0f} Ohms</div>
            <div class="small-text">R0: {R0:.0f} Ohms</div>
        </div>
    </div>
    <div class="card">
        <h3>空气质量</h3>
        <div class="value">{air_quality}</div>
        <div class="small-text">{formaldehyde_hint}</div>
    </div>
    <div class="chart-container">
        <h3>过去{len(sensor_data_history)}点数据趋势</h3>
        <canvashljs-string">"></canvas>
    </div>
    <script>
        document.addEventListener('DOMContentLoaded', function() {{
        document.getElementById('calibrateBtn').addEventListener('click', calibrateSensor);
        let isCalibrating = {str(is_calibrating).lower()};

        function calibrateSensor() {{
            fetch('/calibrate').then(r=>r.text()).then(alert);
        }}
        if (typeof Chart !== 'undefined') {{
            const ctx = document.getElementById('historyChart').getContext('2d');
            new Chart(ctx, {{
            type: 'line',
            data: {{
                labels: {labels_js},
                datasets: [
                    {{
                        label: '温度 (°C)',
                        data: {temp_data_js},
                        borderColor: 'rgb(255, 99, 132)',
                        backgroundColor: 'rgba(255, 99, 132, 0.2)',
                        yAxisID: 'y',
                        tension: 0.3
                    }},
                    {{
                        label: '湿度 (%)',
                        data: {humidity_data_js},
                        borderColor: 'rgb(54, 162, 235)',
                        backgroundColor: 'rgba(54, 162, 235, 0.2)',
                        yAxisID: 'y1',
                        tension: 0.3
                    }},
                    {{
                        label: '气体电阻 (Ohms)',
                        data: {gas_data_js},
                        borderColor: 'rgb(75, 192, 192)',
                        backgroundColor: 'rgba(75, 192, 192, 0.2)',
                        yAxisID: 'y2',
                        tension: 0.3
                    }}
                ]
            }}

        }});
        }} else {{
            
            document.getElementById('historyChart').parentElement.innerHTML = '<p>图表加载失败。</p>';
        }}
    }});
        </script>
</body>
</html>
"""
    return html
 
 
 
# --- 主函数 ---
def handle_client(conn, addr):
    print('客户端连接:', addr)
    global is_calibrating, last_record_time, sync_ntp_success, oled_paused
    gc.collect()
    try:
        request = b""
        while True:
            chunk = conn.recv(1024)
            if not chunk:
                break
            request += chunk
            if b"

" in request:  # 检查请求头结束符
                break
        request_str = request.decode('utf-8')
        # 初始化默认响应
        response = 'HTTP/1.1 404 Not Found
Content-Type: text/plain
Connection: close

404 - Not Found'
 
        if request_str.startswith('GET /calibrate'):
            print("收到校准请求")
            try:
                _thread.start_new_thread(calibrate_r0, ())  # ← 用线程
                response = 'HTTP/1.1 200 OK
Content-Type: text/plain
Connection: close

校准已启动,请等待30秒。'
            except Exception as cal_error:
                print(f"校准执行失败: {cal_error}")
                response = 'HTTP/1.1 500 Internal Server Error
Content-Type: text/plain
Connection: close

校准执行失败,请重试。'
        elif request_str.startswith('GET /chart.min.js '):
            print("收到 chart.min.js 请求")
            try:
                with open('chart.min.js', 'rb') as f:
                    # 确保发送正确的 Content-Type
                    header = b'HTTP/1.1 200 OK
Content-Type: application/javascript
Content-Length: '
                    header += str(os.stat('chart.min.js')[6]).encode()
                    header += b'
Connection: close

'
                    conn.send(header)
                    gc.collect()
                    # 分块发送文件内容
                    while True:
                        chunk = f.read(512)
                        if not chunk:
                            break
                        conn.send(chunk)
                        gc.collect()
            except OSError:
                print("chart.min.js 文件未找到")
                conn.send(
                    b'HTTP/1.1 404 Not Found
Content-Type: text/plain
Connection: close

File Not Found')
            finally:
                conn.close()
                gc.collect()
                oled_paused = False
            return
        elif 'GET / ' in request_str or 'GET /index.html' in request_str:
            print("收到get请求")
            oled_paused = True
            response_html = build_html_response()
            try:
                
                send_large_response(conn, response_html)
                print("发送get请求完成")
            except Exception as e:
                print(f"发送请求失败: {e}")
            finally:
                # 确保关闭连接(如果使用长连接需调整)
                conn.close()
                gc.collect()
                oled_paused = False
            return
 
 
        conn.sendall(response.encode('utf-8'))
    except Exception as e:
        print(f"处理客户端请求失败: {e}")
    finally:
        conn.close()
        gc.collect()
        oled_paused = False
        print('客户端连接关闭')
 
def send_large_response(conn, response_html):
    """分块发送大型HTML响应,处理部分发送情况"""
    # 构建响应头(不包含响应体)
    headers = [
        'HTTP/1.1 200 OK',
        'Content-Type: text/html; charset=utf-8',
        f'Content-Length: {len(response_html.encode("utf-8"))}',
        'Connection: close',
        '
'  # 空行分隔头部和体
    ]
    header_bytes = '
'.join(headers).encode('utf-8')
    
    # 发送响应头
    conn.sendall(header_bytes)
    gc.collect()
    
    # 分块发送响应体(处理大型数据)
    body_bytes = response_html.encode('utf-8')
    chunk_size = 256  # 每次发送8KB(可根据网络情况调整)
    sent = 0
    total = len(body_bytes)
    print(f"准备发送 {total} 字节数据")
    while sent < total:
        remaining = total - sent
        send_size = min(chunk_size, remaining)
        try:
            # 发送部分数据
            sent_bytes = conn.send(body_bytes[sent:sent+send_size])
            if sent_bytes == 0:
                raise RuntimeError("连接中断")
            sent += sent_bytes
        except socket.error as e:
            print(f"发送数据失败: {e}")
            raise
        finally:
            gc.collect()
    
    print(f"成功发送 {total} 字节数据")
 
# --- OLED更新函数 ---
def oled_update():
    """OLED屏幕更新线程(持续显示传感器数据)"""
    global is_calibrating, last_record_time, sync_ntp_success, oled_paused,air_Q,WIFI_SSID
    while True:
        # 暂停更新逻辑
        while oled_paused:
            time.sleep(1)  # 暂停时降低CPU占用
 
        current_sec = time.time()
 
        # 定期读取传感器数据
        if current_sec - last_record_time >= RECORD_INTERVAL_SEC and not is_calibrating:
            sensor_reading = read_sensor()
            if sensor_reading:
                # 新数据点格式:(时间戳, 温度, 湿度, 气压, 原始气体电阻)
                new_data_point = (
                current_sec, sensor_reading[0], sensor_reading[1], sensor_reading[2], sensor_reading[3])
                add_data_to_history(new_data_point)
                air_quality = get_air_quality(latest_compensated_gas)
 
                # 打印日志
                print(f"[{current_time_str}] T: {sensor_reading[0]:.2f}C, H: {sensor_reading[1]:.2f}%, P: {sensor_reading[2]:.2f}hPa, Gas: {sensor_reading[3]:.0f} Ohms, Compensated: {latest_compensated_gas:.0f} Ohms, AQ: {air_quality}")
 
            last_record_time = current_sec
            update_current_time()
 
        # 更新OLED显示
        if not is_calibrating:
            oled.fill(0)
            # 显示IP地址(简化显示,只显示最后一段)
            oled.text(f"IP:{ip_addr}", 0, 0)
            # 显示网络状态
            oled.text(f"Net:" + (f"{WIFI_SSID}" if sync_ntp_success else "NTP Err"), 0, 10)
 
            # 显示传感器数据
            if latest_data and latest_compensated_gas > 0:
                oled.text(f"T: {latest_data[0]:.1f}C", 0, 25)
                oled.text(f"H: {latest_data[1]:.1f}%", 64, 25)
                oled.text(f"G: {latest_compensated_gas:.0f} O", 0, 40)
                oled.text(f"AQ: {air_Q}", 0, 55)
            else:
                oled.text("Sensor Err", 0, 25)
 
            oled.show()
 
        time.sleep(10)  # 控制更新频率
 
 
 
# --- 主函数 (多线程版本) ---
def main():
    """程序主入口(多线程版本)"""
    global is_calibrating, last_record_time, sync_ntp_success, s,oled_paused
    global current_time_str, start_time, WIFI_SSID, WIFI_PASSWORD,WIFI_SSID_list,WIFI_PASSWORD_list
 
    init_hardware()
 
    # ... (加载WiFi配置、连接WiFi、同步NTP时间的代码保持不变) ...
    try:
        wifi_config = load_wifi_config()
        if wifi_config and 'ssid' in wifi_config and 'password' in wifi_config:
            WIFI_SSID_list = wifi_config['ssid']
            WIFI_PASSWORD_list = wifi_config['password']
        else:
            print("WiFi配置无效,使用默认值")
    except Exception as e:
        print(f"加载WiFi配置失败: {e}")
    
    WIFI_SSID=WIFI_SSID_list[0]
    WIFI_PASSWORD=WIFI_PASSWORD_list[0]
    connect_wifi()
    sync_ntp_success = sync_ntp_time()
 
    # --- 创建标准的TCP服务器 socket ---
    addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1]
    s = socket.socket()
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind(addr)
    s.listen(5)
    print(f'Web 服务器启动,访问 http://{ip_addr}')
 
    # 初始化变量
    last_record_time = 0
    start_time = time.time()
 
    # 启动OLED更新线程
    _thread.start_new_thread(oled_update, ())
    while True:
        if not sync_ntp_success and WIFI_OK:
            time.sleep(2)
            sync_ntp_success = sync_ntp_time()
            if sync_ntp_success:
                continue
            
 
        s.setblocking(False)
        try:
            conn, addr = s.accept()
            # 为每个连接启动一个新线程来处理
            _thread.start_new_thread(handle_client, (conn, addr))
        except OSError:
            pass
 
        time.sleep(0.5)
 
 
if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print("程序被用户中断")
    finally:
        # 程序退出时清理资源
        if 's' in globals() and s:
            s.close()
        print("程序退出")
 

代码解析与注意事项

配置区


I2C_SDA_PIN
 和 
I2C_SCL_PIN
 默认为 21 和 22,如果你的接线不同,请修改。
BME680_ADDR
 如果传感器无法被识别,可以尝试 
0x76
。可以参考扫描出来的地址。
RECORD_INTERVAL_SEC
 和 
MAX_RECORDS
 控制数据记录的频率和历史数据的长度。注意:因为esp32的内存容量有限,在
MAX_RECORDS>15后,显示网页会刷新chart很慢或者失败卡死。即使我加了很多的gc.collect。。。。。

库文件

确保 
ssd1306.py
 和 
bme680.py
 已经正确上传到 ESP32。代码开头的 
import
 语句会导入它们。

NTP 时间同步

代码会尝试连接到 NTP 服务器 (
cn.pool.ntp.org
) 来获取当前时间。这需要设备能够访问互联网。如果同步失败,OLED 会显示 “NTP Err”,但程序会继续运行,只是时间显示不正确。

Web 服务器

联网失败时会启动AP模式,连接ESP32_Air_Quality,在192.168.4.1:8080端口,可以增加WiFi配置。目前没有删除功能,如果保存的太多了,按照顺序连接很慢,需要的话自行到json文件删减。
简易甲醛监测仪-ESP32读取BME680显示服务器在端口 80 上监听。HTML 页面中嵌入了 Chart.js 库,它会从 ESP32 获取历史数据并在浏览器中动态生成图表。
简易甲醛监测仪-ESP32读取BME680
handle_client
 函数是简化的 HTTP 请求处理器,它只处理根路径 (
/
) 的 GET 请求。
s.setblocking(False)
 和 
try-except OSError
 是实现非阻塞服务器的关键,这样 Web 服务器就不会影响传感器数据的读取和 OLED 的刷新。

OLED 显示

两种显示模式

如何运行

将代码保存到 ESP32,命名为 
main.py
。ESP32 会自动重启并开始运行程序。在 OLED 屏幕上看到 IP 地址后,在同一网络下的电脑或手机浏览器中输入该 IP 地址,即可看到实时数据和趋势图。OLED 显示 IP:在 OLED 顶部固定显示 ESP32 的 IP 地址(
oled.text("IP: " + ip_addr, 0, 0)
),方便用户查看并连接 Web 界面。Web 校准接口
/calibrate
 接口,通过浏览器点击按钮触发校准。校准期间 Web 页面会显示提示,OLED 也会显示校准状态。校准状态管理: 
is_calibrating
 标志,校准期间暂停传感器数据读取和 OLED 显示切换,避免干扰校准过程。Web 页面优化:添加 “校准 BME680” 按钮,点击后发送请求到 
/calibrate
,校准完成后自动刷新页面。数据显示:在 Web 页面和 OLED 上同时显示空气质量等级和甲醛提示

简易甲醛监测仪-ESP32读取BME680简易甲醛监测仪-ESP32读取BME680

© 版权声明

相关文章

暂无评论

您必须登录才能参与评论!
立即登录
none
暂无评论...