一、添加钉钉机器人
步骤一,登录钉钉,在机器人管理页面选择“自定义”机器人,输入机器人名字并选择要发送消息的群。如果需要的话,可以为机器人设置一个头像。点击“完成添加”,完成后会生成Webhook地址。
步骤二,点击“复制”按钮,即可获得这个机器人对应的Webhook地址,其格式如下:
1
| https://oapi.dingtalk.com/robot/send?access_token=xxxxxxxx
|
二、获取timestamp与sign参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
timestamp = str(round(time.time() * 1000))
secret = 'this is secret'
secret_enc = secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
|
三、推送消息完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
| import time
import hmac
import hashlib
import base64
import urllib.parse
import requests
import json
from common.common_logger import Logger
'''
对接钉钉消息通知
'''
basepagelog = Logger('logger', r'D:\po_test\outputs\\').getlog()
def dingding(test_report):
timestamp = str(round(time.time() * 1000))
secret = 'this is secret'
secret_enc = secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
access_token = ''
url = 'https://oapi.dingtalk.com/robot/send?access_token={}×tamp={}&sign={}'.format(access_token,
timestamp, sign)
str_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
data = {"msgtype": "markdown",
"markdown": {
"title": "测试已完成",
"text": r"#### {}测试报告生成成功 \n> 存放地址:\\192.168.1.80\share\TestFolder\自动化分享\测试文件夹\n> "
r"![微信截图_20200817153548.png](https://i.loli.net/2020/08/17/gKVw2qTXYf3aPRG.png)"
r"\n> ###### {}生成成功\n".format(test_report, str_time)
},
"at": {"atMobiles": ["15386174586"]}
}
headers = {'Content-Type': 'application/json'}
message = requests.post(url, json.dumps(data), headers=headers).json()
if message['errmsg'] == 'ok':
return basepagelog.info('钉钉消息推送成功')
else:
return basepagelog.error('钉钉消息推送失败')
|