Data Crawling - 크롤러 속도를 높이는 멀티프로세싱 (multiprocessing)

이번에는 크롤러의 속도를 높이는 방법 중 하나를 알아보자.

우리가 10000개의 유저 데이터를 수집한다고 가정하면, 지금까지는 처음부터 차례대로 하나씩 수집한 것이다.

그런데 만약 동일한 기능을 하는 프로그램 창을 여러개 띄우고, 2500개씩 나누어 4개의 창으로 동시에 데이터를 수집하면 어떨까?

이런 것을 가능하게 하는 것이 바로 파이썬의 기본 모듈 중 하나인 멀티프로세싱 (multiprocessing)이다.

지금부터, 트위터에서 유저 데이터 (username, joined date, total tweets, followings, followers)를 멀티프로세싱을 통해 속도를 개선한 크롤러로 수집해 보겠다.

수집할 트윗은 요즘 결승 라운드를 시작하여 매우 핫한 “슈퍼밴드” 를 포함한 트윗으로, 6월 21일부터 6월 28일까지의 기간으로 정했다.

편의를 위해 트윗을 수집할 때 직접 수집하지 않고 앞서 사용하였던 GetOldTweet3 을 임포트 하였다. 자세한 내용은 여기를 참고하자.

트윗 수집하기

먼저, GetOldTweet3 을 사용하여 특정 검색어를 포함한 트윗을 먼저 수집하고, 이런 트윗을 작성한 유저들의 닉네임 (username)을 리스트로 반환해 보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# import packages
import time
import datetime
import GetOldTweets3 as got

# 트윗 수집하는 함수 정의
def get_tweets(start_date, end_date, keyword):

# 범위 끝을 포함하게 만듬
end_date = (datetime.datetime.strptime(end_date, "%Y-%m-%d")
+ datetime.timedelta(days=1)).strftime("%Y-%m-%d")

# 트윗 수집 기준 설정
tweetCriteria = got.manager.TweetCriteria().setQuerySearch('{}'.format(keyword))\
.setSince(start_date)\
.setUntil(end_date)\
.setMaxTweets(-1) # 모두 수집

print("==> Collecting data start..")
start_time = time.time()
tweets = got.manager.TweetManager.getTweets(tweetCriteria)
print("==> Collecting data end.. {0:0.2f} minutes".format((time.time() - start_time)/60))
print("=== Total number of tweets is {} ===".format(len(tweets)))

return tweets

# 유저 리스트 반환하는 함수 정의
def get_users(tweets):

user_list = []

for index in tweets:
username = index.username
user_list.append(username)

return user_list

# 유저 리스트 수집하기
tweets = get_tweets("2019-06-21", "2019-06-28", "슈퍼밴드")
users = get_users(tweets)

> ==> Collecting data start..
> ==> Collecting data end.. 4.45 minutes
> === Total number of tweets is 3456 ===

get_tweets()으로 2019년 06월 21일부터 2019년 06년 28일까지 키워드 “슈퍼밴드” 를 포함한 트윗을 먼저 수집하고, get_users()으로 트윗을 작성한 유저의 닉네임을 리스트로 만들어 users에 저장하였다.

총 5분 가량 소요되었고, 조건을 만족하는 트윗은 3456개임을 확인하였다.

logger 정의하기

본격적으로 멀티프로세싱에 들어가기 전, 효과적으로 결과를 볼 수 있도록 도와주는 로깅 (logging)을 먼저 소개하고자 한다.

로깅 (logging)이란 현재 우리의 프로그램이 어떤 상태를 가지고 있는지 외부 출력을 하게 만들어서, 개발자들이 프로그램의 상황을 직접 눈으로 확인할 수 있도록 하는 것이다.

얼핏 보면 우리가 지금까지 사용한 print()와 유사하지만, 다양한 옵션의 출력을 미리 세팅해 둘 수 있어서 훨씬 유연하게 상황에 따라 대처할 수 있다.

코드부터 살펴보자. 파이썬 기본 모듈이므로 따로 설치할 필요는 없다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import logging
import logging.handlers

# logging 설정
def get_logger():
logger = logging.getLogger("my")

if len(logger.handlers) > 0:
return logger

logger.setLevel(logging.INFO)
stream_hander = logging.StreamHandler()
logger.addHandler(stream_hander)

return logger

my 라는 로거 (logger)를 정의하고, INFO 이상의 등급에 해상하는 로그만 출력하도록 setLevel()을 통해 설정하였다. 또한 프로그램 실행과 동시에 결과를 보여줄 수 있도록 핸들러를 정의하고, 이를 콘솔창에 출력하도록 하였다.

또한, 로그가 중복되어 두번씩 출력되는 현상을 방지하기 위해 logging.handlers를 이미 불러온 경우에는 또 불러오지 않도록 중간에 if문을 넣었다.

대단히 간단한 옵션들만 사용한 것인데, 자세한 내용은 이승현님의 블로그를 참고하자. 여러가지 옵션에 대해서 상세히 다루어 놓았다.

유저 데이터 수집하기

아까 수집해 놓은 유저 닉네임 (username)을 바탕으로, 유저 데이터를 수집해 보자.

수집에 사용할 툴은 bs4 패키지의 BeautlfulSoup이며, lxml 형식으로 데이터를 받아올 것이다.

