一聚教程网:一个值得你收藏的教程网站

最新下载

热门教程

Python如何实现并行抓取整站40万条房价数据的教程(可更换抓取城市)

时间:2016-12-14 编辑:简简单单 来源:一聚教程网

写在前面

这次的爬虫是关于房价信息的抓取,目的在于练习10万以上的数据处理及整站式抓取。

数据量的提升最直观的感觉便是对函数逻辑要求的提高,针对Python的特性,谨慎的选择数据结构。以往小数据量的抓取,即使函数逻辑部分重复,I/O请求频率密集,循环套嵌过深,也不过是1~2s的差别,而随着数据规模的提高,这1~2s的差别就有可能扩展成为1~2h。

因此对于要抓取数据量较多的网站,可以从两方面着手降低抓取信息的时间成本。

1)优化函数逻辑,选择适当的数据结构,符合Pythonic的编程习惯。例如,字符串的合并,使用join()要比“+”节省内存空间。

2)依据I/O密集与CPU密集,选择多线程、多进程并行的执行方式,提高执行效率。

一、获取索引

包装请求request,设置超时timeout

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 获取列表页面
defget_page(url):
  headers={
    'User-Agent': r'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) '
           r'Chrome/45.0.2454.85 Safari/537.36 115Browser/6.0.3',
    'Referer': r'http://bj.fangjia.com/ershoufang/',
    'Host': r'bj.fangjia.com',
    'Connection':'keep-alive'
  }
  timeout=60
  socket.setdefaulttimeout(timeout)# 设置超时
  req=request.Request(url, headers=headers)
  response=request.urlopen(req).read()
  page=response.decode('utf-8')
  returnpage

一级位置:区域信息


 

二级位置:板块信息(根据区域位置得到板块信息,以key_value对的形式存储在dict中)

以dict方式存储,可以快速的查询到所要查找的目标。-> {'朝阳':{'工体','安贞','健翔桥'......}}

三级位置:地铁信息(搜索地铁周边房源信息)

将所属位置地铁信息,添加至dict中。 -> {'朝阳':{'工体':{'5号线','10号线' , '13号线'},'安贞','健翔桥'......}}

对应的url:http://bj.fangjia.com/ershoufang/--r-%E6%9C%9D%E9%98%B3%7Cw-5%E5%8F%B7%E7%BA%BF%7Cb-%E6%83%A0%E6%96%B0%E8%A5%BF%E8%A1%97

解码后的url:http://bj.fangjia.com/ershoufang/--r-朝阳|w-5号线|b-惠新西街

根据url的参数模式,可以有两种方式获取目的url:

1)根据索引路径获得目的url

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 获取房源信息列表(嵌套字典遍历)
defget_info_list(search_dict, layer, tmp_list, search_list):
  layer+=1# 设置字典层级
  foriinrange(len(search_dict)):
    tmp_key=list(search_dict.keys())[i]# 提取当前字典层级key
    tmp_list.append(tmp_key) # 将当前key值作为索引添加至tmp_list
    tmp_value=search_dict[tmp_key]
    ifisinstance(tmp_value,str): # 当键值为url时
      tmp_list.append(tmp_value) # 将url添加至tmp_list
      search_list.append(copy.deepcopy(tmp_list)) # 将tmp_list索引url添加至search_list
      tmp_list=tmp_list[:layer]# 根据层级保留索引
    eliftmp_value=='': # 键值为空时跳过
      layer-=2     # 跳出键值层级
      tmp_list=tmp_list[:layer] # 根据层级保留索引
    else:
      get_info_list(tmp_value, layer, tmp_list, search_list)# 当键值为列表时,迭代遍历
      tmp_list=tmp_list[:layer]
  returnsearch_list

2)根据dict信息包装url

{'朝阳':{'工体':{'5号线'}}}

参数:

——  r-朝阳

——  b-工体

——  w-5号线

组装参数:http://bj.fangjia.com/ershoufang/--r-朝阳|w-5号线|b-工体

1
2
3
4
5
1# 根据参数创建组合url
2defget_compose_url(compose_tmp_url, tag_args, key_args):
3  compose_tmp_url_list=[compose_tmp_url,'|'iftag_args !='r-'else'', tag_args, parse.quote(key_args), ]
4  compose_url=''.join(compose_tmp_url_list)
5  returncompose_url

