Socket编程示例03--Socket Server and Client

  • Socket Server
# -*- coding:utf-8 -*-

"""
@Author:        [email protected]
@Create Date:   2018-02-18
@Update Date:   2018-02-19
@Version:       V0.9.20180219
"""

import os
import re
import time
import json
import socket
import serial
import logging
import datetime
import threading
import socketserver

FPGA_SERIAL_PORT = 'COM11'
FPGA_SERIAL_BAUD_RATE = '115200'
FPGA_SERIAL_TIMEOUT = 5  # seconds
FPGA_SERIAL_GET_DATA_FLAG = False

FPGA_LOG_PREFIX_STR = 'FPGA'
PHONE_LOG_PREFIX_STR = 'Phone'

BUILD_INIT_FLAG = False
BUILD_INIT_PER_TEST_FLAG = True

SOCKET_SERVER_ADDRESS = '127.0.0.1'
SOCKET_SERVER_PORT = 5000
SOCKET_RECEIVE_BUFFER = 1024

TEST_LOG_DIR = '.\\logs'
TEST_LOG_SUFFIX = '.log'

TEST_ENV = {
    'version': '',
}

TEST_CASE_CONF = {}

PRINT_THREADING_LOCK = threading.Lock()

LOGGING_LEVEL = logging.INFO


def logging_config(logging_level):
    # log_format = "%(asctime)s - %(levelname)s - %(message)s"
    # log_format = "%(asctime)s [line: %(lineno)d] - %(levelname)s - %(message)s"
    log_format = "[%(asctime)s - [File: %(filename)s line: %(lineno)d] - %(levelname)s]: %(message)s"
    logging.basicConfig(level=logging_level, format=log_format)


class SerialPortHandleClass:
    def __init__(self, port, baud_rate='115200', read_timeout=5, write_time=5):
        self.port = port
        self.baud_rate = baud_rate
        self.read_timeout = read_timeout
        self.write_timeout = write_time

        self.__serial_handle = None
        self.serial_rw_status_flag = False

        self.open_serial_port()

    def open_serial_port(self):
        try:
            self.__serial_handle = serial.Serial(self.port, self.baud_rate, timeout=self.read_timeout,
                                                 write_timeout=self.write_timeout)
        except (serial.SerialTimeoutException, serial.SerialException) as err:
            self.serial_rw_status_flag = False
            logging.critical('Can not open serial port {} due to {}'.format(self.port, err))
            raise IOError(err)
        else:
            self.serial_rw_status_flag = True

    def close_serial_port(self):
        if self.serial_rw_status_flag:
            self.__serial_handle.close()
            self.serial_rw_status_flag = False

    def send_cmd_to_serial(self, cmd_str):
        if self.serial_rw_status_flag:
            cmd_str = re.sub(r'\s+', '', cmd_str)
            cmd_byte = cmd_str.encode('utf-8')

            try:
                self.__serial_handle.write(cmd_byte)
            except (serial.SerialTimeoutException, serial.SerialException):
                self.serial_rw_status_flag = False

    def reset_output_buffer(self):
        if self.serial_rw_status_flag:
            self.__serial_handle.reset_output_buffer()

    def get_data_from_serial(self, read_flag=1):
        serial_data = None

        if self.serial_rw_status_flag:
            try:
                if 0 == read_flag:
                    serial_data = self.__serial_handle.readline()
                elif 1 == read_flag:
                    serial_data = self.__serial_handle.readlines()
                else:
                    logging.error('Unsupported read flag value {}'.format(read_flag))
            except serial.SerialException:
                self.serial_rw_status_flag = False

        return serial_data


class TestLogHandleClass:
    def __init__(self, file_name):
        self.__file_name = os.path.join(TEST_LOG_DIR, file_name)

        self.__fd = None
        self.__file_open_flag = False
        self.__file_end_flag = False

        self.open_log_file()

    @staticmethod
    def create_log_folder():
        log_path = TEST_LOG_DIR

        if os.path.exists(log_path):
            if not os.path.isdir(log_path):
                try:
                    os.remove(log_path)
                    os.makedirs(log_path)
                finally:
                    pass
        else:
            try:
                os.makedirs(log_path)
            finally:
                pass

    def open_log_file(self):
        self. create_log_folder()

        try:
            self.__fd = open(self.__file_name, 'a', encoding='utf-8')
            self.__fd.seek(0, 0)
            self.__file_open_flag = True
        except Exception as err:
            self.__file_open_flag = False
            logging.critical('Can not open {} file, error info is: {}'.format(self.__file_name, err))
            raise IOError(err)

    def close_log_file(self):
        if self.__file_open_flag:
            self.__fd.close()
            self.__file_open_flag = False
            self.__file_end_flag = False

    def write_log_to_file(self, source, msg):
        datetime_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
        write_msg = '{} -> [{}]{}'.format(datetime_str, source, msg)
        debug_print(write_msg)

        if self.__file_open_flag:
            try:
                self.__fd.write('{}\n'.format(write_msg))
                self.__fd.flush()
            except Exception as err:
                logging.error('Write log file {} error, error info is: {}'.format(self.__file_name, err))
                self.close_log_file()


