WICの中から

機構設計者が株式投資や育児に奮闘するblog

【Raspberry pi】空気品質センサCCS811を使って我が家の空気を観測する

f:id:temcee:20190801234146j:plain

空気が悪い。

梅雨が長かったからか、どうも部屋内の空気が淀んでいるように感じます。空気の悪さを見れないものかと調べてみると、空気品質センサがヒットしました。

というわけで、今回は空気品質センサCCS811をRaspberry piにつなげて、空気を観測してみました。

www.switch-science.com

CCS811で測定できるもの

  • 二酸化炭素相当物(eCO2)
  • 総揮発性勇気化合物(TVOC)

TVOCの方は聞き慣れないですね。
溶剤、燃料として用いられるトルエン、ベンゼンなどを指しており、この濃度を低減させることはシックハウス症候群のリスクを低減させます。

ワイヤの接続

温度センサは付けず、シンプルな構成にしています。Fritzingできちんとした図を作りたかったですが、操作が調べながらで時間かかったので、今回は割愛します。そのうちシレッと図を追加するかもしれません。

f:id:temcee:20190801234127j:plain

CSS811とピンヘッダのハンダは相変わらずのクオリティ…今のハンダごては会社用にして、もうちょっときちんとしたハンダごてを買おうかな。

Raspberry piのI2C設定

ラズパイを起動したら、コンフィグでInterfaces内のI2CをEnabledにします。その後、再起動。

f:id:temcee:20190801235415p:plain

f:id:temcee:20190801235431p:plain

設定ファイル/bot/config.txtに下記を書き込みます。自分はコレを忘れていて、実行時にエラーを吐いて詰まりました。

dtparam=i2c_baudrate=10000

センサーが正しく接続されているかどうかを確かめます。i2cdetectコマンドで下記の様に表示されれば、センサが正しく認識されています。

sudo i2cdetect -y 1

f:id:temcee:20190801235735p:plain

コード

CCS811をPythonで実装している記事は、調べればいくつか見つかります。

中を見た感じ、大本はAdafruitから提供されているサンプルのようです。

今回は下記の記事を参考に、slackに関するところを省いてprintでセンサがきちんと動くかどうかを確認します。

空気品質センサでオフィスの二酸化炭素濃度をモニターする - ユニファ開発者ブログ

atom_log.py実行用のファイルです。

#!/usr/bin/env python

import os
import sys
from logging import basicConfig, getLogger, DEBUG, FileHandler, Formatter
from time import sleep

from ccs811 import CCS811

class AirConditionMonitor:
    CO2_PPM_THRESHOLD_1 = 1000
    CO2_PPM_THRESHOLD_2 = 2000

    CO2_LOWER_LIMIT  =  400
    CO2_HIGHER_LIMIT = 8192

    CO2_STATUS_CONDITIONING = 'CONDITIONING'
    CO2_STATUS_LOW          = 'LOW'
    CO2_STATUS_HIGH         = 'HIGH'
    CO2_STATUS_TOO_HIGH     = 'TOO HIGH'
    CO2_STATUS_ERROR        = 'ERROR'


    LOG_FILE = '{script_dir}/logs/air_condition_monitor.log'.format(
        script_dir = os.path.dirname(os.path.abspath(__file__))
    )

    def __init__(self):
        self._ccs811 = CCS811()
        self.co2_status = self.CO2_STATUS_LOW
        self.init_logger()

    def init_logger(self):
        self._logger = getLogger(__class__.__name__)
        file_handler = FileHandler(self.LOG_FILE)
        formatter = Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        file_handler.setFormatter(formatter)
        self._logger.addHandler(file_handler)
        self._logger.setLevel(DEBUG)

    def status(self, co2):
        if co2 < self.CO2_LOWER_LIMIT or co2 > self.CO2_HIGHER_LIMIT:
            return self.CO2_STATUS_CONDITIONING
        elif co2 < self.CO2_PPM_THRESHOLD_1:
            return self.CO2_STATUS_LOW
        elif co2 < self.CO2_PPM_THRESHOLD_2:
            return self.CO2_STATUS_HIGH
        else:
            return self.CO2_STATUS_TOO_HIGH

    def execute(self):
        while not self._ccs811.available():
            pass

        while True:
            if not self._ccs811.available():
                sleep(1)
                continue

            try:
                if not self._ccs811.readData():
                    co2 = self._ccs811.geteCO2()
                    co2_status = self.status(co2)
                    if co2_status == self.CO2_STATUS_CONDITIONING:
                        print("Under Conditioning...")
                        self._logger.debug("Under Conditioning...")
                        sleep(2)
                        continue

                    print("CO2: {0}ppm, TVOC: {1}".format(co2, self._ccs811.getTVOC()))

                    if co2_status != self.co2_status:
                        self.co2_status = co2_status
                        self._logger.info("CO2: {0}ppm, TVOC: {1}".format(co2, self._ccs811.getTVOC()))
                else:
                    self._logger.error('ERROR!')
                    while True:
                        pass
            except:
                self._logger.error(sys.exc_info())

            sleep(2)

