之前一直使用Typora+各种博客(Wordpress/Hexo/Typecho)来进行笔记和写作,后来接触并爱上了语雀,主要是贴图太方便了。(使用Typora的时候会搭配PicGo+云存储,但是有时候会粘贴了多余的图片或者想替换已有图片时,懒得打开云存储进行删除,久而久之就忘了,造成了一定的空间浪费。)
刚开始用语雀的时候还特地看了下,可以导出md格式的文章。但最近想批量导出知识库时,发现只能选择PDF或者语雀特定的格式,数据不在自己手里感觉不大放心。于是弄了个脚本通过语雀官方API导出了全部文章,并开始寻找本地存储的笔记软件。
结合个人情况进行筛选后发现Obisidian比较适合,但是一开始不会用,不会怎么处理图片路径的问题。语雀是没有目录这个概念的,所以导出的文章都放到了一起,然后图片等资源也统一放到了文章目录中的某一目录。而如果我在Obsidian里通过建立多级文件夹的方式来分类文章,那么所有图片资源的链接都要进行改动,差点弃坑了。还好在B站看了关于ob的视频,学到了通过索引的方式来进行管理。

先上一张Obisidian的图:
export_yuque-0

语雀导出

新脚本

  • 【2022-09-18】

    • 新增:可一次性导出多个知识库,在输入知识库ID时用逗号,分隔即可
    • 新增:记录所下载的文档,并建立索引
    • 修复:将文档中不能作为文件名的字符进行编码
  • 【2022-09-17】

    • 新增:对文档中的附件及其链接进行记录,因为附件需要鉴权,无法直接下载
  • 【2022-09-16】:基于 pyuque 这个语雀第三方 Python-SDK 重写了脚本,目前用着还不错。

    • 新增:主要变动是将文档中的图片,以文档-序号的方式命名。原有的脚本用的是该图片在语雀中的文件名,是一长串的随机字符串,查找起来非常麻烦
    • 修复:发现部分旧文档中的图片链接是jpeg后缀,而原脚本只匹配了png后缀
  • 用法:安装相关依赖,然后在 语雀-Token 页面申请一个有读取权限的密钥,填入token变量然后执行脚本即可
# 安装依赖
$ pip3 install pyuque aiohttp huepy PrettyTable

# 执行脚本
$ python3 YuqueExport.py

YuqueExport.py

import sys
import re
import os
import asyncio
import aiohttp
from urllib import parse
from pyuque.client import Yuque
from huepy import *
from prettytable import PrettyTable


# 获取仓库列表
def get_repos(user_id):
    repos = {}
    for repo in yuque.user_list_repos(user_id)['data']:
        repo_id = str(repo['id'])
        repo_name = repo['name']
        repos[repo_id] = repo_name
    return repos


# 获取指定仓库下的文档列表
def get_docs(repo_id):
    docs = {}
    for doc in yuque.repo_list_docs(repo_id)['data']:
        doc_id = str(doc['id'])
        doc_title = doc['title']
        docs[doc_id] = doc_title
    return docs


# 获取文档Markdown代码
def get_body(repo_id, doc_id):
    doc = yuque.doc_get(repo_id, doc_id)
    body = doc['data']['body']
    body = re.sub("<a name=\"(\w.*)\"></a>", "", body)                 # 正则去除语雀导出的<a>标签
    body = re.sub(r'\<br \/\>!\[image.png\]', "\n![image.png]", body)  # 正则去除语雀导出的图片后紧跟的<br \>标签
    body = re.sub(r'\)\<br \/\>', ")\n", body)                         # 正则去除语雀导出的图片后紧跟的<br \>标签
    body = re.sub(r'png[#?](.*)+', 'png)', body)                       # 正则去除语雀图片链接特殊符号后的字符串
    body = re.sub(r'jpeg[#?](.*)+', 'jpeg)', body)                     # 正则去除语雀图片链接特殊符号后的字符串
    return body


