一聚教程网:一个值得你收藏的教程网站

最新下载

热门教程

python断点续传的原理及实现例子(支持多线程)

时间:2016-08-12 编辑:简简单单 来源:一聚教程网

一、 简单原理

说到断点续传,就不得不说一些和断点续传相关的HTTP头部字段。

① Content-Length

Content-Length用于指示HTTP响应包中实体的大小。除非使用了分块编码,否则Content-Length首部就是带有实体主体的报文必须使用的。使用Content-Length首部是为了能够检测出服务器崩溃而导致的报文截尾,并对共享持久连接的多个报文进行正确分段。

检测结尾
 HTTP的早期版本采用关闭连接的办法来划定报文的结束。但是,没有Content-Length的话,客户端无法区分到底是报文结束时正常的关闭连接还是报文传输中由于服务器崩溃而导致的连接关闭。客户端需要通过Content-Length来检测报文截尾。
 报文截尾的问题对缓存代理服务器来说尤为重要。如果缓存服务器收到被截尾的报文却没有识别出截尾的话,它可能会存储不完整的内容并多次使用他来提供服务。缓存代理服务器通常不会为没有显式Content-Length首部的HTTP主体做缓存,以此来减小缓存已截尾报文的风险。
Content-Length与持久连接
Content-Length首部对于持久链接是必不可少的。如果响应通过持久连接传送,就可能有另一条HTTP响应紧随其后。客户端通过Content-Length首部就可以知道报文在何处结束,下一条报文从何处开始。因为连接是持久的,客户端无法依赖连接关闭来判断报文的结束。
有一种情况,使用持久连接可以没有Content-Length首部,即采用分块编码(chunked encoding)时。在分块编码的情况下,数据是分为一系列的块来发送的,没块都有大小说明。哪怕服务器在生成首部的时候不知道整个实体的大小(通常是因为实体是动态生成的),仍然可以使用分块编码传输若干已知大小的块。
② Transfer-Encoding

HTTP协议中只定义了一种Transger-Encoding,也就是chunked。举个例子,如果服务端的主体是动态生成的。而客户端不希望服务端将主体全部生成后再获取,因为中间的时延会特别大。chunked的格式如下:


HTTP/1.1 200 OK

Transfer-Encoding: chunked

2 ab

a 0123456789

0
1
2
3
4
5
6
7
8
9
HTTP/1.1 200 OK
 
Transfer-Encoding: chunked
 
2 ab
 
a 0123456789
 
0
③ Content-Enconding

常见有如下三种:gzip,deflate,compress。它用来指示实体是以什么算法进行编码的。通常,Content-Encoding与Transfer-Encoding结合使用。

④ Content-Range

用于响应头,指定整个实体中的一部分的插入位置,他也指示了整个实体的长度。在服务器向客户返回一个部分响应,它必须描述响应覆盖的范围和整个实体长度。一般格式:

 Content-Range: bytes start-end/total

⑤ Range

用于请求头中,指定第一个字节的位置和最后一个字节的位置,一般格式:

 Range:bytes=start-end

 

二、单线程实现

① 是否支持断点续传

采用head获取部分实体,看是否返回头中含有Content-Range
采用head获取部分实体,看返回状态码是否为206。
② 具体实现步骤

使用head方法获取文件大小
获取本地文件大小
设置请求头Range信息
利用requqests.response.iter_content以及开启stream模式
文件下载到一定大小就写入

 代码如下 复制代码

# usr/bin/env python
# coding: utf-8

"""
Copyright (c) 2015-2016 cain
author: cain
"""

import os
import time
import logging
import datetime
import requests
import argparse

class fileDownload(object):
    def __init__(self, url, file_name):
        """
        :param url:文件的下载地址
        :param file_name:重命名文件的名字
        :return:
        """
        self.url = url
        self.file_name = file_name
        self.stat_time = time.time()
        self.file_size = self.getSize()
        self.offset = self.getOffset()
        self.downloaded_size = self.offset
        self.headers = self.setHeaders()
        self.tmpfile = ""
        self.info()

    def info(self):
        logging.info("Downloaded    [%s] bytes" % (self.offset))

    def setHeaders(self):
        """
        根据已下载文件的大小设置Range头部范围并返回
        :return:
        """
        start = self.offset
        end = self.file_size - 1
        range = "bytes={0}-{1}".format(start, end)
        return {"Range": range}

    def getOffset(self):
        if os.path.exists(self.file_name):
            if self.file_size == os.path.getsize(self.file_name):
                exit()
            else:
                return os.path.getsize(self.file_name)
        else:
            return 0

    def getSize(self):
        """
        :return:返回文件的大小,采用head的方式
        """
        response = requests.head(self.url)
        return int(response.headers["content-length"])

    def download(self):
        """
        断点续传的核心部分
        :return:
        """
        with open(self.file_name, "ab") as f:
            try:
                r = requests.get(self.url, stream=True, headers=self.headers)
                for chunk in r.iter_content(chunk_size=1024):
                    if not chunk:
                        break
                    self.tmpfile += chunk
                    if len(self.tmpfile) == 1024*50:
                        f.write(self.tmpfile)
                        self.downloaded_size += len(self.tmpfile)
                        logging.info("Downloaded ---[%.2f%%] [%s/%s] bytes" % (float(self.downloaded_size)
                                                                           /self.file_size*100,
                                                                           self.downloaded_size, self.file_size))
                        self.tmpfile = ""
            except KeyboardInterrupt:
                logging.warning("Interruped by user")
                logging.info("Ending the thread,please do not exit")
            finally:
                f.write(self.tmpfile)
                self.downloaded_size += len(self.tmpfile)
                logging.info("Downloaded ---[%.2f%%] %s/%s bytes" % (float(self.downloaded_size)
                                                                   /self.file_size*100,
                                                                   self.downloaded_size, self.file_size))
                consume = int(time.time()) - self.stat_time
                logging.info("It consumes %d seconds" % (consume))
                logging.info("End at %s" % (time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()))))

