##language:zh #pragma section-numbers on {{{ Devin Deng <deng.devin@gmail.com> hide details Mar 13 (12 hours ago) reply-to python-chinese@lists.python.cn to python-chinese@lists.python.cn date Mar 13, 2007 11:03 PM subject Re: [python-chinese] 地下室里的爬虫 }}} <<TableOfContents>> ## 默许导航,请保留 去年写的Quick & Dirty 蜘蛛程序,抓指定网站的, 现在都忘光了,看能不能给大家参考一下。 = Quick & Dirty 蜘蛛程序 = {{{ #!python # -*- coding: utf-8 -*- from twisted.python import threadable threadable.init() from twisted.internet import reactor, threads import urllib2 import urllib import urlparse import time from sgmllib import SGMLParser from usearch import USearch # 此部分负责数据库操作,无法公布源码 class URLLister(SGMLParser): def reset(self): SGMLParser.reset(self) self.urls = [] def start_a(self, attrs): href = [v for k, v in attrs if k=='href'] if href: self.urls.extend(href) class Filter: def __init__(self, Host, denys=None, allows=None): self.deny_words = denys self.allow_words = allows # Check url is valid or not. def verify(self, url): for k in self.deny_words: if url.find(k) != -1: return False for k in self.allow_words: if url.find(k) !=-1: return True return True class Host: def __init__(self, hostname, entry_url=None, description=None, encoding=None, charset=None): self.hostname = hostname self.entry_url = entry_url self.encoding = encoding self.charset = charset self.description = description def configxml(self): import elementtree.ElementTree as ET root = ET.Element("config") en = ET.SubElement(root, "encoding") en.text = self.encoding ch = ET.SubElement(root, "charset") ch.text = self.charset entry = ET.SubElement(root, "entry_url") entry.text = self.entry_url return ET.tostring(root) def parse_config(self, configstring): import elementtree.ElementTree as ET from StringIO import StringIO tree = ET.parse(StringIO(configstring)) self.encoding = tree.findtext(".//encoding") self.charset = tree.findtext(".//charset") self.entry_url = tree.findtext(".//entry_url") def create(self): u = USearch() self.configs = self.configxml() ret = u.CreateDomain(self.hostname,self.description, self.configs) #print ret def load(self, flag='A'): # 'A' means all, 0 means unvisited, 1 == visiting, 2 = visited. # TODO: load domain data from backend database. u = USearch() try: ret = u.ListDomain(flag)['result'] for d in ret: if d.domain == self.hostname: self.parse_config(d.parse_config) self.description = d.description return True except: pass return False class Page: def __init__(self, url, host, description=None): self.url = url self.description = description self.host = host self.page_request = None self.content = None self.status_code = None self.encoding = None self.charset = None self.length = 0 self.md5 = None self.urls = [] # Read web page. def get_page(self, url=None): if not url: url = self.url type = get_type(self.host.hostname,url) if type != 0: return None try: opener = urllib2.build_opener() opener.addheaders = [('User-agent', 'Mozilla/5.0')] self.page_request = opener.open(urllib.unquote(url)) #self.page_request = urllib2.urlopen(url) self.content = self.page_request.read() self.status_code = self.page_request.code return self.status_code except: self.stats_code = 500 print "ERROR READING: %s" % self.url return None def get_header(self): if not self.page_request: self.get_page() header = self.page_request.info() try: self.length = header['Content-Length'] content_type = header['Content-Type'] #if content_type.find('charset') == -1: self.charset = self.host.charset self.encoding = self.host.encoding except: pass def get_urls(self): if not self.page_request: self.get_page() if self.status_code != 200: return parser = URLLister() try: parser.feed(self.content) except: print "ERROR: Parse urls error!" return #print "URLS: ", parser.urls #self.urls = parser.urls if not self.charset: self.charset = "gbk" for i in parser.urls: try: type = get_type(self.host.hostname,i) if type == 4: i = join_url(self.host.hostname, self.url, i) if type == 0 or type ==4: if i: i = urllib.quote(i) self.urls.append(i.decode(self.charset).encode('utf-8')) except: pass parser.close() self.page_request.close() def save_header(self): # Save header info into db. pass def save_current_url(self): save_url = urllib.quote(self.url) usearch = USearch() usearch.CreateUrl( domain=self.host.hostname, url=save_url, length=self.length, status_code=self.status_code) # Set URL's flag def flag_url(self, flag): usearch = USearch() usearch.UpdateUrl(status=flag) def save_urls(self): # Save all the founded urls into db print "RELEATED_URLS:", len(self.urls) usearch = USearch() usearch.CreateRelateUrl(urllib.quote(self.url), self.urls) def save_page(self): usearch = USearch() import cgi try: content = self.content.decode(self.charset).encode('utf-8') usearch.CreateSearchContent(self.url.decode(self.charset).encode('utf-8'), content) except: print "ERROR to save page" return -1 print "SAVE PAGE Done", self.url return 0 def get_type(domain, url): if not url: return 5 import urlparse tup = urlparse.urlparse(url) if tup[0] == "http": # check if the same domain if tup[1] == domain: return 0 else: return 1 # outside link if tup[0] == "javascript": return 2 if tup[0] == "ftp": return 3 if tup[0] == "mailto": return 5 return 4 # internal link def join_url(domain, referral, url): if not url or len(url) ==0: return None tup = urlparse.urlparse(url) if not tup: return None if tup[0] == "javascript" or tup[0] == "ftp": return None else: if url[0] == "/": # means root link begins newurl = "http://%s%s" % ( domain, url) return newurl if url[0] == ".": return None # ignore relative link at first. else: # if referral.rfind("/") != -1: # referral = referral[0:referral.rfind("/")+1] # newurl = "%s%s" % (referral, url) newurl = urlparse.urljoin(referral, url) return newurl if __name__ == '__main__': def done(x): u = USearch() x = urllib.quote(x.decode('gbk').encode('utf-8')) u.SetUrlStatus(x, '2') time.sleep(2) print "DONE: ",x url = next_url(h) if not url: reactor.stop() else:threads.deferToThread(spider, h, url ).addCallback(done) def next_url(host): u = USearch() ret = u.GetTaskUrls(host.hostname,'0',1)['result'] try: url = urllib.unquote(ret[0].url) except: return None if urlparse.urlparse(url)[1] != host.hostname: next_url(host) return urllib.unquote(ret[0].url) def spider(host, surf_url): #surf_url = surf_url.decode(host.charset).encode('utf-8') surf_url = urllib.unquote(surf_url) p = Page(surf_url, host) #try: if not p.get_page(): print "ERROR: GET %s error!" % surf_url return surf_url # Something Wrong! p.get_header() # Get page's header p.get_urls() # Get all the urls in page #print p.urls p.save_current_url() # Save current page's url info into DB p.save_urls() p.save_page() #except: # pass return surf_url import sys #host = Host("www.chilema.cn", "/Eat/", "Shenzhen Local", "","gb2312") #host.create() #~ h = Host("www.chilema.cn") #~ h.load() #~ #reactor.callInThread(Spider, h, "http://beta.u2m.cn/") #~ #reactor.callInThread(Spider, h, "http://beta.u2m.cn/canyin/") #~ #reactor.callInThread(Spider, h, "http://beta.u2m.cn/fb/") #~ threads.deferToThread(spider, h, "http://www.chilema.cn/Eat/").addCallback(done) #host = Host("www.ziye114.com", "", "Beijing Local", "gb2312") #host.create() hostname = sys.argv[1] entry_url = "" if len(sys.argv) == 3: entry_url = sys.argv[2] h = Host(hostname) hostname_url = "http://%s/%s" % (hostname,entry_url) h.load() threads.deferToThread(spider, h, hostname_url).addCallback(done) threads.deferToThread(spider, h, next_url(h)).addCallback(done) threads.deferToThread(spider, h, next_url(h)).addCallback(done) threads.deferToThread(spider, h, next_url(h)).addCallback(done) reactor.run() }}} == 反馈 == ::-- ZoomQuiet [<<DateTime(2007-03-14T03:44:44Z)>>]