if __name__ == '__main__':
    air_condition_monitor = AirConditionMonitor()
    air_condition_monitor.execute()

ccs811.py センサとのやり取りはこちらで記述されています。

#!/usr/bin/env python

import os
import smbus2 as smbus
from collections import OrderedDict
from logging import basicConfig, getLogger, DEBUG, FileHandler, Formatter
from time import sleep

CCS811_ADDRESS  =  0x5B

CCS811_STATUS = 0x00
CCS811_MEAS_MODE = 0x01
CCS811_ALG_RESULT_DATA = 0x02
CCS811_HW_ID = 0x20

CCS811_DRIVE_MODE_IDLE = 0x00
CCS811_DRIVE_MODE_1SEC = 0x01
CCS811_DRIVE_MODE_10SEC = 0x02
CCS811_DRIVE_MODE_60SEC = 0x03
CCS811_DRIVE_MODE_250MS = 0x04

CCS811_BOOTLOADER_APP_START = 0xF4

CCS811_HW_ID_CODE = 0x81

class CCS811:
    LOG_FILE = '{script_dir}/logs/ccs811.log'.format(script_dir = os.path.dirname(os.path.abspath(__file__)))

    def __init__(self, mode = CCS811_DRIVE_MODE_1SEC, address = CCS811_ADDRESS):
        self.init_logger()

        if mode not in [CCS811_DRIVE_MODE_IDLE, CCS811_DRIVE_MODE_1SEC, CCS811_DRIVE_MODE_10SEC, CCS811_DRIVE_MODE_60SEC, CCS811_DRIVE_MODE_250MS]:
            raise ValueError('Unexpected mode value {0}.  Set mode to one of CCS811_DRIVE_MODE_IDLE, CCS811_DRIVE_MODE_1SEC, CCS811_DRIVE_MODE_10SEC, CCS811_DRIVE_MODE_60SEC or CCS811_DRIVE_MODE_250MS'.format(mode))

        self._address = address
        self._bus = smbus.SMBus(1)

        self._status = Bitfield([('ERROR' , 1), ('unused', 2), ('DATA_READY' , 1), ('APP_VALID', 1), ('unused2' , 2), ('FW_MODE' , 1)])

        self._meas_mode = Bitfield([('unused', 2), ('INT_THRESH', 1), ('INT_DATARDY', 1), ('DRIVE_MODE', 3)])

        self._error_id = Bitfield([('WRITE_REG_INVALID', 1), ('READ_REG_INVALID', 1), ('MEASMODE_INVALID', 1), ('MAX_RESISTANCE', 1), ('HEATER_FAULT', 1), ('HEATER_SUPPLY', 1)])

        self._TVOC = 0
        self._eCO2 = 0
        #check that the HW id is correct
        if self.readU8(CCS811_HW_ID) != CCS811_HW_ID_CODE:
            raise Exception("Device ID returned is not correct! Please check your wiring.")

        self.writeList(CCS811_BOOTLOADER_APP_START, [])
        sleep(0.1)

        #make sure there are no errors and we have entered application mode
        if self.checkError():
            raise Exception("Device returned an Error! Try removing and reapplying power to the device and running the code again.")
        if not self._status.FW_MODE:
            raise Exception("Device did not enter application mode! If you got here, there may be a problem with the firmware on your sensor.")

        self.disableInterrupt()

        self.setDriveMode(mode)

    def init_logger(self):
        self._logger = getLogger(__class__.__name__)
        file_handler = FileHandler(self.LOG_FILE)
        formatter = Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        file_handler.setFormatter(formatter)
        self._logger.addHandler(file_handler)
        self._logger.setLevel(DEBUG)

    def disableInterrupt(self):
        self._meas_mode.INT_DATARDY = 1
        self.write8(CCS811_MEAS_MODE, self._meas_mode.get())

    def setDriveMode(self, mode):
        self._meas_mode.DRIVE_MODE = mode
        self.write8(CCS811_MEAS_MODE, self._meas_mode.get())

    def available(self):
        self._status.set(self.readU8(CCS811_STATUS))
        if not self._status.DATA_READY:
            return False
        else:
            return True

    def readData(self):
        if not self.available():
            return False
        else:
            buf = self.readList(CCS811_ALG_RESULT_DATA, 8)
            self._eCO2 = (buf[0] << 8) | (buf[1])
            self._TVOC = (buf[2] << 8) | (buf[3])
            if self._status.ERROR:
                return buf[5]
            else:
                return 0

    def getTVOC(self):
        return self._TVOC

    def geteCO2(self):
        return self._eCO2

    def checkError(self):
        self._status.set(self.readU8(CCS811_STATUS))
        return self._status.ERROR

    def readU8(self, register):
        result = self._bus.read_byte_data(self._address, register) & 0xFF
        self._logger.debug("Read 0x%02X from register 0x%02X", result, register)
        return result

    def write8(self, register, value):
        value = value & 0xFF
        self._bus.write_byte_data(self._address, register, value)
        self._logger.debug("Wrote 0x%02X to register 0x%02X", value, register)

    def readList(self, register, length):
        results = self._bus.read_i2c_block_data(self._address, register, length)
        self._logger.debug("Read the following from register 0x%02X: %s", register, results)
        return results

    def writeList(self, register, data):
        self._bus.write_i2c_block_data(self._address, register, data)
        self._logger.debug("Wrote to register 0x%02X: %s", register, data)

