#/usr/bin/env python #coding=utf-8 from mysql.connector.connection import MySQLConnection import commands import datetime import calendar from fty_util.config import APP_CFG class Mysql(object): def __init__(self): pass @staticmethod def createOnlineConn(): # 如果需要使用连接池功能,可以多指定一个参数,pool_size=10或者pool_name config = { 'user': APP_CFG.ONLINE_CONFIG_USER, 'password': APP_CFG.ONLINE_CONFIG_PASSWORD, 'host': APP_CFG.ONLINE_CONFIG_HOST, 'port': APP_CFG.ONLINE_CONFIG_PORT } # config = { # 'user': 'root', # 'password': 'huojutech_yaozhi!23', # 'host': '121.41.17.212', # # 'database': 'yxb', # 'port': 3306 # } # 如果conn是直接新建的连接则它会被关闭,如果是从线程池中分配一个连接则会被归还给连接池 cnx = MySQLConnection() try: cnx.connect(**config) except Exception, e: print e cnx.reconnect(attempts=3, delay=0) return cnx @staticmethod def createOfflineConn(): # 如果需要使用连接池功能,可以多指定一个参数,pool_size=10或者pool_name config = { 'user': APP_CFG.OFFLINE_CONFIG_USER, 'password': APP_CFG.OFFLINE_CONFIG_PASSWORD, 'host': APP_CFG.OFFLINE_CONFIG_HOST, 'port': APP_CFG.OFFLINE_CONFIG_PORT } # config = { # 'user': 'root', # 'password': 'huojutech_yaozhi!23', # 'host': '121.41.17.212', # # 'database': 'yxb', # 'port': 3306 # } # 如果conn是直接新建的连接则它会被关闭,如果是从线程池中分配一个连接则会被归还给连接池 cnx = MySQLConnection() try: cnx.connect(**config) except Exception, e: print e cnx.reconnect(attempts=3, delay=0) return cnx @staticmethod def createScrapyConn(): # 如果需要使用连接池功能,可以多指定一个参数,pool_size=10或者pool_name config = { 'user': APP_CFG.SCRAPY_CONFIG_USER, 'password': APP_CFG.SCRAPY_CONFIG_PASSWORD, 'host': APP_CFG.SCRAPY_CONFIG_HOST, 'port': APP_CFG.SCRAPY_CONFIG_PORT } # config = { # 'user': 'root', # 'password': 'huojutech_yaozhi!23', # 'host': '121.41.17.212', # 'port': 3306 # } # 如果conn是直接新建的连接则它会被关闭,如果是从线程池中分配一个连接则会被归还给连接池 cnx = MySQLConnection() try: cnx.connect(**config) except Exception, e: print e cnx.reconnect(attempts=3, delay=0) return cnx @staticmethod def getCursor(conn=None, buffered=None): if not conn.is_connected(): if conn is not None: conn.close() conn.reconnect(attempts=5) if buffered is not None: cursor = conn.cursor(buffered=True) else: cursor = conn.cursor() return cursor # if Mysql.__pool is None: # __pool = PooledDB(creator=mysql.connector, mincached=1, maxcached=20, # host=MYSQL_HOST, # port=MYSQL_PORT, # db=MYSQL_DBNAME, # user=MYSQL_USER, # passwd=MYSQL_PASSWD, # charset='utf8') # return __pool.connection() @staticmethod def getAll(sql, param=None, conn=None): # conn = self.getConn() cursor = Mysql.getCursor(conn=conn) """ @summary: 执行查询, 并取出所有结果集 @param sql: 查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来 @param param: 可选参数,条件列表值 (元组/列表) @return: result list(字典对象)/boolean 查询到的结果集 """ if param is None: cursor.execute(sql) else: cursor.execute(sql, param) cols = [t[0] for t in cursor.description] result = cursor.fetchall() if result: cursor.close() return [dict(zip(cols, row)) for row in result] else: cursor.close() return result @staticmethod def selectAll(sql, param=None, conn=None): # conn = self.getConn() cursor = Mysql.getCursor(conn=conn) """ @summary: 执行查询, 并取出所有结果集 @param sql: 查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来 @param param: 可选参数,条件列表值 (元组/列表) @return: result list查询到的结果集 """ if param is None: cursor.execute(sql) else: cursor.execute(sql, param) cols = [t[0] for t in cursor.description] result = cursor.fetchall() cursor.close() return result @staticmethod def getOne(sql, param=None, conn=None): # conn = self.getConn() cursor = Mysql.getCursor(conn=conn, buffered=True) """ @summary: 执行查询,并取出第一条 @param sql: 查询SQL, 如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来 @param param: 可选参数,条件列表值(元组/列表) @return: result list/boolean 查询到的结果集 """ if param is None: count = cursor.execute(sql) else: count = cursor.execute(sql, param) result = cursor.fetchone() cursor.close() return result @staticmethod def getMany(sql, num, param=None, conn=None): # conn = self.getConn() cursor = Mysql.getCursor(conn=conn) """ @summary: 执行查询, 并取出num条结果 @param sql: 查询SQL, 如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来 @param num: 取得的结果条数 @param param: 可选参数, 条件列表值(元组/列表) @return: result list/boolean 查询到的结果集 """ if param is None: count = cursor.execute(sql) else: count = cursor.execute(sql, param) result = cursor.fetchmany(num) cursor.close() return result @staticmethod def insertOne(sql, value=None, conn=None): # conn = self.getConn() cursor = Mysql.getCursor(conn=conn) """ @summary: 向数据表插入一条记录 @param sql: 要插入的SQL格式 @param value: 要插入的记录数据tuple/list @return: insertId 受影响的行数 """ if value is None: cursor.execute(sql) else: cursor.execute(sql, value) Mysql.dispose(cursor, conn) return Mysql.__getInsertId(conn) @staticmethod def insertMany(sql, values, conn): # conn = self.getConn() cursor = Mysql.getCursor(conn=conn) """ @summary: 向数据表插入多条记录 @param sql: 要插入的SQL格式 @param values: 要插入的记录数据tuple(tuple)/list[list] @return: count 受影响的行数 """ count = cursor.executemany(sql, values) Mysql.dispose(cursor, conn) return count @staticmethod def __getInsertId(conn): # conn = self.getConn() cursor = Mysql.getCursor(conn=conn) """ 获取当前连接最后一次插入操作生成的id,如果没有则为0 """ cursor.execute("select @@identity as id") result = cursor.fetchall() cursor.close() return result[0][0] @staticmethod def __query(sql, param=None, conn=None): # conn = Mysql.getConn() cursor = Mysql.getCursor(conn=conn) if param is None: count = cursor.execute(sql) else: count = cursor.execute(sql, param) Mysql.dispose(cursor, conn) return count @staticmethod def execute(sql, param=None, conn=None): # conn = self.getConn() cursor = Mysql.getCursor(conn=conn) if param is None: count = cursor.execute(sql) else: count = cursor.execute(sql, param) Mysql.dispose(cursor, conn) return count @staticmethod def updateMany(sql, param=None, conn=None): # conn = Mysql.getConn() cursor = Mysql.getCursor(conn=conn) count = cursor.executemany(sql, param) return count @staticmethod def update(sql, param=None, conn=None): """ @summary: 更新数据表记录 @param sql: sql格式及条件, 使用(%s, %s) @param param: 要更新的 值 tuple/list @return: count 受影响的行数 """ return Mysql.__query(sql, param=param, conn=conn) @staticmethod def delete(sql, param=None, conn=None): """ @summary: 删除数据表记录 @param sql: sql格式及条件,使用(%s, %s) @param param: 要删除的条件 值 tuple/list @return: count 受影响的行数 """ return Mysql.__query(sql, param=param, conn=conn) @staticmethod def dispose(cursor, conn): """ @summary: 释放连接池资源 """ conn.commit() cursor.close() @staticmethod def close(conn): if conn: conn.close() @staticmethod def cmd(cmd): status, output = commands.getstatusoutput(cmd) if status != 0: print '同步线上失败' else: print '同步线上成功' class Util(object): @staticmethod def insert_by_chunk(sql, data_list, conn): start = 0 while True: end = start + 10000 if end >= len(data_list): end = len(data_list) if start >= len(data_list): break Mysql.insertMany(sql, data_list[start:end], conn) start = end @staticmethod def calc_ratings_index(num1, num2): """计算收视指数 收视指数 = 收视率/(近一年平均收视率 * 0.2) 收视指数 > 10 按10计算 收视指数 < 1 按1计算 Args: num1 收视率 Args: num2 近一年平均收视率 """ # 如果num1 或 num2为空,则直接返回指数为1 if num1 is None or num2 is None: return 1.0 ratings_index = float(num1) / (float(num2) * 0.2) if ratings_index > 10.0: ratings_index = 10.0 if ratings_index < 1.0: ratings_index = 1.0 return ratings_index @staticmethod def get_max_date_of_month(field): """获取给定月份的最大日期 """ if isinstance(field, datetime.date): month_str = field.strftime('%Y-%m-%d') _year = str(month_str.split('-')[0]) _month = str(month_str.split('-')[1]) max_date = calendar.monthrange(int(_year), int(_month)) date_str = _year + '-' + _month + '-' + str(max_date[1]) return date_str @staticmethod def get_first_date_of_yesterday(): # 当前日期 now = datetime.date.today() # 昨天日期 yesterday = now - datetime.timedelta(days=1) # 昨天的当月日期 first_day = datetime.date(yesterday.year, yesterday.month, 1) return first_day @staticmethod def get_max_date_of_one_year_ago(field): """获取给定月份一年前的日期 """ if isinstance(field, datetime.date): month_str = field.strftime('%Y-%m-%d') _year = str(month_str.split('-')[0]) _month = str(month_str.split('-')[1]) max_date = calendar.monthrange(int(_year)-1, int(_month)) date_str = str(int(_year) - 1) + '-' + _month + '-' + str(max_date[1]) return date_str