python脚本,备份获取Telegram 所有群组聊天记录、下载保存文件

✍️Auth:运维笔记       Date:2024/03/29       Cat:python相关       👁️:2,148 次浏览

前言,功能介绍

脚本功能,可在服务器设置定时任务,运行脚本。

1:生成html文件,提供css和js文件,方便阅读。

2:每次运行,单独生成以 “output-月-日-时” 名称的文件夹,下载信息在此文件夹。

3:图片和视频正常显示在聊天记录中,其他文件自行下载文件夹中,聊天记录可点击超链接直接打开。

4:默认备份24小时之内的记录,没有产生信息的群组不导出备份。需要更改时间的,可修改代码中 days=1 天数,默认为1天。

start_date = end_date - timedelta(days=1)

5:时区设置,默认为8时区,即中国时间。需要改时区,更改代码中,hours=8 的数字,8 为8区。

local_date = message.date + timedelta(hours=8)

1、导入所需模块:

from telethon.sync import TelegramClient
from telethon.tl.functions.messages import GetHistoryRequest
from telethon.tl.types import MessageMediaPhoto, MessageMediaDocument, DocumentAttributeFilename
from datetime import datetime, timedelta
import os
import asyncio
import pytz
import re
import html
import mimetypes

这里我们导入了我们所需的各种模块和类。Telethon 库用于与 Telegram API 进行交互,datetime 和 timedelta 用于处理日期和时间,os 用于文件操作,asyncio 用于异步处理,pytz 用于处理时区,re 用于正则表达式,html 用于HTML转义,mimetypes 用于根据 MIME 类型猜测文件扩展名。

2、配置 Telegram 客户端:

# Your Telegram API configuration
api_id = 'your_api_id'
api_hash = 'your_api_hash'
phone_number = '+your_phone_number'  # Must be a phone number associated with your Telegram account
css_js_directory = '../'
# 获取当前时间
current_time = datetime.now()
# 生成文件夹名称,格式为 output-月-日-小时,例如:output-3-29-12
output_directory = current_time.strftime("output-%m-%d-%H")

在这里,您需要将 api_id 和 api_hash 替换为您在 Telegram 应用注册 页面获取的相应值。phone_number 应该是您的已验证手机号。output_directory 是存储输出文件的目录,css_js_directory 是CSS和JavaScript文件的目录。

3、创建 Telegram 客户端:

client = TelegramClient('session_name', api_id, api_hash)
client.start(phone_number)

通过创建一个 TelegramClient 实例并使用给定的 API 凭据和电话号码来初始化它,然后使用 start() 方法登录 Telegram 客户端。

4、定义导出消息的异步函数:

async def export_messages(entity):
    # ...

这是一个异步函数,负责导出给定实体(例如对话或频道)的消息。它接受一个实体作为参数,并在该实体的消息历史中查找最近24小时内的消息。

5、获取消息历史

messages = await client(GetHistoryRequest(
    peer=entity,
    limit=100,
    offset_date=end_date,
    offset_id=0,
    min_id=0,
    add_offset=0,
    max_id=0,
    hash=0
))

这里使用 GetHistoryRequest 函数获取给定实体的消息历史。我们指定了一些参数,如 limit(消息数量限制)、offset_date(消息日期偏移)等。

6、处理消息内容

if message.media:
    if isinstance(message.media, MessageMediaPhoto):
        # 处理图片消息
    elif isinstance(message.media, MessageMediaDocument):
        # 处理文件消息
else:
    # 处理文本消息

这部分代码根据消息的类型进行处理。如果消息是图片或文件类型,则分别处理;否则假定为文本消息。

7、下载文件并生成HTML:

await client.download_media(message, file=document_path)

这里使用 download_media 函数下载文件,并将其保存到指定的文件路径中。然后,我们在HTML中添加相应的标记来显示文件或图片。

8、保存HTML内容到文件

with open(os.path.join(output_directory, filename), 'w', encoding='utf-8') as f:
    f.write(html_content)

最后,将生成的 HTML 内容写入文件。

