PyQt入门之信号槽使用

import sys
from PyQt4.QtCore import SIGNAL, QThread, pyqtSignal
from PyQt4.QtGui import *
from struct import *
import random
import Queue
import time

##########################################################################################
##@brief Swap a 32-bits value
##########################################################################################
def BSWAP32(val):
    val = ((val >> 24) & 0xFF) + (((val >> 16) & 0xFF) << 8) + (((val >> 8) & 0xFF) << 16) + (((val >> 0) & 0xFF) << 24)

    return val


##########################################################################################
##@brief Get bytes in text format for received string for multibyte fields.
##
## Bytes separated by ':'
##########################################################################################
def B(txt):
    bytes = unpack('B' * len(txt), txt)
    out = ''
    for b in bytes:
        out += ('%02X' % b) + ':'
    out = out[:(len(out) - 1)]

    return out


##########################################################################################
##@brief Get bytes in text format for received string for multibyte fields.
##
## Bytes concatenate in big endian format MSB->LSB
##########################################################################################
def B2(txt):
    bytes = unpack('B' * len(txt), txt)
    out = ''
    for b in bytes:
        out = ('%02X' % b) + out

    return out


##########################################################################################
##@brief Get bytes in text format for received string for multibyte fields,
##       inverting the bytes in original array
##
## Bytes concatenate in big endian format MSB->LSB
##########################################################################################
def B2INV(txt):
    bytes = unpack('B' * len(txt), txt)
    out = ''
    for b in bytes:
        out = out + ('%02X' % b)

    return out


##########################################################################################
##@brief Get user input text to packed format
##########################################################################################
def S2B(txt):
    ret = ''
    if txt != '':
        # cut text after '0X' - make sure it is in upper case
        txt_nb = txt.upper().split('0X')[1]

        for i in range(len(txt_nb) / 2):
            ret += pack('B', int('0x' + txt_nb[i * 2] + txt_nb[i * 2 + 1], 16))

    return ret


##########################################################################################
##@brief Print out in GUI in 0xXXXXXX... format
##########################################################################################
def B2S(txt):
    bytes = unpack('B' * len(txt), txt)
    out = '0x'
    for b in bytes: out += ('%02X' % b)

    return out


##########################################################################################
##@brief Print out in GUI in 0xXXXXXX... format, inverting the bytes in original array
##########################################################################################
def B2SINV(txt):
    if (type(txt) is int):
        out = '0x%X' % (txt)
    else:
        b = unpack('B' * len(txt), txt)
        out = '0x'
        for i in range(len(b)): out += ('%02X' % b[len(b) - 1 - i])

    return out


##########################################################################################
##@brief Print out in XX:XX:XX:... format, inverting the bytes in original array
##########################################################################################
def B2SINV2(txt):
    b = unpack('B' * len(txt), txt)
    out = ('%02X' % b[len(b) - 1])
    for i in range(len(b) - 1):
        out += (':%02X' % b[len(b) - 2 - i])

    return out


##########################################################################################
##@brief Invert bytes in packed string.
##########################################################################################
def INVB(txt):
    txt_inv = ''
    for i in range(len(txt)):
        txt_inv += txt[len(txt) - 1 - i]
    return txt_inv


class AppendTextBrowser(QThread):
    postSignal = pyqtSignal(str)

    def __init__(self, parent=None):
        super(AppendTextBrowser, self).__init__(parent)
        # self.handle = handle
        self.is_running = True

    def run(self):
        while self.is_running:
            byte_len = 371
            lrx = ''

            for index in range(byte_len):
                lrx += pack('B', random.randint(0, 255))
            # self.handle.append('Raw Data: {}'.format(lrx))

            struct_value = B2(lrx)
            self.postSignal.emit(struct_value)

            # self.is_running -= 1

            time.sleep(0.01)

    def stop(self):
        self.is_running = False
        # self.terminate()