class SocketRequestHandler(threading.Thread):
    phone_status_list = []
    phone_run_status = {'ip': None, 'flag': False}

    phone_status_list_lock = threading.Lock()

    def __init__(self, app_project_name, request, client_address, test_config):
        super().__init__()
        self.app_project_name = app_project_name
        self.request = request
        self.client_address = client_address
        self.test_config = test_config

        self.phone_info = None

        self.log_handle_inst = None
        self.serial_handle_inst = None

        self.case_flag = False
        self.finished_flag = False
        self.abnormal_flag = False

    def send_message_to_phone(self, response_msg):
        debug_print('Send Message: "{}" to {} ip address'.format(response_msg, self.client_address))

        if response_msg:
            json_str = json.dumps(response_msg)
            try:
                self.request.sendall(json_str.encode(encoding='utf-8'))
            except Exception as err:
                debug_print('Error Message: "{}" with {} client ip address'.format(err, self.client_address))
                self.abnormal_flag = True

    def send_command_to_phone(self, cmd_str):
        cmd_data = {'type': 'cmd', 'category': 'phone', 'message': cmd_str}

        self.send_message_to_phone(cmd_data)

    def send_command_to_fpga(self, cmd_str):
        if self.serial_handle_inst.serial_rw_status_flag:
            debug_print('Send Message: send command "{}" to FPGA'.format(cmd_str))

            log_str = 'send command "{}" to FPGA'.format(cmd_str)
            self.log_handle_inst.write_log_to_file(FPGA_LOG_PREFIX_STR, log_str)

            self.serial_handle_inst.send_cmd_to_serial(cmd_str)

            if FPGA_SERIAL_GET_DATA_FLAG:
                serial_data_list = self.serial_handle_inst.get_data_from_serial()

                for serial_data in serial_data_list:
                    if serial_data and serial_data.strip():
                        self.log_handle_inst.write_log_to_file(FPGA_LOG_PREFIX_STR,
                                                               serial_data.decode(encoding='utf-8'))
        else:
            self.abnormal_flag = True

    def run_test(self):
        self.serial_handle_inst = SerialPortHandleClass(FPGA_SERIAL_PORT, FPGA_SERIAL_BAUD_RATE)
        self.serial_handle_inst.reset_output_buffer()

        debug_print('start to running test cases for {} project...'.format(self.app_project_name))

        debug_print('self.phone_info: {}'.format(self.phone_info))

        # self.log_handle_inst = TestLogHandleClass('{}_{}_{}{}'.format(
        #     re.sub(r'\s', '', self.phone_info['model']), self.phone_info['imei'],
        #     case_name, TEST_LOG_SUFFIX))
        self.log_handle_inst = TestLogHandleClass('{}_{}_{}{}'.format(
            re.sub(r'\s', '', self.phone_info['model']), self.phone_info['imei'], self.app_project_name,
            TEST_LOG_SUFFIX))

        self.send_command_to_phone(self.test_config)
        self.finished_flag = False

        while True:
            if self.abnormal_flag or self.finished_flag:
                break

            try:
                req_str = self.request.recv(SOCKET_RECEIVE_BUFFER).decode(encoding='utf-8')
            except Exception as err:
                debug_print('Error Message: "{}" with {} client ip address'.format(err, self.client_address))
                self.abnormal_flag = True
                break
            else:
                if req_str:
                    debug_print('Received Message: "{}", from {} ip address'.format(req_str, self.client_address))

                    try:
                        json_str = json.loads(req_str)
                    except BaseException as err:
                        debug_print('Error Message: json load error with "{}" error'.format(err))
                    else:
                        msg_type = json_str['type']

                        if 'cmd' == msg_type:
                            self.command_msg_handle(json_str)
                        elif 'log' == msg_type:
                            self.log_msg_handle(json_str)
                        else:
                            debug_print('Incorrect Message: {}'.format(json_str))
                else:
                    self.abnormal_flag = True

            if FPGA_SERIAL_GET_DATA_FLAG:
                self.get_data_from_serial_port()

        self.finished_flag = True

    def get_data_from_serial_port(self):
        if self.serial_handle_inst.serial_rw_status_flag:
            serial_data_list = self.serial_handle_inst.get_data_from_serial()
            for serial_data in serial_data_list:
                if serial_data and serial_data.strip():
                    self.log_handle_inst.write_log_to_file(FPGA_LOG_PREFIX_STR,
                                                           serial_data.decode(encoding='utf-8'))
        else:
            self.abnormal_flag = True

    def end_handle(self):
        if self.log_handle_inst is not None:
            self.log_handle_inst.close_log_file()

        if self.serial_handle_inst is not None:
            self.serial_handle_inst.close_serial_port()

        SocketRequestHandler.phone_run_status['ip'] = None
        SocketRequestHandler.phone_run_status['flag'] = False

        for phone_status_dict in SocketRequestHandler.phone_status_list:
            if self.client_address[0] == phone_status_dict['ip']:
                phone_status_dict['tested'] = True

    def command_msg_handle(self, json_msg):
        if 'fpga' == json_msg['category']:
            self.send_command_to_fpga(json_msg['message'])
        elif 'pc' == json_msg['category']:
            pass
        else:
            pass

    def log_msg_handle(self, json_msg):
        if self.log_handle_inst is not None:
            self.log_handle_inst.write_log_to_file(PHONE_LOG_PREFIX_STR, json_msg['message'])

        if 'result' == json_msg['category']:
            self.log_handle_inst.close_log_file()
            self.finished_flag = True
        else:
            ack_data = {'type': 'info', 'category': 'ack', 'message': 'ack message'}
            self.send_message_to_phone(ack_data)

    def info_msg_handle(self, info_msg):
        for phone_status_dict in SocketRequestHandler.phone_status_list:
            if self.client_address[0] == phone_status_dict['ip']:
                break
        else:
            SocketRequestHandler.phone_status_list_lock.acquire()
            SocketRequestHandler.phone_status_list.append({'ip': self.client_address[0],
                                                           'port': self.client_address[1],
                                                           'tested': False,
                                                           })
            # SocketRequestHandler.phone_status_list.append({'ip': '{}:{}'.
            #                                               format(self.client_address[0], self.client_address[1]),
            #                                                'tested': False,
            #                                                })
            SocketRequestHandler.phone_status_list_lock.release()

        if 'heart' == info_msg['category']:
            debug_print("Heartbeat Message: Received heartbeat packet from {}, the message content is: {}".
                        format(self.client_address, info_msg))

            # self.send_message_to_phone(info_msg)
        elif 'phone' == info_msg['category']:
            self.phone_info = info_msg['message']
            debug_print("Phone Info Message: Received info message from {}, the message content is: {}".
                        format(self.client_address, info_msg))
        else:
            pass

    def request_handle(self):
        while True:
            try:
                req_str = self.request.recv(SOCKET_RECEIVE_BUFFER).decode(encoding='utf-8')
            except Exception as err:
                debug_print('Error Message: {} with client ip address {}'.format(err, self.client_address))
                break
            else:
                if req_str:
                    debug_print('Received Message: "{}", which from {} ip address'.
                                format(req_str, self.client_address))

                    try:
                        json_str = json.loads(req_str)
                    except BaseException as err:
                        debug_print('Error Message: json load error with {}'.format(err))
                    else:
                        msg_type = json_str['type']

                        if 'info' == msg_type:
                            self.info_msg_handle(json_str)
                        else:
                            debug_print('Incorrect Message: {}'.format(json_str))
                else:
                    debug_print('Error Message: The connection shutdown with client ip address {}'.
                                format(self.client_address))

                    for entry_index in range(len(SocketRequestHandler.phone_status_list)):
                        if self.client_address[0] == SocketRequestHandler.phone_status_list[entry_index].get('ip'):
                            SocketRequestHandler.phone_status_list_lock.acquire()
                            SocketRequestHandler.phone_status_list.pop(entry_index)
                            SocketRequestHandler.phone_status_list_lock.release()
                            break

                    break

                debug_print('SocketRequestHandler.phone_status_list: {}'.
                            format(SocketRequestHandler.phone_status_list))
                debug_print('SocketRequestHandler.phone_run_status: {}'.
                            format(SocketRequestHandler.phone_run_status))

                if not SocketRequestHandler.phone_run_status.get('flag'):
                    for phone_status_dict in SocketRequestHandler.phone_status_list:
                        if not phone_status_dict.get('tested'):
                            if self.client_address[0] == phone_status_dict.get('ip'):

                                SocketRequestHandler.phone_run_status['flag'] = True
                                SocketRequestHandler.phone_run_status['ip'] = self.client_address

                                self.run_test()

                            break

                # abnormal occur, enter the end handle
                if self.abnormal_flag or self.finished_flag:
                    self.end_handle()
                    debug_print('Test finished for {} project..., abnormal_flag: {}, finished_flag: {}'.
                                format(self.app_project_name, self.abnormal_flag, self.finished_flag))
                    break

            '''
            send_data = 'Response from socket server with threading {}...'.format(self.name)

            # https://stackoverflow.com/questions/180095/how-to-handle-a-broken-pipe-sigpipe-in-python/180922#180922
            # https://stackoverflow.com/questions/11866792/how-to-prevent-errno-32-broken-pipe
            # https://stackoverflow.com/questions/41014252/socket-error-errno-32-broken-pipe/41015359
            # https://stackoverflow.com/questions/409783/socket-shutdown-vs-socket-close
            # https://www.programcreek.com/python/example/3678/socket.SHUT_RDWR
            # https://pymotw.com/2/socket/tcp.html
            # https://pythontips.com/2013/08/06/python-socket-network-programming/
            # https://docs.python.org/3/howto/sockets.html
            # https://docs.python.org/3/library/socket.html
            try:
                self.request.sendall(send_data.encode())
            except ConnectionError as err:
                # self.socket.shutdown(socket.SHUT_RDWR)
                # self.socket.close()
                # 客户端调用上面两条语句断开连接,这时候服务器端的发送会产生如下错误
                # Error message: [Errno 32] Broken pipe
                print('Error message: {} with client ip address {}'.format(err, self.client_address))

                # socket.shutdown(how)
                # Shut down one or both halves of the connection. If how is SHUT_RD, further receives are disallowed.
                # If how is SHUT_WR, further sends are disallowed. If how is SHUT_RDWR,
                # further sends and receives are disallowed.

                # 如客户端已断开连接,这时调用socket.shutdown(socket.SHUT_RDWR)会报错,
                # 但是没有试验SHUT_RD或者SHUT_WR参数有没有问题,待验证
                # self.request.shutdown(socket.SHUT_RDWR)

                # Mark the socket closed. The underlying system resource (e.g. a file descriptor) is also closed when
                # all file objects from makefile() are closed. Once that happens, all future operations on the socket
                # object will fail. The remote end will receive no more data (after queued data is flushed).
                # Sockets are automatically closed when they are garbage-collected, but it is recommended to close()
                # them explicitly, or to use a with statement around them.

                # Note: close() releases the resource associated with a connection but does not necessarily close
                # the connection immediately. If you want to close the connection in a timely fashion, call shutdown()
                # before close().
                self.request.close()
                break
            '''

    def run(self):
        self.request_handle()