async def download_md(repo_id, repo_name, doc_id, doc_title):
    body = get_body(repo_id, doc_id)

    # 创建文档目录及子目录
    repo_dir = os.path.join(base_dir, repo_name)
    make_dir(repo_dir)
    assets_dir = os.path.join(repo_dir, "assets")
    make_dir(assets_dir)

    # 保存图片
    pattern_images = r'(\!\[(.*)\]\((https:\/\/cdn\.nlark\.com\/yuque.*\/(\d+)\/(.*?\.[a-zA-z]+)).*\))'
    images = [index for index in re.findall(pattern_images, body)]
    if images:
        for index, image in enumerate(images):
            image_body = image[0]                                # 图片完整代码
            image_url = image[2]                                 # 图片链接
            image_suffix = image_url.split(".")[-1]              # 图片后缀
            local_abs_path = f"{assets_dir}/{doc_title}-{str(index)}.{image_suffix}"  # 保存图片的绝对路径
            local_md_path = f"![{doc_title}-{str(index)}](assets/{doc_title}-{str(index)}.{image_suffix})"  # 图片相对路径完整代码
            await download_images(image_url, local_abs_path)     # 下载图片
            body = body.replace(image_body, local_md_path)       # 替换链接

    # 保存附件: 直接访问附件会302跳转到鉴权页面,无法直接下载,因此这里仅作记录
    pattern_annexes = r'(\[(.*)\]\((https:\/\/www\.yuque\.com\/attachments\/yuque.*\/(\d+)\/(.*?\.[a-zA-z]+)).*\))'
    annexes = [index for index in re.findall(pattern_annexes, body)]
    if annexes:
        # 记录附件链接
        record_annex_output = f"## Annex-{repo_name}-{doc_title} \n"
        record_annex_file = os.path.join(base_dir, f"Annex-{repo_name}-{doc_title}.md")
        for index, annex in enumerate(annexes):
            annex_body = annex[0]                                # 附件完整代码
            annex_name = annex[1]                                # 附件名称
            print(que(f"File {index + 1}: {annex_name} ..."))
            record_annex_output += f"- {annex_body} \n"
        with open(record_annex_file, "w+") as f:
            f.write(record_annex_output)
        print(good(f"Found {len(annexes)} Files, Written into {record_annex_file}"))

    # 保存文档
    markdown_path = f"{repo_dir}/{doc_title}.md"
    with open(markdown_path, "w", encoding="utf-8") as f:
        f.write(body)

    # 记录下载文档
    record_doc_file = os.path.join(base_dir, f"{repo_name}.md")
    record_doc_output = f"- [{doc_title}](./{repo_name}/{doc_title}.md) \n"
    with open(record_doc_file, "a+") as f:
        f.write(record_doc_output)


async def download_images(image, local_name):
    print(good(f"Download {local_name} ..."))
    async with aiohttp.ClientSession() as session:
        async with session.get(image) as resp:
            with open(local_name, "wb") as f:
                f.write(await resp.content.read())


# 创建目录
def make_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)
        print(info(f"Make Dir {path} ..."))


async def main():
    # 获取用户ID
    user_id = yuque.user.get()['data']['id']

    # 获取知识库列表
    all_repos = get_repos(user_id)
    repo_table = PrettyTable(["ID", "Name"])
    for repo_id, repo_name in all_repos.items():
        repo_table.add_row([repo_id, repo_name])
    print(repo_table)

    # 输入知识库ID,可输出多个,以逗号分隔
    input_ids = input(lcyan("Repo ID (Example: 111,222): "))
    temp_ids = [ temp.strip() for temp in input_ids.split(",") ]

    # 检查全部知识库id
    for temp_id in temp_ids:
        if temp_id not in all_repos:
            print(bad(red(f"Repo ID {temp_id} Not Found !")))
            sys.exit(0)

    # 获取知识库全部文档
    for temp_id in temp_ids:
        repo = {temp_id: all_repos[temp_id]}     # 根据知识库ID获取知识库名称
        for repo_id, repo_name in repo.items():
            all_docs = get_docs(repo_id)
            print(cyan(f"\n=====  {repo_name}: {len(all_docs)} docs ===== "))
            # 获取文档内容
            for doc_id, doc_title in all_docs.items():
                # 将不能作为文件名的字符进行编码
                for char in r'/\<>?:"|*':
                    doc_title = doc_title.replace(char, parse.quote_plus(char))
                print(run(cyan(f"Get Doc {doc_title} ...")))
                await download_md(repo_id, repo_name, doc_id, doc_title)


