Administrator
发布于 2024-04-11 / 150 阅读
1

Python程序 用于批量下载腾讯企业邮箱中的邮件 - 曲速引擎(Warp Drive)

解决现状

由于企业抠门,导致无法备份企业箱中的邮件,因此只能采用这种效率低的方式进行备份

实现效果

通过某种API强制修改离职用户的邮箱密码,然后调用这个程序多进程下载邮件信息到存储文件夹

# 企业邮箱批量下载邮件算法


from imapclient import IMAPClient
from imapclient.exceptions import LoginError
from imapclient.exceptions import IMAPClientError
from multiprocessing import Pool
import time
import eml_parser
import random
import re
import os


'''
重构代码,多进程稳定版
'''

class downloadMailEMLClass:
    def __init__(self,server,port,username,password):
        self.server = server
        self.port = port
        self.username = username
        self.password = password
        self.user = None #全局登录用户
        self.folderNameList = [] #用于存储用户文件夹信息,中文英文
        self.userFolderPath = None #用于存储用户文件夹路径

    def run(self):
        self._login()
        self._getFolders()
        self._mkdirFolds()
        self._downMailCore()

    #下载邮件核心,控制逻辑
    def _downMailCore(self):
        for dict in self.folderNameList:
            idList = self._getMailIdList(dict["English"])
            if idList:
                for id in idList:
                    body = self._getMailBody(id)
                    subject,createTime = self._pasreBody(body)
                    print("[_downMailCore_Save] : {}".format(subject))
                    userFolderPath = os.path.join(self.userFolderPath,dict["Chinese"])
                    putMailStorage(subject,body,createTime,userFolderPath)

    def getIdList(self,dict):

        return self._getMailIdList(dict["English"])

    def getFolderNameList(self):

        return self.folderNameList


    def _mulDownMailCore(self,idList,folderChinese):
        if idList:
            #print(id)
            body = self._getMailBody(idList[0])
            subject, createTime = self._pasreBody(body)
            print("[_downMailCore_Save] : {}".format(subject))
            userFolderPath = os.path.join(self.userFolderPath, folderChinese)
            putMailStorage(subject, body, createTime, userFolderPath)
	#本文由 曲速引擎(Warp Drive)个人博客 曲速引擎(Warp Drive)CSDN技术博客 创作,转载请说明出处谢谢
	#https://blog.csdn.net/siberiaWarpDrive
	#https://www.exp-9.com/

    def mulRun(self,idList,folderDict):
        self.user.select_folder(folderDict["English"], readonly=True)
        self._mulDownMailCore(idList,folderDict["Chinese"])

    def init(self):
        self._login()
        self._getFolders()
        self._mkdirFolds()

    #登录模块,返回全局信息
    def _login(self):
        try:
            self.user = IMAPClient(host=self.server,port=self.port)
            self.user.login(username=self.username,password=self.password)
        except LoginError as Error:
            print("[_LOGON_FUN_ERROR] LOGIN FAIL {}".format(Error))
            exit()


    #制作文件夹字典
    def _parseDict(self,english,chinese):
        folderNameDict = {}
        folderNameDict["English"] = english
        folderNameDict["Chinese"] = chinese
        return folderNameDict

    #获取用户邮箱文件夹,返回列表,列表成员为字典{"English":english,"Chinese":chinese}
    def _getFolders(self):
        folderNameDulpList = self.user.list_folders()
        for folderNameDulp in folderNameDulpList:
            folderName = folderNameDulp[-1]
            if folderName == "INBOX" :
                self.folderNameList.append(self._parseDict(folderName,"收件箱"))
            elif folderName == "Drafts":
                self.folderNameList.append(self._parseDict(folderName,"草稿箱"))
            elif folderName == "Sent Messages":
                self.folderNameList.append(self._parseDict(folderName,"已发送的邮件"))
            elif folderName == "Deleted Messages":
                self.folderNameList.append(self._parseDict(folderName, "已删除的邮件"))
            elif folderName == "Junk":
                self.folderNameList.append(self._parseDict(folderName, "垃圾邮件"))
            else:
                self.folderNameList.append(self._parseDict(folderName, folderName))

    #创建存储文件夹
    def _mkdirFolds(self):
        serverPath = os.path.abspath(os.path.dirname(__file__))
        backupPath = os.path.join(serverPath,"backUPMailFolder")
        self.userFolderPath = os.path.join(backupPath,self.username)
        #print(self.userFolderPath)
        if os.path.exists(backupPath):
            pass
        else:
            os.mkdir(backupPath)
        if os.path.exists(self.userFolderPath):
            pass
        else:
            os.mkdir(self.userFolderPath)
        #递归创建文件夹
        for folderName in self.folderNameList:
            folderNamePath = os.path.join(self.userFolderPath,folderName["Chinese"])
            if os.path.exists(folderNamePath):
                pass
            else:
                os.mkdir(folderNamePath)



    #获取邮件ID列表
    def _getMailIdList(self,folderName):
        self.user.select_folder(folderName, readonly=True)
        try:
            mailIdList = self.user.search()
            print("[_getMailIdList_Length] : {} ".format(len(mailIdList)))
            return mailIdList
        except IMAPClientError as CErr:
            print("[_GETMAILIDLIST_ERROR] : {}".format(CErr))


    #通过邮件ID列表获取邮件主体列表
    def _getMailBody(self,id):
        try:
            body = self.user.fetch(id, [b'BODY[]'])[id][b'BODY[]']
            parser = eml_parser.EmlParser()
            print("[_getMailBody_Download] : {}".format(parser.decode_email_bytes(body)["header"]["subject"]))
            return body
        except KeyError as KErr:
            print("[_GetMailList_KEYERROR] : {}".format(KErr))
        except IMAPClientError as IErr:
            print("[_GetMailList_IMAPClientERROR] : {}".format(IErr)

    #分析邮件主体
    def _pasreBody(self,body):
        parser = eml_parser.EmlParser()
        subject = parser.decode_email_bytes(body)["header"]["subject"]
        createTime = parser.decode_email_bytes(body)["header"]["date"].strftime("%Y-%m-%d+%H-%M-%S")
        return subject,createTime

#存储信息
def putMailStorage(mailTitle,mailBody,mailCreateTime,userFolderPath):
    randomStr = ""
    baseStr = 'ABCDEFGHIGKLMNOPQRSTUVWXYZabcdefghigklmnopqrstuvwxyz0123456789'
    for i in range(6):
        randomStr += baseStr[random.randint(0, len(baseStr)-1)]
    ##文件名字未处理好,存在特殊字符存储报错
    filterRules = [':', '/', '\\', '\*', '\?', '"', '<', '>', '\|']
    for ruls in filterRules:
        try:
            tmp = re.sub(ruls, "_", mailTitle)
        except re.error as rErr:
            tmp = re.sub(r"\\", "_", mailTitle)
        mailTitle = tmp
    file = userFolderPath + "\\" + mailTitle + "_" + mailCreateTime + "_" + randomStr + "_" + ".eml"
    with open(file, "wb+") as f:f.write(mailBody)

	#本文由 曲速引擎(Warp Drive)个人博客 曲速引擎(Warp Drive)CSDN技术博客 创作,转载请说明出处谢谢
	#https://blog.csdn.net/siberiaWarpDrive
	#https://www.exp-9.com/
def mulCore(worker,folderDict,server,port,username,password):
    d = downloadMailEMLClass(server=server, port=port, username=username, password=password)
    d.init()
    d.mulRun(worker,folderDict)




if __name__ == '__main__':
    print("[+] INFO : Foxmail Information Download System")
    server = "imap.exmail.qq.com"
    port = 993
    username = "[email protected]"
    password = "xxx"
    startTime = time.time()
    d = downloadMailEMLClass(server=server,port=port,username=username,password=password)
    d.init()
    folderDictList = d.getFolderNameList()
    for folderDict in folderDictList:
        idList = d.getIdList(folderDict)
        if idList:
            workerList = []
            pList = []
            length = 1
            for i in range(0, len(idList), length):
                workerList.append(idList[i:i + length])
            p = Pool()
            for worker in workerList:
                pList.append(p.apply_async(mulCore, args=(worker, folderDict, server, port, username, password,)))
	#本文由 曲速引擎(Warp Drive)个人博客 曲速引擎(Warp Drive)CSDN技术博客 创作,转载请说明出处谢谢
	#https://blog.csdn.net/siberiaWarpDrive
	#https://www.exp-9.com/
            p.close()
            p.join()
    endTime = time.time()
    print("[Main-INFO] : Total usage time {} /s".format(endTime-startTime))

本文由 曲速引擎(Warp Drive)个人博客 曲速引擎(Warp Drive)CSDN技术博客创作,转载请说明出处谢谢