def debug_print(msg):
    datetime_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
    print_msg = 'Debug Information: {} -> {}'.format(datetime_str, msg)

    PRINT_THREADING_LOCK.acquire()
    print(print_msg)
    PRINT_THREADING_LOCK.release()


def app_init(app_project):
    pass


def run_test(app_project_name, app_project_dict):
    client_thread_lists = []
    server_address = (SOCKET_SERVER_ADDRESS, SOCKET_SERVER_PORT)

    test_config = app_project_dict.get('test_config')
    app_project = app_project_dict.get('app_project')

    # download bit file, compile project and download app to flash, run debug
    if BUILD_INIT_FLAG and BUILD_INIT_PER_TEST_FLAG:
        app_init(app_project)

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as socket_server:
        # Running an example several times with too small delay between executions, could lead to this error:
        # OSError: [Errno 98] Address already in use
        # This is because the previous execution has left the socket in a TIME_WAIT state,
        # and can’t be immediately reused.
        # There is a socket flag to set, in order to prevent this, socket.SO_REUSEADDR:
        '''
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        s.bind((HOST, PORT))
        '''
        # the SO_REUSEADDR flag tells the kernel to reuse a local socket in TIME_WAIT state,
        # without waiting for its natural timeout to expire.
        socket_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        # Bind the socket to address. The socket must not already be bound.
        socket_server.bind(server_address)

        # Set a timeout on blocking socket operations
        socket_server.settimeout(60)

        # Enable a server to accept connections. If backlog is specified,
        # it must be at least 0 (if it is lower, it is set to 0);
        # it specifies the number of unaccepted connections that the system will allow before refusing new connections.
        # If not specified, a default reasonable value is chosen.
        socket_server.listen()

        while True:
            try:
                request, client_address = socket_server.accept()
            except Exception as err:
                debug_print('error happened while accept connection, error message: {}'.format(err))
            else:
                debug_print('####################New connect from {} ...####################'.format(client_address))

                client_thread = SocketRequestHandler(app_project_name, request, client_address, test_config)
                client_thread_lists.append(client_thread)
                client_thread.daemon = True
                client_thread.start()
                time.sleep(5)
            finally:
                for phone_status_dict in SocketRequestHandler.phone_status_list:
                    if not phone_status_dict['tested']:
                        break
                else:
                    break