class Bitfield:
    def __init__(self, _structure):
        self._structure = OrderedDict(_structure)
        for key, value in self._structure.items():
            setattr(self, key, 0)

    def get(self):
        fullreg = 0
        pos = 0
        for key, value in self._structure.items():
            fullreg = fullreg | ( (getattr(self, key) & (2**value - 1)) << pos )
            pos = pos + value

        return fullreg

    def set(self, data):
        pos = 0
        for key, value in self._structure.items():
            setattr(self, key, (data >> pos) & (2**value - 1))
            pos = pos + value

注意点として、自分はログファイル周りでエラーがおきました。どうもログファイルが見つからない…ということで、「生成されないんだっけ」と思いながら、マニュアルでログファイルを作成し、事なきを得ました。

空気の観測

atom_log.pyを実行すると、下記の通りeCO2とTVOCの値が流れてきます。

f:id:temcee:20190801234146j:plain

センサに息を吹きかけてみると数値が変動します。きちんと動作した!

データシートによると、実使用前に48時間のエージングと、起動毎に20分のウォームアップが必要なようです。今回は起動だけ見て、時間を見つけてcsvに出力するようコードをいじろうと思います。部屋ごとの空気の状況と、非常に空気が悪い会社の事務所のデータを取って比較したいですね。

参考リンク

空気品質センサでオフィスの二酸化炭素濃度をモニターする - ユニファ開発者ブログ

https://learn.adafruit.com/adafruit-ccs811-air-quality-sensor/raspberry-pi-wiring-test

CCS811搭載 空気品質センサモジュール - スイッチサイエンス

Hardware Hump Day: Air Quality Measurements with the CCS811 - News - SparkFun Electronics

こんな記事も書いています。

temcee.hatenablog.com

temcee.hatenablog.com