if __name__ == '__main__':
    token = "<Your_Yuque_Token>"
    yuque = Yuque(token)
    base_dir = "./YuqueExport"
    asyncio.run(main())

旧脚本

  • 基于ExportMD进行了一些优化,修复部分Bug以及适配Obsidian

    • 正则去除语雀导出时可能存在的<a name="xxx"></a>标签
    • 导出的图片从./assets/修改为assets/,用于匹配Obsidian
  • 使用方法:

    • NameSpace:访问语雀个人主页https://www.yuque.com/<xxx>中的xxx部分
    • Token:访问语雀Token新建,只需要给读取权限即可。
$ python3 ExportMD.py
> 请输入语雀namespace: xxx
> 请输入语雀Token: xxx

export_yuque-1

  • ExportMD.py完整代码,感谢@杜大哥协助修复
# -*- coding: UTF-8 -*-

from prettytable import PrettyTable
import re
import os
import aiohttp
import asyncio
from urllib import parse
from PyInquirer import prompt, Separator
from examples import custom_style_2
from colr import color
from cfonts import render, say


class ExportMD:
    def __init__(self):
        self.repo_table = PrettyTable(["知识库ID", "名称"])
        self.namespace, self.Token = self.get_UserInfo()
        self.headers = {
            "Content-Type": "application/json",
            "User-Agent": "ExportMD",
            "X-Auth-Token": self.Token
        }
        self.repo = {}
        self.export_dir = './yuque'

    def print_logo(self):
        output = render('ExportMD', colors=['red', 'yellow'], align='center')
        print(output)

    # 语雀用户信息
    def get_UserInfo(self):
        f_name = ".userinfo"
        if os.path.isfile(f_name):
            with open(f_name, encoding="utf-8") as f:
                userinfo = f.read().split("&")
        else:
            namespace = input("请输入语雀namespace:")
            Token = input("请输入语雀Token:")
            userinfo = [namespace, Token]
            with open(f_name, "w") as f:
                f.write(namespace + "&" + Token)
        return userinfo

    # 发送请求
    async def req(self, session, api):
        url = "https://www.yuque.com/api/v2" + api
        # print(url)
        async with session.get(url, headers=self.headers) as resp:
            result = await resp.json()
            return result

    # 获取所有知识库
    async def getRepo(self):
        api = "/users/%s/repos" % self.namespace
        async with aiohttp.ClientSession() as session:
            result = await self.req(session, api)
            for repo in result.get('data'):
                repo_id = str(repo['id'])
                repo_name = repo['name']
                self.repo[repo_name] = repo_id
                self.repo_table.add_row([repo_id, repo_name])

    # 获取一个知识库的文档列表
    async def get_docs(self, repo_id):
        api = "/repos/%s/docs" % repo_id
        async with aiohttp.ClientSession() as session:
            result = await self.req(session, api)
            docs = {}
            for doc in result.get('data'):
                title = doc['title']
                slug = doc['slug']
                docs[slug] = title
            return docs

    # 获取正文 Markdown 源代码
    async def get_body(self, repo_id, slug):
        api = "/repos/%s/docs/%s" % (repo_id, slug)
        async with aiohttp.ClientSession() as session:
            result = await self.req(session, api)
            body = result['data']['body']
            body = re.sub("<a name=\".*\"></a>","", body)  # 正则去除语雀导出的<a>标签
            body = re.sub(r'\<br \/\>!\[image.png\]',"\n![image.png]",body) # 正则去除语雀导出的图片后紧跟的<br \>标签
            body = re.sub(r'\)\<br \/\>', ")\n", body)  # 正则去除语雀导出的图片后紧跟的<br \>标签
            return body

    # 选择知识库
    def selectRepo(self):
        choices = [{"name": repo_name} for repo_name, _ in self.repo.items()]
        choices.insert(0, Separator('=== 知识库列表 ==='))
        questions = [
            {
                'type': 'checkbox',
                'qmark': '>>>',
                'message': '选择知识库',
                'name': 'repo',
                'choices': choices
            }
        ]
        repo_name_list = prompt(questions, style=custom_style_2)
        return repo_name_list["repo"]

    # 创建文件夹
    def mkDir(self, dir):
        isExists = os.path.exists(dir)
        if not isExists:
            os.makedirs(dir)

    # 获取文章并执行保存
    async def download_md(self, repo_id, slug, repo_name, title):
        """
        :param repo_id: 知识库id
        :param slug: 文章id
        :param repo_name: 知识库名称
        :param title: 文章名称
        :return: none
        """
        body = await self.get_body(repo_id, slug)
        new_body, image_list = await self.to_local_image_src(body)

        if image_list:
            # 图片保存位置: .yuque/<repo_name>/assets/<filename>
            save_dir = os.path.join(self.export_dir, repo_name, "assets")
            self.mkDir(save_dir)
            async with aiohttp.ClientSession() as session:
                await asyncio.gather(
                    *(self.download_image(session, image_info, save_dir) for image_info in image_list)
                )

        self.save(repo_name, title, new_body)

        print(" %s 导出成功!" % color(title, fore='green', style='bright'))

    # 将md里的图片地址替换成本地的图片地址
    async def to_local_image_src(self, body):
        body = re.sub(r'\<br \/\>!\[image.png\]',"\n![image.png]",body) # 正则去除语雀导出的图片后紧跟的<br \>标签
        body = re.sub(r'\)\<br \/\>', ")\n", body)  # 正则去除语雀导出的图片后紧跟的<br \>标签
        
        pattern = r"!\[(?P<img_name>.*?)\]" \
                  r"\((?P<img_src>https:\/\/cdn\.nlark\.com\/yuque.*\/(?P<slug>\d+)\/(?P<filename>.*?\.[a-zA-z]+)).*\)"
        repl = r"![\g<img_name>](assets/\g<filename>)"
        images = [_.groupdict() for _ in re.finditer(pattern, body)]
        new_body = re.sub(pattern, repl, body)
        return new_body, images

    # 下载图片
    async def download_image(self, session, image_info: dict, save_dir: str):
        img_src = image_info['img_src']
        filename = image_info["filename"]

        async with session.get(img_src) as resp:
            with open(os.path.join(save_dir, filename), 'wb') as f:
                f.write(await resp.read())

    # 保存文章
    def save(self, repo_name, title, body):
        # 将不能作为文件名的字符进行编码
        def check_safe_path(path: str):
            for char in r'/\<>?:"|*':
                path = path.replace(char, parse.quote_plus(char))
            return path

        repo_name = check_safe_path(repo_name)
        title = check_safe_path(title)
        save_path = "./yuque/%s/%s.md" % (repo_name, title)
        with open(save_path, "w", encoding="utf-8") as f:
            f.write(body)

    async def run(self):
        self.print_logo()
        await self.getRepo()
        repo_name_list = self.selectRepo()
        
        self.mkDir(self.export_dir)  # 创建用于存储知识库文章的文件夹

        # 遍历所选知识库
        for repo_name in repo_name_list:
            dir_path = self.export_dir + "/" + repo_name.replace("/", "%2F")
            dir_path.replace("//", "/")
            self.mkDir(dir_path)

            repo_id = self.repo[repo_name]
            docs = await self.get_docs(repo_id)

            await asyncio.gather(
                *(self.download_md(repo_id, slug, repo_name, title) for slug, title in docs.items())
            )

        print("\n" + color('导出完成!', fore='green', style='bright'))
        print("已导出到:" + color(os.path.realpath(self.export_dir), fore='green', style='bright'))


if __name__ == '__main__':
    export = ExportMD()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(export.run())

可能出现的报错

  • 运行脚本时出现如下错误:

export_yuque-2

  • 原因是默认的最大打开文件数不够,修复方法:
$ ulimit -n      # 查看当前最大打开数文件
$ ulimit -n 512  # 设置多一点

export_yuque-3

建立索引

这里以语雀目录为内容,批量添加obsidian的内链格式[[xxx]],以建立索引
  • 先复制语雀全部文档的标题,然后利用以下脚本批量添加内链格式,最后根据情况进行手动调整。
# -*- coding: UTF-8 -*-

file = "list.txt"
new_file = "list2.txt"

datas = []

with open(file, "r") as f:
    lines = f.readlines()
    for line in lines:
        data = "[[" + line.strip() + "]]"
        datas.append(data)

with open(new_file, "w") as f2:
    for line in datas:
        f2.writelines(line + "\n")
  • obsidian一些配置

export_yuque-4

如果觉得我的文章对你有帮助,请我吃颗糖吧~