def run_all_tests(app_project_list):
    test_env = TEST_ENV
    test_info_dict = TEST_CASE_CONF[test_env['version']]
    app_project_group_dict = test_info_dict.get('app_project_dict')

    for app_project_name in app_project_list:
        app_project_dict = app_project_group_dict.get(app_project_name)
        app_project = app_project_dict.get('app_project')

        if app_project is not None:
            run_test(app_project_name, app_project_dict)


class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
    running_flag = False
    phone_list = []
    tested_phone_list = []

    def start_running(self):
        pass

    def log_handle(self, msg_dict):
        pass

    def cmd_handle(self, msg_dict):
        pass

    def heartbeat_handle(self, msg_dict):
        # cur_thread = threading.current_thread()
        logging.info("Received heartbeat packet from %s, the message content is: %s", self.client_address, msg_dict)

        if self.client_address not in ThreadedTCPRequestHandler.phone_list:
            ThreadedTCPRequestHandler.phone_list.append(self.client_address)

        if self.client_address not in ThreadedTCPRequestHandler.tested_phone_list:
            if not ThreadedTCPRequestHandler.running_flag:
                self.start_running()

    def handle(self):
        """
        data = str(self.request.recv(1024), 'ascii')
        cur_thread = threading.current_thread()
        response = bytes("{}: {}".format(cur_thread.name, data), 'ascii')
        self.request.sendall(response)
        :return:
        """
        req_str = self.request.recv(SOCKET_RECEIVE_BUFFER).decode(encoding='utf-8')

        try:
            json_str = json.loads(req_str)
        except BaseException as Err:
            logging.error(Err)
        else:
            msg_type = json_str['type']
            # msg_content = json_str['message']

            if 'cmd' == msg_type:
                self.cmd_handle(json_str)
            elif 'log' == msg_type:
                self.log_handle(json_str)
            elif 'heart' == msg_type:
                self.heartbeat_handle(json_str)
            else:
                pass

    def finish(self):
        logging.debug("end socket connect with client ip: %s" % str(self.client_address))
        pass

    def setup(self):
        logging.debug("start socket connect with client ip: %s" % str(self.client_address))
        pass