9、主函数

async def main():
    async for dialog in client.iter_dialogs():
        if dialog.is_group:
            await export_messages(dialog.entity)
            await asyncio.sleep(5)  # 等待5秒后处理下一个对话

这个函数是主函数,它负责遍历所有对话,并对每个对话调用 export_messages 函数来导出消息。

10、运行主循环

with client:
    client.loop.run_until_complete(main())

这里通过 with 语句启动客户端,并运行主循环,即异步地执行主函数直到完成。

11、最后完整代码

from telethon.sync import TelegramClient
from telethon.tl.functions.messages import GetHistoryRequest
from telethon.tl.types import MessageMediaPhoto, MessageMediaDocument, DocumentAttributeFilename
from datetime import datetime, timedelta
import os
import asyncio
import pytz
import re
import html
import mimetypes

# Your Telegram API configuration
api_id = 'your_api_id'
api_hash = 'your_api_hash'
phone_number = '+your_phone_number'  # Must be a phone number associated with your Telegram account
#output_directory = 'output'
css_js_directory = '../'
# 获取当前时间
current_time = datetime.now()
# 生成文件夹名称,格式为 output-月-日-小时,例如:output-3-29-12
output_directory = current_time.strftime("output-%m-%d-%H")

# Create a Telegram client
client = TelegramClient('session_name', api_id, api_hash)
client.start(phone_number)

