Доска кнопка питона ИРЦ


Мой проект состоит из Поленики Pi 2В В1.1 подключен к ряду кнопок через GPIO порты. Цель состоит в том, чтобы создать панель кнопок, что позволяет пользователю сделать выбор одного или нескольких элементов из списка, а затем либо отменить или совершает их выбор. Я новичок в Python, и работали только немного с Java и JS раньше.

Код состоит из трех модулей:

  • matrix.py: получает ввод данных от оборудования
    Кнопки подключены в 2D матрицу, чтобы спасти имеющиеся булавки. В настоящее время; есть 6 столбцов и 2 строк; первая строка содержит 6 вариантов, второй ряд 4 кнопки действий.
  • controller.py: содержит кнопку логику, ручки выбора
    Модуль считывает config.ini для получения информации о том, как обрабатывать ввод от matrix.py. Это включает в себя выбор/отмена выбора элементов, включение светодиодов ВКЛ/ВЫКЛ и экспедирование выбор database_commit.py.
  • database_commit.py: кладет учитывая ввод в базу данных
    Полученная информация будет фиксироваться в базе данных по данным config.ini.

а также config.ini файл. В следующем, выбираемых пунктов носят название темы. "отменить", "совершать", а уведомлений кнопки действия. Как я разработал это пока Изучаем Python, проект не очень хорошо структурирована. Какие концептуальные изменения мне нужно сделать для того, чтобы обеспечить хорошую читаемость, ремонтопригодность и расширяемость? Что еще следует улучшить?
Готовый проект будет в течение длительного времени, и я не смогу поддерживать его; кому-то еще придется. Я приведу сведения по обслуживанию оборудования, и добавили информацию о код там тоже есть. На мой взгляд, код должен быть легко понятен без инструкции.

Примечание: комментарии были переведены на английский. Если что-то неясно, я с радостью уточню.


matrix.py

# -*- coding: UTF-8 -*-

"""Reads input from GPIO with a reduced number of pins."""

import time
import RPi.GPIO as GPIO
from configparser import ConfigParser

class Matrix():

    def __init__(self):
        """Sets up constants and pin numbers."""
        # BCM (Broadcom) numbers are used
        GPIO.setmode(GPIO.BCM)
        parser = ConfigParser()
        filename = 'config.ini'
        parser.read(filename)
        gpio = parser['gpio']
        selection = parser['selection']
        self.rows = list(map(int, gpio['button_rows'].split(',')))
        self.columns = list(map(int, gpio['button_columns'].split(',')))
        topics = selection['topics'].replace(" ","").split(',')
        actions = selection['actions'].replace(" ","").split(',')
        while len(actions) <= len(topics):
            actions.append('unassigned')
        self.options_array = [
            topics,
            actions
        ]

    def get_button(self):
        """Findet gedrueckte Knoepfe und returnt Koordinaten falls gefunden."""

        # To search for the row of a pressed button:
        # - set all rows to input
        # - set all columns to output low
        # - save index in row_position

        row_position = -1

        for i in range(len(self.columns)):
            GPIO.setup(self.columns[i], GPIO.OUT)
            GPIO.output(self.columns[i], GPIO.LOW)

        for i in range(len(self.rows)):
            GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)

        for i in range(len(self.rows)):
            temp_read = GPIO.input(self.rows[i])
            if temp_read == 0:
                row_position = i

        # cancel if no input was found
        if row_position < 0 or row_position >= len(self.rows):
            self.refresh()
            return

        # To search for the column of a pressed button in the row:
        # - set all columns to input
        # - rows[row_position] to output

        column_position = -1

        for j in range(len(self.columns)):
            GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_DOWN)

        GPIO.setup(self.rows[row_position], GPIO.OUT)
        GPIO.output(self.rows[row_position], GPIO.HIGH)

        for j in range(len(self.columns)):
            temp_read = GPIO.input(self.columns[j])
            if temp_read == 1:
                column_position = j

        # cancel if no input was found
        if column_position < 0 or column_position >= len(self.columns):
            self.refresh()
            return

        # returns coordinates of pressed button
        self.refresh()
        return self.options_array[row_position][column_position]

    def refresh(self):
        """Resets all pin states."""
        for i in range(len(self.rows)):
            GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)
        for j in range(len(self.columns)):
            GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_UP)