class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
    """
    This is because the previous execution has left the socket in a TIME_WAIT state, and can’t be immediately reused.
    There is a socket flag to set, in order to prevent this, socket.SO_REUSEADDR:
    """
    socketserver.TCPServer.allow_reuse_address = True
    pass


class OpenSocketServer:
    def __init__(self, server_ip, server_port=20000):
        self.server_ip = server_ip
        self.server_port = server_port
        self.server_handle = None

    def open_socket_server(self, server_ip, server_port=20000):
        self.server_ip = server_ip
        self.server_port = server_port
        self.server_handle = ThreadedTCPServer((server_ip, server_port), ThreadedTCPRequestHandler)

        with self.server_handle:
            # Start a thread with the server -- that thread will then start one
            # more thread for each request
            server_thread = threading.Thread(target=self.server_handle.serve_forever)
            # Exit the server thread when the main thread terminates
            server_thread.daemon = True
            server_thread.start()
            logging.debug("Server loop running in thread:", server_thread.name)

    def close_socket_server(self):
        self.server_handle.server_close()
        self.server_handle.shutdown()


def client(ip, port, message):
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock_client:
        # sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock_client.connect((ip, port))
        sock_client.sendall(bytes(message, 'ascii'))
        response = str(sock_client.recv(1024), 'ascii')
        logging.debug("Received: {}".format(response))
        logging.debug(socket.gethostbyname(socket.gethostname()),
                      socket.gethostbyaddr(socket.gethostname()), socket.gethostname())
        # sock.close()


def socket_connect_test():
    # Port 0 means to select an arbitrary unused port
    # host, port = "localhost", 0
    host, port = "172.16.49.142", 20000

    server = ThreadedTCPServer((host, port), ThreadedTCPRequestHandler)
    with server:
        ip, port = server.server_address

        logging.debug(ip, port)

        # Start a thread with the server -- that thread will then start one
        # more thread for each request
        server_thread = threading.Thread(target=server.serve_forever)
        # Exit the server thread when the main thread terminates
        server_thread.daemon = True
        server_thread.start()
        logging.debug("Server loop running in thread:", server_thread.name)

        client(ip, port, "Hello World 1")
        time.sleep(10)
        client(ip, port, "Hello World 2")
        time.sleep(10)
        client(ip, port, "Hello World 3")
        time.sleep(10)

        server.server_close()
        server.shutdown()


def main():
    logging_config(LOGGING_LEVEL)
    # socket_connect_test()

    run_all_tests(['ble_app_hrs'])


if __name__ == "__main__":
    print("Script start execution at {}".format(str(datetime.datetime.now())))

    time_start = time.time()
    main()

    print("\n\nTotal Elapsed Time: {} seconds".format(time.time() - time_start))
    print("\nScript end execution at {}".format(datetime.datetime.now()))
  • Socket Client
# -*- coding:utf-8 -*-

"""
@Author:        [email protected]
@Create Date:   2018-12-18
@Update Date:   2018-12-19
@Version:       V0.1.20181219
"""

import time
import json
import socket
import random
import inspect
import logging
import datetime
import threading

MAX_THREAD_NUM = 1
SERVER_IP_ADDR = '127.0.0.1'
SERVER_IP_PORT = 5000
LUCK_NUM = 1000
PRINT_THREADING_LOCK = threading.Lock()

