Python(十五)群发微信群消息 用python实现群发微信群消息
一、前言
二、说明
代码主要用到两个库,我在这里重点讲一下
wxauto 和 itchat 这两个库有点不同,都是一部分功能可以拿到我想要的东西,一部分功能不能拿到,所以我只能结合在一起了 - - md
- 1)wxauto: 这个库顾名思义,就界面化的库,它有个很好的点就是,pc端登录了微信,他是可以直接拿到当前会话的,在当前会话中找到你的【微信联系人名称/微信群名】,但是暂时没找到方法能够直接获取到【微信群名称】,所以我调试的时候,就是先定义一个string或者一个list去遍历发消息,调试通;
但在这个的过程中,还有个这个库坑爹的问题:
- chatWith方法要求:最好完整匹配,不完全匹配只会选取搜索框第一个会话…
- 比如我三个test群:test_message1 / test_message2 / test_message22
- 然后我只想给【群名包含2的微信群】发消息,他只会给搜索到的第一个含【2】的微信群,重复发消息… 而不会再去打开test_message22的聊天窗口发了…
ChatWith(self, who, timeout=2) 源方法如图所示:
- 所以你只能完整匹配搜索,但是每个微信群名都不一样,所以我需要一个能够直接获取到【所有微信名称的方法】,这会就找到了 【itchat】这个库
2)itchat库
这个库的好处就是:有很多方法可以获取到微信的具体信息:头像/名称/签名…都可以拿到
但每次都需要人工扫QR登录微信,其实是登录了一个桌面微信,且会退出你原本在PC端已登录的微信;
itchat库里面也有发送消息的方法,可以直接调用。
我没用那个的原因是:wxauto可以类似UI自动化,更符合我的需求,它的界面化可以看到一个一个输入条件查询对话框并发送消息,也可以以防被封号。
另外我想提醒下这个库还有一个很坑爹的地方!!
在我调试了很多遍:发现很多群聊列表明明存在,但是却获取不到数据,最终找到原因如下:
原因1:微信群不活跃,自己在微信群里发一条消息就可以获取到
原因2:微信群未添加到通讯录
–> 这个最坑爹,因为无法批量添加到通讯录,只能一个一个微信群手动点进去添加到通讯录
那这样弄的话,还搞什么脚本… 人工累都累死?
查遍了全网,没看到有人解决这个,坐等大佬有没有别的方法拿到所有微信群聊列表,到时候评论区跟我互动一下!
三、代码
我的需求很简单:
1)就是给 [特定的微信客户群] 发送一条通知消息
2)为了管理及查看方便,做如下优化:
2.1)信息单独写在 msg.txt 里,更改消息只需更改txt内容即可
2.2)已发送的群和未发送的群分别写入两个excel关于其他功能的优化,大家可以在这个基础上继续完善
例如:发文件、图片等等信息 / 定时任务发送 / 同个微信群发多条消息 …
需要下载的库:
pip install itchat pandas wxauto openpyxl
- 注意:如果遇到qr扫码后,console报错:xml.parsers.expat.ExpatError: mismatched tag: line 63, column 4(itchat)
清一下缓存就行了
pip uninstall itchat
pip install itchat-uos==1.5.0.dev0
# -*- codeing = utf-8 -*-
# @time: 2024/12/14 0014 21:59
# @Author : Mikasa
import itchat
from wxauto import *
import time
import pandas as pd
# import schedule
# import turtle
string1 = "test_message2"
string2 = "test_message22"
# string2 = "清关"
def get_chatroom_list_by_name(string1, string2):
"""
根据字符串获取指定微信群列表
:return: 要发送msg的微信群列表
"""
itchat.auto_login()
chatrooms = itchat.get_chatrooms()
chatroom_list_send = []
chatroom_list_nosend = []
for chatroom in chatrooms:
# 打印群组名称
# todo 由于微信限制,只能获取到活跃的且保存在通讯录的群聊
chatroom_name = chatroom['NickName']
if string1 in chatroom_name or string2 in chatroom_name:
print(chatroom_name)
chatroom_list_send.append(chatroom_name)
else:
# print("不发送", chatroom_name)
chatroom_list_nosend.append(chatroom_name)
print("发送微信群list:", chatroom_list_send)
print("发送群聊共:", str(len(chatroom_list_send)))
print("未发送微信群list:", chatroom_list_nosend)
print("未发送群聊共:", str(len(chatroom_list_nosend)))
df_send = pd.DataFrame(chatroom_list_send, columns=['微信群名称'])
df_nosend = pd.DataFrame(chatroom_list_nosend, columns=['微信群名称'])
df_send.to_excel('已发送群消息微信群.xlsx', index=False)
df_nosend.to_excel('未发送群消息微信群.xlsx', index=False)
itchat.logout()
return chatroom_list_send
def get_telex_chatroom_list():
"""
获取所有群聊列表:通讯录 + 活跃
:return:
"""
itchat.auto_login()
chatrooms = itchat.get_chatrooms()
chatroom_list = []
for chatroom in chatrooms:
# 打印群组名称
# todo 由于微信限制,只能获取到活跃的且保存在通讯录的群聊
chatroom_name = chatroom['NickName']
print(chatroom_name)
chatroom_list.append(chatroom_name)
print("最终筛选出来的微信群list:", chatroom_list)
print("发送群聊共:", str(len(chatroom_list)))
itchat.logout()
return chatroom_list
# def logout_wechat():
# itchat.logout()
def get_wechat_session():
"""
获取PC已登录微信的会话
:return:
"""
# 获取当前微信客户端
wx = WeChat()
# 获取会话列表
wx.GetSessionList()
return wx
def send_msg_to_single_user(wx, msg, user):
"""
向单个好友发送消息
:param wx:
:param self:
:param msg:
:param user:
:return:
"""
try:
print(f"向用户`{user}`发送消息:{msg}")
wx.ChatWith(user) # 打开`对方`聊天窗口
wx.SendMsg(msg)
print("发送完毕")
except Exception as e:
print("发送失败,原因:", e)
def send_msg_to_mul_user(wx, msg, user_list):
"""
给多个好友发送单条消息
:param wx:
:param msg: 消息
:param user_list: 聊天框用户list
:return:
"""
try:
for user in user_list:
# if sub_string in user:
# print("发送消息的用户", sub_string)
print(f"向用户`{user}`发送消息:{msg}")
wx.ChatWith(user) # 打开`对方`聊天窗口
wx.SendMsg(msg)
print("发送完毕")
# else:
# print(f"用户{user}不包含{sub_string}")
except Exception as e:
print("发送失败,原因:", e)
def send_file_to_single_user(wx, file, user):
"""
向单个好友发送单个文件
:param wx:
:param self:
:param file:
:param user:
:return:
"""
try:
print(f"向用户`{user}`发送文件:{file}")
wx.ChatWith(user) # 打开`对方`聊天窗口
wx.SendFiles(file)
print("发送完毕")
except Exception as e:
print("发送失败,原因:", e)
def send_files_to_mul_user(wx, files, users):
"""
向多个用户发多个文件
:param wx:
:param self:
:param files:
:param users:
:return:
"""
for who in users:
wx.ChatWith(who)
for file in files:
print(f"向用户`{who}`发送文件:{file}")
wx.SendFiles(file)
print("发送成功")
def read_msg():
"""
读取信息内容
:return: 信息
"""
with open('msg.txt', 'r', encoding='utf-8') as file:
lines = file.readlines()
msg_string = "".join(lines)
return msg_string
if __name__ == '__main__':
# 读取外部txt 信息
msg = read_msg()
# 根据string获取聊天群列表
chatroom_list = get_chatroom_list_by_name(string1, string2)
# 获取所有微信群列表
# chatroom_list = get_telex_chatroom_list()
time.sleep(20) # 等待30s PC端重新扫码登录
wechat_session = get_wechat_session()
send_msg_to_mul_user(wechat_session, msg, chatroom_list)