if __name__ == '__main__':
    # For testing matrix functionality
    while True:
        user_input = None
        while user_input is None:
            user_input = Matrix().get_button()
        print(user_input)
        time.sleep(.2)

controller.py:
Тодос-там подсказка концепция и будет реализована позже

#!/usr/bin/python3

"""Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""

import time
import signal
import sys
import threading
import RPi.GPIO as GPIO
from threading import Timer
from configparser import ConfigParser
from matrix import Matrix
from database_commit import DatabaseCommit as database

class Controller():
    """Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""
    def __init__(self):
        """Sets up constants."""
        # Use Broadcom-Numbering
        GPIO.setmode(GPIO.BCM)

        # Get constants from config.ini
        parser = ConfigParser()
        filename = 'config.ini'
        parser.read(filename)

        # Pin adresses
        gpio = parser['gpio']

        self.TOPICS_LED = list(map(int, gpio['topics_led'].split(',')))
        self.ACTIONS_LED = list(map(int, gpio['actions_led'].split(',')))
        self.CANCEL_LED = self.ACTIONS_LED[0]
        self.CONFIRM_LED = self.ACTIONS_LED[1]
        self.POST_LED = self.ACTIONS_LED[2]
        self.NAGIOS_LED = self.ACTIONS_LED[3]

        # names, defaults, timers
        selection = parser['selection']

        self.TOPICS = selection['topics'].replace(" ", "").split(',')
        self.ACTIONS = selection['actions'].replace(" ", "").split(',')
        self.CANCEL = self.ACTIONS[0]
        self.CONFIRM = self.ACTIONS[1]
        self.POST = self.ACTIONS[2]
        self.NAGIOS = self.ACTIONS[3]
        self.DEFAULT_SELECTION = selection['default_selection'].replace(" ", "").split(',')
        self.DEFAULT_LED_STATE = [False] * len(self.TOPICS_LED)
        self.SLEEP = float(selection['sleeptime'])
        self.BLINK = float(selection['blinktime'])
        self.SEND_DEFAULT_ON_EMPTY = bool(selection['send_default_on_empty'])

        # reminder schedule
        schedule = parser['schedule']

        self.POST_TIME = time.strptime(schedule['post'].replace(":", " "), '%H %M')
        self.NAGIOS_TIME = time.strptime(schedule['nagios'].replace(":", " "), '%H %M')

        self.current_selection = self.DEFAULT_SELECTION[:]
        self.led_state = self.DEFAULT_LED_STATE[:]

        # Setup the LED pins
        for i in range(len(self.TOPICS_LED)):
            GPIO.setup(self.TOPICS_LED[i], GPIO.OUT)
            GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
        for i in range(len(self.ACTIONS_LED)):
            GPIO.setup(self.ACTIONS_LED[i], GPIO.OUT)
            GPIO.output(self.ACTIONS_LED[i], GPIO.LOW)

        # Switch on default selection LEDs
        for i in range(len(self.DEFAULT_SELECTION)):
            for j in range(len(self.TOPICS)):
                if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
                    GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
                    self.led_state[j] = True

        # Notification status 
        self.post_notification = False
        self.nagios_notification = False



    def led_reset_topics(self):
        """Disable all LEDs and enable default selection LEDs"""
        # All off
        for i in range(len(self.TOPICS_LED)):
            GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
        # Default an
        for i in range(len(self.DEFAULT_SELECTION)):
            for j in range(len(self.TOPICS)):
                if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
                    GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
                    self.led_state[j] = True
        return

    def continous_led_blink(blink_event, blink_thread, led):
        """Blink action LED continuously"""
        while not blink_event.isSet():
            GPIO.output(led, GPIO.HIGH)
            time.sleep(self.BLINK)
            event_is_set = blink_event.wait(blink_thread)
            if event_is_set:
                GPIO.output(led, GPIO.LOW)
            else:
                GPIO.output(led, GPIO.LOW)
                time.sleep(self.BLINK)

    def led_blink(self, led_pin):
        """Blink action LED
        """
        t = Timer(self.BLINK, GPIO.output, [led_pin, GPIO.LOW])
        GPIO.output(led_pin, GPIO.HIGH)
        #time.sleep(.2)
        #GPIO.output(led_pin, GPIO.LOW)
        t.start()
        return

    def dismiss_notification(self, led_pin):
        """Disables notification LED
        :param led_pin: GPIO pin number
        """
        GPIO.output(led_pin, GPIO.LOW)


    def led_toggle_topic(self, led_index):
        """Toggles LED state of led_index
        :param led_index: Index in TOPICS_LED; contains GPIO pin number"""
        self.led_state[led_index] = not self.led_state[led_index]
        if self.led_state[led_index]:
            GPIO.output(self.TOPICS_LED[led_index], GPIO.HIGH)
        else:
            GPIO.output(self.TOPICS_LED[led_index], GPIO.LOW)
        return

    def handle_selection(self, user_input):
        """
    Handles de/selection of topics
    Returns False if selection is altered
        Returns True if CANCEL or COMMIT is chosen
        :param user_input: String, name of pressed button
    """

        # If cancelled, reset to default selection
        if user_input == self.CANCEL:
            self.led_blink(self.CANCEL_LED)
            self.current_selection = self.DEFAULT_SELECTION[:]
            self.led_reset_topics()
            return True

        # If selection is non-empty, commit selection
        # - selection array is always sorted
        # If selection is empty, reset to default selection
        # and send default selection if SEND_DEFAULT_ON_EMPTY is True
        elif user_input == self.CONFIRM:
            self.led_blink(self.CONFIRM_LED)
            if self.current_selection != []:
                #self.current_selection.sort()
                database().make_commit(self.current_selection)
                self.current_selection = self.DEFAULT_SELECTION[:]
            else:
                self.current_selection = self.DEFAULT_SELECTION[:]
                if self.SEND_DEFAULT_ON_EMPTY:
                    if len(self.DEFAULT_SELECTION) > 0:
                        database().make_commit(self.current_selection)
            return True

        # TODO
        # Disable "Post" notification
        elif user_input == self.POST:
            """self.post_notification = False
            self.blink_thread.start()
            self.dismiss_notification(self.POST_LED)
            return False"""

        # TODO
        # Disable "Nagios" notification
        elif user_input == self.NAGIOS:
            """self.nagios_notification = False
            self.dismiss_notification(self.NAGIOS_LED)
            return False"""

        else:
            # Compare input with all TOPICS
            for i in range(len(self.TOPICS)):
                if user_input == self.TOPICS[i]:
                    self.led_toggle_topic(i)
                    already_exists = False
                    # Delete if exists already
                    for j in range(len(self.current_selection)):
                        if user_input == self.current_selection[j]:
                            already_exists = True
                            self.current_selection.pop(j)
                            print(user_input + " -")
                            break
                    # Add if does not exits yet
                    if not already_exists:
                        self.current_selection.append(user_input)
                        print(user_input + " +")
        # Sort array
        # Same order as in TOPICS
        self.temp_selection = []
        for l in range(len(self.TOPICS)):
            self.temp_selection.append("")
        for i in range(len(self.TOPICS)):
            for j in range(len(self.current_selection)):
                if self.TOPICS[i] == self.current_selection[j]:
                    self.temp_selection[i]=self.TOPICS[i]
        self.temp_selection = list(filter(None, self.temp_selection))
        self.current_selection = self.temp_selection[:]
        return False

    def wait_for_input(self):
        """Wartet auf Input und gibt an handle_selection weiter."""

        done_with_input = False
        while not done_with_input:
            user_input = None
            while user_input is None:
                user_input = Matrix().get_button()
            done_with_input = self.handle_selection(user_input)
            time.sleep(self.SLEEP)
        return