PHONE_SYSTEM_INFO_LIST = [
    {'manufacturer': 'XiaoMi', 'model': 'XiaoMi 1', 'version': '1.1.1', 'imei': '123456789000001'},
    {'manufacturer': 'XiaoMi', 'model': 'XiaoMi 2', 'version': '2.1.1', 'imei': '123456789000002'},
    {'manufacturer': 'XiaoMi', 'model': 'XiaoMi 3', 'version': '3.1.1', 'imei': '123456789000003'},
    {'manufacturer': 'XiaoMi', 'model': 'XiaoMi 4', 'version': '4.1.1', 'imei': '123456789000004'},
    {'manufacturer': 'XiaoMi', 'model': 'XiaoMi 5', 'version': '5.1.1', 'imei': '123456789000005'},
    {'manufacturer': 'XiaoMi', 'model': 'XiaoMi 6', 'version': '6.1.1', 'imei': '123456789000006'},
    {'manufacturer': 'XiaoMi', 'model': 'XiaoMi 7', 'version': '7.1.1', 'imei': '123456789000007'},
    {'manufacturer': 'XiaoMi', 'model': 'XiaoMi 8', 'version': '8.1.1', 'imei': '123456789000008'},

    {'manufacturer': 'Honor', 'model': 'Honor-1', 'version': '1.1.1', 'imei': '123456789000011'},
    {'manufacturer': 'Honor', 'model': 'Honor-2', 'version': '2.1.1', 'imei': '123456789000012'},
    {'manufacturer': 'Honor', 'model': 'Honor-3', 'version': '3.1.1', 'imei': '123456789000013'},
    {'manufacturer': 'Honor', 'model': 'Honor-4', 'version': '4.1.1', 'imei': '123456789000014'},
    {'manufacturer': 'Honor', 'model': 'Honor-5', 'version': '5.1.1', 'imei': '123456789000015'},
    {'manufacturer': 'Honor', 'model': 'Honor-6', 'version': '6.1.1', 'imei': '123456789000016'},
    {'manufacturer': 'Honor', 'model': 'Honor-7', 'version': '7.1.1', 'imei': '123456789000017'},
    {'manufacturer': 'Honor', 'model': 'Honor-8', 'version': '8.1.1', 'imei': '123456789000018'},

    {'manufacturer': 'Vivo', 'model': 'Vivo X11', 'version': '1.1.1', 'imei': '123456789000021'},
    {'manufacturer': 'Vivo', 'model': 'Vivo X12', 'version': '2.1.1', 'imei': '123456789000022'},
    {'manufacturer': 'Vivo', 'model': 'Vivo X13', 'version': '3.1.1', 'imei': '123456789000023'},
    {'manufacturer': 'Vivo', 'model': 'Vivo X14', 'version': '4.1.1', 'imei': '123456789000024'},
    {'manufacturer': 'Vivo', 'model': 'Vivo X15', 'version': '5.1.1', 'imei': '123456789000025'},
    {'manufacturer': 'Vivo', 'model': 'Vivo X16', 'version': '6.1.1', 'imei': '123456789000026'},
    {'manufacturer': 'Vivo', 'model': 'Vivo X17', 'version': '7.1.1', 'imei': '123456789000027'},
    {'manufacturer': 'Vivo', 'model': 'Vivo X18', 'version': '8.1.1', 'imei': '123456789000028'},

    {'manufacturer': 'Oppo', 'model': 'Oppo F11', 'version': '1.1.1', 'imei': '123456789000031'},
    {'manufacturer': 'Oppo', 'model': 'Oppo F12', 'version': '2.1.1', 'imei': '123456789000032'},
    {'manufacturer': 'Oppo', 'model': 'Oppo F13', 'version': '3.1.1', 'imei': '123456789000033'},
    {'manufacturer': 'Oppo', 'model': 'Oppo F14', 'version': '4.1.1', 'imei': '123456789000034'},
    {'manufacturer': 'Oppo', 'model': 'Oppo F15', 'version': '5.1.1', 'imei': '123456789000035'},
    {'manufacturer': 'Oppo', 'model': 'Oppo F16', 'version': '6.1.1', 'imei': '123456789000036'},
    {'manufacturer': 'Oppo', 'model': 'Oppo F17', 'version': '7.1.1', 'imei': '123456789000037'},
    {'manufacturer': 'Oppo', 'model': 'Oppo F18', 'version': '8.1.1', 'imei': '123456789000038'},
]

LOG_MESSAGE = '{}: Log message {}'
RESULT_MESSAGE = [
    '{}: TEST RESULT: PASSED',
    '{}: TEST RESULT: FAILED',
    '{}: TEST RESULT: ERROR',
    '{}: TEST RESULT: SKIPPED',
]

frame = inspect.currentframe()

# log level
LOGGING_LEVEL = logging.DEBUG


def logging_config(logging_level):
    # log_format = '%(asctime)s - %(levelname)s - %(message)s'
    # log_format = '%(asctime)s [line: %(lineno)d] - %(levelname)s - %(message)s'
    log_format = '[%(asctime)s - [File: %(filename)s line: %(lineno)d] - %(levelname)s]: %(message)s'
    logging.basicConfig(level=logging_level, format=log_format)


def connect(ip, port):
    while True:
        try:
            socket_handle = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            socket_handle.connect((ip, port))
        except Exception as err:
            print('Error message: {}'.format(err))
            time.sleep(5)
        else:
            break

    return socket_handle


def client_send(socket_handle, message):
    ret_value = True
    json_str = json.dumps(message)

    if socket_handle is not None:
        print('Send: {}'.format(json_str))
        try:
            socket_handle.sendall(json_str.encode(encoding='utf-8'))
            print('address: {}, hostname tripe: {}, hostname: {}'.format(socket.gethostbyname(socket.gethostname()),
                                                                         socket.gethostbyaddr(socket.gethostname()),
                                                                         socket.gethostname()))
        except Exception as err:
            print('Error message: {}'.format(err))
            ret_value = False

        # sock.close()

    return ret_value


