[Ray] Multiprocessing library Ray
데이터의 행렬 연산, 딥러닝, 전처리 등 데이터가 커질 수록 연산량도 많아집니다.
그리고 파이썬은 인터프리터 언어인 특성상 느립니다. numpy를 사용하는 것 만으로도 속도가 올라가지만, 놀고 있는 코어를 모두 사용해 병렬 연산을 하면 더 빨라지지 않을까? 하는 생각을 하게됩니다.
그래서 처음에 python의 내장함수인 multiprocessing을 사용하곤 했는데 이보다 더 편하고 좋은 방법이 있어 포스팅하려고 합니다.
# python multiprocessing
import numpy as np
from multiprocessing import Pool
arr = np.random.random(1000000)
def mul(x):
return x * 10
# list comprehension
result = [mul(x) for x in arr] # Wall time: 558 ms ± 10.4 ms per loop
# multiprocessing
with Pool(processes=16) as p:
result = p.map(mul, arr) # Wall time: 3.54 s ± 52.9 ms per loop
multiprocessing은 프로세스 스포닝(process spawning)(부모 프로세스가 os에 요청하여 자식 프로세스를 새로 만들어내는 과정)을 지원해 사용할 수 있는 여러 프로세서를 활용하게 됩니다. 생성된 프로세스 풀을 제어하는 Pool 객체를 통해 병렬 처리를 합니다. Pool 클래스 내에 프로세스 수를 설정해 map(), imap(), map_async(), imap_unordered() 등의 메서드를 사용합니다.
위 예제에서는 멀티프로세싱한 결과가 더 느린 결과를 보여줍니다. muliprocessing 라이브러리에서 프로세스간에 객체를 전달할 때 pickle을 사용해 직렬화(Serialize)한 뒤 전달합니다. 프로세스 수만큼 데이터를 복사하여 메모리를 많이 차지하게 되고 역직렬화를 통해 데이터를 받기 때문에 오버헤드가 발생합니다.
그리고 프로세스를 사용한 뒤 풀을 닫지 않으면 프로세스가 메모리에 계속 남아있어 메모리 누수 현상도 발생합니다. 이를 위해서 비동기적으로 병렬처리를 하고 contextlib의 closing()함수를 사용하는 등의 번거로움도 있씁니다.
즉, multiprocessing은 프로세스에 데이터를 주고 받는 상황이 많아짐에 따라 느려집니다. 복잡한 연산을 사용해야하는 경우는 multiprocessing이 더 빠르겠지만 메모리 관리, 비동기 처리 등 복잡한 문제가 있습니다.
Ray
Ray는, 간단하게 분산 애플리케이션을 구현할 수 있게 해주는 범용 API입니다.
- 분산 애플리케이션을 구현할 수 있게 해주는 기본적인 기능 제공
- 엔드유저가 코드를 크게 변경하지 않고 단일 코드로 병렬처리를 할 수 있게 제공
- 큰 규모의 애플리케이션, 라이브러리, 툴 의 에코 시스템 등의 복잡한 애플리케이션을 Ray Core 위에서 동작할 수 있게 지원
Ray는 multiprocessing을 대체할 여러 장점을 가진 라이브러리입니다. 분산처리, 병렬처리를 단순하고 범용적인 API를 통해 사용할 수 있도록 도와줍니다. multiprocessing처럼 병렬 처리를 위해 함수나 코드를 수정할 필요가 없고, 직렬화 오버헤드 문제가 발생하지 않습니다.
Ray에는 크게 CORE, MULTI_NODE, SERVE, DATA, WORKFLOWS, TUNE, RLLIB, TRAIN 등의 라이브러리를 제공합니다.
각 라이브러리가 하는 일은 아래와 같습니다.
[RAY TUNE]하이퍼파라미터 튜닝 확장
pytorch, XGBoost, MXNet, Keras 등 머신러닝 프레임워크에서 분산 하이퍼파라미터 튜닝을 할 수 있도록 지원해주고 TensorBoard에서 로깅 및 체크포인트 매니징까지 자동으로 할 수 있도록 해줌
[RAY RLLIB]강화학습 라이브러리 지원
# Import the RL algorithm (Trainer) we would like to use.
from ray.rllib.agents.ppo import PPOTrainer
# Configure the algorithm.
config = {
# Environment (RLlib understands openAI gym registered strings).
"env": "Taxi-v3",
# Use 2 environment workers (aka "rollout workers") that parallelly
# collect samples from their own environment clone(s).
"num_workers": 2,
# Change this to "framework: torch", if you are using PyTorch.
# Also, use "framework: tf2" for tf2.x eager execution.
"framework": "tf",
# Tweak the default model provided automatically by RLlib,
# given the environment's observation- and action spaces.
"model": {
"fcnet_hiddens": [64, 64],
"fcnet_activation": "relu",
},
# Set up a separate evaluation worker set for the
# `trainer.evaluate()` call after training (see below).
"evaluation_num_workers": 1,
# Only for evaluation runs, render the env.
"evaluation_config": {
"render_env": True,
}
}
# Create our RLlib Trainer.
trainer = PPOTrainer(config=config)
# Run it for n training iterations. A training iteration includes
# parallel sample collection by the environment workers as well as
# loss calculation on the collected batch and a model update.
for _ in range(3):
print(trainer.train())
# Evaluate the trained Trainer (and render each timestep to the shell's
# output).
trainer.evaluate()
[RAY TRAIN]딥러닝 훈련 분산으로 가능
from ray import train
def train_func_distributed():
num_epochs = 3
model = NeuralNetwork()
model = train.torch.prepare_model(model)
loss_fn = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.1)
for epoch in range(num_epochs):
output = model(input)
loss = loss_fn(output, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f"epoch: {epoch}, loss: {loss.item()}")
from ray.train import Trainer
trainer = Trainer(backend="torch", num_workers=4)
trainer.start()
results = trainer.run(train_func_distributed)
trainer.shutdown()
[RAY DATA]데이터셋 로딩 및 계산 분산으로 가능
(csv, json, numpy, text, binary, python object, spark, dask, mars, pandas, ...)
[RAY SERVE]심지어 추론과 API serve도 지원
import requests
import ray
from ray import serve
serve.start()
@serve.deployment
class Counter:
def __init__(self):
self.count = 0
def __call__(self, *args):
self.count += 1
return {"count": self.count}
# Deploy our class.
Counter.deploy()
# Query our endpoint in two different ways: from HTTP and from Python.
assert requests.get("http://127.0.0.1:8000/Counter").json() == {"count": 1}
assert ray.get(Counter.get_handle().remote()) == {"count": 2}
Ray 사용법
Ray는 일단, 처음 사용법이 매우 간단하고 알아야 하는 개념도 많지 않습니다.
pip install ray
import numpy as np
import ray
ray.init()
@ray.remote
def generate_data():
return np.random.normal(size=1000)
@ray.remote
def aggregate_data(x, y):
return x + y
# Generate some random data. This launches 100 tasks that will be scheduled on
# various nodes. The resulting data will be distributed around the cluster.
data = [generate_data.remote() for _ in range(100)]
# Perform a tree reduce.
while len(data) > 1:
data.append(aggregate_data.remote(data.pop(0), data.pop(0)))
# Fetch the result.
ray.get(data)
기존 코드에서 구조의 변경 없이 아주 약간의 추가적인 작업만으로 병렬처리를 할 수 있습니다.
병렬처리를 하고 싶은 함수나 클래스를 @ray.remote 데코레이터로 감싸고,
function.remote()를 ray.get()을 통해 호출하기만 하면 됩니다.
꼭 코어의 수만큼 task를 나누어야 하는 것은 아니고 만약 8개의 코어(프로세스)가 있을 때 24번 함수를 호출 했으면 각 코어별로 3번 함수를 실행하게 됩니다.
results = [ray.get(do_some_work.remote(x)) for x in range(4)] # (X)
results = ray.get([do_some_work.remote(x) for x in range(4)]) # (O)
주의할 점은, ray.get()할 때 ray.get을 반복 호출하게 될 경우 나중에 호출된 ray.get()은 앞의 작업을 기다리게 됩니다.
따라서 병렬처리가 이루어지지 않으므로 느리기 때문에 ray.get()은 한번만 호출해야합니다.
그리고 ray.remote로 감싸 병렬처리 할 task는 되도록 크게 만들어주어야
내부 OS 스케줄링 문제 등 때문에 오버헤드가 걸릴 일이 없습니다.
또 가벼운 작업에는 사용하지 않는 것이 좋습니다.
x_id = ray.put("example")
ray.get(x_id) # "example"
import numpy as np
import ray
ray.init()
@ray.remote
def no_work(x):
return
x = np.zeros((5000, 5000))
results = [no_work.remote(x) for x in range(20)]
results = ray.get(results)
그리고 큰 object를 모든 프로세스에서 사용해야할 경우, 데이터의 복사가 일어나기 때문에 메모리를 많이 차지하게 됩니다.
따라서 ray.put()으로 object ID를 리턴하도록 만들어 모든 프로세스에 공유할 수 있습니다.
ray.shutdown()
그리고 모든 작업이 끝나면 ray.shutdown()을 통해 프로세스를 해제해줍니다.
그런데 저의 경우 shutdown을 했음에도 죽지 않는 좀비프로세스가 남아있는 문제가 있었습니다. kill도 되지 않고 오로지 reboot를 했을 때만 없어졌습니다.. 아직까지 해결을 못하고 있는 부분입니다. 그래도 도커를 사용하기 때문에 docker restart만 해주면 되서 아주 큰 문제는 아니지만 defunct process(좀비 프로세스)가 메모리를 차지하고 있다는게 너무 찜찜해서 이 부분 해결책을 찾으면 다시 포스팅하도록 하겠습니다.
Ray의 구성요소
- Ray Task
호출하는 곳과 다른 프로세스에서 실행되는 함수 또는 클래스.
@ray.remote 로 감싸진 함수를 Task라고 한다.(클래스일 경우 Actor)
호출자와 비동기적(asynchronously)으로 실행된다.
- Ray Object
Task를 통해 반환되거나 ray.put()을 통해 생성되는 값
데이터의 크기가 큰 경우 ray.put()을 통해 Object로 만들어 Ray에서 빠르게 사용할 수 있다.
- Ray Actor
Stateful한 워커 프로세스.
클래스에 @ray.remote로 감싸면 Actor class가 되며 이 클래스의 함수 호출은 stateful task(상태 정보를 저장해 다시 호출 가능)가 된다.
Ray API
- ray.init()
ray를 초기화하고 병렬처리할 프로세스를 할당하고 context를 초기화, dashboard를 띄운다.
@ray.remote
파이썬 함수를 Ray Task로 클래스를 Ray Actor로 만들어주는 데코레이터
- ray.remote()
remote function, class 선언 호출시 사용하는 접미사로 기존 함수의 인자를 받을 수 있다.
ray.remote()의 리턴 값은 ObjectRef인데, ray.get(ObjectRef) 하여 task를 실행하고 남은 값을 반환할 수 있다.
remote() 작업은 비동기적으로 실행된다.
- ray.put()
object를 저장하고 ID를 반환하여 remote function에서 사용할 수 있다. ray.put()은 동기적으로 실행된다.
- ray.get()
ObjectRef(object id)를 인자로 받아 object value를 반환
ray.get()은 동기적으로 실행
- ray.wait()
remote()로 지정된 object들 중 준비가 된 object id를 반환하며 task가 완료될 때까지 기다리는 역할을 함.
- ray.shutdown()
ray.init()으로 할당된 프로세스를 종료하고 워커와의 연결을 끊는다.
Ray를 사용한 파이썬 프로세스가 종료되면 자동으로 이 코드가 실행됨
Ray Dashboard 사용법
그리고 Ray는 자체적으로 각 자원의 사용량을 확인할 수 있는 Dashboard를 제공합니다.
각 프로세스별 로그도 확인할 수 있고
각 프로세스별 CPU, GPU, RAM, SSD의 사용하는 자원의 양도 한번에 확인이 가능합니다.
pip install ray[default]
ray[default]를 설치하고 ray.init() 호출 시 8265 포트로 자동 매핑 되어 대시보드가 동작합니다.
ray.init(num_cpus=num_cpus, ignore_reinit_error=True, dashboard_host="0.0.0.0", dashboard_port=8265, include_dashboard=True)
매핑이 되지 않거나 추가적으로 포트 등의 파라미터를 변경하고 싶으면 ray.init()의 인자를 추가하면 됩니다.
References
https://otzslayer.github.io/python/2021/10/15/multiprocesesing-using-ray.html
https://zzsza.github.io/mlops/2021/01/03/python-ray/
https://titania7777.tistory.com/15