空気が悪い。
梅雨が長かったからか、どうも部屋内の空気が淀んでいるように感じます。空気の悪さを見れないものかと調べてみると、空気品質センサがヒットしました。
というわけで、今回は空気品質センサCCS811をRaspberry piにつなげて、空気を観測してみました。
CCS811で測定できるもの
- 二酸化炭素相当物(eCO2)
- 総揮発性勇気化合物(TVOC)
TVOCの方は聞き慣れないですね。
溶剤、燃料として用いられるトルエン、ベンゼンなどを指しており、この濃度を低減させることはシックハウス症候群のリスクを低減させます。
ワイヤの接続
温度センサは付けず、シンプルな構成にしています。Fritzingできちんとした図を作りたかったですが、操作が調べながらで時間かかったので、今回は割愛します。そのうちシレッと図を追加するかもしれません。
CSS811とピンヘッダのハンダは相変わらずのクオリティ…今のハンダごては会社用にして、もうちょっときちんとしたハンダごてを買おうかな。
Raspberry piのI2C設定
ラズパイを起動したら、コンフィグでInterfaces内のI2CをEnabledにします。その後、再起動。
設定ファイル/bot/config.txt
に下記を書き込みます。自分はコレを忘れていて、実行時にエラーを吐いて詰まりました。
dtparam=i2c_baudrate=10000
センサーが正しく接続されているかどうかを確かめます。i2cdetect
コマンドで下記の様に表示されれば、センサが正しく認識されています。
sudo i2cdetect -y 1
コード
CCS811をPythonで実装している記事は、調べればいくつか見つかります。
中を見た感じ、大本はAdafruitから提供されているサンプルのようです。
今回は下記の記事を参考に、slackに関するところを省いてprint
でセンサがきちんと動くかどうかを確認します。
空気品質センサでオフィスの二酸化炭素濃度をモニターする - ユニファ開発者ブログ
atom_log.py実行用のファイルです。
#!/usr/bin/env python import os import sys from logging import basicConfig, getLogger, DEBUG, FileHandler, Formatter from time import sleep from ccs811 import CCS811 class AirConditionMonitor: CO2_PPM_THRESHOLD_1 = 1000 CO2_PPM_THRESHOLD_2 = 2000 CO2_LOWER_LIMIT = 400 CO2_HIGHER_LIMIT = 8192 CO2_STATUS_CONDITIONING = 'CONDITIONING' CO2_STATUS_LOW = 'LOW' CO2_STATUS_HIGH = 'HIGH' CO2_STATUS_TOO_HIGH = 'TOO HIGH' CO2_STATUS_ERROR = 'ERROR' LOG_FILE = '{script_dir}/logs/air_condition_monitor.log'.format( script_dir = os.path.dirname(os.path.abspath(__file__)) ) def __init__(self): self._ccs811 = CCS811() self.co2_status = self.CO2_STATUS_LOW self.init_logger() def init_logger(self): self._logger = getLogger(__class__.__name__) file_handler = FileHandler(self.LOG_FILE) formatter = Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) self._logger.addHandler(file_handler) self._logger.setLevel(DEBUG) def status(self, co2): if co2 < self.CO2_LOWER_LIMIT or co2 > self.CO2_HIGHER_LIMIT: return self.CO2_STATUS_CONDITIONING elif co2 < self.CO2_PPM_THRESHOLD_1: return self.CO2_STATUS_LOW elif co2 < self.CO2_PPM_THRESHOLD_2: return self.CO2_STATUS_HIGH else: return self.CO2_STATUS_TOO_HIGH def execute(self): while not self._ccs811.available(): pass while True: if not self._ccs811.available(): sleep(1) continue try: if not self._ccs811.readData(): co2 = self._ccs811.geteCO2() co2_status = self.status(co2) if co2_status == self.CO2_STATUS_CONDITIONING: print("Under Conditioning...") self._logger.debug("Under Conditioning...") sleep(2) continue print("CO2: {0}ppm, TVOC: {1}".format(co2, self._ccs811.getTVOC())) if co2_status != self.co2_status: self.co2_status = co2_status self._logger.info("CO2: {0}ppm, TVOC: {1}".format(co2, self._ccs811.getTVOC())) else: self._logger.error('ERROR!') while True: pass except: self._logger.error(sys.exc_info()) sleep(2) if __name__ == '__main__': air_condition_monitor = AirConditionMonitor() air_condition_monitor.execute()
ccs811.py センサとのやり取りはこちらで記述されています。
#!/usr/bin/env python import os import smbus2 as smbus from collections import OrderedDict from logging import basicConfig, getLogger, DEBUG, FileHandler, Formatter from time import sleep CCS811_ADDRESS = 0x5B CCS811_STATUS = 0x00 CCS811_MEAS_MODE = 0x01 CCS811_ALG_RESULT_DATA = 0x02 CCS811_HW_ID = 0x20 CCS811_DRIVE_MODE_IDLE = 0x00 CCS811_DRIVE_MODE_1SEC = 0x01 CCS811_DRIVE_MODE_10SEC = 0x02 CCS811_DRIVE_MODE_60SEC = 0x03 CCS811_DRIVE_MODE_250MS = 0x04 CCS811_BOOTLOADER_APP_START = 0xF4 CCS811_HW_ID_CODE = 0x81 class CCS811: LOG_FILE = '{script_dir}/logs/ccs811.log'.format(script_dir = os.path.dirname(os.path.abspath(__file__))) def __init__(self, mode = CCS811_DRIVE_MODE_1SEC, address = CCS811_ADDRESS): self.init_logger() if mode not in [CCS811_DRIVE_MODE_IDLE, CCS811_DRIVE_MODE_1SEC, CCS811_DRIVE_MODE_10SEC, CCS811_DRIVE_MODE_60SEC, CCS811_DRIVE_MODE_250MS]: raise ValueError('Unexpected mode value {0}. Set mode to one of CCS811_DRIVE_MODE_IDLE, CCS811_DRIVE_MODE_1SEC, CCS811_DRIVE_MODE_10SEC, CCS811_DRIVE_MODE_60SEC or CCS811_DRIVE_MODE_250MS'.format(mode)) self._address = address self._bus = smbus.SMBus(1) self._status = Bitfield([('ERROR' , 1), ('unused', 2), ('DATA_READY' , 1), ('APP_VALID', 1), ('unused2' , 2), ('FW_MODE' , 1)]) self._meas_mode = Bitfield([('unused', 2), ('INT_THRESH', 1), ('INT_DATARDY', 1), ('DRIVE_MODE', 3)]) self._error_id = Bitfield([('WRITE_REG_INVALID', 1), ('READ_REG_INVALID', 1), ('MEASMODE_INVALID', 1), ('MAX_RESISTANCE', 1), ('HEATER_FAULT', 1), ('HEATER_SUPPLY', 1)]) self._TVOC = 0 self._eCO2 = 0 #check that the HW id is correct if self.readU8(CCS811_HW_ID) != CCS811_HW_ID_CODE: raise Exception("Device ID returned is not correct! Please check your wiring.") self.writeList(CCS811_BOOTLOADER_APP_START, []) sleep(0.1) #make sure there are no errors and we have entered application mode if self.checkError(): raise Exception("Device returned an Error! Try removing and reapplying power to the device and running the code again.") if not self._status.FW_MODE: raise Exception("Device did not enter application mode! If you got here, there may be a problem with the firmware on your sensor.") self.disableInterrupt() self.setDriveMode(mode) def init_logger(self): self._logger = getLogger(__class__.__name__) file_handler = FileHandler(self.LOG_FILE) formatter = Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) self._logger.addHandler(file_handler) self._logger.setLevel(DEBUG) def disableInterrupt(self): self._meas_mode.INT_DATARDY = 1 self.write8(CCS811_MEAS_MODE, self._meas_mode.get()) def setDriveMode(self, mode): self._meas_mode.DRIVE_MODE = mode self.write8(CCS811_MEAS_MODE, self._meas_mode.get()) def available(self): self._status.set(self.readU8(CCS811_STATUS)) if not self._status.DATA_READY: return False else: return True def readData(self): if not self.available(): return False else: buf = self.readList(CCS811_ALG_RESULT_DATA, 8) self._eCO2 = (buf[0] << 8) | (buf[1]) self._TVOC = (buf[2] << 8) | (buf[3]) if self._status.ERROR: return buf[5] else: return 0 def getTVOC(self): return self._TVOC def geteCO2(self): return self._eCO2 def checkError(self): self._status.set(self.readU8(CCS811_STATUS)) return self._status.ERROR def readU8(self, register): result = self._bus.read_byte_data(self._address, register) & 0xFF self._logger.debug("Read 0x%02X from register 0x%02X", result, register) return result def write8(self, register, value): value = value & 0xFF self._bus.write_byte_data(self._address, register, value) self._logger.debug("Wrote 0x%02X to register 0x%02X", value, register) def readList(self, register, length): results = self._bus.read_i2c_block_data(self._address, register, length) self._logger.debug("Read the following from register 0x%02X: %s", register, results) return results def writeList(self, register, data): self._bus.write_i2c_block_data(self._address, register, data) self._logger.debug("Wrote to register 0x%02X: %s", register, data) class Bitfield: def __init__(self, _structure): self._structure = OrderedDict(_structure) for key, value in self._structure.items(): setattr(self, key, 0) def get(self): fullreg = 0 pos = 0 for key, value in self._structure.items(): fullreg = fullreg | ( (getattr(self, key) & (2**value - 1)) << pos ) pos = pos + value return fullreg def set(self, data): pos = 0 for key, value in self._structure.items(): setattr(self, key, (data >> pos) & (2**value - 1)) pos = pos + value
注意点として、自分はログファイル周りでエラーがおきました。どうもログファイルが見つからない…ということで、「生成されないんだっけ」と思いながら、マニュアルでログファイルを作成し、事なきを得ました。
空気の観測
atom_log.py
を実行すると、下記の通りeCO2とTVOCの値が流れてきます。
センサに息を吹きかけてみると数値が変動します。きちんと動作した!
データシートによると、実使用前に48時間のエージングと、起動毎に20分のウォームアップが必要なようです。今回は起動だけ見て、時間を見つけてcsvに出力するようコードをいじろうと思います。部屋ごとの空気の状況と、非常に空気が悪い会社の事務所のデータを取って比較したいですね。
参考リンク
空気品質センサでオフィスの二酸化炭素濃度をモニターする - ユニファ開発者ブログ
https://learn.adafruit.com/adafruit-ccs811-air-quality-sensor/raspberry-pi-wiring-test
CCS811搭載 空気品質センサモジュール - スイッチサイエンス
Hardware Hump Day: Air Quality Measurements with the CCS811 - News - SparkFun Electronics
こんな記事も書いています。