#########################################################################################
# @brief get the statistics from global variable address
#########################################################################################
class tab_sp_statistics(QWidget):
    #########################################################################################
    # @brief Class constructor.
    # @param parent Parent window, is allowed to be None.
    #########################################################################################
    def __init__(self, dev, peer, parent=None):

        # ---------------------------------------------------------------------------------
        # INIT-----------------------------------------------------------------------------
        # ---------------------------------------------------------------------------------
        # call the constructor of the inherited class and use the parent
        super(tab_sp_statistics, self).__init__(parent)

        # keep parent class reference in case it needs updating from here
        self.parent = parent

        # device to which the dialog instance is associated
        self.dev = dev
        self.peer = peer

        self.bw_thread = None

        # create font for labels
        font = QFont()
        font.setWeight(75)
        font.setBold(True)

        # ---------------------------------------------------------------------------------
        # START LAYOUT --------------------------------------------------------------------
        # ---------------------------------------------------------------------------------
        # vertical connection tab layout
        tab_layv = QGridLayout()

        # statistics output label
        lbl_rwip_info_addr = QLabel('Address: ', self)
        lbl_rwip_info_addr.setFont(font)
        tab_layv.addWidget(lbl_rwip_info_addr, 0, 0)

        self.edt_rwip_info_addr = QLineEdit('0x3000154c', self)  # Add local dev variable value
        tab_layv.addWidget(self.edt_rwip_info_addr, 0, 1)

        # statistics length label
        lbl_rwip_info_len = QLabel('Length: ', self)
        lbl_rwip_info_len.setFont(font)
        tab_layv.addWidget(lbl_rwip_info_len, 0, 2)

        self.edt_rwip_info_len = QLineEdit('0', self)  # Add local dev variable value
        self.edt_rwip_info_len.setReadOnly(True)
        tab_layv.addWidget(self.edt_rwip_info_len, 0, 3)

        # statistics output label
        lbl_statistics = QLabel('Statistics Output: ', self)
        lbl_statistics.setFont(font)
        tab_layv.addWidget(lbl_statistics, 1, 0)

        # statistics output text browser
        # self.tb_statistics_output = QPlainTextEdit()
        # self.tb_statistics_output.setMaximumBlockCount(100)
        # tab_layv.addWidget(self.tb_statistics_output, 1, 1, 1, 80)

        self.doc = QTextDocument()
        # self.doc.setDefaultFont(font)
        self.doc.setMaximumBlockCount(100)

        self.tb_statistics_output = QTextBrowser(self)
        self.tb_statistics_output.setDocument(self.doc)
        tab_layv.addWidget(self.tb_statistics_output, 1, 1, 1, 80)

        # get button
        self.qb_get_statistics_output = QPushButton('Con_Stat', self)
        self.connect(self.qb_get_statistics_output, SIGNAL('clicked()'), self.get_statistics_output)
        tab_layv.addWidget(self.qb_get_statistics_output, 2, 0)

        # clear button
        self.qb_clear_statistics_output = QPushButton('Clear', self)
        self.connect(self.qb_clear_statistics_output, SIGNAL('clicked()'), self.clear_statistics_output)
        tab_layv.addWidget(self.qb_clear_statistics_output, 2, 1)

        # ---------------------------------------------------------------------------------
        # SET LAYOUT ----------------------------------------------------------------------
        # ---------------------------------------------------------------------------------
        self.setLayout(tab_layv)

    def get_statistics_output(self):
        con_handle_num = 12
        statistics_t_size = 7 * 4
        magic1_size = 2
        act_state_size = 1
        magic2_size = 2
        adv_rx_err_cnt_size = 2
        adv_rx_normal_cnt_size = 2
        adv_rx_con_req_cnt_size = 2
        adv_rx_scn_rsp_cnt_size = 2
        adv_rx_other_cnt_size = 2
        scn_rx_total_cnt_size = 2
        scn_rx_err_cnt_size = 2
        scn_adv_data_len_err_cnt_size = 2
        magic3_size = 2
        sniffer_ccm_pkt_cnt_req_musk_size = 1

        '''
struct dbg_con_pkt_statistics_t
{
    /// Counts of RX Times
    uint32_t rx_total_cnt;
    /// Counts of RX SYNC Error Times
    uint32_t rx_sync_err_cnt;
    /// Counts of RX CRC Error Times
    uint32_t rx_crc_err_cnt;
    uint32_t rx_other_err_cnt;
    uint32_t rx_sn_err_cnt;
    uint32_t rx_mic_err_cnt;
    uint32_t rx_normal_cnt;
} __attribute__((packed, aligned(1)));

struct dbg_rwip_info_t
{
    struct dbg_con_pkt_statistics_t con_pkt_stat[BLE_ACTIVITY_MAX];
    uint16_t magic1;
    uint8_t act_state[BLE_ACTIVITY_MAX];  // this is valid for adv&scan act only, currently
    uint16_t magic2;
    uint16_t adv_rx_err_cnt;
    uint16_t adv_rx_normal_cnt;
    uint16_t adv_rx_con_req_cnt;
    uint16_t adv_rx_scn_rsp_cnt;
    uint16_t adv_rx_other_cnt;
    uint16_t scn_rx_total_cnt;
    uint16_t scn_rx_err_cnt;
    uint16_t scn_adv_data_len_err_cnt;
    uint16_t magic3;
    uint8_t  sniffer_ccm_pkt_cnt_req_musk;
} __attribute__((packed, aligned(1)));
// #pragma pack(pop)
        '''

        byte_len = con_handle_num * statistics_t_size + magic1_size + con_handle_num * act_state_size + magic2_size + \
                   adv_rx_err_cnt_size + adv_rx_normal_cnt_size + adv_rx_con_req_cnt_size + adv_rx_scn_rsp_cnt_size + \
                   adv_rx_other_cnt_size + scn_rx_total_cnt_size + scn_rx_err_cnt_size + \
                   scn_adv_data_len_err_cnt_size + magic3_size + sniffer_ccm_pkt_cnt_req_musk_size

        self.edt_rwip_info_len.setText(hex(byte_len))

        # lrx = self.dev.hci.h2c_dbg_rd_mem_cmd(int(self.edt_rwip_info_addr.text()), 32, byte_len)
        # self.dev.check_par(lrx, [EVT, evt_code.H_EVT_CC, 'NOCHK', op_code.H_CMD_DBG_RD_MEM, err_code.H_ERR_SUCCESS,
        #                          byte_len])

        lrx = ''
        for index in range(byte_len):
            lrx += pack('B', random.randint(0, 255))
            # self.tb_statistics_output.append('Raw Data: {}'.format(B2(lrx)))
            # self.tb_statistics_output.append('')

        # struct_value = lrx[6:]
        struct_value = B2(lrx[:])
        struct_value_index = 0

        self.tb_statistics_output.append('Raw Data: {}'.format(struct_value))
        self.tb_statistics_output.append('')

        self.tb_statistics_output.append('Address: {}'.format(self.edt_rwip_info_addr.text()))
        self.tb_statistics_output.append('')

        # self.tb_statistics_output.append('First 4 bytes Data: {}'.format(struct_value[:4 * 2]))
        # self.tb_statistics_output.append('')

        for handle_index in range(con_handle_num):
            con_pkt_stat_info = struct_value[handle_index * statistics_t_size * 2:
                                             (handle_index + 1) * statistics_t_size * 2]
            rx_total_cnt = '0x{}'.format(con_pkt_stat_info[0:8])
            rx_sync_err_cnt = '0x{}'.format(con_pkt_stat_info[8:16])
            rx_crc_err_cnt = '0x{}'.format(con_pkt_stat_info[16:24])
            rx_other_err_cnt = '0x{}'.format(con_pkt_stat_info[24:32])
            rx_sn_err_cnt = '0x{}'.format(con_pkt_stat_info[32:40])
            rx_mic_err_cnt = '0x{}'.format(con_pkt_stat_info[40:48])
            rx_normal_cnt = '0x{}'.format(con_pkt_stat_info[48:56])

            handle_stat_output = 'con_handle{}: rx_total_cnt: {}, rx_sync_err_cnt: {}, rx_crc_err_cnt: {}, ' \
                                 'rx_other_err_cnt: {}, rx_sn_err_cnt: {}, rx_mic_err_cnt: {}, rx_normal_cnt: {}'.\
                format(handle_index, rx_total_cnt, rx_sync_err_cnt, rx_crc_err_cnt, rx_other_err_cnt, rx_sn_err_cnt,
                       rx_mic_err_cnt, rx_normal_cnt)
            self.tb_statistics_output.append(handle_stat_output)

        self.tb_statistics_output.append('')
        struct_value_index += con_handle_num * statistics_t_size * 2

        magic1 = struct_value[struct_value_index:(struct_value_index + magic1_size * 2)]
        struct_value_index += magic1_size * 2

        for handle_index in range(con_handle_num):
            act_state_info = struct_value[(struct_value_index + handle_index * act_state_size * 2):
                                          ((handle_index + 1) * act_state_size * 2 + struct_value_index)]

            act_state_output = 'act_state{}: {}'.format(handle_index, '0x{}'.format(act_state_info))
            self.tb_statistics_output.append(act_state_output)

        self.tb_statistics_output.append('')
        struct_value_index += con_handle_num * act_state_size * 2

        magic2 = struct_value[struct_value_index:(struct_value_index + magic2_size * 2)]
        struct_value_index += magic2_size * 2

        adv_rx_err_cnt = struct_value[struct_value_index:(struct_value_index + adv_rx_err_cnt_size * 2)]
        struct_value_index += adv_rx_err_cnt_size * 2

        adv_rx_normal_cnt = struct_value[struct_value_index:(struct_value_index + adv_rx_normal_cnt_size * 2)]
        struct_value_index += adv_rx_normal_cnt_size * 2

        adv_rx_con_req_cnt = struct_value[struct_value_index:(struct_value_index + adv_rx_con_req_cnt_size * 2)]
        struct_value_index += adv_rx_con_req_cnt_size * 2

        adv_rx_scn_rsp_cnt = struct_value[struct_value_index:(struct_value_index + adv_rx_scn_rsp_cnt_size * 2)]
        struct_value_index += adv_rx_scn_rsp_cnt_size * 2

        adv_rx_other_cnt = struct_value[struct_value_index:(struct_value_index + adv_rx_other_cnt_size * 2)]
        struct_value_index += adv_rx_other_cnt_size * 2

        scn_rx_total_cnt = struct_value[struct_value_index:(struct_value_index + scn_rx_total_cnt_size * 2)]
        struct_value_index += scn_rx_total_cnt_size * 2

        scn_rx_err_cnt = struct_value[struct_value_index:(struct_value_index + scn_rx_err_cnt_size * 2)]
        struct_value_index += scn_rx_err_cnt_size * 2

        scn_adv_data_len_err_cnt = struct_value[struct_value_index:(struct_value_index +
                                                                    scn_adv_data_len_err_cnt_size * 2)]
        struct_value_index += scn_adv_data_len_err_cnt_size * 2

        magic3 = struct_value[struct_value_index:(struct_value_index + magic3_size * 2)]
        struct_value_index += magic3_size * 2

        sniffer_ccm_pkt_cnt_req_musk = struct_value[struct_value_index:(struct_value_index +
                                                                        sniffer_ccm_pkt_cnt_req_musk_size * 2)]
        struct_value_index += sniffer_ccm_pkt_cnt_req_musk_size * 2

        self.tb_statistics_output.append('magic1: 0x{}'.format(magic1))
        self.tb_statistics_output.append('magic2: 0x{}'.format(magic2))
        self.tb_statistics_output.append('magic3: 0x{}'.format(magic3))
        self.tb_statistics_output.append('')

        self.tb_statistics_output.append('adv_rx_err_cnt: 0x{}'.format(adv_rx_err_cnt))
        self.tb_statistics_output.append('adv_rx_normal_cnt: 0x{}'.format(adv_rx_normal_cnt))
        self.tb_statistics_output.append('adv_rx_con_req_cnt: 0x{}'.format(adv_rx_con_req_cnt))
        self.tb_statistics_output.append('adv_rx_scn_rsp_cnt: 0x{}'.format(adv_rx_scn_rsp_cnt))
        self.tb_statistics_output.append('adv_rx_other_cnt: 0x{}'.format(adv_rx_other_cnt))
        self.tb_statistics_output.append('')

        self.tb_statistics_output.append('scn_rx_total_cnt: 0x{}'.format(scn_rx_total_cnt))
        self.tb_statistics_output.append('scn_rx_err_cnt: 0x{}'.format(scn_rx_err_cnt))
        self.tb_statistics_output.append('scn_adv_data_len_err_cnt: 0x{}'.format(scn_adv_data_len_err_cnt))
        self.tb_statistics_output.append('')

        self.tb_statistics_output.append('sniffer_ccm_pkt_cnt_req_musk: 0x{}'.format(sniffer_ccm_pkt_cnt_req_musk))

        # self.tb_statistics_output.append('Raw Data: {}'.format(struct_value))
        self.bw_thread = AppendTextBrowser()
        self.bw_thread.start()
        self.bw_thread.postSignal.connect(self.update_statistics_data_output)

    def update_statistics_output(self):
        self.bw_thread = AppendTextBrowser()
        self.bw_thread.start()
        self.bw_thread.postSignal.connect(self.update_statistics_data_output)

    def update_statistics_data_output(self, lrx):
        self.tb_statistics_output.append('Raw Data: {}'.format(lrx))

    def clear_statistics_output(self):
        if self.bw_thread:
            self.bw_thread.stop()
        self.tb_statistics_output.clear()


def main():
    app = QApplication(sys.argv)
    ex = tab_sp_statistics('a', 'b')
    ex.show()
    sys.exit(app.exec_())


if __name__ == '__main__':
    main()

results matching ""

    No results matching ""