二、获取索引页最大页数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 获取当前索引页面页数的url列表
defget_info_pn_list(search_list):
  fin_search_list=[]
  foriinrange(len(search_list)):
    print('>>>正在抓取%s'%search_list[i][:3])
    search_url=search_list[i][3]
    try:
      page=get_page(search_url)
    except:
      print('获取页面超时')
      continue
    soup=BS(page,'lxml')
    # 获取最大页数
    pn_num=soup.select('span[class="mr5"]')[0].get_text()
    rule=re.compile(r'\d+')
    max_pn=int(rule.findall(pn_num)[1])
    # 组装url
    forpninrange(1, max_pn+1):
      print('************************正在抓取%s页************************'%pn)
      pn_rule=re.compile('[|]')
      fin_url=pn_rule.sub(r'|e-%s|'%pn, search_url,1)
      tmp_url_list=copy.deepcopy(search_list[i][:3])
      tmp_url_list.append(fin_url)
      fin_search_list.append(tmp_url_list)
  returnfin_search_list

三、抓取房源信息Tag

这是我们要抓取的Tag:

['区域', '板块', '地铁', '标题', '位置', '平米', '户型', '楼层', '总价', '单位平米价格']


 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 获取tag信息
defget_info(fin_search_list, process_i):
  print('进程%s开始'%process_i)
  fin_info_list=[]
  foriinrange(len(fin_search_list)):
    url=fin_search_list[i][3]
    try:
      page=get_page(url)
    except:
      print('获取tag超时')
      continue
    soup=BS(page,'lxml')
    title_list=soup.select('a[class="h_name"]')
    address_list=soup.select('span[class="address]')
    attr_list=soup.select('span[class="attribute"]')
    price_list=soup.find_all(attrs={"class":"xq_aprice xq_esf_width"})# select对于某些属性值(属性值中间包含空格)无法识别,可以用find_all(attrs={})代替
    fornuminrange(20):
      tag_tmp_list=[]
      try:
        title=title_list[num].attrs["title"]
        print(r'************************正在获取%s************************'%title)
        address=re.sub('\n', '', address_list[num].get_text()) 
        area=re.search('\d+[\u4E00-\u9FA5]{2}', attr_list[num].get_text()).group(0)
        layout=re.search('\d[^0-9]\d.', attr_list[num].get_text()).group(0)
        floor=re.search('\d/\d', attr_list[num].get_text()).group(0)
        price=re.search('\d+[\u4E00-\u9FA5]', price_list[num].get_text()).group(0)
        unit_price=re.search('\d+[\u4E00-\u9FA5]/.', price_list[num].get_text()).group(0)
        tag_tmp_list=copy.deepcopy(fin_search_list[i][:3])
        fortagin[title, address, area, layout, floor, price, unit_price]:
          tag_tmp_list.append(tag)
        fin_info_list.append(tag_tmp_list)
      except:
        print('【抓取失败】')
        continue
  print('进程%s结束'%process_i)
  returnfin_info_list

四、分配任务,并行抓取

对任务列表进行分片,设置进程池,并行抓取。

1
2
3
4
5
6
7
8
9
# 分配任务
defassignment_search_list(fin_search_list, project_num):# project_num每个进程包含的任务数,数值越小,进程数越多
  assignment_list=[]
  fin_search_list_len=len(fin_search_list)
  foriinrange(0, fin_search_list_len, project_num):
    start=i
    end=i+project_num
    assignment_list.append(fin_search_list[start: end])# 获取列表碎片
  returnassignment_list
1
2
3
4
5
6
7
8
9
10
p=Pool(4)# 设置进程池
assignment_list=assignment_search_list(fin_info_pn_list,3)# 分配任务,用于多进程
result=[]# 多进程结果列表
foriinrange(len(assignment_list)):
  result.append(p.apply_async(get_info, args=(assignment_list[i], i)))
p.close()
p.join()
forresult_iinrange(len(result)):
  fin_info_result_list=result[result_i].get()
  fin_save_list.extend(fin_info_result_list)# 将各个进程获得的列表合并

通过设置进程池并行抓取,时间缩短为单进程抓取时间的3/1,总计时间3h。

电脑为4核,经过测试,任务数为3时,在当前电脑运行效率最高。

