Source code for Hotplates.MSHPro

from __future__ import annotations
from typing import Dict, Callable, Union, Optional, Any

from . import SerialThreadedDuplex
from .MSHProCommunication import COMMUNICATION

import logging

logger = logging.getLogger("Hotplates.MSHPro")
logger.addHandler(logging.NullHandler())


[docs]class MSHPro: """ Serial communication with a MSHPro hotplate. """ SERIAL_SETTINGS = { "baudrate": 9600, "bytesize": SerialThreadedDuplex.serial.EIGHTBITS, # type: ignore "parity": SerialThreadedDuplex.serial.PARITY_NONE, # type: ignore "stopbits": SerialThreadedDuplex.serial.STOPBITS_ONE, # type: ignore "xonxoff": False, "rtscts": False, "dsrdtr": False, } """ Communications settings for the MSHPro hotplate. """ HEATLIMIT_MAX: float = 340.0 """Maximum settable temperature (°C).""" HEATLIMIT_MIN: float = 25.0 """Minimum settable temperature (°C).""" STIRLIMIT_MAX: int = 1500 """Maximum settable stir speed (rpm).""" STIRLIMIT_MIN: int = 100 """Minimum settable stir speed (rpm).""" __Serial: SerialThreadedDuplex.Serial
[docs] def __init__( self, port: Optional[Union[int, str]] = None, timeout: float | int = 0.5, ): """ Create a MSHPro object. The serial settings of the hotplate are stored in :const:`MSHPro.SERIAL_SETTINGS`. Args: port (Optional[Union[int, str]], optional): Serial port name. If an integer is passed it will be converted to a serial port name depending on the current system using :func:`.SerialThreadedDuplex.port_parser` The serial port will not be opened upon creation. Defaults to None. timeout (float | int, optional): Set the timeout for serial read. Passed into :class:`.SerialThreadedDuplex.Serial` creation. Defaults to 0.5. """ # the other timeouts, not included in the SERIAL_SETTINGS, are: # write_timeout # inter_byte_timeout self.__Serial = SerialThreadedDuplex.Serial( timeout=timeout, **self.__class__.SERIAL_SETTINGS, ) if port is not None: self.__Serial.port = SerialThreadedDuplex.port_parser(port)
def __del__(self) -> None: self.__Serial.close() @property def port(self) -> Any: # Returns a `property` object if not set """Serial port name.""" return self.__Serial.port
[docs] def serial_open(self) -> None: """Open Serial.""" # newer versions of PySerial if hasattr(self.__Serial, "is_open"): if self.__Serial.is_open: return # older versions of PySerial elif hasattr(self.__Serial, "isOpen"): if self.__Serial.isOpen(): return # Try to open port self.__Serial.open()
[docs] def serial_close(self) -> None: """Close Serial.""" self.__Serial.close()
def __command( self, command: COMMUNICATION, var: Optional[Any] = None, d: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """ Send a command to the hotplate using the :class:`.SerialThreadedDuplex.Serial` interface. Args: command (COMMUNICATION): :mod:`COMMUNICATION` to send. var (Optional[Any], optional): Value to send where appropriate. Defaults to None. d (Optional[Dict[str, Any]], optional): ``dict`` to update with parsed values. Defaults to None that gives an empty ``dict``. Returns: Dict[str, Any]: ``dict`` with parsed responses. """ d = dict() if d is None else d self.serial_open() w_bytes = command.to_bytes(var) logger.debug("Sending bytes: {}.".format(w_bytes.hex())) r_bytes = self.__Serial.write_with_read(w_bytes, size=command.len_rx) logger.debug("Received bytes: {}.".format(r_bytes.hex())) d.update(command.parse_response(r_bytes)) return d
[docs] def ping(self) -> bool: """ Send a ping command. Returns: bool: ``True`` if hotplate is online and responding correctly. """ try: return self.__command(COMMUNICATION.PING)["success"] is True except Exception: return False
[docs] def _status(self) -> Dict[str, Any]: """ Get status of the hotplate: `stir_set`, `stir_actual`, `heat_set`, `heat_actual`. See :meth:`.status`. """ return self.__command(COMMUNICATION.STATUS)
[docs] def _info(self) -> Dict[str, Any]: """ Get info from the hotplate: `mode`, `stir_on`, `heat_on`, `heat_limit` and `heat_alarm`. See :meth:`.status`. """ return self.__command(COMMUNICATION.INFO)
[docs] def status(self, raw_values: bool = False) -> Dict[str, Any]: """ Get hotplate status. See also :meth:`._status` and :meth:`._info` for a subset of this data. Dictionary keys: - `"success"` (`bool`): ``True`` if command received correctly. - `"stir_set"` (`int` | "Off"): target speed (rpm) or "Off". Set :attr:`raw _values` to be ``True`` to view value when off instead of "Off". - `"stir_actual"` (`int`): measured speed (rpm). - `"heat_set"` (`float` | "Off"): target temperature (°C) or "Off". Set :attr:`raw _values` to be ``True`` to view value when off instead of "Off". - `"heat_actual"` (`float`): measured temperature (°C). N.B. The `heat_actual` reading is from an internal temperature sensor that doesn't necesserily match the display value. If a temperature probe is attached then both the display and `heat_actual` reading is from the probe. - `"stir_on"` (`bool`): ``True`` if stirring. - `"heat_on"` (`bool`): ``True`` if heating. - `"heat_limit"` (`float`): maximum set temperature (°C). - `"mode"` ({"A", "B", "C"}): default is "A". **Undocumented**. Possibly related to heat rate profile. - `"heat_alarm"`. **Undocumented**. Unknown. Only returned when :attr:`raw _values` is ``True``. Args: raw_values: bool Default behaviour [``False``] is to convert values to more readable format, specifically related to set values when off and hiding unknown values. If ``True`` then the raw values from :meth:`.COMMUNICATION.parse_response` are returned. Returns: dict: Dictionary of hotplate information. """ r = self.__command(COMMUNICATION.STATUS) r = self.__command(COMMUNICATION.INFO, None, r) # parse on/off information to set values if not raw_values: if not r["heat_on"]: r["heat_set"] = "Off" if not r["stir_on"]: r["stir_set"] = "Off" # this info appears to have no value r["heat_alarm"] return r
def __off(self, *args: COMMUNICATION) -> None: """ Helper function to turn off. """ current_status: Dict[str, Any] current_status = self.status() command: COMMUNICATION for command in args: name: str name = command.name.lower() # already off if not current_status["{}_on".format(name)]: logger.info("{} {} {}".format(command, "OFF", "Success [already off]")) continue # send current setting to turn off if self.__command(command, current_status["{}_set".format(name)])[ "success" ]: logger.info("{} {} {}".format(command, "OFF", "Success")) continue logger.error("{} {} {}".format(command, "OFF", "ERROR!")) def __setval(self, command: COMMUNICATION, val: Any = None) -> bool: """ Helper function to set values. """ current_status = self.status(raw_values=True) name = command.name.lower() set_value = current_status["{}_set".format(name)] on_status = current_status["{}_on".format(name)] # switch on to current value if val is None: val = set_value # already at correct value if set_value == val: # already on if on_status: logger.info( "{} {} {}".format(command, val, "Success [already at target value]") ) return True # send same value to turn on if self.__command(command, set_value)["success"]: logger.info("{} {} {}".format(command, val, "Success [switched on]")) return True logger.error("{} {} {}".format(command, val, "ERROR!")) return False # not at correct value # send value once to change value # this will also toggle the on / off status if not self.__command(command, val)["success"]: logger.error("{} {} {}".format(command, val, "ERROR!")) return False # was not previously on - but now on with correct value if not on_status: logger.info( "{} {} {}".format(command, val, "Success [set value, switched on]") ) return True # send value a second time to turn back on if self.__command(command, val)["success"]: logger.info( "{} {} {}".format( command, val, "Success [switched off, set value, switched on]" ) ) return True logger.error("{} {} {}".format(command, val, "ERROR!")) return False
[docs] def off(self) -> None: """ Turn off stirring and heating. """ self.__off(COMMUNICATION.HEAT, COMMUNICATION.STIR)
[docs] def heat(self, val: float) -> None: """ Control heating. If :attr:`val` is not truthy, heating will be turned off. If an exception is encountered, heating will be turned off. Args: val (int): Target temperature (1 decimal place float, rounded down, °C). Raises: ValueError: If :attr:`val` can not be converted to ``float`` or is outside permissable range. """ if not val: self.heat_off() return try: val = float(val) if not self.HEATLIMIT_MIN <= val <= self.HEATLIMIT_MAX: self.heat_off() raise ValueError( "Heat setting {} outside allowable range {}-{}.".format( val, self.HEATLIMIT_MIN, self.HEATLIMIT_MAX ) ) self.__setval(COMMUNICATION.HEAT, val) except Exception as e: self.heat_off() raise e
[docs] def heat_off(self) -> None: """ Turn off heating. """ self.__off(COMMUNICATION.HEAT)
[docs] def stir(self, val: int) -> None: """ Control stirring. If :attr:`val` is not truthy, stirring will be turned off. Args: val (int): Target stir speed (integer, rpm). Raises: ValueError: If :attr:`val` can not be converted to ``int`` or is outside permissable range. """ if not val: self.stir_off() return val = int(val) if not self.STIRLIMIT_MIN <= val <= self.STIRLIMIT_MAX: raise ValueError( "Stir speed setting {} outside allowable range {}-{}.".format( val, self.STIRLIMIT_MIN, self.STIRLIMIT_MAX ) ) self.__setval(COMMUNICATION.STIR, val)
[docs] def stir_off(self) -> None: """ Turn off stirring. """ self.__off(COMMUNICATION.STIR)
[docs] def mode(self, mode: str) -> None: """ Set hotplate mode. **Warning: undocumented feature.** Args: mode ({"A", "B", "C"}): Choose mode A, B or C. """ target_mode = "ABC".index(mode) set_mode = "ABC".index(self._info()["mode"]) for _ in range((3 + target_mode - set_mode) % 3): self.__command(COMMUNICATION._MODE)
[docs] def text_command(self, cmd: str) -> Any: """ Interpret a text command. The command is one of {"PING", "STATUS", "OFF", "STIR", "HEAT", "MODE"}. Text is not case-sensitive. Each of these will be sent to the corresponding method. i.e. for {"HEAT", "STIR" and "MODE"} a whitespace separated value will be used for setting. Examples: .. code-block:: python >>> hp = Hotplates.MSHPro(0) >>> hp.text_command("STATUS") >>> hp.text_command("HEAT OFF") >>> hp.text_command("MODE B") >>> hp.text_command("stir 560") >>> hp.text_command("PING") >>> hp.text_command("OFF") """ cmd_dict: Dict[str, Callable] # type: ignore cmd_dict = { "PING": self.ping, "STATUS": self.status, "OFF": self.off, "STIR": self.stir, "HEAT": self.heat, "MODE": self.mode, } cmds = str(cmd).upper().split() if cmds[0] not in cmd_dict: raise ValueError("Could not find command: {}".format(cmd)) if cmds[0] in {"STIR", "HEAT", "MODE"}: if cmds[1] == "OFF": return cmd_dict[cmds[0]](False) return cmd_dict[cmds[0]](cmds[1]) return cmd_dict[cmds[0]]()