async def export_messages(entity):
    # Get messages within the last 24 hours
    end_date = datetime.now()
    start_date = end_date - timedelta(days=1)

    # Convert start_date to an aware datetime object with UTC timezone
    start_date = pytz.utc.localize(start_date)

    # Get message history within the last 24 hours
    try:
        messages = await client(GetHistoryRequest(
            peer=entity,
            limit=100,
            offset_date=end_date,
            offset_id=0,
            min_id=0,
            add_offset=0,
            max_id=0,
            hash=0
        ))
    except Exception as e:
        print(f"Failed to get message history for entity {entity}: {e}")
        return

    # Filter messages within the last 24 hours
    recent_messages = [message for message in messages.messages if message.date.replace(tzinfo=pytz.utc) > start_date]

    # If no recent messages found, return without generating file or printing anything
    if not recent_messages:
        return

    # Sort recent messages in ascending order by date
    recent_messages.sort(key=lambda x: x.date)

    # Create output directory if it doesn't exist
    os.makedirs(output_directory, exist_ok=True)

    # Use entity title to generate filename
    filename = re.sub(r'[^\w\s]', '', entity.title) + '.html'

    # Build HTML content
    html_content = f"""
<!DOCTYPE html>
<html>
<head>
  <meta charset="utf-8"/>
  <title>Exported Data</title>
  <meta content="width=device-width, initial-scale=1.0" name="viewport"/>
  <link href="{css_js_directory}css/style.css" rel="stylesheet"/>
  <script src="{css_js_directory}js/script.js" type="text/javascript"></script>
  <style>
    .message img, .message video {{
      max-width: 100%;
      height: auto;
    }}
  </style>
</head>
<body onload="CheckLocation();">
  <div class="page_wrap">
    <div class="page_header">
      <div class="content">
        <div class="text bold">
          {entity.title}
        </div>
      </div>
    </div>
    <div class="page_body chat_page">
      <div class="history">
"""

    # Add message content in reverse chronological order
    for message in recent_messages:
        sender = await get_sender(message.sender_id)
        if sender:
            sender_name = f"{sender.first_name} {sender.last_name}" if hasattr(sender, 'first_name') and hasattr(sender, 'last_name') else "Unknown"
        else:
            sender_name = "Unknown"

        # Convert UTC time to local time (assuming UTC+8)
        local_date = message.date + timedelta(hours=8)  # Add 8 hours to adjust for UTC+8

        # Format message date to include month and day
        formatted_date = local_date.strftime('%m-%d %H:%M')

        # Check message content type
        if message.media:
            if isinstance(message.media, MessageMediaPhoto):
                # If message contains a photo, include it in the HTML
                photo_path = os.path.join(output_directory, f"photo_{message.id}.jpg")
                await client.download_media(message, file=photo_path)
                html_content += f"""
      <div class="message default clearfix" id="message-{message.id}">
        <div class="pull_left userpic_wrap">
          <div class="userpic userpic7" style="width: 42px; height: 42px">
            <div class="initials" style="line-height: 42px">{sender_name[0]}</div>
          </div>
        </div>
        <div class="body">
          <div class="pull_right date details" title="{local_date.strftime('%Y-%m-%d %H:%M:%S')}">
            {formatted_date}
          </div>
          <div class="from_name">{sender_name}</div>
          <img src="{os.path.basename(photo_path)}" alt="Photo"/>
        </div>
      </div>
"""
            elif isinstance(message.media, MessageMediaDocument):
                # If message contains a document, include a link to download it in the HTML
                document_filename = html.escape(get_document_filename(message.media.document))
                document_path = os.path.join(output_directory, f"document_{message.id}.{get_file_extension(message.media.document)}")
                await client.download_media(message, file=document_path)
                html_content += f"""
      <div class="message default clearfix" id="message-{message.id}">
        <div class="pull_left userpic_wrap">
          <div class="userpic userpic7" style="width: 42px; height: 42px">
            <div class="initials" style="line-height: 42px">{sender_name[0]}</div>
          </div>
        </div>
        <div class="body">
          <div class="pull_right date details" title="{local_date.strftime('%Y-%m-%d %H:%M:%S')}">
            {formatted_date}
          </div>
          <div class="from_name">{sender_name}</div>
          <div class="text"><a href="{os.path.basename(document_path)}">{document_filename}</a></div>
        </div>
      </div>
"""

        else:
            # If message is a text message and not None, include it in the HTML
            if message.message:
                html_content += f"""
      <div class="message default clearfix" id="message-{message.id}">
        <div class="pull_left userpic_wrap">
          <div class="userpic userpic7" style="width: 42px; height: 42px">
            <div class="initials" style="line-height: 42px">{sender_name[0]}</div>
          </div>
        </div>
        <div class="body">
          <div class="pull_right date details" title="{local_date.strftime('%Y-%m-%d %H:%M:%S')}">
            {formatted_date}
          </div>
          <div class="from_name">{sender_name}</div>
          <div class="text">{html.escape(message.message)}</div>
        </div>
      </div>
"""

    # Add HTML closing tags
    html_content += """
      </div>
    </div>
  </div>
</body>
</html>
"""

    # Write HTML content to file
    with open(os.path.join(output_directory, filename), 'w', encoding='utf-8') as f:
        f.write(html_content)

async def get_sender(sender_id):
    try:
        sender = await client.get_entity(sender_id)
        return sender
    except Exception as e:
        print(f"Failed to get sender information for sender_id {sender_id}: {e}")
        return None

def get_document_filename(document):
    for attribute in document.attributes:
        if isinstance(attribute, DocumentAttributeFilename):
            return attribute.file_name
    return "Unknown"

def get_file_extension(document):
    mime_type = document.mime_type.lower()
    if mime_type == "application/octet-stream":
        # Try to guess the file extension based on its contents
        return mimetypes.guess_extension(None, strict=False) or ".unknown"
    else:
        return mimetypes.guess_extension(mime_type) or ".unknown"

async def main():
    async for dialog in client.iter_dialogs():
        if dialog.is_group:
            await export_messages(dialog.entity)
            await asyncio.sleep(5)  # Wait for 5 seconds between exporting messages from different groups

with client:
    client.loop.run_until_complete(main())

代码结构为

├── css
│   └── style.css
├── js
│   └── script.js
├── output
│   ├── test.html
│   ├── document_136430..docx
│   ├── document_136431..txt
├── session_name.session
├── tg_backup.py

style.css和script.js代码下载

12、运行测试脚本:

生产的html文件,打开图片显示正常,文件文档也可以打开。

打赏作者

发表评论