if __name__ == '__main__':
    GPIO.setwarnings(False)
    database().external_prepare_table()
    while True:
        Controller().wait_for_input()
    GPIO.cleanup()
    sys.exit(0)

database_commit.py:

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import datetime
import time
from configparser import ConfigParser
from mysql.connector import MySQLConnection, Error

class DatabaseCommit():

    def read_config(section, filename='config.ini'):
        """ Reads config.ini and returns dictionary with config from section
        :param filename: name of config file
        :param section: name of section in config file
        :return: dictionary with database settings
        """

        parser = ConfigParser()
        parser.read(filename)
        config = {}
        if parser.has_section(section):
            items = parser.items(section)
            for item in items:
                config[item[0]] = item[1].replace(" ", "")
        else:
            raise Exception('{0} not found in {1} '.format(section, filename))

        return config

    def establish_connection(self):
        db_config = DatabaseCommit.read_config('mysql')
        conn = MySQLConnection(**db_config)
        if conn.is_connected():
            return conn
        else:
            print("Connection failed")

    def prepare_table(self, cursor):
        # TODO check for existing tables, add any if neccessary, fill with 0
        database_name = DatabaseCommit.read_config('mysql')['database'].replace(" ", "")
        table_name = DatabaseCommit.read_config('table')['table_name'].replace(" ", "")
        counter_column = DatabaseCommit.read_config('table')['counter_column'].replace(" ", "")
        date_column = DatabaseCommit.read_config('table')['date_column'].replace(" ", "")
        sql  = 'CREATE TABLE IF NOT EXISTS '+ table_name +' ('
        sql += counter_column +' INT PRIMARY KEY AUTO_INCREMENT, '
        sql += date_column +' DATETIME'
        for topic in DatabaseCommit.read_config('selection')['topics'].replace(" ", "").split(','):
            topic = topic.lower()
            sql += ', '+ topic +' BOOLEAN NOT NULL DEFAULT 0'
        sql += ');'
        print("\n"+sql+"\n")
        cursor.execute(sql)
        sql = ''
        return cursor

    def external_prepare_table(self):
        conn = self.establish_connection()
        cursor = conn.cursor()
        self.prepare_table(cursor)

    def insert_user_input(self, user_input, cursor, table_name):
        date_column = DatabaseCommit.read_config('table')['date_column']
        sql = 'INSERT INTO ' + table_name + '(' + date_column
        for topic in user_input:
            sql += ','+ topic
        sql += ')'
        sql += 'VALUES(NOW()'
        for topic in user_input:
            sql += ','+'TRUE'
        sql += ')'
        cursor.execute(sql)

    def make_commit(self, user_input):
        conn = self.establish_connection()
        cursor = conn.cursor()
        #self.prepare_table(cursor)
        table_name = DatabaseCommit.read_config('table')['table_name']
        self.insert_user_input(user_input, cursor, table_name)
        cursor.execute('COMMIT;')
        print("made commit :")
        print(user_input)
        cursor.close()
        conn.close()

config.ini:
Названия темы и другие данные были изменены в целях конфиденциальности

[gpio]
button_rows = 4, 17
button_columns = 18, 23, 24, 25, 12, 16
topics_led = 2, 3, 27, 22, 10, 9
actions_led = 11, 5, 6, 13, 19, 26

[selection]
topics = topic_1, topic_2, topic_3, topic_4, topic_5, topic_6
actions = Cancel, Commit, Post, Rundgang, unassigned_1, unassigned_2
default_selection = topic_2
sleeptime = .3
blinktime = .2
send_default_on_empty = True

[schedule]
post = 11:30
nagios = 09:30

[mysql]
host = 127.0.0.1
database = mydatabase
user = rpi
password = password

[table]
table_name = test
counter_column = number
date_column = date


Комментарии