Skip to content

Commit

Permalink
merge qos feature to sched_main
Browse files Browse the repository at this point in the history
  • Loading branch information
sallyjunjun committed Dec 21, 2023
1 parent cc316a3 commit b7d3c55
Show file tree
Hide file tree
Showing 10 changed files with 1,102 additions and 5 deletions.
385 changes: 380 additions & 5 deletions lmdeploy/serve/openai/api_server.py

Large diffs are not rendered by default.

58 changes: 58 additions & 0 deletions lmdeploy/serve/openai/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,26 @@ class UsageInfo(BaseModel):
completion_tokens: Optional[int] = 0


class ChatCompletionRequestQos(BaseModel):
"""Chat completion request."""
model: str
messages: Union[str, List[Dict[str, str]]]
temperature: Optional[float] = 0.7
top_p: Optional[float] = 1.0
n: Optional[int] = 1
max_tokens: Optional[int] = 512
stop: Optional[bool] = False
stream: Optional[bool] = False
presence_penalty: Optional[float] = 0.0
frequency_penalty: Optional[float] = 0.0
user: Optional[str] = None
user_id: Optional[str] = None
# additional argument of lmdeploy
repetition_penalty: Optional[float] = 1.0
session_id: Optional[int] = -1
ignore_eos: Optional[bool] = False


class ChatCompletionRequest(BaseModel):
"""Chat completion request."""
model: str
Expand Down Expand Up @@ -141,6 +161,28 @@ class CompletionRequest(BaseModel):
ignore_eos: Optional[bool] = False
top_k: Optional[int] = 40 # for opencompass

class CompletionRequestQos(BaseModel):
"""Completion request."""
model: str
prompt: Union[str, List[Any]]
suffix: Optional[str] = None
temperature: Optional[float] = 0.7
n: Optional[int] = 1
max_tokens: Optional[int] = 16
stop: Optional[Union[str, List[str]]] = None
stream: Optional[bool] = False
top_p: Optional[float] = 1.0
logprobs: Optional[int] = None
echo: Optional[bool] = False
presence_penalty: Optional[float] = 0.0
frequency_penalty: Optional[float] = 0.0
user: Optional[str] = None
# additional argument of lmdeploy
repetition_penalty: Optional[float] = 1.0
session_id: Optional[int] = -1
ignore_eos: Optional[bool] = False
user_id: Optional[str] = None


class CompletionResponseChoice(BaseModel):
"""Completion response choices."""
Expand Down Expand Up @@ -220,6 +262,22 @@ class GenerateRequest(BaseModel):
ignore_eos: bool = False


class GenerateRequestQos(BaseModel):
"""Generate request."""
prompt: Union[str, List[Dict[str, str]]]
session_id: int = -1
interactive_mode: bool = False
stream: bool = False
stop: bool = False
request_output_len: int = 512
top_p: float = 0.8
top_k: int = 40
temperature: float = 0.8
repetition_penalty: float = 1.0
ignore_eos: bool = False
user_id: Optional[str] = None


class GenerateResponse(BaseModel):
"""Generate response."""
text: str
Expand Down
Empty file.
81 changes: 81 additions & 0 deletions lmdeploy/serve/qos_engine/inner_group_schd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import queue
import threading
import logging
logger = logging.getLogger(__name__)

class UserRequestQueue:
"""
Inner group user request queues
"""

def __init__(self, group: str, user_id_map: dict):
self.group = group
self.user_queue_map = dict()
self.user_quota_map = dict()
self.user_id_maps = user_id_map

total_quota = 0
for item in user_id_map:
total_quota += item["quota_pct"]
for item in user_id_map:
user_id = item["id"]
self.user_queue_map[user_id] = queue.Queue()
self.user_quota_map[user_id] = item["quota_pct"] / total_quota

self.lock = threading.Lock()

def enqueue(self, request_event):
"""
Enqueue request to correspoding user queue.
"""
if request_event[0].user_id in self.user_queue_map:
self.user_queue_map[request_event[0].user_id].put(request_event)
else:
self.user_queue_map["default"].put(request_event)

def empty(self):
"""
Whether all user queues are empty.
"""
with self.lock:
for _, user_queue in self.user_queue_map.items():
if not user_queue.empty():
return False
return True

def dequeue(self, usage_stats):
"""
Dequeue the request to serve.
"""
with self.lock:
uid_to_serve = self.user_to_serve(usage_stats)
if uid_to_serve in self.user_queue_map:
return self.user_queue_map[uid_to_serve].get()

return None

def user_to_serve(self, usage_stats):
"""
Inner group scheduling.
Find the user to serve from user request queues.
"""
min_usage = 100
uid_to_serve = ""
for uid, req_queue in self.user_queue_map.items():
if req_queue.empty():
continue

# TODO: include token length
# Calculate current user's actual used share and quota share
user_usage, _, group_usage, _ = usage_stats.get_user_usage(uid, self.group)
actual_share = (user_usage / group_usage) if group_usage > 0 else 0
due_share = self.user_quota_map[uid]

# Serve the user with the relatively least usage share
curr_usage = actual_share / due_share
if curr_usage == 0:
return uid
if curr_usage < min_usage:
uid_to_serve = uid
min_usage = curr_usage
return uid_to_serve
58 changes: 58 additions & 0 deletions lmdeploy/serve/qos_engine/qos_config.json.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
{
"enable_user_qos": 1,
"user_groups": ["Platinum", "Gold", "Silver", "Bronze"],
"user_group_map": {
"Platinum": [
{
"id": "user_id0",
"quota_pct": 100
},
{
"id": "default",
"quota_pct": 0
}
],
"Gold": [
{
"id": "user_id1",
"quota_pct": 50
},
{
"id": "user_id2",
"quota_pct": 50
},
{
"id": "default",
"quota_pct": 0
}
],
"Silver": [
{
"id": "user_id3",
"quota_pct": 5
},
{
"id": "default",
"quota_pct": 95
}
],
"Bronze": [
{
"id": "user_id4",
"quota_pct": 30
},
{
"id": "user_id5",
"quota_pct": 30
},
{
"id": "user_id6",
"quota_pct": 40
},
{
"id": "default",
"quota_pct": 0
}
]
}
}
Loading

0 comments on commit b7d3c55

Please sign in to comment.