五、将抓取结果存储到excel中,等待可视化数据化处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 存储抓取结果
defsave_excel(fin_info_list, file_name):
  tag_name=['区域','板块','地铁','标题','位置','平米','户型','楼层','总价','单位平米价格']
  book=xlsxwriter.Workbook(r'C:\Users\Administrator\Desktop\%s.xls'%file_name)# 默认存储在桌面上
  tmp=book.add_worksheet()
  row_num=len(fin_info_list)
  foriinrange(1, row_num):
    ifi==1:
      tag_pos='A%s'%i
      tmp.write_row(tag_pos, tag_name)
    else:
      con_pos='A%s'%i
      content=fin_info_list[i-1]# -1是因为被表格的表头所占
      tmp.write_row(con_pos, content)
  book.close()

附上源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
#! -*-coding:utf-8-*-
# Function: 房价调查
# Author:?兹
fromurllibimportparse, request
frombs4importBeautifulSoup as BS
frommultiprocessingimportPool
importre
importlxml
importdatetime
importcProfile
importsocket
importcopy
importxlsxwriter
starttime=datetime.datetime.now()
test_search_dict={'昌平': {'霍营': {'13号线':'http://bj.fangjia.com/ershoufang/--r-%E6%98%8C%E5%B9%B3|w-13%E5%8F%B7%E7%BA%BF|b-%E9%9C%8D%E8%90%A5'}}}
search_list=[]# 房源信息url列表
tmp_list=[]# 房源信息url缓存列表
layer=-1
# 获取列表页面
defget_page(url):
  headers={
    'User-Agent': r'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) '
           r'Chrome/45.0.2454.85 Safari/537.36 115Browser/6.0.3',
    'Referer': r'http://bj.fangjia.com/ershoufang/',
    'Host': r'bj.fangjia.com',
    'Connection':'keep-alive'
  }
  timeout=60
  socket.setdefaulttimeout(timeout)# 设置超时
  req=request.Request(url, headers=headers)
  response=request.urlopen(req).read()
  page=response.decode('utf-8')
  returnpage
# 获取查询关键词dict
defget_search(page, key):
  soup=BS(page,'lxml')
  search_list=soup.find_all(href=re.compile(key), target='')
  search_dict={}
  foriinrange(len(search_list)):
    soup=BS(str(search_list[i]),'lxml')
    key=soup.select('a')[0].get_text()
    value=soup.a.attrs['href']
    search_dict[key]=value
  returnsearch_dict
# 获取房源信息列表(嵌套字典遍历)
defget_info_list(search_dict, layer, tmp_list, search_list):
  layer+=1# 设置字典层级
  foriinrange(len(search_dict)):
    tmp_key=list(search_dict.keys())[i]# 提取当前字典层级key
    tmp_list.append(tmp_key) # 将当前key值作为索引添加至tmp_list
    tmp_value=search_dict[tmp_key]
    ifisinstance(tmp_value,str): # 当键值为url时
      tmp_list.append(tmp_value) # 将url添加至tmp_list
      search_list.append(copy.deepcopy(tmp_list)) # 将tmp_list索引url添加至search_list
      tmp_list=tmp_list[:layer]# 根据层级保留索引
    eliftmp_value=='': # 键值为空时跳过
      layer-=2     # 跳出键值层级
      tmp_list=tmp_list[:layer] # 根据层级保留索引
    else:
      get_info_list(tmp_value, layer, tmp_list, search_list)# 当键值为列表时,迭代遍历
      tmp_list=tmp_list[:layer]
  returnsearch_list
# 获取房源信息详情
defget_info_pn_list(search_list):
  fin_search_list=[]
  foriinrange(len(search_list)):
    print('>>>正在抓取%s'%search_list[i][:3])
    search_url=search_list[i][3]
    try:
      page=get_page(search_url)
    except:
      print('获取页面超时')
      continue
    soup=BS(page,'lxml')
    # 获取最大页数
    pn_num=soup.select('span[class="mr5"]')[0].get_text()
    rule=re.compile(r'\d+')
    max_pn=int(rule.findall(pn_num)[1])
    # 组装url
    forpninrange(1, max_pn+1):
      print('************************正在抓取%s页************************'%pn)
      pn_rule=re.compile('[|]')
      fin_url=pn_rule.sub(r'|e-%s|'%pn, search_url,1)
      tmp_url_list=copy.deepcopy(search_list[i][:3])
      tmp_url_list.append(fin_url)
      fin_search_list.append(tmp_url_list)
  returnfin_search_list
