# -*- coding: utf-8 -*-import asyncioimport randomimport reimport randomfrom aiohttp import webfrom cbpi.api import*'''Make sure to extend CBPiSensor'''@parameters([Property.Number(label="Param1", configurable=True), Property.Text(label="Param2", configurable=True, default_value="HALLO"), Property.Select(label="Param3", options=[1,2,4]), Property.Sensor(label="Param4"), Property.Actor(label="Param5")])classCustomSensor(CBPiSensor):def__init__(self,cbpi,id,props):super(CustomSensor, self).__init__(cbpi, id, props) self.value =0@action(key="Test", parameters=[])asyncdefaction1(self,**kwargs):''' A custom action. Which can be called from the user interface '''print("ACTION!", kwargs)asyncdefrun(self):''' This method is executed asynchronousely In this example the code is executed every second '''while self.running isTrue: self.value = random.randint(0,50) self.push_update(self.value)await asyncio.sleep(1)defget_state(self):# return the current state of the sensorreturndict(value=self.value)defsetup(cbpi):''' This method is called by the server during startup Here you need to register your plugins at the server :param cbpi: the cbpi core :return: ''' cbpi.plugin.register("CustomSensor", CustomSensor)
Actor
import loggingfrom unittest.mock import MagicMock, patchfrom cbpi.api import*logger = logging.getLogger(__name__)try:import RPi.GPIO as GPIOexceptException: logger.error("Failed to load RPi.GPIO. Using Mock") MockRPi =MagicMock() modules ={"RPi": MockRPi,"RPi.GPIO": MockRPi.GPIO} patcher = patch.dict("sys.modules", modules) patcher.start()import RPi.GPIO as GPIO@parameters([Property.Number(label="Param1", configurable=True), Property.Text(label="Param2", configurable=True, default_value="HALLO"), Property.Select(label="Param3", options=[1,2,4]), Property.Sensor(label="Param4"), Property.Actor(label="Param5")])classCustomActor(CBPiActor): my_name =""# Custom property which can be configured by the user@action("test", parameters={})asyncdefaction1(self,**kwargs):print("ACTION !", kwargs) self.my_name = kwargs.get("name")passdefinit(self):print("INIT") self.state =Falsepassasyncdefon(self,power=0): logger.info("ACTOR 1111 %s ON"% self.id) self.state =Trueasyncdefoff(self): logger.info("ACTOR %s OFF "% self.id) self.state =Falsedefget_state(self):return self.stateasyncdefrun(self):passdefsetup(cbpi):''' This method is called by the server during startup Here you need to register your plugins at the server :param cbpi: the cbpi core :return: ''' cbpi.plugin.register("CustomActor", CustomActor)