Python MySQL Elasticsearch Example

In this article, there is a big MySQL database table that contains a huge amount of data. The python example code will select the last 7 days data from the MySQL DB table and then insert those data into an Elasticsearch server programmatically. The Elasticsearch server version is 7.o.

1. MySQL Database Table Structure.

Below is the MySQL DB table definition.

CREATE TABLE `historic_records` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `user_id` varchar(50) NOT NULL DEFAULT '' COMMENT 'user id',
  `time` bigint(20) NOT NULL DEFAULT '0' COMMENT 'online / offlin time',
  `create_time` bigint(20) NOT NULL DEFAULT '0' COMMENT 'creation time',
  `update_time` bigint(20) NOT NULL DEFAULT '0' COMMENT 'update time',
  `online_status` tinyint(1) NOT NULL DEFAULT '0' COMMENT 'online status, default value is 1, 0 offline 1 online',
  `status` tinyint(1) NOT NULL DEFAULT '1' COMMENT 'soft delete tag:0-has been removed;1-normal',
  PRIMARY KEY (`id`),
  KEY `user_id` (`user_id`),
  KEY `order_index` (`time`,`create_time`,`update_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='historic records table';

Delete record SQL statement.

delete from historic_records where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000

Select record SQL statment.

select * from historic_records where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000

2. ElasticSearch Concepts.

  1. node: A server, identified by a name or ip address.
  2. cluster: One or more nodes are organized together.
  3. index: It is equivalent to the MySQL database.
  4. type: Equivalent to a table in MySQL.
  5. document: Equivalent to a record in MySQL table.
  6. field: Equivalent to a column in MySQL table.
  7. shard: The ability to divide a piece of data into many small pieces, allows horizontal splitting and scaling of capacity. Multiple sharding can respond to multiple requests, then it can improve performance and throughput.
  8. replication: Data replication, when a node has an error, the other replication nodes can be used.

3. Elasticsearch Data Structure.

Define Elasticsearch data fields and data types according to the MySQL DB table structure.

# Create the data fileds and type. 
_index_mappings = {
    "settings": {
        "index": {
            # The number of primary shards per index. The default value is 5. This configuration cannot be modified after index creation.
            "number_of_shards": 3,
            # The number of replications per shard. The default value is 1. For an active index, this configuration can be modified at any time.
            "number_of_replicas": 1
        }
    },
    # Fields definition.
    "mappings": {
        # self.index_type : {
            "properties": {
                "id": {"type": "long"}, # Since the id type in MySQL is bigint(20), it is long in Elasticsearch, which means a big integer number.
                "user_id": {"type": "keyword"}, # The type of MySQL table user_id is varchar(50). In Elasticsearch, there are two types: text and keyword can mapping to it. Text type creates a full-text index and supports fuzzy search. Keyword, on the other hand, does not support full-text index, it only support accurate search. Because user_id doesn't need fuzzy search, so set it's data type to keyword is reasonable.
                "mac": {"type": "keyword"},
                "time": {
                    "type": "date",
                    "format": "epoch_millis"
                },
                "create_time": {
                    "type": "date",
                    "format": "epoch_millis"
                }, # The timestamp type in MySQL mapping to 'date' type in Elasticsearch, the time format is epoch_millis for microsecond timestamp.
                "update_time": {
                    "type": "date",
                    "format": "epoch_millis"
                },
                "online_status": {"type": "short"}, # 'tinyint' data type in MySQL mapping to 'short' data type in Elasticsearch.
                "status": {"type": "short"}
            }
        # }
    }
}

4. The Example Source Files Structure.

./
├── conf.py
├── es_bulk.py
├── README.md
├── requirements.txt
└── utils
    ├── common.py
    └── mysql.py

5. Use Python Pymysql Module To Query Data From MySQL Database Server.

To facilitate the operation of MySQL, a MySQL tool class is encapsulated to query and update data. It uses python pymysql module to operate the MySQL database server.

mysql.py

# Import pymysql library module.
import pymysql

# Create a tool class Mysql to operate MySQL database server.
class Mysql(object):
    
    '''
     host: MySQL database server ip.
     port: MySQL DB server port number, should be an integer, the default port number is 3306.
     db_name: MySQL DB server database name.
     user: MySQL DB server account user name.
     passwd: MySQL DB server account user password.
    '''
    def __init__(self,host,port=3306,db_name,user,passwd):
        self.host = host
        self.port = port
        self.db_name = db_name
        self.user = user
        self.passwd = passwd

    # Connect to the MySQL DB server and return the DB connection object.
    def connect_to_mysql():
        try:
            conn = pymysql.connect(
                host=self.host,
                user=self.user,
                passwd=self.passwd,
                port=self.port,
                database=self.db_name,
                charset='utf8',
                cursorclass=pymysql.cursors.DictCursor
            )
            return conn
        except Exception as e:
            print(e)
            return False

       
        
    """
      Execute select SQL command.
      :param sql: The MySQL SQL statement.
      :return: A tuple contains the SQL select results.
    """    
    def select(self,sql):
        try:
            conn = connect_to_mysql()

            # Create the database cursor.
            cur = conn.cursor()

            # Execute the select SQL statement.  
            cur.execute(sql) 

            # Get above sql execution results/ 
            res = cur.fetchall()  

            # Close the cursor.
            cur.close()

            # Close the connection to the MySQL server.
            conn.close()

            # Return the select results/
            return res
        except Exception as e:
            print(e)
            return False


    '''
      Execute update SQL command ( insert, update, delete ).
      :param sql : The insert, update or delete sql command.
      :return: bool - 1.True: update success, False: update failed. 
    '''
    def update(self,sql):
        
        try:
            conn = connect_to_mysql()
            
            # Create the database cursor.
            cur = conn.cursor(cursor=pymysql.cursors.DictCursor)
            
            # Execute the SQL command and return the number of rows affected.
            stmt = cur.execute(sql)

            # Check the return result, if it is a number then the execution success.
            if isinstance(stmt,int):
                pass
                write_log('Execute sql: %s success.'%sql, "green")
            else:
                write_log('Execute sql: %s fail.'%sql, "red")
                return False

            # Commit actively, otherwise the execution of the SQL will not take effect.
            conn.commit()

            # Close the DB cursor object.
            cur.close()
           
            # Close the DB connection object.
            conn.close()

            # Return the affected results number.
            return stmt
        except Exception as e:
            print(e)
            # write_log('Execute mysql sql: {} 异常'.format(sql), "red")
            return False

6. Use Python Elasticsearch Module To Insert Data To Elasticsearch Server.

This example uses the python elasticsearch module to operate( such as insert data, create index, etc. ) the Elasticsearch server. Below is the python source code.

es_bulk.py

# Import time module.
import time

# Import elasticsearch modules.
from elasticsearch import Elasticsearch
from elasticsearch import helpers

# Import Mysql tool class.
import conf
from utils.mysql import Mysql

# Import some tool classes, we will define these class in common.py file later.
from utils.common import write_log,valid_ip,check_tcp

# The ElasticObj class is used to operate elasticsearch data.
class ElasticObj:
    
    ''' 
      Instance method. It will connect to the elasticsearch server in this method.
      :param timeout:  The elasticsearch server connect timeout value.
    '''
    def __init__(self,timeout=3600):
        
        # Index name.
        self.index_name = conf.INDEX_NAME

        # Index type.
        self.index_type = conf.INDEX_TYPE 

        # Elasticsearch server ip.
        self.es_ip = conf.ES_IP

        # Connect to the elasticsearch server anonymously.
        self.es = Elasticsearch([self.es_ip], port=9200, timeout=timeout)
        
        # Connect to the elasticsearch server with username and password.
        # self.es = Elasticsearch([self.es_ip], http_auth=('admin', 'admin'), port=9200, timeout=timeout)

    ''' 
       This method will create data index in elasticsearch server.
       return: bool 
    '''
    def create_index(self):
        
        # Create index mapping.
        _index_mappings = {
            # Index settings.
            "settings": {
                "index": {
                    # Index shards number.
                    "number_of_shards": 3,
                    # Index replication number.
                    "number_of_replicas": 1  
                }
            },
            # configure index fields.
            "mappings": {
                "properties": {
                    "id": {"type": "long"},
                    "loid": {"type": "keyword"},
                    "mac": {"type": "keyword"},
                    "time": {
                        "type": "date",
                        "format": "epoch_millis"
                    },
                    "create_time": {
                        "type": "date",
                        "format": "epoch_millis"
                    },
                    "update_time": {
                        "type": "date",
                        "format": "epoch_millis"
                    },
                    "online_status": {"type": "short"},
                    "status": {"type": "short"}
                }
            }
        }
        # If there is not any index existing in the elasticsearch server.
        if self.es.indices.exists(index=self.index_name) is not True:
            # Then create the index in elasticsearch server.
            res = self.es.indices.create(index=self.index_name, body=_index_mappings)
            # print(res)
            if not res:
                write_log("Create index {} failed.".format(self.index_name),"red")
                return False

            write_log("Create index {} success.".format(self.index_name), "green")
            return True
        else:
            write_log("The elasticsearch index {} exist.".format(self.index_name), "green")
            return True




     """ Batch insert data into elasticsearch server.
         :param table: The MySQL table name. 
         :param data_list: Inserted data list, below is an example.
         [ 
            { 
               'online_status': 1, 
               'update_time': 1668192126238, 
               'create_time': 1668192126238, 
               'id': 1, 
               'status': 1, 
               'time': 1668192126238, 
               'loid': '1668192126238', 
               'mac': '51:56:cb:96:c8:86' 
             }, 
             ... 
          ] 
          :return: bool 
    """
    def bulk_insert(self,table,data_list):
        
        # Batch insert.
        # Insert start time.
        start_time = time.time()
        
        # Create a temporary data list.
        actions = []  
        
        # Counter.
        i = 0  

        try:
            # Create the temporary data list ( which will be inserted into elasticsearch server ) by loop the input data list.
            for data in data_list:
                # Create one JSON format document that will be inserted into elasticsearch server.
                action = {
                    "_index": self.index_name,
                    "_type": self.index_type,
                    #"_id": i,  #_id ( It can also be generated by elasticsearch server automatically without assignment. )
                    "_source": {
                        'id': data['id'],
                        'user_id': data['user_id'],
                        'time': data['time'],
                        'create_time': data['create_time'],
                        'online_status': data['online_status'],
                        'status': data['status'],
                    }
                }
                # Counter plus 1.
                i += 1
                # Add above JSON format elasticsearch document to data list.
                actions.append(action) 
                # When the number of list elements reaches 100.
                if len(action) == conf.MAXIMUM: 
                    # Batch insert the data into elasticsearch server.
                    helpers.bulk(self.es, actions)
                    # Empty the temporary data list.
                    del actions[0:len(action)]
            
            # When there are less than 100 elements in the input data list then insert the remaining data into elasticsearch server.
            if i > 0:  
                helpers.bulk(self.es, actions)

            # Get insertion end time.
            end_time = time.time()

            # Calculate the insertion cost time.
            t = round((end_time - start_time),2)
            
            write_log("Transfer {} records from MySQL table {} to ES ,cost {} seconds.".format(i, table, t), "green")
            return True
        except Exception as e:
            print(e)
            return False

    """ 
       Check whether the MySQL table exist or not.
       :return: bool 
    """
    def has_table(self,db_name,target_table):
        
        mysql_obj = Mysql(conf.HOST, conf.USER, conf.PASSWD, conf.DB_NAME, conf.PORT)
        sql = "select count(1) from {}.{}".format(db_name, target_table)
        res = mysql_obj.select(sql)

        if res is False:
            write_log("MySQL table {}.{} does not exist.".format(db_name,target_table),"red")
            return False
        else:
            return True

    """ 
        Verify the ip address of MySQL server and elasticsearch server is valid. 
        Also verify the MySQL and elasticsearch server port is reachable.
        :return: bool
    """
    def has_conf(self):
        
        if not valid_ip(conf.HOST):
            write_log("MySQL server ip address is not a valid ip.","red")
            return False

        if not valid_ip(conf.ES_IP):
            write_log("Elasticsearch server ip address is not a valid ip.","red")
            return False

        if not check_tcp(conf.HOST,conf.PORT):
            write_log("MySQL server port {} is unreachable.".format(conf.PORT),"red")
            return False

        if not check_tcp(conf.ES_IP,9200):
            write_log("Elasticsearch server port 9200 is unreachalbe. ","red")
            return False

        return True



    """ 
       Read the last 7-day records from MySQL table and write it to elasticsearch server.
       :return: bool 
    """
    def read_mysql_es(self):
        
        # Check whether the MySQL and Elasticsearch server ip, ports in the configuration file are normal or not.
        if not self.has_conf():
            return False

        # Create elasticsearch data index.
        if self.create_index() is False:
            # print(2)
            return False

        # Maximum row number of one query at one time.
        max = conf.MAXIMUM
        
        # The flag_list is used to records unexist table.
        flag_list = []
        mysql_obj = Mysql(conf.HOST, conf.USER, conf.PASSWD, conf.DB_NAME, conf.PORT)
        
        # Transfer 64 tables records to elasticsearch server.
        for i in range(64):  
            # Check whether the table exist or not.
            res = self.has_table(conf.DB_NAME,'historic_record_%s'%i)
            if not res:
                flag_list.append(False)
                return False

            id = 0 
            while True:
                # Query records from the table.
                sql = "select * from historic_record_%s where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000 and id > %s order by id limit %s" % (
                i, id, max)
                data_list = mysql_obj.select(sql)

                # Break the loop if the select result is empty.
                if not data_list:  
                    write_log("The sql: %s execution return empty results, no records to write to elasticsearch server." %(sql), "yellow")
                    break 

                # The last row in the table.
                last_row = data_list[-1]

                # Modify the maximum id value.
                id = last_row['id']  

                # Batch insert the MySQL table row data into elasticsearch server.
                res = self.bulk_insert('historic_record_%s' % i, data_list)
                if not res:
                    write_log("Transfer table historic_record_%s rows to elasticsearch server fail."%i,"red")
                    flag_list.append(False)
                    return False

        if False in flag_list:
            write_log("Error, part of the historic_record table can not be transfered to elasticsearch server.","red")
            return False

        write_log("All historic_record table's rows has been transfered to elasticsearch server successfully.", "green")
        return True



    """ 
       Delete table data for the last 7 days.
       :return: bool 
    """
    def delete_record(self):

        # Maximum row number of one query at one time.
        max = conf.MAXIMUM
        flag_list = []
        mysql_obj = Mysql(conf.HOST, conf.USER, conf.PASSWD, conf.DB_NAME, conf.PORT)
        for i in range(64):
            # Check whether the table exist or not.
            res = self.has_table(conf.DB_NAME, 'historic_record_%s' % i)
            if not res:
                flag_list.append(False)
                return False

            # Records the maximum id value after each query.
            id = 0
            while True:
                # Query data.
                sql = "select * from historic_record_%s where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000 and id > %s order by id limit %s" % (
                    i, id, max)
                # Get the returned data list.
                data_list = mysql_obj.select(sql)

                # When the result is empty, break the loop.
                if not data_list: 
                    write_log("The sql: %s execution return empty result, no need to delete." % sql, "yellow")
                    # Break the loop.
                    break

                ### Now delete the table records.
                sql = "delete from historic_record_%s where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000 and id > %s order by id limit %s" % (
                    i, id, max)
                # Invoke Mysql class object's update method.
                res = mysql_obj.update(sql)
                if res is False:
                    write_log("Delete historic_record_%s table records fail." % i, "red")
                    flag_list.append(False)
                    break
                else:
                    write_log("Delete historic_record_%s table records success." % i, "green")
                    
                # Get the last row in the table.
                last_row = data_list[-1]
                
                # Modify the maximum id value.
                id = last_row['id']


        if False in flag_list:
            write_log("Delete part of historic_record table fail.", "red")
            return False

        write_log("Delete all historic_record table success.", "green")

    '''
       The main function will first read data from MySQL database table then delete records from the table.
    '''
    def main(self):
        self.read_mysql_es()
        self.delete_record()

# Run the main function.
ElasticObj().main()

7. The Configuration Python File.

conf.py

"""
The mysql and elasticsearch server commons settings file.
"""
import os

# Get the project root folder.
BASE_DIR = os.path.dirname(os.path.abspath(__file__))  

# mysql
HOST = "192.168.1.166"
USER = "root"
PASSWD = "666666"
DB_NAME = "test_db"
PORT = 3306

# elasticsearch
INDEX_NAME = "test_records"
INDEX_TYPE = "_doc"
ES_IP = "192.168.6.188"

# The maximum number of rows that will be inserted into elasticsearch server.
MAXIMUM = 100

8. The Tool Class Python File.

common.py

"""
The tool class python file.
"""

import sys
import io

# Set the default screen output text encoding to UTF-8
def setup_io(): 
    sys.stdout = sys.__stdout__ = io.TextIOWrapper(sys.stdout.detach(), encoding='utf-8', line_buffering=True)
    sys.stderr = sys.__stderr__ = io.TextIOWrapper(sys.stderr.detach(), encoding='utf-8', line_buffering=True)
setup_io()


import os
import time
import conf
import socket
import subprocess
import ipaddress
from multiprocessing import cpu_count

"""
    Write to log file.
    :param content: The log content data.
    :param colour: Test color.
    :param skip: Whether to skip print time.
    :return:
"""
def write_log(content,colour='white',skip=False):
    # Color code.
    colour_dict = {
        'red': 31,  # Red
        'green': 32,  # Green
        'yellow': 33,  # Yellow
        'blue': 34,  # Bluw
        'purple_red': 35,  # Purple
        'bluish_blue': 36, # Light blue.
        'white': 37,  # White
    }
    # Choose one color code.
    choice = colour_dict.get(colour)  

    # Create the log file path and name.
    path = os.path.join(conf.BASE_DIR,"output.log") 
    with open(path, mode='a+', encoding='utf-8') as f:
        # Print the log date time.
        if skip is False: 
            content = time.strftime('%Y-%m-%d %H:%M:%S') + ' ' + content

        info = "\033[1;{};1m{}\033[0m".format(choice, content)
        print(info)
        f.write(content+"\n")
 

""" 
   Execute linux command and return a list. 
   :param cmd: linux command. 
   :param timeout: timeout number in seconds. 
   :param skip: whether skip timeout or not.
   :return: list 
"""       
def execute_linux2(cmd, timeout=10, skip=False):

    # Invoke the linux command.
    p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True)

    # Record the start time.
    t_beginning = time.time()

    while True:
        if p.poll() is not None:
            break
        seconds_passed = time.time() - t_beginning
        if timeout and seconds_passed > timeout:
            p.terminate()

            if not skip:
                write_log('The linux command: {} execute timeout.'.format(cmd),"red")
                return False

    result = p.stdout.readlines()
    return result


""" 
   Verify whether the IP is valid. For example, 192.168.1.256 is a nonexistent IP.
   :return: bool 
"""
def valid_ip(ip):

    try:
        # Check python version.
        if sys.version_info[0] == 2:
            ipaddress.ip_address(ip.strip().decode("utf-8"))
        elif sys.version_info[0] == 3:
            ipaddress.ip_address(ip)

        return True
    except Exception as e:
        print(e)
        return False


""" 
   Check whether tcp port can be accessed or not.
   :param ip: ip address. 
   :param port: tcp port number.
   :param timeout: timeout number value.
   :return: bool 
"""
def check_tcp(ip, port, timeout=1):

    flag = False
    try:
        # Set timeout value in socket layer.
        socket.setdefaulttimeout(timeout)
        cs = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        address = (str(ip), int(port))
   
        # Connect to tcp port number.
        status = cs.connect_ex((address))
        cs.settimeout(timeout)

        if not status:
            flag = True

        return flag
    except Exception as e:
        print(e)
        return flag

COROUTINE_NUMBER = cpu_count()

Leave a Comment

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.