수집할 유저 데이터는 유저 닉네임, 가입일, 전체 작성 트윗수, 팔로워수, 팔로잉수 이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import requests
from bs4 import BeautifulSoup

def crawl_userdata(username):

# setting
url = 'https://twitter.com/{}'.format(username)
mylogger.info("{} 유저의 데이터 수집 시작".format(username))
HEADER = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36'}
response = requests.get(url, headers=HEADER)
html = response.text

# parsing
soup = BeautifulSoup(html, "lxml")

# parsing fail
try:
user_profile_header = soup.find("div", {"class":'ProfileHeaderCard'})
user_profile_canopy = soup.find("div", {"class":'ProfileCanopy-nav'})

# data collect
user = user_profile_header.find('a', {'class':'ProfileHeaderCard-nameLink u-textInheritColor js-nav'})['href'].strip("/")

date_joined = user_profile_header.find('div', {'class':"ProfileHeaderCard-joinDate"}).find('span', {'class':'ProfileHeaderCard-joinDateText js-tooltip u-dir'})['title']
date_joined = date_joined.split("-")[1].strip()
if date_joined is None:
data_joined = "Unknown"

tweets = user_profile_canopy.find('span', {'class':"ProfileNav-value"})['data-count']
if tweets is None:
tweets = 0

except AttributeError:
mylogger.info("{} 유저의 데이터 수집 중 알수없는 오류가 발생했습니다.".format(username))
mylogger.info("링크 : {}".format(url))
user, date_joined, tweets, following, followers = username, None, None, None, None

# 블락 계정 특징 : 팔로워, 팔로잉 수가 안보임
try:

test_following = user_profile_canopy.find('li', {'class':"ProfileNav-item ProfileNav-item--following"})
test_followers = user_profile_canopy.find('li', {'class':"ProfileNav-item ProfileNav-item--followers"})

following = test_following.find('span', {'class':"ProfileNav-value"})['data-count']
followers = test_followers.find('span', {'class':"ProfileNav-value"})['data-count']

mylogger.info("{} 유저의 데이터 수집 완료".format(username))

except AttributeError:
mylogger.info("{} 유저는 블락된 계정입니다.".format(username))
following = "Block"
followers = "Block"

result = [user, date_joined, tweets, following, followers]

return result

트위터 데이터를 어려번 수집하여 보니 안 사실인데, 트위터의 정책상 블락 (block)된 유저들은 가입일이나 전체 작성한 트윗 수 등은 조회가 가능하지만, 팔로잉 수나 팔로워 수는 보이지 않는다. 처음에는 이런 정보를 유저가 선택적으로 오픈할 수 있는 옵션이 있는지 의심했지만, 그런 옵션은 공식적으로 제공하지 않았다.

중간중간에 try-except문을 사용하여 위와 같은 경우를 방지하였다. BeautifulSoup을 통해 원하는 데이터를 수집하는 부분은 따로 설명하지 않았다. 좀 더 알고 싶다면 과거에 작성한 글이 있으니 참고.

멀티프로세싱 사용하기

모든 준비가 끝났다. 이제 실제 코드를 멀티프로세싱을 통해서 병렬화 하여 돌리면 된다. (병렬 크롤링)

유의할 점은, 멀티프로세싱을 사용할 때, main()이 무엇인지 정확히 언급해 주어야 오류를 방지할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from multiprocessing import Pool

# 유저 정보 Multiprocessing
global user_info
user_info = []

def main():

user_list = users
pool_size = len(user_list)

if pool_size < 8:
pool = Pool(pool_size)

else:
pool = Pool(8)

for user in pool.map(crawl_userdata, user_list):
user_info.append(user)


if __name__ == '__main__':

start_time = time.time()

mylogger = get_logger()
mylogger.info("유저 정보 수집 시작")

main()

end_time = (time.time() - start_time)/60
mylogger.info("유저 정보 수집 종료.. {0:0.2f} 분 소요".format(end_time))
mylogger.info("총 수집된 유저 정보는 {} 개 입니다.".format(len(user_info)))

Pool은, 몇개의 창을 열어서 동시에 프로그램을 돌릴지 정의하는 함수이다. 위 코드에서는 우리가 수집할 데이터의 갯수가 8 이하일 경우에는 그 숫지만큼, 아닐 경우 8개를 동시에 실행해서 수집하도록 셋팅하였다.

이어서 pool.map()으로 적용하고 싶은 함수 crawl_userdata()와 그 적용 대상이 되는 user_list을 차례로 적고 for문을 완성한다.

위 코드를 실행시켜보면, 총 3456개의 유저 정보가 단 10분만에 크롤링 됨을 확인할 수 있다.

결과가 어떤 형태로 나오는지 보여주기 위해, 해당 트윗도 추가해서 위와 같이 출력해 보았다.

위의 정보 뿐만 아니라 트윗의 업로드 시각, 리트윗 수, 관심글 수, 지역 등의 데이터를 추가로 수집 가능하다.

이전 다루었던 트위터 데이터 크롤링의 코드로 실행하면 거의 3시간씩 걸린 작업을 멀티 프로세싱을 통해 대단히 빠른 속도로 시간을 단축시켜 보았다.

하지만, 이는 공격적인 크롤링으로 오인받아 일부 사이트의 경우 차단당할 우려가 있으므로.. 상황에 맞게 조심스레(?) 사용하자.