def client_send_received(socket_handle, message):
    ret_value = True
    json_str = json.dumps(message)

    if socket_handle is not None:
        print('Send: {}'.format(json_str))
        try:
            socket_handle.sendall(json_str.encode(encoding='utf-8'))
            response = socket_handle.recv(1024).decode(encoding='utf-8')
            print('Received: {}'.format(response))
            print('address: {}, hostname tripe: {}, hostname: {}'.format(socket.gethostbyname(socket.gethostname()),
                                                                         socket.gethostbyaddr(socket.gethostname()),
                                                                         socket.gethostname()))
        except Exception as err:
            print('Error message: {}'.format(err))
            ret_value = False

        # sock.close()

    return ret_value


def client_received(socket_handle):
    response = None

    if socket_handle is not None:
        response = socket_handle.recv(1024).decode(encoding='utf-8')
        print('Received: {}'.format(response))
        print('address: {}, hostname tripe: {}, hostname: {}'.format(socket.gethostbyname(socket.gethostname()),
                                                                     socket.gethostbyaddr(socket.gethostname()),
                                                                     socket.gethostname()))

    return response


def run_socket_client():
    ip, port = '172.16.53.55', 5000
    sys_message = {'type': 'info', 'category': 'phone',
                   'message': {'manufacturer': 'XiaoMi', 'model': 'MI 6', 'version': '8.1.1',
                               'imei': '201808020957129'}}
    fpga_cmd_message = {'type': 'cmd', 'category': 'fpga', 'message': 'AA 00 00'}
    hb_message = {'type': 'info', 'category': 'heart', 'message': 'heart message'}
    log_message = {'type': 'log', 'category': 'log', 'message': ''}
    result_message = {'type': 'log', 'category': 'result', 'message': 'Test Result: PASSED'}

    socket_handle = connect(ip, port)
    test_count = 3

    client_send(socket_handle, sys_message)
    time.sleep(5)

    count = 5
    received_msg = None

    while True:
        send_ret = client_send(socket_handle, hb_message)
        if not send_ret:
            break

        received_json_msg = client_received(socket_handle)
        if received_json_msg is None or not received_json_msg:
            break
        else:
            received_msg = json.loads(received_json_msg)

        if 'cmd' == received_msg['type']:
            break
        else:
            time.sleep(5)

    while True:
        if 'cmd' == received_msg['type'] and 'EE 00 00 00' == received_msg['message']['command']:
            client_send(socket_handle, fpga_cmd_message)
            time.sleep(2)
            while count > 0:
                message_content = 'log message {}'.format(count)
                log_message['message'] = message_content
                client_send(socket_handle, log_message)
                time.sleep(5)
                count -= 1

            client_send(socket_handle, fpga_cmd_message)
            time.sleep(2)
            client_send(socket_handle, result_message)
            test_count -= 1

        if test_count <= 0:
            break
        else:
            count = 5

        time.sleep(5)
        # count -= 1

        received_json_msg = client_received(socket_handle)
        if received_json_msg is None or not received_json_msg:
            break
        else:
            received_msg = json.loads(received_json_msg)

    try:
        socket_handle.shutdown(socket.SHUT_RDWR)
        socket_handle.close()
    except Exception as err:
        print('Error message: {}'.format(err))


def debug_print(msg):
    datetime_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
    print_msg = 'Debug Information: {} -> {}'.format(datetime_str, msg)

    PRINT_THREADING_LOCK.acquire()
    print(print_msg)
    PRINT_THREADING_LOCK.release()