def init():
    """
    配置日志信息
    :return:
    """
    logging.basicConfig(format='[%(asctime)s]\t[%(levelname)s]\t%(message)s',
                    level="DEBUG",
                    datefmt="%Y/%m/%d %I:%M:%S %p"
                    )

def run(url, name):
    if not name:
        name = url.split("/")[-1]
    file = fileDownload(url, name)
    file.download()

if __name__ == '__main__':
    init()
    parser = argparse.ArgumentParser()
    parser.add_argument("url", help="The file's url")
    parser.add_argument("--name", help="The file's name you want to rename")
    args = parser.parse_args()
    run(args.url, args.name)

三、多线程实现(非断点续传)

 

 代码如下 复制代码
# usr/bin/env python
# coding: utf-8
 
"""
Copyright (c) 2015-2016 cain
author: cain
"""
 
import time
import math
import Queue
import logging
import argparse
import requests
import threading
 
mutex = threading.Lock()
 
 
class FileDownload(object):
    def __init__(self, url, filename, threadnum, bulk_size, chunk_size):
        self.url = url
        self.filename = filename
        self.threadnum = threadnum
        self.bulk_size = bulk_size
        self.chunk_size = chunk_size
        self.file_size = self.getSize()
        self.buildEmptyFile()
        self.queue = Queue.Queue(1024)
        self.setQueue()
 
 
    def getSize(self):
        """
        :return:返回文件的大小,采用head的方式
        """
        response = requests.head(self.url)
        return int(response.headers["content-length"])
 
    def buildEmptyFile(self):
        """
        建立空文件
        :return:
        """
        try:
            logging.info("Building empty file...")
            with open(self.filename, "w") as f:
                f.seek(self.file_size)
                f.write("\x00")
                f.close()
        except Exception as err:
            logging.error("Building empty file error...")
            logging.error(err)
            exit()
 
    def setQueue(self):
        """
        根据文件大小以及设置的每个任务的文件大小设置队列
        :return:返回队列信息
        """
        logging.info("Setting the queue...")
        tasknums = int(math.ceil(float(self.file_size)/self.bulk_size))     # 向上取整
        for i in range(tasknums):
            ranges = (self.bulk_size*i, self.bulk_size*(i+1)-1)
            self.queue.put(ranges)
 
    def download(self):
        while True:
            logging.info("Downloading data in %s" % (threading.current_thread().getName()))
            if not self.queue.empty():
                start, end = self.queue.get()
                tmpfile = ""
                ranges = "bytes={0}-{1}".format(start, end)
                headers = {"Range": ranges}
                logging.info(headers)
                r = requests.get(self.url, stream=True, headers=headers)
                for chunk in r.iter_content(chunk_size=self.chunk_size):
                    if not chunk:
                        break
                    tmpfile += chunk
                mutex.acquire()
                with open(self.filename, "r+b") as f:
                    f.seek(start)
                    f.write(tmpfile)
                    f.close()
                logging.info("Writing [%d]bytes data into the file..." % (len(tmpfile)))
                mutex.release()
            else:
                logging.info("%s is over..." % (threading.current_thread().getName()))
                break
 
    def run(self):
        threads = list()
        for i in range(self.threadnum):
            threads.append(threading.Thread(target=self.download))
        for thread in threads:
            thread.start()
        for thread in threads:
            thread.join()
 
def logInit():
    """
    配置日志信息
    :return:
    """
    logging.basicConfig(format='[%(asctime)s]\t[%(levelname)s]\t%(message)s',
                        level="DEBUG",
                        datefmt="%Y/%m/%d %I:%M:%S %p")
 
 
def start(url, filename, threadnum):
    """
    下载部分核心功能
    :param url:
    :param filename:
    :param threadnum:
    :return:
    """
    url = url
    filename = filename
    threadnum = threadnum if threadnum and threadnum < 20 else 5
    bulk_size = 2*1024*1014
    chunk_size = 50*1024
    print url, filename, threadnum, bulk_size, chunk_size
    Download = FileDownload(url, filename, threadnum, bulk_size, chunk_size)
    Download.run()
 
if __name__ == '__main__':
    logInit()
    logging.info("App is starting...")
    start_time = time.time()
    parser = argparse.ArgumentParser()
    parser.add_argument("url", help="The file's url")
    parser.add_argument("--filename", help="The file's name you want to rename")
    parser.add_argument("--threadnum", help="The threads you want to choose", type=int)
    args = parser.parse_args()
    start(args.url, args.filename, args.threadnum)
    logging.info("App in ending...")
    logging.info("It consumes [%d] seconds" % (time.time()-start_time))

 

四、多线程断点续传

这就是上面的结合体,不过通常会使用一个配置文件用于保存下载的状态,重新下载的时候就根据此文件重新配置。

热门栏目