简易甲醛监测仪-ESP32读取BME680
MicroPython在ESP32上实现读取BME680,并自动连接WiFi,建立本地浏览器供访问,实时数据和历史趋势图。在0.96的oled显示:IP,状态,实时数据。
项目概述
一个基于 ESP32 的环境监测站,具备以下功能:
传感器读取:通过 I2C 总线读取 BME680 传感器的温度、湿度、气压和气体电阻(VOC 指数)。WiFi 连接:设备启动后自动连接到指定的 WiFi 网络。Web 服务器:在 ESP32 上建立一个简单的 Web 服务器。
实时数据:通过浏览器访问,可以看到当前的温、湿、压、气数据。历史趋势图:使用 Chart.js 库在浏览器端绘制过去半小时的数据趋势图。 OLED 显示:在 0.96 寸 I2C OLED 屏幕上循环显示:
ESP32 的 IP 地址和 WiFi 连接状态。实时环境数据。过去半小时内的温度(或其他你选择的参数)变化曲线。设备运行时长。当前时间(需要网络时间同步 NTP)。
硬件准备
ESP32 开发板 (如 NodeMCU-32S), 16RMB
BME680 传感器模块 (I2C 接口) 35RMB
0.96 寸 I2C OLED 显示屏 (128×64 分辨率) 6RMB
合适的小盒子和电线,烙铁。USB 数据线
硬件接线
所有传感器和显示屏都将通过 I2C 总线与 ESP32 连接。I2C 总线需要两根线:SDA (数据线) 和 SCL (时钟线)。
| ESP32 引脚 | BME680 引脚 | OLED 引脚 | 说明 |
|---|---|---|---|
| 3V3 | VCC | VCC | 供电 (3.3V,切勿接 5V) |
| GND | GND | GND | 接地 |
| GPIO21 (SDA) | SDA | SDA | I2C 数据线 |
| GPIO22 (SCL) | SCL | SCL | I2C 时钟线 |
注意:
请确认你的 BME680 和 OLED 模块的 VCC 电压。大多数模块是 3.3V,有些 OLED 模块可能支持 5V,但 ESP32 的 GPIO 是 3.3V 电平,为安全起见,统一使用 3.3V 供电。I2C 设备地址:BME680 默认地址通常是 或
0x76。OLED 屏幕默认地址通常是
0x77 或
0x3C。如果你的代码无法找到设备,可能需要扫描 I2C 地址或修改代码中的地址。
0x3D
软件准备
MicroPython 固件:确保ESP32 刷入了最新版的 MicroPython 固件。Thonny IDE:用于编写和上传代码。必要的 MicroPython 库:
: 用于驱动 0.96 寸 OLED 屏幕。
ssd1306.py: 用于驱动 BME680 传感器。
bme680.py
如何获取库文件并上传到 ESP32:
下载库文件:
: https://github.com/micropython/micropython/blob/master/drivers/display/ssd1306.py
ssd1306.py: https://github.com/adafruit/Adafruit_CircuitPython_BME680
bme680.py
上传库文件:
在 Thonny 中,打开下载或安装的 和
ssd1306.py 文件。对于每个文件,点击
bme680.py ->
文件,在弹出的对话框中选择
另存为... (或
ESP32),文件名保持不变,点击保存。这样库文件就被上传到了 ESP32 的文件系统中。 ssd1306.py
MicroPython 设备
# MicroPython SSD1306 OLED driver, I2C and SPI interfaces
from micropython import const
import framebuf
# register definitions
SET_CONTRAST = const(0x81)
SET_ENTIRE_ON = const(0xA4)
SET_NORM_INV = const(0xA6)
SET_DISP = const(0xAE)
SET_MEM_ADDR = const(0x20)
SET_COL_ADDR = const(0x21)
SET_PAGE_ADDR = const(0x22)
SET_DISP_START_LINE = const(0x40)
SET_SEG_REMAP = const(0xA0)
SET_MUX_RATIO = const(0xA8)
SET_COM_OUT_DIR = const(0xC0)
SET_DISP_OFFSET = const(0xD3)
SET_COM_PIN_CFG = const(0xDA)
SET_DISP_CLK_DIV = const(0xD5)
SET_PRECHARGE = const(0xD9)
SET_VCOM_DESEL = const(0xDB)
SET_CHARGE_PUMP = const(0x8D)
# Subclassing FrameBuffer provides support for graphics primitives
# http://docs.micropython.org/en/latest/pyboard/library/framebuf.html
class SSD1306(framebuf.FrameBuffer):
def __init__(self, width, height, external_vcc):
self.width = width
self.height = height
self.external_vcc = external_vcc
self.pages = self.height // 8
self.buffer = bytearray(self.pages * self.width)
super().__init__(self.buffer, self.width, self.height, framebuf.MONO_VLSB)
self.init_display()
def init_display(self):
for cmd in (
SET_DISP | 0x00, # off
# address setting
SET_MEM_ADDR,
0x00, # horizontal
# resolution and layout
SET_DISP_START_LINE | 0x00,
SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0
SET_MUX_RATIO,
self.height - 1,
SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0
SET_DISP_OFFSET,
0x00,
SET_COM_PIN_CFG,
0x02 if self.width > 2 * self.height else 0x12,
# timing and driving scheme
SET_DISP_CLK_DIV,
0x80,
SET_PRECHARGE,
0x22 if self.external_vcc else 0xF1,
SET_VCOM_DESEL,
0x30, # 0.83*Vcc
# display
SET_CONTRAST,
0xFF, # maximum
SET_ENTIRE_ON, # output follows RAM contents
SET_NORM_INV, # not inverted
# charge pump
SET_CHARGE_PUMP,
0x10 if self.external_vcc else 0x14,
SET_DISP | 0x01,
): # on
self.write_cmd(cmd)
self.fill(0)
self.show()
def poweroff(self):
self.write_cmd(SET_DISP | 0x00)
def poweron(self):
self.write_cmd(SET_DISP | 0x01)
def contrast(self, contrast):
self.write_cmd(SET_CONTRAST)
self.write_cmd(contrast)
def invert(self, invert):
self.write_cmd(SET_NORM_INV | (invert & 1))
def show(self):
x0 = 0
x1 = self.width - 1
if self.width == 64:
# displays with width of 64 pixels are shifted by 32
x0 += 32
x1 += 32
self.write_cmd(SET_COL_ADDR)
self.write_cmd(x0)
self.write_cmd(x1)
self.write_cmd(SET_PAGE_ADDR)
self.write_cmd(0)
self.write_cmd(self.pages - 1)
self.write_data(self.buffer)
class SSD1306_I2C(SSD1306):
def __init__(self, width, height, i2c, addr=0x3C, external_vcc=False):
self.i2c = i2c
self.addr = addr
self.temp = bytearray(2)
self.write_list = [b"x40", None] # Co=0, D/C#=1
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.temp[0] = 0x80 # Co=1, D/C#=0
self.temp[1] = cmd
self.i2c.writeto(self.addr, self.temp)
def write_data(self, buf):
self.write_list[1] = buf
self.i2c.writevto(self.addr, self.write_list)
class SSD1306_SPI(SSD1306):
def __init__(self, width, height, spi, dc, res, cs, external_vcc=False):
self.rate = 10 * 1024 * 1024
dc.init(dc.OUT, value=0)
res.init(res.OUT, value=0)
cs.init(cs.OUT, value=1)
self.spi = spi
self.dc = dc
self.res = res
self.cs = cs
import time
self.res(1)
time.sleep_ms(1)
self.res(0)
time.sleep_ms(10)
self.res(1)
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.spi.init(baudrate=self.rate, polarity=0, phase=0)
self.cs(1)
self.dc(0)
self.cs(0)
self.spi.write(bytearray([cmd]))
self.cs(1)
def write_data(self, buf):
self.spi.init(baudrate=self.rate, polarity=0, phase=0)
self.cs(1)
self.dc(1)
self.cs(0)
self.spi.write(buf)
self.cs(1)
ssd屏幕检测ssd1306check.py
import time # <--- 添加这一行
import machine
import ssd1306
import network # 如果用到了 network 也需要导入
# I2C 配置(与你的接线一致)
I2C_SDA_PIN = 21
I2C_SCL_PIN = 22
OLED_I2C_ADDR = 0x3C # 你的 OLED 地址
# 初始化 I2C
i2c = machine.I2C(0, sda=machine.Pin(I2C_SDA_PIN), scl=machine.Pin(I2C_SCL_PIN), freq=400000)
# 扫描 I2C 设备,确认 OLED 已连接
print("I2C 设备地址:", [hex(i) for i in i2c.scan()])
if OLED_I2C_ADDR not in i2c.scan():
print("未找到 OLED 设备,请检查接线!")
else:
# 读取 SSD1306 分辨率配置(核心指令)
def read_oled_resolution(i2c, addr):
# 发送读取命令(SSD1306 控制寄存器地址 0x00)
i2c.writeto_mem(addr, 0x00, b'x0C') # 读取显示模式
time.sleep(0.1)
# 读取分辨率相关配置(不同芯片可能略有差异,这里兼容主流型号)
try:
# 尝试 128x64 配置验证
i2c.writeto_mem(addr, 0x00, b'x20x00') # 设置内存地址模式
time.sleep(0.1)
# 若能正常写入,大概率是 128x64;若报错,尝试 128x32
return (128, 64)
except:
try:
i2c.writeto_mem(addr, 0x00, b'x20x01')
time.sleep(0.1)
return (128, 32)
except:
return (None, None)
# 执行检测
width, height = read_oled_resolution(i2c, OLED_I2C_ADDR)
if width and height:
print(f"检测到 OLED 分辨率:{width}x{height}")
else:
print("自动检测失败,尝试手动验证!")
ssd1306test.py
import machine
import ssd1306
import network
import time
# --- 配置 ---
# WiFi 信息
WIFI_SSID = "XJ-000120"
WIFI_PASSWORD = "wansbdx666"
# I2C 配置
# 对于大多数 ESP32 开发板,推荐使用 GPIO21 (SDA) 和 GPIO22 (SCL)
I2C_SDA_PIN = 21
I2C_SCL_PIN = 22
OLED_I2C_ADDR = 0x3C # OLED 的 I2C 地址
# OLED 屏幕尺寸 (0.96寸通常是 128x64)
OLED_WIDTH = 128
OLED_HEIGHT = 64
# --- 初始化 ---
# 1. 初始化 I2C 总线
print("初始化 I2C 总线...")
i2c = machine.I2C(0, sda=machine.Pin(I2C_SDA_PIN), scl=machine.Pin(I2C_SCL_PIN), freq=400000)
# 扫描 I2C 设备,确认 OLED 是否被识别
print("扫描 I2C 设备地址:", [hex(i) for i in i2c.scan()])
if OLED_I2C_ADDR not in i2c.scan():
print(f"错误:未找到 I2C 设备地址 {hex(OLED_I2C_ADDR)}。请检查接线或地址是否正确。")
else:
# 2. 初始化 OLED 显示屏
print(f"初始化 OLED 显示屏 (地址: {hex(OLED_I2C_ADDR)})...")
oled = ssd1306.SSD1306_I2C(OLED_WIDTH, OLED_HEIGHT, i2c, addr=OLED_I2C_ADDR)
# 3. 连接 WiFi
print("连接 WiFi...")
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
# 等待 WiFi 连接
while not wlan.isconnected():
oled.fill(0)
oled.text("Connecting to", 0, 0)
oled.text(WIFI_SSID, 0, 16)
oled.show()
time.sleep(0.5)
print(".", end="")
print("WiFi 连接成功!")
ip_address = wlan.ifconfig()[0]
print(f"IP 地址: {ip_address}")
# --- 在 OLED 上显示信息 ---
# 清屏
oled.fill(0)
# 显示标题
oled.text("ESP32 OLED Demo", 4, 0)
# 画一条水平线
oled.hline(0, 12, 128, 1)
# 显示 WiFi 信息
oled.text("WiFi:"+WIFI_SSID, 0, 20)
#oled.text(WIFI_SSID, 0, 32)
# 显示 IP 地址
oled.text("IP:", 0, 30)
oled.text(ip_address, 0, 45)
# 将缓存的内容显示到屏幕上
oled.show()
print("信息已显示在 OLED 屏幕上。")
bme680.py文件
# The MIT License (MIT)
#
# Copyright (c) 2017 ladyada for Adafruit Industries
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# We have a lot of attributes for this complex sensor.
# pylint: disable=too-many-instance-attributes
"""
`bme680` - BME680 - Temperature, Humidity, Pressure & Gas Sensor
================================================================
MicroPython driver from BME680 air quality sensor, based on Adafruit_bme680
* Author(s): Limor 'Ladyada' Fried of Adafruit
Jeff Raber (SPI support)
and many more contributors
"""
import time
import math
from micropython import const
from ubinascii import hexlify as hex
try:
import struct
except ImportError:
import ustruct as struct
# I2C ADDRESS/BITS/SETTINGS
# -----------------------------------------------------------------------
_BME680_CHIPID = const(0x61)
_BME680_REG_CHIPID = const(0xD0)
_BME680_BME680_COEFF_ADDR1 = const(0x89)
_BME680_BME680_COEFF_ADDR2 = const(0xE1)
_BME680_BME680_RES_HEAT_0 = const(0x5A)
_BME680_BME680_GAS_WAIT_0 = const(0x64)
_BME680_REG_SOFTRESET = const(0xE0)
_BME680_REG_CTRL_GAS = const(0x71)
_BME680_REG_CTRL_HUM = const(0x72)
_BME280_REG_STATUS = const(0xF3)
_BME680_REG_CTRL_MEAS = const(0x74)
_BME680_REG_CONFIG = const(0x75)
_BME680_REG_PAGE_SELECT = const(0x73)
_BME680_REG_MEAS_STATUS = const(0x1D)
_BME680_REG_PDATA = const(0x1F)
_BME680_REG_TDATA = const(0x22)
_BME680_REG_HDATA = const(0x25)
_BME680_SAMPLERATES = (0, 1, 2, 4, 8, 16)
_BME680_FILTERSIZES = (0, 1, 3, 7, 15, 31, 63, 127)
_BME680_RUNGAS = const(0x10)
_LOOKUP_TABLE_1 = (2147483647.0, 2147483647.0, 2147483647.0, 2147483647.0, 2147483647.0,
2126008810.0, 2147483647.0, 2130303777.0, 2147483647.0, 2147483647.0,
2143188679.0, 2136746228.0, 2147483647.0, 2126008810.0, 2147483647.0,
2147483647.0)
_LOOKUP_TABLE_2 = (4096000000.0, 2048000000.0, 1024000000.0, 512000000.0, 255744255.0, 127110228.0,
64000000.0, 32258064.0, 16016016.0, 8000000.0, 4000000.0, 2000000.0, 1000000.0,
500000.0, 250000.0, 125000.0)
def _read24(arr):
"""Parse an unsigned 24-bit value as a floating point and return it."""
ret = 0.0
#print([hex(i) for i in arr])
for b in arr:
ret *= 256.0
ret += float(b & 0xFF)
return ret
class Adafruit_BME680:
"""Driver from BME680 air quality sensor
:param int refresh_rate: Maximum number of readings per second. Faster property reads
will be from the previous reading."""
def __init__(self, *, refresh_rate=10):
"""Check the BME680 was found, read the coefficients and enable the sensor for continuous
reads."""
self._write(_BME680_REG_SOFTRESET, [0xB6])
time.sleep(0.005)
# Check device ID.
chip_id = self._read_byte(_BME680_REG_CHIPID)
if chip_id != _BME680_CHIPID:
raise RuntimeError('Failed to find BME680! Chip ID 0x%x' % chip_id)
self._read_calibration()
# set up heater
self._write(_BME680_BME680_RES_HEAT_0, [0x73])
self._write(_BME680_BME680_GAS_WAIT_0, [0x65])
self.sea_level_pressure = 1013.25
"""Pressure in hectoPascals at sea level. Used to calibrate ``altitude``."""
# Default oversampling and filter register values.
self._pressure_oversample = 0b011
self._temp_oversample = 0b100
self._humidity_oversample = 0b010
self._filter = 0b010
self._adc_pres = None
self._adc_temp = None
self._adc_hum = None
self._adc_gas = None
self._gas_range = None
self._t_fine = None
self._last_reading = time.ticks_ms()
self._min_refresh_time = 1000 // refresh_rate
@property
def pressure_oversample(self):
"""The oversampling for pressure sensor"""
return _BME680_SAMPLERATES[self._pressure_oversample]
@pressure_oversample.setter
def pressure_oversample(self, sample_rate):
if sample_rate in _BME680_SAMPLERATES:
self._pressure_oversample = _BME680_SAMPLERATES.index(sample_rate)
else:
raise RuntimeError("Invalid oversample")
@property
def humidity_oversample(self):
"""The oversampling for humidity sensor"""
return _BME680_SAMPLERATES[self._humidity_oversample]
@humidity_oversample.setter
def humidity_oversample(self, sample_rate):
if sample_rate in _BME680_SAMPLERATES:
self._humidity_oversample = _BME680_SAMPLERATES.index(sample_rate)
else:
raise RuntimeError("Invalid oversample")
@property
def temperature_oversample(self):
"""The oversampling for temperature sensor"""
return _BME680_SAMPLERATES[self._temp_oversample]
@temperature_oversample.setter
def temperature_oversample(self, sample_rate):
if sample_rate in _BME680_SAMPLERATES:
self._temp_oversample = _BME680_SAMPLERATES.index(sample_rate)
else:
raise RuntimeError("Invalid oversample")
@property
def filter_size(self):
"""The filter size for the built in IIR filter"""
return _BME680_FILTERSIZES[self._filter]
@filter_size.setter
def filter_size(self, size):
if size in _BME680_FILTERSIZES:
self._filter = _BME680_FILTERSIZES[size]
else:
raise RuntimeError("Invalid size")
@property
def temperature(self):
"""The compensated temperature in degrees celsius."""
self._perform_reading()
calc_temp = (((self._t_fine * 5) + 128) / 256)
return calc_temp / 100
@property
def pressure(self):
"""The barometric pressure in hectoPascals"""
self._perform_reading()
var1 = (self._t_fine / 2) - 64000
var2 = ((var1 / 4) * (var1 / 4)) / 2048
var2 = (var2 * self._pressure_calibration[5]) / 4
var2 = var2 + (var1 * self._pressure_calibration[4] * 2)
var2 = (var2 / 4) + (self._pressure_calibration[3] * 65536)
var1 = (((((var1 / 4) * (var1 / 4)) / 8192) *
(self._pressure_calibration[2] * 32) / 8) +
((self._pressure_calibration[1] * var1) / 2))
var1 = var1 / 262144
var1 = ((32768 + var1) * self._pressure_calibration[0]) / 32768
calc_pres = 1048576 - self._adc_pres
calc_pres = (calc_pres - (var2 / 4096)) * 3125
calc_pres = (calc_pres / var1) * 2
var1 = (self._pressure_calibration[8] * (((calc_pres / 8) * (calc_pres / 8)) / 8192)) / 4096
var2 = ((calc_pres / 4) * self._pressure_calibration[7]) / 8192
var3 = (((calc_pres / 256) ** 3) * self._pressure_calibration[9]) / 131072
calc_pres += ((var1 + var2 + var3 + (self._pressure_calibration[6] * 128)) / 16)
return calc_pres/100
@property
def humidity(self):
"""The relative humidity in RH %"""
self._perform_reading()
temp_scaled = ((self._t_fine * 5) + 128) / 256
var1 = ((self._adc_hum - (self._humidity_calibration[0] * 16)) -
((temp_scaled * self._humidity_calibration[2]) / 200))
var2 = (self._humidity_calibration[1] *
(((temp_scaled * self._humidity_calibration[3]) / 100) +
(((temp_scaled * ((temp_scaled * self._humidity_calibration[4]) / 100)) /
64) / 100) + 16384)) / 1024
var3 = var1 * var2
var4 = self._humidity_calibration[5] * 128
var4 = (var4 + ((temp_scaled * self._humidity_calibration[6]) / 100)) / 16
var5 = ((var3 / 16384) * (var3 / 16384)) / 1024
var6 = (var4 * var5) / 2
calc_hum = (((var3 + var6) / 1024) * 1000) / 4096
calc_hum /= 1000 # get back to RH
if calc_hum > 100:
calc_hum = 100
if calc_hum < 0:
calc_hum = 0
return calc_hum
@property
def altitude(self):
"""The altitude based on current ``pressure`` vs the sea level pressure
(``sea_level_pressure``) - which you must enter ahead of time)"""
pressure = self.pressure # in Si units for hPascal
return 44330.77 * (1.0 - math.pow(pressure / self.sea_level_pressure, 0.1902632))
@property
def gas(self):
"""The gas resistance in ohms"""
self._perform_reading()
var1 = ((1340 + (5 * self._sw_err)) * (_LOOKUP_TABLE_1[self._gas_range])) / 65536
var2 = ((self._adc_gas * 32768) - 16777216) + var1
var3 = (_LOOKUP_TABLE_2[self._gas_range] * var1) / 512
calc_gas_res = (var3 + (var2 / 2)) / var2
return int(calc_gas_res)
def _perform_reading(self):
"""Perform a single-shot reading from the sensor and fill internal data structure for
calculations"""
expired = time.ticks_diff(self._last_reading, time.ticks_ms()) * time.ticks_diff(0, 1)
if 0 <= expired < self._min_refresh_time:
time.sleep_ms(self._min_refresh_time - expired)
# set filter
self._write(_BME680_REG_CONFIG, [self._filter << 2])
# turn on temp oversample & pressure oversample
self._write(_BME680_REG_CTRL_MEAS,
[(self._temp_oversample << 5)|(self._pressure_oversample << 2)])
# turn on humidity oversample
self._write(_BME680_REG_CTRL_HUM, [self._humidity_oversample])
# gas measurements enabled
self._write(_BME680_REG_CTRL_GAS, [_BME680_RUNGAS])
ctrl = self._read_byte(_BME680_REG_CTRL_MEAS)
ctrl = (ctrl & 0xFC) | 0x01 # enable single shot!
self._write(_BME680_REG_CTRL_MEAS, [ctrl])
new_data = False
while not new_data:
data = self._read(_BME680_REG_MEAS_STATUS, 15)
new_data = data[0] & 0x80 != 0
time.sleep(0.005)
self._last_reading = time.ticks_ms()
self._adc_pres = _read24(data[2:5]) / 16
self._adc_temp = _read24(data[5:8]) / 16
self._adc_hum = struct.unpack('>H', bytes(data[8:10]))[0]
self._adc_gas = int(struct.unpack('>H', bytes(data[13:15]))[0] / 64)
self._gas_range = data[14] & 0x0F
var1 = (self._adc_temp / 8) - (self._temp_calibration[0] * 2)
var2 = (var1 * self._temp_calibration[1]) / 2048
var3 = ((var1 / 2) * (var1 / 2)) / 4096
var3 = (var3 * self._temp_calibration[2] * 16) / 16384
self._t_fine = int(var2 + var3)
def _read_calibration(self):
"""Read & save the calibration coefficients"""
coeff = self._read(_BME680_BME680_COEFF_ADDR1, 25)
coeff += self._read(_BME680_BME680_COEFF_ADDR2, 16)
coeff = list(struct.unpack('<hbBHhbBhhbbHhhBBBHbbbBbHhbb', bytes(coeff[1:39])))
# print("
",coeff)
coeff = [float(i) for i in coeff]
self._temp_calibration = [coeff[x] for x in [23, 0, 1]]
self._pressure_calibration = [coeff[x] for x in [3, 4, 5, 7, 8, 10, 9, 12, 13, 14]]
self._humidity_calibration = [coeff[x] for x in [17, 16, 18, 19, 20, 21, 22]]
self._gas_calibration = [coeff[x] for x in [25, 24, 26]]
# flip around H1 & H2
self._humidity_calibration[1] *= 16
self._humidity_calibration[1] += self._humidity_calibration[0] % 16
self._humidity_calibration[0] /= 16
self._heat_range = (self._read_byte(0x02) & 0x30) / 16
self._heat_val = self._read_byte(0x00)
self._sw_err = (self._read_byte(0x04) & 0xF0) / 16
def _read_byte(self, register):
"""Read a byte register value and return it"""
return self._read(register, 1)[0]
def _read(self, register, length):
raise NotImplementedError()
def _write(self, register, values):
raise NotImplementedError()
class BME680_I2C(Adafruit_BME680):
"""Driver for I2C connected BME680.
:param i2c: I2C device object
:param int address: I2C device address
:param bool debug: Print debug statements when True.
:param int refresh_rate: Maximum number of readings per second. Faster property reads
will be from the previous reading."""
def __init__(self, i2c, address=0x77, debug=False, *, refresh_rate=10):
"""Initialize the I2C device at the 'address' given"""
self._i2c = i2c
self._address = address
self._debug = debug
super().__init__(refresh_rate=refresh_rate)
def _read(self, register, length):
"""Returns an array of 'length' bytes from the 'register'"""
result = bytearray(length)
self._i2c.readfrom_mem_into(self._address, register & 0xff, result)
if self._debug:
print(" ${:x} read ".format(register), " ".join(["{:02x}".format(i) for i in result]))
return result
def _write(self, register, values):
"""Writes an array of 'length' bytes to the 'register'"""
if self._debug:
print(" ${:x} write".format(register), " ".join(["{:02x}".format(i) for i in values]))
for value in values:
self._i2c.writeto_mem(self._address, register, bytearray([value & 0xFF]))
register += 1
class BME680_SPI(Adafruit_BME680):
"""Driver for SPI connected BME680.
:param spi: SPI device object, configured
:param cs: Chip Select Pin object, configured to OUT mode
:param bool debug: Print debug statements when True.
:param int refresh_rate: Maximum number of readings per second. Faster property reads
will be from the previous reading.
"""
def __init__(self, spi, cs, debug=False, *, refresh_rate=10):
self._spi = spi
self._cs = cs
self._debug = debug
self._cs(1)
super().__init__(refresh_rate=refresh_rate)
def _read(self, register, length):
if register != _BME680_REG_PAGE_SELECT:
# _BME680_REG_PAGE_SELECT exists in both SPI memory pages
# For all other registers, we must set the correct memory page
self._set_spi_mem_page(register)
register = (register | 0x80) & 0xFF # Read single, bit 7 high.
try:
self._cs(0)
self._spi.write(bytearray([register])) # pylint: disable=no-member
result = bytearray(length)
self._spi.readinto(result) # pylint: disable=no-member
if self._debug:
print(" ${:x} read ".format(register), " ".join(["{:02x}".format(i) for i in result]))
except Exception as e:
print (e)
result = None
finally:
self._cs(1)
return result
def _write(self, register, values):
if register != _BME680_REG_PAGE_SELECT:
# _BME680_REG_PAGE_SELECT exists in both SPI memory pages
# For all other registers, we must set the correct memory page
self._set_spi_mem_page(register)
register &= 0x7F # Write, bit 7 low.
try:
self._cs(0)
buffer = bytearray(2 * len(values))
for i, value in enumerate(values):
buffer[2 * i] = register + i
buffer[2 * i + 1] = value & 0xFF
self._spi.write(buffer) # pylint: disable=no-member
if self._debug:
print(" ${:x} write".format(register), " ".join(["{:02x}".format(i) for i in values]))
except Exception as e:
print (e)
finally:
self._cs(1)
def _set_spi_mem_page(self, register):
spi_mem_page = 0x00
if register < 0x80:
spi_mem_page = 0x10
self._write(_BME680_REG_PAGE_SELECT, [spi_mem_page])
bme680test.py文件
import machine
import time
import bme680
# 初始化 I2C
i2c = machine.I2C(0, sda=machine.Pin(21), scl=machine.Pin(22), freq=400000)
# 扫描 I2C 设备地址,确认传感器是否被识别
print("I2C 设备地址:", [hex(i) for i in i2c.scan()])
# 初始化 BME680
try:
# 如果你的传感器地址不是默认的 0x76,请在这里修改
bme = bme680.BME680_I2C(i2c, address=0x77)
# 可以根据需要配置 oversampling
bme.overscan_temperature = 8
bme.overscan_humidity = 2
bme.overscan_pressure = 4
bme.filter_size = 3
print("BME680 初始化成功!")
except OSError as e:
print("BME680 初始化失败,请检查接线和地址:", e)
raise
# 读取数据
while True:
print("
温度: {:.2f} °C".format(bme.temperature))
print("湿度: {:.2f} %".format(bme.humidity))
print("气压: {:.2f} hPa".format(bme.pressure))
print("气体电阻: {:.0f} Ohms".format(bme.gas))
time.sleep(2)
BME680 的气体电阻值(通常以 Ohms 为单位)无法直接精准区分甲醛含量,也没有统一的空气质量等级标准,但可通过气体电阻的相对变化和校准,间接判断空气质量优劣(核心逻辑:气体电阻越低,说明空气中挥发性有机化合物 / VOC 浓度越高,空气质量越差)。
一、核心原理
BME680 的气体传感器通过检测 VOC(甲醛属于 VOC 之一)等还原性气体,气体浓度越高,传感器表面化学反应越剧烈,导电能力越强,表现为气体电阻值越低。
关键前提:气体电阻是 “相对参考值”,受温度、湿度、传感器老化影响大,必须先校准基准值,再通过相对变化判断。局限性:无法单独识别甲醛(会响应酒精、油烟、香水等所有 VOC),也不能直接换算甲醛具体浓度(需专业甲醛传感器如 SGP30/SGP40 配合)。
二、空气质量等级划分(基于气体电阻的相对标准)
需先在 “清洁空气” 中校准基准电阻(R0),再根据实际电阻(R)与 R0 的比值划分等级,以下是行业通用参考方案(需根据实际环境微调):
| 空气质量等级 | 气体电阻与基准值关系 | 直观表现 | 适用场景 |
|---|---|---|---|
| 优秀 | R ≥ 0.8×R0 | 气体电阻接近清洁空气基准 | 室内通风良好、无异味 |
| 良好 | 0.5×R0 < R < 0.8×R0 | 气体电阻略有下降 | 少量 VOC 释放(如刚打开包装) |
| 一般(注意) | 0.3×R0 < R ≤ 0.5×R0 | 气体电阻明显下降 | 室内有轻微异味(如家具释放、吸烟后) |
| 较差(警示) | R ≤ 0.3×R0 | 气体电阻大幅降低 | 高浓度 VOC(如甲醛超标、油烟、酒精挥发) |
校准基准值(R0)的步骤:
将 BME680 放在通风良好、无异味的清洁空气中(如户外新鲜空气或开窗通风 1 小时后的室内)。连续读取 10-20 次气体电阻值,取平均值作为基准值 R0,存入 ESP32 的 NVS 非易失性存储(避免每次重启重新校准)。
三、甲醛含量的间接判断(仅参考,非精准测量)
由于 BME680 不专门检测甲醛,只能通过气体电阻变化 “推测甲醛是否可能超标”,需结合甲醛的特性(常温下甲醛是无色有刺激性气味的 VOC,浓度超标时会伴随气体电阻大幅下降):
甲醛安全标准:我国室内甲醛限值为 ≤ 0.1 mg/m³(约 0.08 ppm),超标时人体可能感到眼鼻刺激。间接判断逻辑:
若气体电阻 R ≤ 0.3×R0,且环境中无明显其他 VOC 来源(如无油烟、酒精、香水),则可能存在甲醛等有害气体超标风险。若要精准测量甲醛含量,需额外搭配甲醛专用传感器(如 EC202、SGP41),与 BME680 数据互补。
补偿思路(简化版)我们将采用一个简化的补偿模型,主要基于温度进行线性补偿,湿度的影响则通过一个经验系数来调整。
温度补偿:气体电阻值(Rg)与绝对温度(T,单位为 Kelvin)成反比。
湿度补偿:湿度增加会导致 Rg 略有下降。我们引入一个经验系数
Rg_compensated ∝ Rg_measured * (T_reference / T_measured) 来修正这个影响。
H_factor
这个公式意味着,每偏离参考湿度(50% RH)1%,电阻值就修正 1%。你可以根据实际情况微调
H_factor = 1.0 + (H_measured - H_reference) * 0.01 这个系数。
0.01
最终补偿公式
Rg_compensated = Rg_measured * (298.15 / (273.15 + T_measured)) * (1.0 + (H_measured - 50.0) * 0.01)
完整代码
1. 1个chart.min.js文件(用于在网页显示曲线趋势)

2. wifi_config.json,用于保存可连接WiFi清单,在开机后会按照顺序尝试连接
{"ssid": ["XJ-000120","CMCC-FfeQ"], "password": ["wansbdx666","fukf3tg4"]}
3. air_quality_config.json, 当前只使用了R0这个值,保存空气质量基准电阻。
{"calibration": 1.2, "last_reading": 95.5, "threshold": 100, "r0": 455217.6}
4.下面是完整的 代码。将此代码保存到 ESP32 中,它将在设备启动时自动运行。
main.py
import _thread
import machine
import network
import time
import socket
import ntptime
import ujson
from ssd1306 import SSD1306_I2C
import bme680
import uos
import uasyncio as asyncio
import gc
import os
from machine import Pin, I2C
import urequests
# --- 配置区 ---
WIFI_CONFIG_FILE = "wifi_config.json"
WIFI_SSID = "CMCC-FfeQ00"
WIFI_PASSWORD = "fukf3t4"
AP_SSID = "ESP32_Air_Quality"
AP_PASS = "12345678"
WEB_PORT = 8080
WIFI_OK = True
I2C_SDA_PIN = 21
I2C_SCL_PIN = 22
OLED_WIDTH = 128
OLED_HEIGHT = 64
OLED_I2C_ADDR = 0x3C
BME680_ADDR = 0x77
RECORD_INTERVAL_SEC = 10
MAX_RECORDS = 12
CONFIG_FILE = "air_quality_config.json"
# --- 全局变量 ---
sensor_data_history = []
start_time = time.time()
current_time_str = "00:00:00"
ip_addr = "0.0.0.0"
is_calibrating = False
latest_data = (0, 0, 0, 0) # (temp, humidity, pressure, gas)
latest_compensated_gas = 0
bme_OK = False
i2c = None
oled = None
bme = None
oled_paused = False
# 校准相关
R0 = None
data_to_save = {
"r0": 10.0,
"threshold": 100,
"calibration": 1.2,
"last_reading": 95.5
}
# --- 工具函数 ---
def save_to_file(filename, data):
"""保存数据到JSON文件"""
try:
with open(filename, 'w') as f:
f.write(ujson.dumps(data))
print(f"数据已保存到 {filename}")
except Exception as e:
print(f"保存数据失败: {e}")
def load_from_file(filename):
"""从JSON文件加载数据"""
try:
with open(filename, 'r') as f:
data = ujson.loads(f.read())
print(f"从 {filename} 加载数据成功")
return data
except OSError:
print(f"{filename} 不存在,返回默认值")
return None
except Exception as e:
print(f"加载数据失败: {e}")
return None
# --- 硬件初始化函数 ---
def init_hardware():
"""初始化 I2C, OLED 和 BME680 传感器"""
global i2c, oled, bme, bme_OK, R0, data_to_save
i2c = machine.I2C(0, sda=machine.Pin(I2C_SDA_PIN), scl=machine.Pin(I2C_SCL_PIN), freq=400000)
print("I2C 设备扫描:", [hex(addr) for addr in i2c.scan()])
# 初始化OLED
oled = SSD1306_I2C(OLED_WIDTH, OLED_HEIGHT, i2c, addr=OLED_I2C_ADDR)
oled.fill(0)
oled.text("Initializing...", 0, 0)
oled.show()
# 初始化BME680
try:
bme = bme680.BME680_I2C(i2c, address=BME680_ADDR)
bme.overscan_temperature = 8
bme.overscan_humidity = 2
bme.overscan_pressure = 4
bme.filter_size = 3
print("BME680 初始化成功")
oled.text("BME680 OK", 0, 16)
bme_OK = True
except OSError as e:
print("BME680 初始化失败:", e)
oled.text("BME680 Error!", 0, 16)
bme_OK = False
# 加载配置文件
loaded_data = load_from_file(CONFIG_FILE)
if loaded_data:
data_to_save.update(loaded_data)
R0 = data_to_save.get("r0")
if R0:
print(f"加载已保存的基准电阻 R0 = {R0:.0f} Ohms")
else:
print("配置文件中未找到 R0,需要校准。")
else:
print("未找到配置文件,首次运行请校准。")
oled.show()
time.sleep(1)
# --- WiFi和网络相关函数 ---
def connect_wifi():
"""连接WiFi网络"""
global ip_addr, WIFI_OK,WIFI_SSID, WIFI_PASSWORD,WIFI_SSID_list,WIFI_PASSWORD_list
wlan = network.WLAN(network.STA_IF)
timeout = 15 # 15秒超时
for i in range(len(WIFI_SSID_list)):
WIFI_SSID=WIFI_SSID_list[i]
WIFI_PASSWORD=WIFI_PASSWORD_list[i]
# 等待一小段时间确保断开成功
wlan.active(False)
time.sleep(2)
wlan.active(True)
time.sleep(1)
timeout = 15 # 15秒超时
#WIFI_PASSWORD=WIFI_PASSWORD_list[i]
if not wlan.isconnected():
print('正在连接 WiFi:', WIFI_SSID)
oled.fill(0)
oled.text("Connecting WiFi...", 0, 0)
oled.show()
print(WIFI_SSID, WIFI_PASSWORD)
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
while timeout > 0 and not wlan.isconnected():
timeout -= 0.5
time.sleep(0.5)
if wlan.isconnected():
break
if wlan.isconnected():
WIFI_OK = True
ip_addr = wlan.ifconfig()[0]
print(f'WiFi 连接成功, IP: {ip_addr}')
else:
WIFI_OK = False
ip_addr = "192.168.4.1"
print(f'WiFi 连接失败, IP: {ip_addr}')
_thread.start_new_thread(start_ap_provisioning, ())
# 更新OLED显示IP
oled.fill(0)
oled.text(f"IP: {ip_addr}", 0, 0)
oled.show()
time.sleep(1)
return ip_addr
# ------------------- 加载 WiFi 配置 -------------------
def load_wifi_config():
"""供外部(read_display2.py)调用"""
return load_from_file(WIFI_CONFIG_FILE)
# ------------------- 新增:保存 WiFi 配置 -------------------
def save_wifi_config(ssid, password):
save_to_file(WIFI_CONFIG_FILE, {"ssid": ssid, "password": password})
# ------------------- AP 配网 Web Server -------------------
def start_ap_provisioning():
ap0 = network.WLAN(network.AP_IF)
ap0.active(True)
ap0.config(essid=AP_SSID, password=AP_PASS)
while not ap0.active():
time.sleep(0.5)
print(f"[smart_config] AP 模式 {AP_SSID} 已开启")
addr0 = socket.getaddrinfo('0.0.0.0', WEB_PORT)[0][-1]
s0 = socket.socket()
s0.bind(addr0)
s0.listen(1)
html = """<!DOCTYPE html>
<html><head><meta charset="utf-8"/><title>ESP32 配网</title></head>
<body><h2>ESP32 WiFi 配置</h2>
<form action="/configure" method="POST">
WiFi SSID:<br><input name="ssid" required><br>
Password:<br><input type="password" name="password" required><br><br>
<button type="submit">连接并保存</button></form></body></html>"""
while True:
cl, _ = s0.accept()
req = cl.recv(1024).decode()
if 'POST /configure' in req:
ssid = req.split('&')[0].split('=')[-1]
print(ssid)
pwd = req.split('&')[1].split('=')[1]
save_wifi_config(ssid, pwd)
cl.send("HTTP/1.1 200 OK
<h1>配置成功!3s 后重启...</h1>")
cl.close()
time.sleep(3)
machine.reset()
else:
cl.send("HTTP/1.1 200 OK
Content-Type: text/html
" + html)
cl.close()
def sync_ntp_time():
"""同步NTP时间"""
global current_time_str,oled_paused
try:
print("正在同步 NTP 时间...")
ntptime.host = 'cn.pool.ntp.org'
ntptime.settime()
print("NTP 时间同步成功")
update_current_time()
oled_paused=False
return True
except Exception as e:
print("NTP 时间同步失败:", e)
current_time_str = "NTP Fail"
return False
def update_current_time():
"""更新当前时间字符串(北京时间)"""
global current_time_str
tm = time.localtime()
beijing_hour = (tm[3] + 8) % 24
current_time_str = "{:02d}:{:02d}:{:02d}".format(beijing_hour, tm[4], tm[5])
# --- 传感器数据处理函数 ---
def read_sensor():
"""读取BME680传感器数据并进行温湿度补偿"""
global latest_data, latest_compensated_gas
print(f"当前空闲内存: {gc.mem_free()} 字节")
print(f"已分配内存: {gc.mem_alloc()} 字节")
gc.collect()
print(f"GC后空闲内存: {gc.mem_free()} 字节") # 对比是否释放了内存
if not bme_OK:
return None
try:
print("读取传感器数据")
temp = bme.temperature
humidity = bme.humidity
pressure = bme.pressure
gas = bme.gas
# 数据有效性检查
if temp is None or humidity is None or pressure is None or gas is None:
return None
# 温湿度补偿计算
T_ref = 298.15 # 参考温度(25°C)
H_ref = 50.0 # 参考湿度(50%)
T_meas = 273.15 + temp # 转换为开尔文
H_factor = 1.0 + (humidity - H_ref) * 0.005 # 湿度补偿因子
# 计算补偿后气体电阻
if T_meas > 0:
latest_compensated_gas = gas * (T_ref / T_meas) * H_factor
else:
latest_compensated_gas = gas
latest_data = (temp, humidity, pressure, gas)
return latest_data
except Exception as e:
print(f"读取传感器数据失败: {e}")
return None
def add_data_to_history(data):
"""添加传感器数据到历史记录"""
global sensor_data_history
if data and len(data) >= 5:
compensated_gas = latest_compensated_gas if latest_compensated_gas else 0
# 历史记录格式:(时间戳, 温度, 湿度, 气压, 原始气体电阻, 补偿后气体电阻)
sensor_data_history.append((data[0], data[1], data[2], data[3], data[4], compensated_gas))
# 限制历史记录长度
while len(sensor_data_history) > MAX_RECORDS:
sensor_data_history.pop(0)
def get_running_time():
"""获取设备运行时间"""
seconds = time.time() - start_time
hours, rem = divmod(seconds, 3600)
minutes, seconds = divmod(rem, 60)
return "{:02d}:{:02d}:{:02d}".format(int(hours), int(minutes), int(seconds))
# --- 空气质量评估函数 ---
def get_air_quality(gas_resistance):
"""根据气体电阻值评估空气质量"""
global R0, air_Q
if R0 is None:
air_Q="NA"
return "未校准"
if gas_resistance <= 0:
air_Q="Invaild"
return "数据无效"
ratio = gas_resistance / R0
if ratio >= 0.8:
air_Q="Great!"
return "优秀"
elif 0.5 < ratio < 0.8:
air_Q="Good!"
return "良好"
elif 0.3 < ratio <= 0.5:
air_Q="Normal."
return "一般(注意)"
else:
air_Q="Bad.."
return "较差(警示)"
def get_formaldehyde_hint(air_quality, gas_resistance):
"""根据空气质量给出甲醛提示"""
global R0
if R0 is None:
return "请先校准传感器"
if air_quality == "较差(警示)" and gas_resistance < 0.3 * R0:
return "可能存在甲醛等VOC超标,建议通风"
else:
return "甲醛浓度未超标(参考)"
# --- 校准函数 ---
def calibrate_r0():
"""同步版本的BME680校准函数(保留未使用)"""
global R0, is_calibrating, latest_compensated_gas, data_to_save, oled_paused
is_calibrating = True
oled_paused = True
oled.fill(0)
oled.text(f"IP: {ip_addr}", 0, 0)
oled.text("Calibrating...", 0, 20)
oled.text("Keep sensor in", 0, 35)
oled.text("clean air!", 0, 45)
oled.show()
print("开始校准 BME680... 请确保传感器处于清洁空气中")
raw_values = []
# 校准前稳定
for _ in range(5):
if read_sensor():
time.sleep(1)
# 采样20次
for _ in range(20):
read_sensor()
if latest_compensated_gas > 0:
raw_values.append(latest_compensated_gas)
time.sleep(1)
# 计算结果
if len(raw_values) > 0:
R0 = sum(raw_values) / len(raw_values)
data_to_save["r0"] = R0
save_to_file(CONFIG_FILE, data_to_save)
print(f"校准完成, 基准电阻 R0 (补偿后) = {R0:.0f} Ohms")
oled.fill(0)
oled.text(f"IP: {ip_addr}", 0, 0)
oled.text("Calibrate OK!", 0, 20)
oled.text(f"R0: {R0:.0f} Ohms", 0, 35)
else:
print("校准失败:未读取到有效数据")
oled.fill(0)
oled.text(f"IP: {ip_addr}", 0, 0)
oled.text("Calibrate Fail!", 0, 20)
oled.show()
time.sleep(2)
is_calibrating = False
oled_paused = False
def build_html_response():
print('len data:',len(sensor_data_history))
uptime = get_running_time()
temp = humidity = pressure = gas = compensated_gas = "N/A"
air_quality = "未知"
formaldehyde_hint = ""
if latest_data and latest_compensated_gas > 0:
temp, humidity, pressure, gas = latest_data
compensated_gas = latest_compensated_gas
air_quality = get_air_quality(compensated_gas)
formaldehyde_hint = get_formaldehyde_hint(air_quality, compensated_gas)
labels_js = "["
temp_data_js = "["
humidity_data_js = "["
gas_data_js = "["
for i, record in enumerate(sensor_data_history):
tm = time.localtime(record[0])
beijing_hour = (tm[3] + 8) % 24
labels_js += f'"{beijing_hour:02d}:{tm[4]:02d}"'
temp_data_js += f'{record[1]:.2f}'
humidity_data_js += f'{record[2]:.2f}'
gas_data_js += f'{record[5]:.0f}'
if i < len(sensor_data_history) - 1:
labels_js += ", "
temp_data_js += ", "
humidity_data_js += ", "
gas_data_js += ", "
labels_js += "]"
temp_data_js += "]"
humidity_data_js += "]"
gas_data_js += "]"
html = f"""
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>ESP32 BME680 监测站</title>
<script src="/chart.min.js">
<style>
body {{ font-family: sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; background-color: #f5f5f5; }}
.container {{ display: flex; flex-wrap: wrap; gap: 20px; margin-top: 20px; }}
.card {{ background: white; border-radius: 12px; box-shadow: 0 2px 8px rgba(0,0,0,0.1); padding: 20px; flex: 1; min-width: 250px; text-align: center; }}
h3 {{ color: #333; text-align: center; margin-bottom: 10px; }}
.header-info {{ text-align: center; color: #666; font-size: 0.9em; margin-bottom: 20px; }}
.value {{ font-size: 2.5em; font-weight: bold; color: #2c3e50; margin: 15px 0; }}
.small-text {{ font-size: 0.9em; color: #555; margin-top: 5px; }}
.chart-container {{ background: white; border-radius: 12px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); padding: 20px; margin-top: 20px; }}
canvas {{ width: 100%; height: 300px; }}
.calibrate-btn {{ text-align: center; margin: 20px 0; }}
.calibrate-btn button {{ padding: 10px 20px; font-size: 1.1em; background-color: #4CAF50; color: white; border: none; border-radius: 5px; cursor: pointer; }}
.calibrate-btn button:hover {{ background-color: #45a049; }}
.calibrate-btn button:disabled {{ background-color: #cccccc; cursor: not-allowed; }}
</style>
</head>
<body>
<h2>环境监测</h2>
<p class="header-info">IP: {ip_addr} | 运行时间: {uptime} | 更新时间: {current_time_str}</p>
<div class="calibrate-btn">
<buttonhljs-string">"{' disabled' if is_calibrating else ''}>
{'校准中...' if is_calibrating else '校准传感器'}
</button>
<phljs-string">" class="small-text"></p>
</div>
<div class="container">
<div class="card">
<h3>温度</h3>
<div class="value">{temp:.2f} °C</div>
</div>
<div class="card">
<h3>湿度</h3>
<div class="value">{humidity:.2f} %</div>
</div>
<div class="card">
<h3>气压</h3>
<div class="value">{pressure:.2f} hPa</div>
</div>
<div class="card">
<h3>气体电阻</h3>
<div class="value">{gas:.0f} Ohms</div>
<div class="small-text">补偿后: {compensated_gas:.0f} Ohms</div>
<div class="small-text">R0: {R0:.0f} Ohms</div>
</div>
</div>
<div class="card">
<h3>空气质量</h3>
<div class="value">{air_quality}</div>
<div class="small-text">{formaldehyde_hint}</div>
</div>
<div class="chart-container">
<h3>过去{len(sensor_data_history)}点数据趋势</h3>
<canvashljs-string">"></canvas>
</div>
<script>
document.addEventListener('DOMContentLoaded', function() {{
document.getElementById('calibrateBtn').addEventListener('click', calibrateSensor);
let isCalibrating = {str(is_calibrating).lower()};
function calibrateSensor() {{
fetch('/calibrate').then(r=>r.text()).then(alert);
}}
if (typeof Chart !== 'undefined') {{
const ctx = document.getElementById('historyChart').getContext('2d');
new Chart(ctx, {{
type: 'line',
data: {{
labels: {labels_js},
datasets: [
{{
label: '温度 (°C)',
data: {temp_data_js},
borderColor: 'rgb(255, 99, 132)',
backgroundColor: 'rgba(255, 99, 132, 0.2)',
yAxisID: 'y',
tension: 0.3
}},
{{
label: '湿度 (%)',
data: {humidity_data_js},
borderColor: 'rgb(54, 162, 235)',
backgroundColor: 'rgba(54, 162, 235, 0.2)',
yAxisID: 'y1',
tension: 0.3
}},
{{
label: '气体电阻 (Ohms)',
data: {gas_data_js},
borderColor: 'rgb(75, 192, 192)',
backgroundColor: 'rgba(75, 192, 192, 0.2)',
yAxisID: 'y2',
tension: 0.3
}}
]
}}
}});
}} else {{
document.getElementById('historyChart').parentElement.innerHTML = '<p>图表加载失败。</p>';
}}
}});
</script>
</body>
</html>
"""
return html
# --- 主函数 ---
def handle_client(conn, addr):
print('客户端连接:', addr)
global is_calibrating, last_record_time, sync_ntp_success, oled_paused
gc.collect()
try:
request = b""
while True:
chunk = conn.recv(1024)
if not chunk:
break
request += chunk
if b"
" in request: # 检查请求头结束符
break
request_str = request.decode('utf-8')
# 初始化默认响应
response = 'HTTP/1.1 404 Not Found
Content-Type: text/plain
Connection: close
404 - Not Found'
if request_str.startswith('GET /calibrate'):
print("收到校准请求")
try:
_thread.start_new_thread(calibrate_r0, ()) # ← 用线程
response = 'HTTP/1.1 200 OK
Content-Type: text/plain
Connection: close
校准已启动,请等待30秒。'
except Exception as cal_error:
print(f"校准执行失败: {cal_error}")
response = 'HTTP/1.1 500 Internal Server Error
Content-Type: text/plain
Connection: close
校准执行失败,请重试。'
elif request_str.startswith('GET /chart.min.js '):
print("收到 chart.min.js 请求")
try:
with open('chart.min.js', 'rb') as f:
# 确保发送正确的 Content-Type
header = b'HTTP/1.1 200 OK
Content-Type: application/javascript
Content-Length: '
header += str(os.stat('chart.min.js')[6]).encode()
header += b'
Connection: close
'
conn.send(header)
gc.collect()
# 分块发送文件内容
while True:
chunk = f.read(512)
if not chunk:
break
conn.send(chunk)
gc.collect()
except OSError:
print("chart.min.js 文件未找到")
conn.send(
b'HTTP/1.1 404 Not Found
Content-Type: text/plain
Connection: close
File Not Found')
finally:
conn.close()
gc.collect()
oled_paused = False
return
elif 'GET / ' in request_str or 'GET /index.html' in request_str:
print("收到get请求")
oled_paused = True
response_html = build_html_response()
try:
send_large_response(conn, response_html)
print("发送get请求完成")
except Exception as e:
print(f"发送请求失败: {e}")
finally:
# 确保关闭连接(如果使用长连接需调整)
conn.close()
gc.collect()
oled_paused = False
return
conn.sendall(response.encode('utf-8'))
except Exception as e:
print(f"处理客户端请求失败: {e}")
finally:
conn.close()
gc.collect()
oled_paused = False
print('客户端连接关闭')
def send_large_response(conn, response_html):
"""分块发送大型HTML响应,处理部分发送情况"""
# 构建响应头(不包含响应体)
headers = [
'HTTP/1.1 200 OK',
'Content-Type: text/html; charset=utf-8',
f'Content-Length: {len(response_html.encode("utf-8"))}',
'Connection: close',
'
' # 空行分隔头部和体
]
header_bytes = '
'.join(headers).encode('utf-8')
# 发送响应头
conn.sendall(header_bytes)
gc.collect()
# 分块发送响应体(处理大型数据)
body_bytes = response_html.encode('utf-8')
chunk_size = 256 # 每次发送8KB(可根据网络情况调整)
sent = 0
total = len(body_bytes)
print(f"准备发送 {total} 字节数据")
while sent < total:
remaining = total - sent
send_size = min(chunk_size, remaining)
try:
# 发送部分数据
sent_bytes = conn.send(body_bytes[sent:sent+send_size])
if sent_bytes == 0:
raise RuntimeError("连接中断")
sent += sent_bytes
except socket.error as e:
print(f"发送数据失败: {e}")
raise
finally:
gc.collect()
print(f"成功发送 {total} 字节数据")
# --- OLED更新函数 ---
def oled_update():
"""OLED屏幕更新线程(持续显示传感器数据)"""
global is_calibrating, last_record_time, sync_ntp_success, oled_paused,air_Q,WIFI_SSID
while True:
# 暂停更新逻辑
while oled_paused:
time.sleep(1) # 暂停时降低CPU占用
current_sec = time.time()
# 定期读取传感器数据
if current_sec - last_record_time >= RECORD_INTERVAL_SEC and not is_calibrating:
sensor_reading = read_sensor()
if sensor_reading:
# 新数据点格式:(时间戳, 温度, 湿度, 气压, 原始气体电阻)
new_data_point = (
current_sec, sensor_reading[0], sensor_reading[1], sensor_reading[2], sensor_reading[3])
add_data_to_history(new_data_point)
air_quality = get_air_quality(latest_compensated_gas)
# 打印日志
print(f"[{current_time_str}] T: {sensor_reading[0]:.2f}C, H: {sensor_reading[1]:.2f}%, P: {sensor_reading[2]:.2f}hPa, Gas: {sensor_reading[3]:.0f} Ohms, Compensated: {latest_compensated_gas:.0f} Ohms, AQ: {air_quality}")
last_record_time = current_sec
update_current_time()
# 更新OLED显示
if not is_calibrating:
oled.fill(0)
# 显示IP地址(简化显示,只显示最后一段)
oled.text(f"IP:{ip_addr}", 0, 0)
# 显示网络状态
oled.text(f"Net:" + (f"{WIFI_SSID}" if sync_ntp_success else "NTP Err"), 0, 10)
# 显示传感器数据
if latest_data and latest_compensated_gas > 0:
oled.text(f"T: {latest_data[0]:.1f}C", 0, 25)
oled.text(f"H: {latest_data[1]:.1f}%", 64, 25)
oled.text(f"G: {latest_compensated_gas:.0f} O", 0, 40)
oled.text(f"AQ: {air_Q}", 0, 55)
else:
oled.text("Sensor Err", 0, 25)
oled.show()
time.sleep(10) # 控制更新频率
# --- 主函数 (多线程版本) ---
def main():
"""程序主入口(多线程版本)"""
global is_calibrating, last_record_time, sync_ntp_success, s,oled_paused
global current_time_str, start_time, WIFI_SSID, WIFI_PASSWORD,WIFI_SSID_list,WIFI_PASSWORD_list
init_hardware()
# ... (加载WiFi配置、连接WiFi、同步NTP时间的代码保持不变) ...
try:
wifi_config = load_wifi_config()
if wifi_config and 'ssid' in wifi_config and 'password' in wifi_config:
WIFI_SSID_list = wifi_config['ssid']
WIFI_PASSWORD_list = wifi_config['password']
else:
print("WiFi配置无效,使用默认值")
except Exception as e:
print(f"加载WiFi配置失败: {e}")
WIFI_SSID=WIFI_SSID_list[0]
WIFI_PASSWORD=WIFI_PASSWORD_list[0]
connect_wifi()
sync_ntp_success = sync_ntp_time()
# --- 创建标准的TCP服务器 socket ---
addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1]
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(addr)
s.listen(5)
print(f'Web 服务器启动,访问 http://{ip_addr}')
# 初始化变量
last_record_time = 0
start_time = time.time()
# 启动OLED更新线程
_thread.start_new_thread(oled_update, ())
while True:
if not sync_ntp_success and WIFI_OK:
time.sleep(2)
sync_ntp_success = sync_ntp_time()
if sync_ntp_success:
continue
s.setblocking(False)
try:
conn, addr = s.accept()
# 为每个连接启动一个新线程来处理
_thread.start_new_thread(handle_client, (conn, addr))
except OSError:
pass
time.sleep(0.5)
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print("程序被用户中断")
finally:
# 程序退出时清理资源
if 's' in globals() and s:
s.close()
print("程序退出")
代码解析与注意事项
配置区:
和
I2C_SDA_PIN 默认为 21 和 22,如果你的接线不同,请修改。
I2C_SCL_PIN 如果传感器无法被识别,可以尝试
BME680_ADDR。可以参考扫描出来的地址。
0x76 和
RECORD_INTERVAL_SEC 控制数据记录的频率和历史数据的长度。注意:因为esp32的内存容量有限,在
MAX_RECORDS
MAX_RECORDS>15后,显示网页会刷新chart很慢或者失败卡死。即使我加了很多的gc.collect。。。。。
库文件:
确保 和
ssd1306.py 已经正确上传到 ESP32。代码开头的
bme680.py 语句会导入它们。
import
NTP 时间同步:
代码会尝试连接到 NTP 服务器 () 来获取当前时间。这需要设备能够访问互联网。如果同步失败,OLED 会显示 “NTP Err”,但程序会继续运行,只是时间显示不正确。
cn.pool.ntp.org
Web 服务器:
联网失败时会启动AP模式,连接ESP32_Air_Quality,在192.168.4.1:8080端口,可以增加WiFi配置。目前没有删除功能,如果保存的太多了,按照顺序连接很慢,需要的话自行到json文件删减。
显示服务器在端口 80 上监听。HTML 页面中嵌入了 Chart.js 库,它会从 ESP32 获取历史数据并在浏览器中动态生成图表。

函数是简化的 HTTP 请求处理器,它只处理根路径 (
handle_client) 的 GET 请求。
/ 和
s.setblocking(False) 是实现非阻塞服务器的关键,这样 Web 服务器就不会影响传感器数据的读取和 OLED 的刷新。
try-except OSError
OLED 显示:
两种显示模式
如何运行
将代码保存到 ESP32,命名为 。ESP32 会自动重启并开始运行程序。在 OLED 屏幕上看到 IP 地址后,在同一网络下的电脑或手机浏览器中输入该 IP 地址,即可看到实时数据和趋势图。OLED 显示 IP:在 OLED 顶部固定显示 ESP32 的 IP 地址(
main.py),方便用户查看并连接 Web 界面。Web 校准接口:
oled.text("IP: " + ip_addr, 0, 0) 接口,通过浏览器点击按钮触发校准。校准期间 Web 页面会显示提示,OLED 也会显示校准状态。校准状态管理:
/calibrate 标志,校准期间暂停传感器数据读取和 OLED 显示切换,避免干扰校准过程。Web 页面优化:添加 “校准 BME680” 按钮,点击后发送请求到
is_calibrating,校准完成后自动刷新页面。数据显示:在 Web 页面和 OLED 上同时显示空气质量等级和甲醛提示
/calibrate

