Files
my-assistant/scripts/email_summary.py

207 lines
7.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
邮件摘要脚本 - 每天早上9点推送前一天的邮件摘要
重点关注 fj@77ircloud.com 的邮件
需要回应的邮件会标记提醒
⚠️ 安全规则:此脚本仅读取邮件,不执行任何删除操作
"""
import imaplib
import ssl
import email
from email.header import decode_header
from datetime import datetime, timedelta
def decode_str(s):
if s is None:
return ''
parts = decode_header(s)
result = []
for part, charset in parts:
if isinstance(part, bytes):
charset = charset or 'utf-8'
try:
result.append(part.decode(charset, errors='replace'))
except:
result.append(part.decode('utf-8', errors='replace'))
else:
result.append(part)
return ''.join(result)
def get_body_preview(msg, max_len=200):
"""获取邮件正文预览"""
try:
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == 'text/plain':
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or 'utf-8'
body = payload.decode(charset, errors='replace')
return body[:max_len].replace('\n', ' ').strip()
else:
payload = msg.get_payload(decode=True)
if payload:
charset = msg.get_content_charset() or 'utf-8'
body = payload.decode(charset, errors='replace')
return body[:max_len].replace('\n', ' ').strip()
except:
pass
return ''
def needs_response(subject, body, sender):
"""判断邮件是否需要回应"""
text = (subject + ' ' + body).lower()
sender_lower = sender.lower()
# 需要回应的关键词
response_keywords = [
'请回复', '请回复我', '需回复', '需要回复',
'请回复确认', '请回复一下', '请回复告知',
'请确认', '请批准', '请审核', '请审批',
'期待回复', '希望回复', '烦请', '麻烦',
'请知悉', '请查收', '请查阅',
'请协助', '请配合', '请支持', '请安排', '请处理',
]
# 重要发件人
important_senders = ['fj@77ircloud.com']
return any(kw in text for kw in response_keywords) or any(s in sender_lower for s in important_senders)
def fetch_yesterday_emails():
"""获取昨天的邮件"""
try:
context = ssl.create_default_context()
mail = imaplib.IMAP4_SSL('imap.exmail.qq.com', 993, ssl_context=context)
mail.login('lgc@77ircloud.com', 'bVPPcg6XgB7qRPw7')
mail.select('INBOX')
status, messages = mail.search(None, 'ALL')
email_ids = messages[0].split()
yesterday = (datetime.now() - timedelta(days=1)).date()
all_emails = []
important_emails = []
need_response_emails = []
# 扫描最近50封
scan_range = email_ids[-50:] if len(email_ids) >= 50 else email_ids
for uid in reversed(scan_range):
try:
# 获取完整邮件(头部+正文)
status, msg_data = mail.fetch(uid, '(RFC822)')
if not msg_data or not msg_data[0]:
continue
raw_email = msg_data[0][1]
if isinstance(raw_email, tuple):
raw_email = raw_email[1]
msg = email.message_from_bytes(raw_email)
subject = decode_str(msg.get('Subject', '(无主题)'))
sender = decode_str(msg.get('From', ''))
date_str = msg.get('Date', '')
# 解析日期
try:
t = email.utils.parsedate_tz(date_str)
if t:
local_dt = datetime.fromtimestamp(email.utils.mktime_tz(t))
email_date = local_dt.date()
else:
continue
except:
continue
# 只取昨天的邮件
if email_date != yesterday:
continue
body = get_body_preview(msg, 300)
email_info = {
'subject': subject,
'sender': sender,
'date': date_str,
'body': body,
}
all_emails.append(email_info)
# 重点关注
if 'fj@77ircloud.com' in sender:
important_emails.append(email_info)
# 需要回应
if needs_response(subject, body, sender):
need_response_emails.append(email_info)
except Exception as e:
continue
mail.logout()
return all_emails, important_emails, need_response_emails, yesterday.strftime('%Y年%m月%d')
except Exception as e:
return [], [], [], str(e)
def format_summary():
"""格式化邮件摘要"""
emails, important, need_response, date_str = fetch_yesterday_emails()
if isinstance(important, str):
return f"⚠️ 获取邮件失败: {important}"
if not emails:
return f"📬 {date_str} 无新邮件\n\n祝你一天好心情!😊"
lines = [f"📬 {date_str} 邮件摘要\n"]
lines.append(f"共收到 **{len(emails)}** 封邮件\n")
# 需要回应的邮件(最优先)
if need_response:
lines.append(f"🔔 **需要您回应({len(need_response)} 封)**\n")
lines.append("" * 40)
for i, mail in enumerate(need_response, 1):
lines.append(f" {i}. 📧 {mail['subject']}")
lines.append(f" 发件人: {mail['sender']}")
if mail.get('body'):
lines.append(f" 摘要: {mail['body'][:150]}...")
lines.append("")
# 重点关注(排除已标记需要回应的)
important_only = [e for e in important if e not in need_response]
if important_only:
lines.append(f"⭐ **重点关注fj@77ircloud.com{len(important_only)} 封)**\n")
lines.append("" * 40)
for i, mail in enumerate(important_only, 1):
lines.append(f" {i}. 📧 {mail['subject']}")
lines.append(f" 发件人: {mail['sender']}")
lines.append("")
# 其他邮件
other = [e for e in emails if e not in important and e not in need_response]
if other:
lines.append(f"\n📋 **其他邮件({len(other)} 封)**\n")
lines.append("" * 40)
for i, mail in enumerate(other[:8], 1):
lines.append(f" {i}. 📧 {mail['subject']}")
lines.append(f" 发件人: {mail['sender']}")
lines.append("")
if len(other) > 8:
lines.append(f" ... 还有 {len(other) - 8} 封邮件")
lines.append("\n" + "=" * 40)
lines.append("💡 提示:以上标记「需要回应」的邮件,请及时处理!")
return "\n".join(lines)
if __name__ == '__main__':
print(format_summary())