# 获取tag信息
defget_info(fin_search_list, process_i):
  print('进程%s开始'%process_i)
  fin_info_list=[]
  foriinrange(len(fin_search_list)):
    url=fin_search_list[i][3]
    try:
      page=get_page(url)
    except:
      print('获取tag超时')
      continue
    soup=BS(page,'lxml')
    title_list=soup.select('a[class="h_name"]')
    address_list=soup.select('span[class="address]')
    attr_list=soup.select('span[class="attribute"]')
    price_list=soup.find_all(attrs={"class":"xq_aprice xq_esf_width"})# select对于某些属性值(属性值中间包含空格)无法识别,可以用find_all(attrs={})代替
    fornuminrange(20):
      tag_tmp_list=[]
      try:
        title=title_list[num].attrs["title"]
        print(r'************************正在获取%s************************'%title)
        address=re.sub('\n', '', address_list[num].get_text())
        area=re.search('\d+[\u4E00-\u9FA5]{2}', attr_list[num].get_text()).group(0)
        layout=re.search('\d[^0-9]\d.', attr_list[num].get_text()).group(0)
        floor=re.search('\d/\d', attr_list[num].get_text()).group(0)
        price=re.search('\d+[\u4E00-\u9FA5]', price_list[num].get_text()).group(0)
        unit_price=re.search('\d+[\u4E00-\u9FA5]/.', price_list[num].get_text()).group(0)
        tag_tmp_list=copy.deepcopy(fin_search_list[i][:3])
        fortagin[title, address, area, layout, floor, price, unit_price]:
          tag_tmp_list.append(tag)
        fin_info_list.append(tag_tmp_list)
      except:
        print('【抓取失败】')
        continue
  print('进程%s结束'%process_i)
  returnfin_info_list
# 分配任务
defassignment_search_list(fin_search_list, project_num):# project_num每个进程包含的任务数,数值越小,进程数越多
  assignment_list=[]
  fin_search_list_len=len(fin_search_list)
  foriinrange(0, fin_search_list_len, project_num):
    start=i
    end=i+project_num
    assignment_list.append(fin_search_list[start: end])# 获取列表碎片
  returnassignment_list
# 存储抓取结果
defsave_excel(fin_info_list, file_name):
  tag_name=['区域','板块','地铁','标题','位置','平米','户型','楼层','总价','单位平米价格']
  book=xlsxwriter.Workbook(r'C:\Users\Administrator\Desktop\%s.xls'%file_name)# 默认存储在桌面上
  tmp=book.add_worksheet()
  row_num=len(fin_info_list)
  foriinrange(1, row_num):
    ifi==1:
      tag_pos='A%s'%i
      tmp.write_row(tag_pos, tag_name)
    else:
      con_pos='A%s'%i
      content=fin_info_list[i-1]# -1是因为被表格的表头所占
      tmp.write_row(con_pos, content)
  book.close()
if__name__=='__main__':
  file_name=input(r'抓取完成,输入文件名保存:')
  fin_save_list=[]# 抓取信息存储列表
  # 一级筛选
  page=get_page(base_url)
  search_dict=get_search(page,'r-')
  # 二级筛选
  forkinsearch_dict:
    print(r'************************一级抓取:正在抓取【%s】************************'%k)
    url=search_dict[k]
    second_page=get_page(url)
    second_search_dict=get_search(second_page,'b-')
    search_dict[k]=second_search_dict
  # 三级筛选
  forkinsearch_dict:
    second_dict=search_dict[k]
    fors_kinsecond_dict:
      print(r'************************二级抓取:正在抓取【%s】************************'%s_k)
      url=second_dict[s_k]
      third_page=get_page(url)
      third_search_dict=get_search(third_page,'w-')
      print('%s>%s'%(k, s_k))
      second_dict[s_k]=third_search_dict
  fin_info_list=get_info_list(search_dict, layer, tmp_list, search_list)
  fin_info_pn_list=get_info_pn_list(fin_info_list)
  p=Pool(4)# 设置进程池
  assignment_list=assignment_search_list(fin_info_pn_list,2)# 分配任务,用于多进程
  result=[]# 多进程结果列表
  foriinrange(len(assignment_list)):
    result.append(p.apply_async(get_info, args=(assignment_list[i], i)))
  p.close()
  p.join()
  forresult_iinrange(len(result)):
    fin_info_result_list=result[result_i].get()
    fin_save_list.extend(fin_info_result_list)# 将各个进程获得的列表合并
  save_excel(fin_save_list, file_name)
  endtime=datetime.datetime.now()
  time=(endtime-starttime).seconds
  print('总共用时:%s s'%time)

总结

当抓取数据规模越大,对程序逻辑要求就愈严谨,对python语法要求就越熟练。如何写出更加pythonic的语法,也需要不断学习掌握的。

热门栏目