common.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. #/usr/bin/env python
  2. #coding=utf-8
  3. from mysql.connector.connection import MySQLConnection
  4. import commands
  5. import datetime
  6. import calendar
  7. from fty_util.config import APP_CFG
  8. class Mysql(object):
  9. def __init__(self):
  10. pass
  11. @staticmethod
  12. def createOnlineConn():
  13. # 如果需要使用连接池功能,可以多指定一个参数,pool_size=10或者pool_name
  14. config = {
  15. 'user': APP_CFG.ONLINE_CONFIG_USER,
  16. 'password': APP_CFG.ONLINE_CONFIG_PASSWORD,
  17. 'host': APP_CFG.ONLINE_CONFIG_HOST,
  18. 'port': APP_CFG.ONLINE_CONFIG_PORT
  19. }
  20. # config = {
  21. # 'user': 'root',
  22. # 'password': 'huojutech_yaozhi!23',
  23. # 'host': '121.41.17.212',
  24. # # 'database': 'yxb',
  25. # 'port': 3306
  26. # }
  27. # 如果conn是直接新建的连接则它会被关闭,如果是从线程池中分配一个连接则会被归还给连接池
  28. cnx = MySQLConnection()
  29. try:
  30. cnx.connect(**config)
  31. except Exception, e:
  32. print e
  33. cnx.reconnect(attempts=3, delay=0)
  34. return cnx
  35. @staticmethod
  36. def createOfflineConn():
  37. # 如果需要使用连接池功能,可以多指定一个参数,pool_size=10或者pool_name
  38. config = {
  39. 'user': APP_CFG.OFFLINE_CONFIG_USER,
  40. 'password': APP_CFG.OFFLINE_CONFIG_PASSWORD,
  41. 'host': APP_CFG.OFFLINE_CONFIG_HOST,
  42. 'port': APP_CFG.OFFLINE_CONFIG_PORT
  43. }
  44. # config = {
  45. # 'user': 'root',
  46. # 'password': 'huojutech_yaozhi!23',
  47. # 'host': '121.41.17.212',
  48. # # 'database': 'yxb',
  49. # 'port': 3306
  50. # }
  51. # 如果conn是直接新建的连接则它会被关闭,如果是从线程池中分配一个连接则会被归还给连接池
  52. cnx = MySQLConnection()
  53. try:
  54. cnx.connect(**config)
  55. except Exception, e:
  56. print e
  57. cnx.reconnect(attempts=3, delay=0)
  58. return cnx
  59. @staticmethod
  60. def createScrapyConn():
  61. # 如果需要使用连接池功能,可以多指定一个参数,pool_size=10或者pool_name
  62. config = {
  63. 'user': APP_CFG.SCRAPY_CONFIG_USER,
  64. 'password': APP_CFG.SCRAPY_CONFIG_PASSWORD,
  65. 'host': APP_CFG.SCRAPY_CONFIG_HOST,
  66. 'port': APP_CFG.SCRAPY_CONFIG_PORT
  67. }
  68. # config = {
  69. # 'user': 'root',
  70. # 'password': 'huojutech_yaozhi!23',
  71. # 'host': '121.41.17.212',
  72. # 'port': 3306
  73. # }
  74. # 如果conn是直接新建的连接则它会被关闭,如果是从线程池中分配一个连接则会被归还给连接池
  75. cnx = MySQLConnection()
  76. try:
  77. cnx.connect(**config)
  78. except Exception, e:
  79. print e
  80. cnx.reconnect(attempts=3, delay=0)
  81. return cnx
  82. @staticmethod
  83. def getCursor(conn=None, buffered=None):
  84. if not conn.is_connected():
  85. if conn is not None:
  86. conn.close()
  87. conn.reconnect(attempts=5)
  88. if buffered is not None:
  89. cursor = conn.cursor(buffered=True)
  90. else:
  91. cursor = conn.cursor()
  92. return cursor
  93. # if Mysql.__pool is None:
  94. # __pool = PooledDB(creator=mysql.connector, mincached=1, maxcached=20,
  95. # host=MYSQL_HOST,
  96. # port=MYSQL_PORT,
  97. # db=MYSQL_DBNAME,
  98. # user=MYSQL_USER,
  99. # passwd=MYSQL_PASSWD,
  100. # charset='utf8')
  101. # return __pool.connection()
  102. @staticmethod
  103. def getAll(sql, param=None, conn=None):
  104. # conn = self.getConn()
  105. cursor = Mysql.getCursor(conn=conn)
  106. """
  107. @summary: 执行查询, 并取出所有结果集
  108. @param sql: 查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
  109. @param param: 可选参数,条件列表值 (元组/列表)
  110. @return: result list(字典对象)/boolean 查询到的结果集
  111. """
  112. if param is None:
  113. cursor.execute(sql)
  114. else:
  115. cursor.execute(sql, param)
  116. cols = [t[0] for t in cursor.description]
  117. result = cursor.fetchall()
  118. if result:
  119. cursor.close()
  120. return [dict(zip(cols, row)) for row in result]
  121. else:
  122. cursor.close()
  123. return result
  124. @staticmethod
  125. def selectAll(sql, param=None, conn=None):
  126. # conn = self.getConn()
  127. cursor = Mysql.getCursor(conn=conn)
  128. """
  129. @summary: 执行查询, 并取出所有结果集
  130. @param sql: 查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
  131. @param param: 可选参数,条件列表值 (元组/列表)
  132. @return: result list查询到的结果集
  133. """
  134. if param is None:
  135. cursor.execute(sql)
  136. else:
  137. cursor.execute(sql, param)
  138. cols = [t[0] for t in cursor.description]
  139. result = cursor.fetchall()
  140. cursor.close()
  141. return result
  142. @staticmethod
  143. def getOne(sql, param=None, conn=None):
  144. # conn = self.getConn()
  145. cursor = Mysql.getCursor(conn=conn, buffered=True)
  146. """
  147. @summary: 执行查询,并取出第一条
  148. @param sql: 查询SQL, 如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
  149. @param param: 可选参数,条件列表值(元组/列表)
  150. @return: result list/boolean 查询到的结果集
  151. """
  152. if param is None:
  153. count = cursor.execute(sql)
  154. else:
  155. count = cursor.execute(sql, param)
  156. result = cursor.fetchone()
  157. cursor.close()
  158. return result
  159. @staticmethod
  160. def getMany(sql, num, param=None, conn=None):
  161. # conn = self.getConn()
  162. cursor = Mysql.getCursor(conn=conn)
  163. """
  164. @summary: 执行查询, 并取出num条结果
  165. @param sql: 查询SQL, 如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
  166. @param num: 取得的结果条数
  167. @param param: 可选参数, 条件列表值(元组/列表)
  168. @return: result list/boolean 查询到的结果集
  169. """
  170. if param is None:
  171. count = cursor.execute(sql)
  172. else:
  173. count = cursor.execute(sql, param)
  174. result = cursor.fetchmany(num)
  175. cursor.close()
  176. return result
  177. @staticmethod
  178. def insertOne(sql, value=None, conn=None):
  179. # conn = self.getConn()
  180. cursor = Mysql.getCursor(conn=conn)
  181. """
  182. @summary: 向数据表插入一条记录
  183. @param sql: 要插入的SQL格式
  184. @param value: 要插入的记录数据tuple/list
  185. @return: insertId 受影响的行数
  186. """
  187. if value is None:
  188. cursor.execute(sql)
  189. else:
  190. cursor.execute(sql, value)
  191. Mysql.dispose(cursor, conn)
  192. return Mysql.__getInsertId(conn)
  193. @staticmethod
  194. def insertMany(sql, values, conn):
  195. # conn = self.getConn()
  196. cursor = Mysql.getCursor(conn=conn)
  197. """
  198. @summary: 向数据表插入多条记录
  199. @param sql: 要插入的SQL格式
  200. @param values: 要插入的记录数据tuple(tuple)/list[list]
  201. @return: count 受影响的行数
  202. """
  203. count = cursor.executemany(sql, values)
  204. Mysql.dispose(cursor, conn)
  205. return count
  206. @staticmethod
  207. def __getInsertId(conn):
  208. # conn = self.getConn()
  209. cursor = Mysql.getCursor(conn=conn)
  210. """
  211. 获取当前连接最后一次插入操作生成的id,如果没有则为0
  212. """
  213. cursor.execute("select @@identity as id")
  214. result = cursor.fetchall()
  215. cursor.close()
  216. return result[0][0]
  217. @staticmethod
  218. def __query(sql, param=None, conn=None):
  219. # conn = Mysql.getConn()
  220. cursor = Mysql.getCursor(conn=conn)
  221. if param is None:
  222. count = cursor.execute(sql)
  223. else:
  224. count = cursor.execute(sql, param)
  225. Mysql.dispose(cursor, conn)
  226. return count
  227. @staticmethod
  228. def execute(sql, param=None, conn=None):
  229. # conn = self.getConn()
  230. cursor = Mysql.getCursor(conn=conn)
  231. if param is None:
  232. count = cursor.execute(sql)
  233. else:
  234. count = cursor.execute(sql, param)
  235. Mysql.dispose(cursor, conn)
  236. return count
  237. @staticmethod
  238. def updateMany(sql, param=None, conn=None):
  239. # conn = Mysql.getConn()
  240. cursor = Mysql.getCursor(conn=conn)
  241. count = cursor.executemany(sql, param)
  242. return count
  243. @staticmethod
  244. def update(sql, param=None, conn=None):
  245. """
  246. @summary: 更新数据表记录
  247. @param sql: sql格式及条件, 使用(%s, %s)
  248. @param param: 要更新的 值 tuple/list
  249. @return: count 受影响的行数
  250. """
  251. return Mysql.__query(sql, param=param, conn=conn)
  252. @staticmethod
  253. def delete(sql, param=None, conn=None):
  254. """
  255. @summary: 删除数据表记录
  256. @param sql: sql格式及条件,使用(%s, %s)
  257. @param param: 要删除的条件 值 tuple/list
  258. @return: count 受影响的行数
  259. """
  260. return Mysql.__query(sql, param=param, conn=conn)
  261. @staticmethod
  262. def dispose(cursor, conn):
  263. """
  264. @summary: 释放连接池资源
  265. """
  266. conn.commit()
  267. cursor.close()
  268. @staticmethod
  269. def close(conn):
  270. if conn:
  271. conn.close()
  272. @staticmethod
  273. def cmd(cmd):
  274. status, output = commands.getstatusoutput(cmd)
  275. if status != 0:
  276. print '同步线上失败'
  277. else:
  278. print '同步线上成功'
  279. class Util(object):
  280. @staticmethod
  281. def insert_by_chunk(sql, data_list, conn):
  282. start = 0
  283. while True:
  284. end = start + 10000
  285. if end >= len(data_list):
  286. end = len(data_list)
  287. if start >= len(data_list):
  288. break
  289. Mysql.insertMany(sql, data_list[start:end], conn)
  290. start = end
  291. @staticmethod
  292. def calc_ratings_index(num1, num2):
  293. """计算收视指数
  294. 收视指数 = 收视率/(近一年平均收视率 * 0.2)
  295. 收视指数 > 10 按10计算
  296. 收视指数 < 1 按1计算
  297. Args: num1 收视率
  298. Args: num2 近一年平均收视率
  299. """
  300. # 如果num1 或 num2为空,则直接返回指数为1
  301. if num1 is None or num2 is None:
  302. return 1.0
  303. ratings_index = float(num1) / (float(num2) * 0.2)
  304. if ratings_index > 10.0:
  305. ratings_index = 10.0
  306. if ratings_index < 1.0:
  307. ratings_index = 1.0
  308. return ratings_index
  309. @staticmethod
  310. def get_max_date_of_month(field):
  311. """获取给定月份的最大日期
  312. """
  313. if isinstance(field, datetime.date):
  314. month_str = field.strftime('%Y-%m-%d')
  315. _year = str(month_str.split('-')[0])
  316. _month = str(month_str.split('-')[1])
  317. max_date = calendar.monthrange(int(_year), int(_month))
  318. date_str = _year + '-' + _month + '-' + str(max_date[1])
  319. return date_str
  320. @staticmethod
  321. def get_first_date_of_yesterday():
  322. # 当前日期
  323. now = datetime.date.today()
  324. # 昨天日期
  325. yesterday = now - datetime.timedelta(days=1)
  326. # 昨天的当月日期
  327. first_day = datetime.date(yesterday.year, yesterday.month, 1)
  328. return first_day
  329. @staticmethod
  330. def get_max_date_of_one_year_ago(field):
  331. """获取给定月份一年前的日期
  332. """
  333. if isinstance(field, datetime.date):
  334. month_str = field.strftime('%Y-%m-%d')
  335. _year = str(month_str.split('-')[0])
  336. _month = str(month_str.split('-')[1])
  337. max_date = calendar.monthrange(int(_year)-1, int(_month))
  338. date_str = str(int(_year) - 1) + '-' + _month + '-' + str(max_date[1])
  339. return date_str