class SocketClientHandler(threading.Thread):
    def __init__(self, thread_index, server_ip, server_port):
        super().__init__()
        self.thread_index = thread_index
        self.server_ip = server_ip
        self.server_port = server_port

        self.connect_flag = False
        self.socket_handle = None

    def socket_connect(self):
        time.sleep(random.randint(2, 20))

        while True:
            try:
                self.socket_handle = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.socket_handle.connect((self.server_ip, self.server_port))
                # self.socket_handle.setblocking(False)
                self.socket_handle.settimeout(5)
            except Exception as err:
                logging.debug('Error Happened[{}, {}, {}]: {}'.
                              format(self.name, self.thread_index, frame.f_lineno, err))
                time.sleep(random.randint(1, 5))
            else:
                self.connect_flag = True
                break

    def send_message_to_server(self, message):
        json_str = json.dumps(message)

        if self.socket_handle is not None:
            logging.debug('Send Message[{}, {}, {}]: {}'.
                          format(self.name, self.thread_index, frame.f_lineno, json_str))

            try:
                self.socket_handle.sendall(json_str.encode(encoding='utf-8'))
                logging.debug('address: {}, hostname tripe: {}, hostname: {}'.
                              format(socket.gethostbyname(socket.gethostname()),
                                     socket.gethostbyaddr(socket.gethostname()),
                                     socket.gethostname()))
            except Exception as err:
                logging.debug('Error Happened[{}, {}, {}]: {}'.
                              format(self.name, self.thread_index, frame.f_lineno, err))
                self.connect_flag = False

    def send_message(self, msg_type, msg_category, msg_str):
        cmd_message = {'type': msg_type, 'category': msg_category, 'message': msg_str}
        self.send_message_to_server(cmd_message)

    def received_message_from_server(self):
        response = None

        if self.socket_handle is not None:
            try:
                response = self.socket_handle.recv(1024).decode(encoding='utf-8')
                logging.debug('Received Message[{}, {}, {}]: {}'.
                              format(self.name, self.thread_index, frame.f_lineno, response))
                logging.debug('address: {}, hostname tripe: {}, hostname: {}'.
                              format(socket.gethostbyname(socket.gethostname()),
                                     socket.gethostbyaddr(socket.gethostname()),
                                     socket.gethostname()))
            except socket.timeout as err:
                logging.debug('Error Happened[{}, {}, {}]: {}'.
                              format(self.name, self.thread_index, frame.f_lineno, err))
            except socket.error as err:
                logging.debug('Error Happened[{}, {}, {}]: {}'.
                              format(self.name, self.thread_index, frame.f_lineno, err))
                self.connect_flag = False

        return response

    def received_handle(self):
        log_message_count = random.randint(50, 100)
        log_message_index = 0
        start_test_flag = False

        while True:
            if not self.connect_flag:
                break

            # send heart message
            self.send_message('info', 'heart', 'heart message')
            time.sleep(5)

            response = self.received_message_from_server()

            if response is not None:
                try:
                    json_str = json.loads(response)
                except BaseException as err:
                    logging.debug('Error Message[{}, {}, {}]: json load error with "{}" error'.
                                  format(self.name, self.thread_index, frame.f_lineno, err))
                    break
                else:
                    msg_type = json_str['type']

                    if 'cmd' == msg_type:
                        start_test_flag = True
                    elif 'info' == msg_type:
                        logging.debug('Info Message[{}, {}, {}]: {}'.
                                      format(self.name, self.thread_index, frame.f_lineno, json_str))
                    else:
                        logging.debug('Incorrect Message[{}, {}, {}]: {}'.
                                      format(self.name, self.thread_index, frame.f_lineno, json_str))

                # random to end the connection
                if LUCK_NUM == random.randint(0, 10):
                    logging.debug('disconnect with the server[{}, {}]'.format(self.name, self.thread_index))
                    self.socket_handle.close()
                    self.socket_handle = None
                    break

                # start to send log message
                if start_test_flag:
                    if log_message_index < log_message_count:
                        self.send_message('log', 'log', LOG_MESSAGE.
                                          format('[{}, {}]'.format(self.name, self.thread_index),
                                                 log_message_index))
                        log_message_index += 1
                        time.sleep(1)
                    else:
                        self.send_message('log', 'result', random.choice(RESULT_MESSAGE))
                        break
            else:
                pass
                # break

    def run(self):
        while True:
            if not self.connect_flag:
                self.socket_connect()

                # send device information to server
                # self.send_message('info', 'phone', PHONE_SYSTEM_INFO_LIST[self.thread_index])
                device_info = random.choice(PHONE_SYSTEM_INFO_LIST)
                device_info['imei'] = random.randint(100000000000000, 999999999999999)
                self.send_message('info', 'phone', device_info)

                # random to end the connection
                if LUCK_NUM == random.randint(0, 10):
                    logging.debug('disconnect with the server[{}, {}]'.format(self.name, self.thread_index))
                    self.socket_handle.close()
                    self.socket_handle = None
                    # break

            # # send heart message
            # self.send_message('info', 'heart', 'heart message')
            # time.sleep(5)

            # # random to end the connection
            # if LUCK_NUM == random.randint(0, 10):
            #     logging.debug('disconnect with the server[{}, {}]'.format(self.name, self.thread_index))
            #     self.socket_handle.close()
            #     self.socket_handle = None
            #     # break

            self.received_handle()


def run_threading_socket_client():
    socket_thread_list = []

    for thread_index in range(MAX_THREAD_NUM):
        thread_handler = SocketClientHandler(thread_index, SERVER_IP_ADDR, SERVER_IP_PORT)
        socket_thread_list.append(thread_handler)

    for thread_index in range(MAX_THREAD_NUM):
        socket_thread_list[thread_index].start()

    for thread_index in range(MAX_THREAD_NUM):
        socket_thread_list[thread_index].join()


def run_socket_client_multi_times(times=10):
    run_time = times
    while run_time > 0:
        run_socket_client()
        run_time -= 1


def main():
    # run_socket_client_multi_times()

    run_threading_socket_client()


if __name__ == '__main__':
    print('Script start execution at {}'.format(str(datetime.datetime.now())))

    logging_config(LOGGING_LEVEL)

    time_start = time.time()
    main()

    print('\n\nTotal Elapsed Time: {} seconds'.format(time.time() - time_start))
    print('\nScript end execution at {}'.format(datetime.datetime.now()))

results matching ""

    No results matching ""