123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371 |
- #/usr/bin/env python
- #coding=utf-8
- from mysql.connector.connection import MySQLConnection
- import commands
- import datetime
- import calendar
- from fty_util.config import APP_CFG
- class Mysql(object):
- def __init__(self):
- pass
-
- @staticmethod
- def createOnlineConn():
- # 如果需要使用连接池功能,可以多指定一个参数,pool_size=10或者pool_name
- config = {
- 'user': APP_CFG.ONLINE_CONFIG_USER,
- 'password': APP_CFG.ONLINE_CONFIG_PASSWORD,
- 'host': APP_CFG.ONLINE_CONFIG_HOST,
- 'port': APP_CFG.ONLINE_CONFIG_PORT
- }
- # config = {
- # 'user': 'root',
- # 'password': 'huojutech_yaozhi!23',
- # 'host': '121.41.17.212',
- # # 'database': 'yxb',
- # 'port': 3306
- # }
- # 如果conn是直接新建的连接则它会被关闭,如果是从线程池中分配一个连接则会被归还给连接池
- cnx = MySQLConnection()
- try:
- cnx.connect(**config)
- except Exception, e:
- print e
- cnx.reconnect(attempts=3, delay=0)
- return cnx
- @staticmethod
- def createOfflineConn():
- # 如果需要使用连接池功能,可以多指定一个参数,pool_size=10或者pool_name
- config = {
- 'user': APP_CFG.OFFLINE_CONFIG_USER,
- 'password': APP_CFG.OFFLINE_CONFIG_PASSWORD,
- 'host': APP_CFG.OFFLINE_CONFIG_HOST,
- 'port': APP_CFG.OFFLINE_CONFIG_PORT
- }
- # config = {
- # 'user': 'root',
- # 'password': 'huojutech_yaozhi!23',
- # 'host': '121.41.17.212',
- # # 'database': 'yxb',
- # 'port': 3306
- # }
- # 如果conn是直接新建的连接则它会被关闭,如果是从线程池中分配一个连接则会被归还给连接池
- cnx = MySQLConnection()
- try:
- cnx.connect(**config)
- except Exception, e:
- print e
- cnx.reconnect(attempts=3, delay=0)
- return cnx
- @staticmethod
- def createScrapyConn():
- # 如果需要使用连接池功能,可以多指定一个参数,pool_size=10或者pool_name
- config = {
- 'user': APP_CFG.SCRAPY_CONFIG_USER,
- 'password': APP_CFG.SCRAPY_CONFIG_PASSWORD,
- 'host': APP_CFG.SCRAPY_CONFIG_HOST,
- 'port': APP_CFG.SCRAPY_CONFIG_PORT
- }
- # config = {
- # 'user': 'root',
- # 'password': 'huojutech_yaozhi!23',
- # 'host': '121.41.17.212',
- # 'port': 3306
- # }
- # 如果conn是直接新建的连接则它会被关闭,如果是从线程池中分配一个连接则会被归还给连接池
- cnx = MySQLConnection()
- try:
- cnx.connect(**config)
- except Exception, e:
- print e
- cnx.reconnect(attempts=3, delay=0)
- return cnx
- @staticmethod
- def getCursor(conn=None, buffered=None):
- if not conn.is_connected():
- if conn is not None:
- conn.close()
- conn.reconnect(attempts=5)
- if buffered is not None:
- cursor = conn.cursor(buffered=True)
- else:
- cursor = conn.cursor()
- return cursor
- # if Mysql.__pool is None:
- # __pool = PooledDB(creator=mysql.connector, mincached=1, maxcached=20,
- # host=MYSQL_HOST,
- # port=MYSQL_PORT,
- # db=MYSQL_DBNAME,
- # user=MYSQL_USER,
- # passwd=MYSQL_PASSWD,
- # charset='utf8')
- # return __pool.connection()
- @staticmethod
- def getAll(sql, param=None, conn=None):
- # conn = self.getConn()
- cursor = Mysql.getCursor(conn=conn)
- """
- @summary: 执行查询, 并取出所有结果集
- @param sql: 查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
- @param param: 可选参数,条件列表值 (元组/列表)
- @return: result list(字典对象)/boolean 查询到的结果集
- """
- if param is None:
- cursor.execute(sql)
- else:
- cursor.execute(sql, param)
- cols = [t[0] for t in cursor.description]
- result = cursor.fetchall()
- if result:
- cursor.close()
- return [dict(zip(cols, row)) for row in result]
- else:
- cursor.close()
- return result
- @staticmethod
- def selectAll(sql, param=None, conn=None):
- # conn = self.getConn()
- cursor = Mysql.getCursor(conn=conn)
- """
- @summary: 执行查询, 并取出所有结果集
- @param sql: 查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
- @param param: 可选参数,条件列表值 (元组/列表)
- @return: result list查询到的结果集
- """
- if param is None:
- cursor.execute(sql)
- else:
- cursor.execute(sql, param)
- cols = [t[0] for t in cursor.description]
- result = cursor.fetchall()
- cursor.close()
- return result
- @staticmethod
- def getOne(sql, param=None, conn=None):
- # conn = self.getConn()
- cursor = Mysql.getCursor(conn=conn, buffered=True)
- """
- @summary: 执行查询,并取出第一条
- @param sql: 查询SQL, 如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
- @param param: 可选参数,条件列表值(元组/列表)
- @return: result list/boolean 查询到的结果集
- """
- if param is None:
- count = cursor.execute(sql)
- else:
- count = cursor.execute(sql, param)
- result = cursor.fetchone()
- cursor.close()
- return result
- @staticmethod
- def getMany(sql, num, param=None, conn=None):
- # conn = self.getConn()
- cursor = Mysql.getCursor(conn=conn)
- """
- @summary: 执行查询, 并取出num条结果
- @param sql: 查询SQL, 如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
- @param num: 取得的结果条数
- @param param: 可选参数, 条件列表值(元组/列表)
- @return: result list/boolean 查询到的结果集
- """
- if param is None:
- count = cursor.execute(sql)
- else:
- count = cursor.execute(sql, param)
- result = cursor.fetchmany(num)
- cursor.close()
- return result
- @staticmethod
- def insertOne(sql, value=None, conn=None):
- # conn = self.getConn()
- cursor = Mysql.getCursor(conn=conn)
- """
- @summary: 向数据表插入一条记录
- @param sql: 要插入的SQL格式
- @param value: 要插入的记录数据tuple/list
- @return: insertId 受影响的行数
- """
- if value is None:
- cursor.execute(sql)
- else:
- cursor.execute(sql, value)
- Mysql.dispose(cursor, conn)
- return Mysql.__getInsertId(conn)
- @staticmethod
- def insertMany(sql, values, conn):
- # conn = self.getConn()
- cursor = Mysql.getCursor(conn=conn)
- """
- @summary: 向数据表插入多条记录
- @param sql: 要插入的SQL格式
- @param values: 要插入的记录数据tuple(tuple)/list[list]
- @return: count 受影响的行数
- """
- count = cursor.executemany(sql, values)
- Mysql.dispose(cursor, conn)
- return count
- @staticmethod
- def __getInsertId(conn):
- # conn = self.getConn()
- cursor = Mysql.getCursor(conn=conn)
- """
- 获取当前连接最后一次插入操作生成的id,如果没有则为0
- """
- cursor.execute("select @@identity as id")
- result = cursor.fetchall()
- cursor.close()
- return result[0][0]
- @staticmethod
- def __query(sql, param=None, conn=None):
- # conn = Mysql.getConn()
- cursor = Mysql.getCursor(conn=conn)
- if param is None:
- count = cursor.execute(sql)
- else:
- count = cursor.execute(sql, param)
- Mysql.dispose(cursor, conn)
- return count
- @staticmethod
- def execute(sql, param=None, conn=None):
- # conn = self.getConn()
- cursor = Mysql.getCursor(conn=conn)
- if param is None:
- count = cursor.execute(sql)
- else:
- count = cursor.execute(sql, param)
- Mysql.dispose(cursor, conn)
- return count
- @staticmethod
- def updateMany(sql, param=None, conn=None):
- # conn = Mysql.getConn()
- cursor = Mysql.getCursor(conn=conn)
- count = cursor.executemany(sql, param)
- return count
- @staticmethod
- def update(sql, param=None, conn=None):
- """
- @summary: 更新数据表记录
- @param sql: sql格式及条件, 使用(%s, %s)
- @param param: 要更新的 值 tuple/list
- @return: count 受影响的行数
- """
- return Mysql.__query(sql, param=param, conn=conn)
- @staticmethod
- def delete(sql, param=None, conn=None):
- """
- @summary: 删除数据表记录
- @param sql: sql格式及条件,使用(%s, %s)
- @param param: 要删除的条件 值 tuple/list
- @return: count 受影响的行数
- """
- return Mysql.__query(sql, param=param, conn=conn)
- @staticmethod
- def dispose(cursor, conn):
- """
- @summary: 释放连接池资源
- """
- conn.commit()
- cursor.close()
- @staticmethod
- def close(conn):
- if conn:
- conn.close()
- @staticmethod
- def cmd(cmd):
- status, output = commands.getstatusoutput(cmd)
- if status != 0:
- print '同步线上失败'
- else:
- print '同步线上成功'
- class Util(object):
-
- @staticmethod
- def insert_by_chunk(sql, data_list, conn):
- start = 0
- while True:
- end = start + 10000
- if end >= len(data_list):
- end = len(data_list)
- if start >= len(data_list):
- break
- Mysql.insertMany(sql, data_list[start:end], conn)
- start = end
- @staticmethod
- def calc_ratings_index(num1, num2):
- """计算收视指数
- 收视指数 = 收视率/(近一年平均收视率 * 0.2)
- 收视指数 > 10 按10计算
- 收视指数 < 1 按1计算
- Args: num1 收视率
- Args: num2 近一年平均收视率
- """
- # 如果num1 或 num2为空,则直接返回指数为1
- if num1 is None or num2 is None:
- return 1.0
- ratings_index = float(num1) / (float(num2) * 0.2)
- if ratings_index > 10.0:
- ratings_index = 10.0
- if ratings_index < 1.0:
- ratings_index = 1.0
- return ratings_index
- @staticmethod
- def get_max_date_of_month(field):
- """获取给定月份的最大日期
- """
- if isinstance(field, datetime.date):
- month_str = field.strftime('%Y-%m-%d')
- _year = str(month_str.split('-')[0])
- _month = str(month_str.split('-')[1])
- max_date = calendar.monthrange(int(_year), int(_month))
- date_str = _year + '-' + _month + '-' + str(max_date[1])
- return date_str
- @staticmethod
- def get_first_date_of_yesterday():
- # 当前日期
- now = datetime.date.today()
- # 昨天日期
- yesterday = now - datetime.timedelta(days=1)
- # 昨天的当月日期
- first_day = datetime.date(yesterday.year, yesterday.month, 1)
- return first_day
- @staticmethod
- def get_max_date_of_one_year_ago(field):
- """获取给定月份一年前的日期
- """
- if isinstance(field, datetime.date):
- month_str = field.strftime('%Y-%m-%d')
- _year = str(month_str.split('-')[0])
- _month = str(month_str.split('-')[1])
- max_date = calendar.monthrange(int(_year)-1, int(_month))
- date_str = str(int(_year) - 1) + '-' + _month + '-' + str(max_date[1])
- return date_str
|