From d73ad70517e6b47803ce957763dfc5779a586615 Mon Sep 17 00:00:00 2001 From: lvhan028 Date: Wed, 22 Nov 2023 20:42:12 +0800 Subject: [PATCH 01/17] update profile scripts --- benchmark/profile_generation.py | 149 ++++++++++++++++++++----------- benchmark/profile_restful_api.py | 4 +- benchmark/profile_serving.py | 4 +- benchmark/profile_throughput.py | 8 +- 4 files changed, 106 insertions(+), 59 deletions(-) diff --git a/benchmark/profile_generation.py b/benchmark/profile_generation.py index ff8d062501..bb07ee36aa 100644 --- a/benchmark/profile_generation.py +++ b/benchmark/profile_generation.py @@ -22,33 +22,44 @@ from lmdeploy.turbomind import TurboMind -def infer(model, session_id: int, input_ids: str, output_seqlen: int, +def infer(model, session_id: int, input_ids: List, output_seqlen: int, test_round: int, que: Queue): chatbot = model.create_instance() stats = [] - for i in range(test_round): - start = time.perf_counter() - timestamps = [] - tokens = [] + for _ in range(test_round): + token_latency_stats = [0] * (output_seqlen + 1) + prev = time.perf_counter() + n_pre_token = 0 + """ + The iterator provided by `chatbot.stream_infer` denotes the number of generated tokens so far, + which is represented by the variable `n_token`. + Please note that `n_token` is not a continuous value. In other words, during the iteration, + its value might be 5, 7, 8, 16, and so on, rather than 1, 2, 3, 4, etc. + So, it is quite difficult to get the latency of each generated token. + As a work-around, we set the latency `new-prev` of each iteration to the first token of + the new generated tokens, and leave the latency of the rest tokens being 0. + For example, in the first iteration, 5 tokens are generated. + The time elapsing in this iteration `now-prev` is set to the latency of first token of + the 5 tokens, i.e. `token_latency_stats[0]`, and `token_latency_stats[1:4]` is set 0` + """ # noqa: E501 for outputs in chatbot.stream_infer(session_id, input_ids, request_output_len=output_seqlen, sequence_start=True, sequence_end=True, - ignore_eos=True): - res, token = outputs[0] - timestamps.append(time.perf_counter()) - tokens.append(token) - - # TODO: ignore first token - first_token_latency = np.round(timestamps[0] - start, 2) - if len(timestamps) == 1: - token_latency = np.round(timestamps[0] - start, 2) - token = tokens[0] - else: - token_latency = np.round(timestamps[-1] - timestamps[0], 2) - token = tokens[-1] - tokens[0] - stats.append([first_token_latency, token, token_latency]) + ignore_eos=True, + stream_output=True): + _, n_token = outputs[0] + now = time.perf_counter() + if n_pre_token != n_token: + token_latency_stats[n_pre_token] = np.round(now - prev, 3) + n_pre_token = n_token + prev = now + + assert output_seqlen <= n_token <= output_seqlen + 1, \ + f'Error. session_id({session_id}) request {output_seqlen} ' \ + f'tokens, but generate {n_token} tokens' + stats.append(token_latency_stats[:output_seqlen]) que.put((session_id, stats)) @@ -128,33 +139,49 @@ def profile_throughput(model_path: str, _end = time.perf_counter() elapsed_time = _end - _start - stats = [] + token_latency_stats = [] while not que.empty(): - session_id, _stats = que.get() - print(f'\n{"-" * 50}\n' - f'session {session_id} stats: \n{_stats}\n{"-" * 50}\n') - stats.append(_stats) - - stats = np.array(stats).reshape(-1, 3) - - first_token_latency_min = np.min(stats[:, 0], axis=0) - first_token_latency_max = np.max(stats[:, 0], axis=0) - first_token_latency_ave = np.mean(stats[:, 0], axis=0) - token_latency_min = np.min(stats[:, 2], axis=0) - token_latency_max = np.max(stats[:, 2], axis=0) - token_latency_ave = np.mean(stats[:, 2], axis=0) - throughput = np.sum(stats[:, 1], axis=0) / np.sum(stats[:, 2], - axis=0) * concurrency - print(f'\n{"-" * 50}\nconcurrency: {concurrency}, input_tokens: ' - f'{input_seqlen}, output_tokens: {output_seqlen}\n' - f'elapsed_time: {elapsed_time:.2f}s\n' + _, _stats = que.get() + token_latency_stats += _stats + + # The shape is [concurrency*test_round, output_seqlen] + token_latency_stats = np.stack(token_latency_stats, axis=0) + + first_token_latency_min = np.round( + np.min(token_latency_stats[:, 0], axis=0), 3) + first_token_latency_max = np.round( + np.max(token_latency_stats[:, 0], axis=0), 3) + first_token_latency_ave = np.round( + np.mean(token_latency_stats[:, 0], axis=0), 3) + token_latency_max = np.round(np.max(np.sum(token_latency_stats, axis=1)), + 3) + token_latency_min = np.round(np.min(np.sum(token_latency_stats, axis=1)), + 3) + token_latency_ave = np.round(np.mean(np.sum(token_latency_stats, axis=1)), + 3) + # sort token_latency without the first token's latency + sorted_token_latency = np.sort(token_latency_stats[:, 1:].flatten()) + percentiles = [ + np.round( + sorted_token_latency[int(percent * len(sorted_token_latency))], 3) + for percent in [0.5, 0.75, 0.95, 0.99] + ] + + throughput = np.round(token_latency_stats.size / elapsed_time, 2) + print(f'\n{"-" * 50}\ntotal time: {elapsed_time:.2f}s\n' + f'concurrency: {concurrency}, test_round: {test_round}\n' + f'input_tokens: {input_seqlen}, output_tokens: {output_seqlen}\n' f'first_token latency(min, max, ave): ' - f'{first_token_latency_min:.2f}s, {first_token_latency_max:.2f}s, ' - f'{first_token_latency_ave:.2f}s\ntoken latency(min, max, ave): ' - f'{token_latency_min:.2f}s, {token_latency_max:.2f}s, ' - f'{token_latency_ave:.2f}s\n' - f'throughput: {throughput:.2f} token/s\n{"-" * 50}') - return tm_model.model_name, throughput, tm_model.gpu_count + f'{first_token_latency_min}s, {first_token_latency_max}s, ' + f'{first_token_latency_ave}s\ntotal_token latency(min, max, ave): ' + f'{token_latency_min}s, {token_latency_max}s, ' + f'{token_latency_ave}s\n' + f'token_latency percentiles(50%,75%,95%,99%)(s): {percentiles}\n' + f'throughput: {throughput} token/s\n{"-" * 50}') + return tm_model.model_name, \ + [first_token_latency_min, first_token_latency_max, + first_token_latency_ave], \ + percentiles, throughput, tm_model.gpu_count class MemoryMonitor: @@ -235,6 +262,8 @@ class ProfileResult: batch: int prompt_tokens: int completion_tokens: int + first_token_latency: List + percentiles: List throughput_per_proc: float throughput_per_node: float mem_per_proc: float @@ -258,7 +287,7 @@ def parse_args(): type=int, help='how many requests launched concurrently. One-to-one' 'correspondence with completion-tokens', - default=[64, 512, 512, 1024]) + default=[1, 512, 512, 1024]) parser.add_argument('--completion-tokens', nargs='+', type=int, @@ -266,7 +295,7 @@ def parse_args(): 'correspondence with prompt-tokens', default=[512, 512, 1024, 1024]) parser.add_argument('--tp', type=int, help='Tensor parallel', default=1) - parser.add_argument('--dst-csv', + parser.add_argument('--csv', type=str, help='Where to save the result.', default='profile_generation.csv') @@ -274,6 +303,10 @@ def parse_args(): help='set log level', default='INFO', choices=list(logging._nameToLevel.keys())) + parser.add_argument('--test-round', + type=int, + help='number of test rounds', + default=10) args = parser.parse_args() return args @@ -292,9 +325,11 @@ def main(): concurrency=batch, input_seqlen=prompt_tokens, output_seqlen=completion_tokens, - tp=args.tp) + tp=args.tp, + test_round=args.test_round) output = Pool(1).map(profile_target, (args.model_path, )) - model_name, throughput_per_proc, tp = output[0] + model_name, first_token_latency, percentiles, \ + throughput_per_proc, tp = output[0] time.sleep(5) # wait a while for releasing GPU mem memory = MemoryMonitor.terminate() device_count = MemoryMonitor.device_count.value @@ -303,25 +338,31 @@ def main(): batch=batch, prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, + first_token_latency=first_token_latency, + percentiles=percentiles, throughput_per_proc=throughput_per_proc, throughput_per_node=throughput_per_proc / tp * device_count, mem_per_proc=memory, mem_per_gpu=memory / tp, mem_per_node=memory / tp * device_count)) - with open(args.dst_csv, 'w') as csvfile: + with open(args.csv, 'w') as csvfile: writer = csv.writer(csvfile) writer.writerow([ 'batch', 'prompt_tokens', 'completion_tokens', - 'throughput_per_proc(token/s)', 'throughput_per_node(token/s)', - 'mem_per_proc(GB)', 'mem_per_gpu(GB)', 'mem_per_node(GB)' + '1st_token_latency(min)(s)', '1st_token_latency(max)(s)', + '1st_token_latency(ave)(s)', 'percentile50(s)', 'percentile75(s)', + 'percentile95(s)', 'percentile99(s)', 'throughput(token/s)', + 'mem_per_proc(GB)', 'mem_per_gpu(GB)' ]) for re in results: writer.writerow([ re.batch, re.prompt_tokens, re.completion_tokens, - f'{re.throughput_per_proc:.2f}', - f'{re.throughput_per_node:.2f}', f'{re.mem_per_proc:.2f}', - f'{re.mem_per_gpu:.2f}', f'{re.mem_per_node:.2f}' + re.first_token_latency[0], re.first_token_latency[1], + re.first_token_latency[2], re.percentiles[0], + re.percentiles[1], re.percentiles[2], re.percentiles[3], + f'{re.throughput_per_proc:.2f}', f'{re.mem_per_proc:.2f}', + f'{re.mem_per_gpu:.2f}' ]) diff --git a/benchmark/profile_restful_api.py b/benchmark/profile_restful_api.py index 394c7ec1b9..a32ec34dc1 100644 --- a/benchmark/profile_restful_api.py +++ b/benchmark/profile_restful_api.py @@ -127,7 +127,9 @@ def main(server_addr: str, concurrency: int = 1, session_len: int = 2048, samples: int = 1000, - stream_output: bool = False): + stream_output: bool = False, + seed: int = 0): + random.seed(seed) api_url = server_addr + '/v1/chat/interactive' warmup(api_url, concurrency, session_len - 1, 4, stream_output) req_queue, n_req = read_dataset(tokenizer_path, dataset_path, samples, diff --git a/benchmark/profile_serving.py b/benchmark/profile_serving.py index ee23452d8a..73215d5b21 100644 --- a/benchmark/profile_serving.py +++ b/benchmark/profile_serving.py @@ -123,7 +123,9 @@ def main(tritonserver_addr: str, dataset_path: str, concurrency: int = 1, session_len: int = 2048, - samples: int = 1000): + samples: int = 1000, + seed: int = 0): + random.seed(seed) warmup(tritonserver_addr, concurrency, session_len - 1) req_que = mp.Queue() res_que = mp.Queue() diff --git a/benchmark/profile_throughput.py b/benchmark/profile_throughput.py index 77402b5592..75bee336f4 100644 --- a/benchmark/profile_throughput.py +++ b/benchmark/profile_throughput.py @@ -179,14 +179,16 @@ def process_request(self, def main(dataset: str, model_path: str, concurrency: int = 1, - num_prompts: int = 1000, + samples: int = 1000, tp: int = 1, - stream_output: bool = True): + stream_output: bool = True, + seed: int = 0): + random.seed(seed) engine = Engine(model_path, tp=tp) tokenizer = engine.tokenizer - requests = sample_requests(dataset, num_prompts, tokenizer) + requests = sample_requests(dataset, samples, tokenizer) engine.process_request(requests, concurrency, stream_output) From 20aecd8f3c57126575b5e983ad0af79038e81b6f Mon Sep 17 00:00:00 2001 From: lvhan028 Date: Fri, 24 Nov 2023 12:37:45 +0800 Subject: [PATCH 02/17] add top_p, top_k and temperature as input arguments --- benchmark/profile_generation.py | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/benchmark/profile_generation.py b/benchmark/profile_generation.py index bb07ee36aa..a8f818d351 100644 --- a/benchmark/profile_generation.py +++ b/benchmark/profile_generation.py @@ -104,10 +104,11 @@ def profile_throughput(model_path: str, input_seqlen: int = 1, output_seqlen: int = 512, test_round: int = 10, - tp: int = 1): + tp: int = 1, + **kwargs): tokenizer_model_path = osp.join(model_path, 'triton_models', 'tokenizer') tokenizer = Tokenizer(tokenizer_model_path) - tm_model = TurboMind(model_path=model_path, tp=tp) + tm_model = TurboMind(model_path=model_path, tp=tp, **kwargs) # make up a prompt that can be tokenized into {input_seqlen} tokens assert input_seqlen > 0, 'input_seqlen should > 0' @@ -280,21 +281,37 @@ def parse_args(): nargs='+', type=int, help='how many requests launched concurrently', - default=[1, 8, 16, 32]) + default=[1, 32, 64, 128]) parser.add_argument( '--prompt-tokens', nargs='+', type=int, help='how many requests launched concurrently. One-to-one' 'correspondence with completion-tokens', - default=[1, 512, 512, 1024]) + default=[1, 128, 128, 2048, 2048]) parser.add_argument('--completion-tokens', nargs='+', type=int, help='how many tokens to be generated. One-to-one' 'correspondence with prompt-tokens', - default=[512, 512, 1024, 1024]) + default=[128, 2048, 128, 2048]) parser.add_argument('--tp', type=int, help='Tensor parallel', default=1) + parser.add_argument('--top_k', + type=int, + help='The number of highest probability vocabulary ' + 'tokens to keep for top-k-filtering', + default=1) + parser.add_argument('--top_p', + type=float, + help='the set of most probable tokens with ' + 'probabilities that add up to top_p or higher ' + 'are kept for generation', + default=1.0) + parser.add_argument('--temperature', + type=float, + help='The value used to modulate the next token ' + 'probabilities', + default=0.8) parser.add_argument('--csv', type=str, help='Where to save the result.', @@ -326,6 +343,9 @@ def main(): input_seqlen=prompt_tokens, output_seqlen=completion_tokens, tp=args.tp, + top_k=args.top_k, + top_p=args.top_p, + temperature=args.temperature, test_round=args.test_round) output = Pool(1).map(profile_target, (args.model_path, )) model_name, first_token_latency, percentiles, \ From 171c3019b809fabadcaf15dc88ee9ef6f6a36859 Mon Sep 17 00:00:00 2001 From: lvhan028 Date: Fri, 24 Nov 2023 13:30:11 +0800 Subject: [PATCH 03/17] fix input_ids --- benchmark/profile_generation.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/benchmark/profile_generation.py b/benchmark/profile_generation.py index a8f818d351..4ad848525f 100644 --- a/benchmark/profile_generation.py +++ b/benchmark/profile_generation.py @@ -113,8 +113,10 @@ def profile_throughput(model_path: str, # make up a prompt that can be tokenized into {input_seqlen} tokens assert input_seqlen > 0, 'input_seqlen should > 0' prompt = 'hi' - input_ids = tokenizer.encode(prompt) + input_ids = tokenizer.encode(prompt, add_bos=False) input_ids = input_ids * input_seqlen + assert len(input_ids) == input_seqlen, \ + '#input_token {input_seqlen} but #dummy_input_token {len(input_ids)}' warmup(tm_model, concurrency, input_ids, output_seqlen) @@ -294,7 +296,7 @@ def parse_args(): type=int, help='how many tokens to be generated. One-to-one' 'correspondence with prompt-tokens', - default=[128, 2048, 128, 2048]) + default=[128, 128, 2048, 128, 2048]) parser.add_argument('--tp', type=int, help='Tensor parallel', default=1) parser.add_argument('--top_k', type=int, From b00fac25220c544203e6cf35175d7b68a2c1fa7a Mon Sep 17 00:00:00 2001 From: lvhan028 Date: Fri, 24 Nov 2023 18:19:19 +0800 Subject: [PATCH 04/17] update profile_throughput --- benchmark/profile_throughput.py | 65 ++++++++++++++++++++++++--------- 1 file changed, 48 insertions(+), 17 deletions(-) diff --git a/benchmark/profile_throughput.py b/benchmark/profile_throughput.py index 75bee336f4..1ea4f98380 100644 --- a/benchmark/profile_throughput.py +++ b/benchmark/profile_throughput.py @@ -1,5 +1,5 @@ import json -import os.path as osp +import os import random import time from queue import Queue @@ -56,25 +56,27 @@ def sample_requests( class Engine: - def __init__(self, model_path: str, tp: int = 1): - tokenizer_model_path = osp.join(model_path, 'triton_models', - 'tokenizer') - tokenizer = Tokenizer(tokenizer_model_path) - tm_model = TurboMind(model_path=model_path, tp=tp) + def __init__(self, model_path: str, tp: int = 1, **kwargs): + # avoid turbomind checking chat template name by setting + # `model_name='llama'` + tm_model = TurboMind(model_path=model_path, + model_name='llama', + tp=tp, + **kwargs) self.tm_model = tm_model - self.tokenizer = tokenizer + self.tokenizer = tm_model.tokenizer def _inference(self, req_queue: Queue, res_queue: Queue, session_id: int, stream_output: bool): model_inst = self.tm_model.create_instance() stats = [] - timestamps = [] - tokens = [] - timestamps.append(time.perf_counter()) for prompt, input_seqlen, output_seqlen in iter( req_queue.get, [None, None, None]): - input_ids = self.tokenizer.encode(prompt) + input_ids = self.tokenizer(prompt).input_ids offset = 0 + timestamps = [] + tokens = [] + timestamps.append(time.perf_counter()) for outputs in model_inst.stream_infer( session_id, input_ids=input_ids, @@ -93,6 +95,7 @@ def _inference(self, req_queue: Queue, res_queue: Queue, session_id: int, first_token_latency = np.round(timestamps[1] - timestamps[0], 3) token_latency = np.round(timestamps[-1] - timestamps[0], 3) completion_tokens = tokens[-1] + assert output_seqlen <= completion_tokens <= output_seqlen + 1 total_tokens = tokens[-1] + len(input_ids) stats.append([ first_token_latency, completion_tokens, output_seqlen, @@ -136,8 +139,8 @@ def process_request(self, stats = [] while not res_queue.empty(): session_id, _stats = res_queue.get() - print(f'\n{"-" * 50}\n' - f'session {session_id} stats: \n{_stats}\n{"-" * 50}\n') + # print(f'\n{"-" * 50}\n' + # f'session {session_id} stats: \n{_stats}\n{"-" * 50}\n') stats.append(np.array(_stats)) stats = np.concatenate(stats).reshape(-1, 5) @@ -179,16 +182,44 @@ def process_request(self, def main(dataset: str, model_path: str, concurrency: int = 1, - samples: int = 1000, + num_prompts: int = 1000, tp: int = 1, + top_k: int = 1, + top_p: float = 1.0, + temperature: float = 0.8, stream_output: bool = True, + log_level: str = 'ERROR', seed: int = 0): + """Benchmark the request throughput of lmdeploy in localhost. + + Args: + dataset (str): Path to the dataset + model_path (str): Path to a model in localhost or a model_repo_id in huggingface.co + concurrency (int, optional): Number of working threads to process the sampled prompts. + Defaults to 1. + num_prompts (int, optional): Number of prompts to process. Defaults to 1000. + tp (int, optional): Number of GPUs for tensor parallel. Defaults to 1. + top_k (int, optional): The number of highest probability vocabulary tokens + to keep for top-k-filtering. Defaults to 1. + top_p (float, optional): the set of most probable tokens with + probabilities that add up to top_p or higher + are kept for generation. Defaults to 1.0. + temperature (float, optional): The value used to modulate the next token probabilities. + Defaults to 0.8. + stream_output (bool, optional): Indicator for streaming output. Defaults to True. + log_level(str, optional): The log level. Defaults to INFO + seed (int, optional): Seed used in sampling prompts from dataset. Defaults to 0. + """ # noqa random.seed(seed) + os.environ['TM_LOG_LEVEL'] = log_level - engine = Engine(model_path, tp=tp) - tokenizer = engine.tokenizer + engine = Engine(model_path, + tp=tp, + top_k=top_k, + top_p=top_p, + temperature=temperature) - requests = sample_requests(dataset, samples, tokenizer) + requests = sample_requests(dataset, num_prompts, engine.tokenizer) engine.process_request(requests, concurrency, stream_output) From d53ce93064771bd9d0559cad84e47c42e3340edd Mon Sep 17 00:00:00 2001 From: lvhan028 Date: Fri, 24 Nov 2023 19:09:11 +0800 Subject: [PATCH 05/17] update profile_restful_api --- benchmark/profile_restful_api.py | 373 +++++++++++++++++-------------- 1 file changed, 205 insertions(+), 168 deletions(-) diff --git a/benchmark/profile_restful_api.py b/benchmark/profile_restful_api.py index a32ec34dc1..a8299fa4d4 100644 --- a/benchmark/profile_restful_api.py +++ b/benchmark/profile_restful_api.py @@ -1,8 +1,10 @@ import json +import os import random import time from queue import Queue from threading import Thread +from typing import List, Tuple import fire import numpy as np @@ -11,185 +13,220 @@ from lmdeploy.tokenizer import Tokenizer -def infer(server_addr: str, session_id: int, req_queue: Queue, res_que: Queue, - stream_output: bool): - stats = [] - for prompt, input_seqlen, output_seqlen in iter(req_queue.get, - [None, None, None]): - if prompt is None: - break - timestamps = [] - tokens = [] - timestamps.append(time.perf_counter()) - for res, token, status in get_streaming_response( - prompt, - server_addr, - session_id, - request_output_len=output_seqlen, - interactive_mode=False, - ignore_eos=True, - stream=stream_output): - timestamps.append(time.perf_counter()) - tokens.append(token) - - first_token_latency = np.round(timestamps[1] - timestamps[0], 3) - token_latency = np.round(timestamps[-1] - timestamps[0], 3) - completion_tokens = tokens[-1] - total_tokens = tokens[-1] + input_seqlen - stats.append([ - first_token_latency, completion_tokens, output_seqlen, - total_tokens, token_latency - ]) - print(f'session {session_id}: ' - f'input_seqlen {input_seqlen}, output_seqlen {output_seqlen}, ' - f'completion_tokens {completion_tokens}') - res_que.put((session_id, stats)) - - -def warmup(server_addr: str, - concurrency: int, - output_seqlen: int, - warmup_round: int = 1, - stream_output: bool = False): - print('start to warmup ...') - - def _infer(server_addr, session_id): - for _ in range(warmup_round): - for _ in get_streaming_response('', - server_addr, - session_id, - request_output_len=output_seqlen, - interactive_mode=False, - stream=stream_output, - ignore_eos=True): - continue - - _start = time.perf_counter() - procs = [] - for i in range(concurrency): - proc = Thread(target=_infer, args=(server_addr, i + 1)) - procs.append(proc) - proc.start() - for proc in procs: - proc.join() - _end = time.perf_counter() - print(f'end warmup, elapsed time: {round(_end - _start, 2)} s') - - -def read_dataset(tokenizer_path: str, dataset_path: str, samples: int, - session_len: int): - start = time.perf_counter() +def sample_requests( + dataset_path: str, + num_requests: int, + tokenizer: Tokenizer, +) -> List[Tuple[str, int, int]]: + # Load the dataset. with open(dataset_path) as f: dataset = json.load(f) - dataset = [data for data in dataset if len(data['conversations']) >= 2] - # Only keep the first two turns of each conversation. - dataset = [(data['conversations'][0]['value'], - data['conversations'][1]['value']) for data in dataset] - prompts = [prompt for prompt, _ in dataset] - completions = [completion for _, completion in dataset] - print(f'elapsed time for read data: ' - f'{round(time.perf_counter() - start, 2)} s') - - print('start tokenization. This takes a while, please wait...') - start = time.perf_counter() - tokenizer = Tokenizer(tokenizer_path) - prompts_token_lens = [len(tokenizer.encode(prompt)) for prompt in prompts] - completions_token_lens = [ - len(tokenizer.encode(prompt)) for prompt in completions - ] - print(f'elapsed time for tokenization: ' - f'{round(time.perf_counter() - start, 2)} s') - - start = time.perf_counter() - filtered_dataset = [] - for (prompt, _), input_len, output_len in zip(dataset, prompts_token_lens, - completions_token_lens): - if input_len + output_len > session_len: - # ignore too long conversation + # Filter out the conversations with less than 2 turns. + dataset = [data for data in dataset if len(data['conversations']) >= 2] + # Only keep the first two turns of each conversation. + dataset = [(data['conversations'][0]['value'], + data['conversations'][1]['value']) for data in dataset] + + # Tokenize the prompts and completions. + prompts = [prompt for prompt, _ in dataset] + prompt_token_ids = tokenizer(prompts).input_ids + completions = [completion for _, completion in dataset] + completion_token_ids = tokenizer(completions).input_ids + tokenized_dataset = [] + for i in range(len(dataset)): + output_len = len(completion_token_ids[i]) + tokenized_dataset.append((prompts[i], prompt_token_ids[i], output_len)) + + # Filter out too long sequences. + filtered_dataset: List[Tuple[str, int, int]] = [] + for prompt, prompt_token_ids, output_len in tokenized_dataset: + prompt_len = len(prompt_token_ids) + if prompt_len < 4 or output_len < 4: + # Prune too short sequences. continue - filtered_dataset.append([prompt, input_len, output_len]) - - if samples > 0: - filtered_dataset = random.sample(filtered_dataset, samples) - - que = Queue() - for data in filtered_dataset: - que.put(data) - que.put((None, None, None)) - print(f'elapsed time for filtering: ' - f'{round(time.perf_counter() - start, 2)} s') - return que, len(filtered_dataset) + if prompt_len > 1024 or prompt_len + output_len > 2048: + # Prune too long sequences. + continue + filtered_dataset.append((prompt, prompt_len, output_len)) + + # Sample the requests. + sampled_requests = random.sample(filtered_dataset, num_requests) + return sampled_requests + + +class Engine: + + def __init__(self, + server_addr: str, + tokenzier_path: str, + temperature: float = 0.8, + top_k: int = 1, + top_p: float = 1.0): + self.tokenizer = Tokenizer(tokenzier_path) + # We choose `v1/compeletions` API to profile the performance since + # it won't decorate prompt according to the served model's + # chat template + self.api_url = server_addr + '/v1/completions' + self.temperature = temperature + self.top_k = top_k + self.top_p = top_p + + def _inference(self, req_queue: Queue, res_queue: Queue, session_id: int, + stream_output: bool): + + stats = [] + for prompt, input_seqlen, output_seqlen in iter( + req_queue.get, [None, None, None]): + timestamps = [] + tokens = [] + timestamps.append(time.perf_counter()) + for _, n_token, _ in get_streaming_response( + prompt, + self.api_url, + session_id, + request_output_len=output_seqlen, + ignore_eos=True, + stream=stream_output): + timestamps.append(time.perf_counter()) + tokens.append(n_token) + first_token_latency = np.round(timestamps[1] - timestamps[0], 3) + token_latency = np.round(timestamps[-1] - timestamps[0], 3) + completion_tokens = tokens[-1] + assert output_seqlen <= completion_tokens <= output_seqlen + 1, \ + f'Error. session_id({session_id}) request {output_seqlen} ' \ + f'tokens, but generate {completion_tokens} tokens.\n' \ + f'prompt: {prompt}' + total_tokens = tokens[-1] + input_seqlen + stats.append([ + first_token_latency, completion_tokens, output_seqlen, + total_tokens, token_latency + ]) + print( + f'session {session_id}: ' + f'input_seqlen {input_seqlen}, output_seqlen {output_seqlen}, ' + f'completion_tokens {completion_tokens}') + res_queue.put((session_id, stats)) + + def process_request(self, + requests, + concurrency: int = 1, + stream_output: bool = False): + res_queue = Queue() + req_queue = Queue() + threads = [] + + # feed request to q + for req in requests: + req_queue.put(req) + for i in range(concurrency): + req_queue.put([None, None, None]) + + start = time.time() + + # start threads + for i in range(concurrency): + t = Thread(target=self._inference, + args=(req_queue, res_queue, i, stream_output)) + t.start() + threads.append(t) + + # wait for finish + for t in threads: + t.join() + + elapsed_time = time.time() - start + + stats = [] + while not res_queue.empty(): + session_id, _stats = res_queue.get() + # print(f'\n{"-" * 50}\n' + # f'session {session_id} stats: \n{_stats}\n{"-" * 50}\n') + stats.append(np.array(_stats)) + + stats = np.concatenate(stats).reshape(-1, 5) + + first_token_latency_min = np.min(stats[:, 0], axis=0) + first_token_latency_max = np.max(stats[:, 0], axis=0) + first_token_latency_ave = np.mean(stats[:, 0], axis=0) + completion_tokens = np.sum(stats[:, 1], axis=0) + request_output_tokens = np.sum(stats[:, 2], axis=0) + total_tokens = np.sum(stats[:, 3], axis=0) + prompt_tokens = total_tokens - completion_tokens + completion_token_throughput = completion_tokens / elapsed_time + total_token_throughput = total_tokens / elapsed_time + rqs = len(requests) / elapsed_time + rqm = rqs * 60 + + if (np.abs(stats[:, 1] - stats[:, 2]) <= 1).min() is False: + print(f'Did not generate requested number of tokens. ' + f'Request {request_output_tokens:.0f}, ' + f'but got {completion_tokens:.0f}') + + print(f'\n{"-" * 50}\nconcurrency: {concurrency}\n' + f'elapsed_time: {elapsed_time:.3f}s\n') + if stream_output: + print(f'first_token latency(min, max, ave): ' + f'{first_token_latency_min:.3f}s, ' + f'{first_token_latency_max:.3f}s, ' + f'{first_token_latency_ave:.3f}s\n') + print( + f'number of prompt tokens: {prompt_tokens:.0f}\n' + f'number of completion tokens: {completion_tokens:.0f}\n' + f'token throughput (completion token): {completion_token_throughput:.3f} token/s\n' # noqa + f'token throughput (prompt + completion token): {total_token_throughput:.3f} token/s\n' # noqa + f'RPS (request per second): {rqs:.3f} req/s\n' + f'RPM (request per minute): {rqm:.3f} req/min\n' + f'{"-" * 50}\n') def main(server_addr: str, tokenizer_path: str, - dataset_path: str, + dataset: str, concurrency: int = 1, - session_len: int = 2048, - samples: int = 1000, + num_prompts: int = 1000, + top_k: int = 1, + top_p: float = 1.0, + temperature: float = 0.8, stream_output: bool = False, + log_level: str = 'INFO', seed: int = 0): + """Benchmark the request througput of api server. + + Args: + server_addr (str): Address of the triton inference server with format http://0.0.0.0:0 + tokenizer_path (str): Path to the tokenizer model in localhost + dataset (str): Path to the dataset + concurrency (int, optional): Number of working threads to process the sampled prompts. + Defaults to 1. + num_prompts (int, optional): Number of prompts to process. Defaults to 1000. + top_k (int, optional): The number of highest probability vocabulary tokens + to keep for top-k-filtering. Defaults to 1. + top_p (float, optional): the set of most probable tokens with + probabilities that add up to top_p or higher + are kept for generation. Defaults to 1.0. + temperature (float, optional): The value used to modulate the next token probabilities. + Defaults to 0.8. + stream_output (bool, optional): Indicator for streaming output. Defaults to True. + log_level(str, optional): The log level. Defaults to INFO + seed (int, optional): Seed used in sampling prompts from dataset. Defaults to 0. + """ # noqa + if not server_addr.startswith('http://'): + print(f'[WARNING] server_addr of the api_server should ' + f'start with http://, but got {server_addr}') + server_addr = 'http://' + server_addr + random.seed(seed) - api_url = server_addr + '/v1/chat/interactive' - warmup(api_url, concurrency, session_len - 1, 4, stream_output) - req_queue, n_req = read_dataset(tokenizer_path, dataset_path, samples, - session_len) - for i in range(concurrency): - req_queue.put([None, None, None]) - res_que = Queue() - procs = [] - _start = time.perf_counter() - for i in range(concurrency): - proc = Thread(target=infer, - args=(api_url, i + 1, req_queue, res_que, stream_output)) - procs.append(proc) - proc.start() - for proc in procs: - proc.join() - _end = time.perf_counter() - elapsed_time = _end - _start - - stats = [] - while not res_que.empty(): - session_id, _stats = res_que.get() - print(f'\n{"-" * 50}\n' - f'session {session_id} stats: \n{_stats}\n{"-" * 50}\n') - stats.append(np.array(_stats)) - - stats = np.concatenate(stats).reshape(-1, 5) - - first_token_latency_min = np.min(stats[:, 0], axis=0) - first_token_latency_max = np.max(stats[:, 0], axis=0) - first_token_latency_ave = np.mean(stats[:, 0], axis=0) - completion_tokens = np.sum(stats[:, 1], axis=0) - request_output_tokens = np.sum(stats[:, 2], axis=0) - total_tokens = np.sum(stats[:, 3], axis=0) - prompt_tokens = total_tokens - completion_tokens - completion_token_throughput = completion_tokens / elapsed_time - total_token_throughput = total_tokens / elapsed_time - rqs = n_req / elapsed_time - rqm = rqs * 60 - - if (np.abs(stats[:, 1] - stats[:, 2]) <= 1).min() is False: - print(f'Did not generate requested number of tokens. ' - f'Request {request_output_tokens:.0f}, ' - f'but got {completion_tokens:.0f}') - - print(f'\n{"-" * 50}\nconcurrency: {concurrency}\n' - f'elapsed_time: {elapsed_time:.3f}s\n') - if stream_output: - print(f'first_token latency(min, max, ave): ' - f'{first_token_latency_min:.3f}s, ' - f'{first_token_latency_max:.3f}s, ' - f'{first_token_latency_ave:.3f}s\n') - print( - f'number of prompt tokens: {prompt_tokens:.0f}\n' - f'number of completion tokens: {completion_tokens:.0f}\n' - f'token throughput (completion token): {completion_token_throughput:.3f} token/s\n' # noqa - f'token throughput (prompt + completion token): {total_token_throughput:.3f} token/s\n' # noqa - f'RPS (request per second): {rqs:.3f} req/s\n' - f'RPM (request per minute): {rqm:.3f} req/min\n' - f'{"-" * 50}\n') + os.environ['TM_LOG_LEVEL'] = log_level + + engine = Engine(server_addr, + tokenizer_path, + top_k=top_k, + top_p=top_p, + temperature=temperature) + + requests = sample_requests(dataset, num_prompts, engine.tokenizer) + + engine.process_request(requests, concurrency, stream_output) if __name__ == '__main__': From 2fdaa9acf02c6ac15945e802f54a31ce2dd65fa2 Mon Sep 17 00:00:00 2001 From: lvhan028 Date: Sun, 26 Nov 2023 17:24:12 +0800 Subject: [PATCH 06/17] update profile_serving --- benchmark/profile_serving.py | 381 +++++++++++++++------------- lmdeploy/serve/turbomind/chatbot.py | 6 +- 2 files changed, 211 insertions(+), 176 deletions(-) diff --git a/benchmark/profile_serving.py b/benchmark/profile_serving.py index 73215d5b21..c6bf28ab1e 100644 --- a/benchmark/profile_serving.py +++ b/benchmark/profile_serving.py @@ -1,8 +1,9 @@ import json -import logging -import multiprocessing as mp import random import time +from queue import Queue +from threading import Thread +from typing import List, Tuple import fire import numpy as np @@ -11,190 +12,220 @@ from lmdeploy.tokenizer import Tokenizer -def infer(chatbot, session_id: int, req_que: mp.Queue, res_que: mp.Queue): - stats = [] - for prompt, input_seqlen, output_seqlen in iter(req_que.get, - [None, None, None]): - timestamps = [] - tokens = [] - timestamps.append(time.perf_counter()) - for status, res, token in chatbot.stream_infer( - session_id, - prompt, - request_output_len=output_seqlen, - sequence_start=True, - sequence_end=True): - timestamps.append(time.perf_counter()) - tokens.append(token) - first_token_latency = np.round(timestamps[1] - timestamps[0], 3) - token_latency = np.round(timestamps[-1] - timestamps[0], 3) - completion_tokens = tokens[-1] - total_tokens = tokens[-1] + input_seqlen - stats.append([ - first_token_latency, completion_tokens, output_seqlen, - total_tokens, token_latency - ]) - print(f'session {session_id}: ' - f'input_seqlen {input_seqlen}, output_seqlen {output_seqlen}, ' - f'completion_tokens {completion_tokens}') - res_que.put((session_id, stats)) - - -def warmup(tritonserver_addr: str, - concurrency: int, - output_seqlen: int, - warmup_round: int = 1): - print('start to warmup ...') - - def _infer(_chatbot, session_id): - for _ in range(warmup_round): - for _, _, _ in _chatbot.stream_infer( - session_id, - prompt='', - request_output_len=output_seqlen, - sequence_start=True, - sequence_end=True): - continue - _chatbot.reset_session() - - _start = time.perf_counter() - chatbots = [ - Chatbot(tritonserver_addr=tritonserver_addr, - ignore_eos=True, - log_level=logging.ERROR, - profile_generation=True) for _ in range(concurrency) - ] - procs = [] - for i, chatbot in enumerate(chatbots): - proc = mp.Process(target=_infer, args=(chatbot, i + 1)) - procs.append(proc) - proc.start() - for proc in procs: - proc.join() - _end = time.perf_counter() - print(f'end warmup, elapsed time: {round(_end - _start, 2)} s') - - -def read_dataset(tokenizer_path: str, dataset_path: str, samples: int, - session_len: int, que: mp.Queue): - start = time.perf_counter() +def sample_requests( + dataset_path: str, + num_requests: int, + tokenizer: Tokenizer, +) -> List[Tuple[str, int, int]]: + # Load the dataset. with open(dataset_path) as f: dataset = json.load(f) - dataset = [data for data in dataset if len(data['conversations']) >= 2] - # Only keep the first two turns of each conversation. - dataset = [(data['conversations'][0]['value'], - data['conversations'][1]['value']) for data in dataset] - prompts = [prompt for prompt, _ in dataset] - completions = [completion for _, completion in dataset] - print(f'elapsed time for read data: ' - f'{round(time.perf_counter() - start, 2)} s') - print('start tokenization. This takes a while, please wait...') - - start = time.perf_counter() - tokenizer = Tokenizer(tokenizer_path) - prompts_token_lens = [len(tokenizer.encode(prompt)) for prompt in prompts] - completions_token_lens = [ - len(tokenizer.encode(prompt)) for prompt in completions - ] - print(f'elapsed time for tokenization: ' - f'{round(time.perf_counter() - start, 2)} s') - - start = time.perf_counter() - filtered_dataset = [] - for (prompt, _), input_len, output_len in zip(dataset, prompts_token_lens, - completions_token_lens): - if input_len + output_len > session_len: - # ignore too long conversation + # Filter out the conversations with less than 2 turns. + dataset = [data for data in dataset if len(data['conversations']) >= 2] + # Only keep the first two turns of each conversation. + dataset = [(data['conversations'][0]['value'], + data['conversations'][1]['value']) for data in dataset] + + # Tokenize the prompts and completions. + prompts = [prompt for prompt, _ in dataset] + prompt_token_ids = tokenizer(prompts).input_ids + completions = [completion for _, completion in dataset] + completion_token_ids = tokenizer(completions).input_ids + tokenized_dataset = [] + for i in range(len(dataset)): + output_len = len(completion_token_ids[i]) + tokenized_dataset.append((prompts[i], prompt_token_ids[i], output_len)) + + # Filter out too long sequences. + filtered_dataset: List[Tuple[str, int, int]] = [] + for prompt, prompt_token_ids, output_len in tokenized_dataset: + prompt_len = len(prompt_token_ids) + if prompt_len < 4 or output_len < 4: + # Prune too short sequences. + continue + if prompt_len > 1024 or prompt_len + output_len > 2048: + # Prune too long sequences. continue - filtered_dataset.append([prompt, input_len, output_len]) + filtered_dataset.append((prompt, prompt_len, output_len)) - if samples > 0: - filtered_dataset = random.sample(filtered_dataset, samples) + # Sample the requests. + sampled_requests = random.sample(filtered_dataset, num_requests) + return sampled_requests - for data in filtered_dataset: - que.put(data) - print(f'elapsed time for filtering: ' - f'{round(time.perf_counter() - start, 2)} s') - return len(filtered_dataset) +class Engine: -def main(tritonserver_addr: str, + def __init__(self, + server_addr: str, + tokenzier_path: str, + temperature: float = 0.8, + top_k: int = 1, + top_p: float = 1.0, + log_level: str = 'ERROR'): + self.server_addr = server_addr + self.tokenizer = Tokenizer(tokenzier_path) + self.temperature = temperature + self.top_k = top_k + self.top_p = top_p + self.log_level = log_level + + def _inference(self, req_queue: Queue, res_queue: Queue, session_id: int, + stream_output: bool): + + chatbot = Chatbot(self.server_addr, + ignore_eos=True, + profile_serving=True, + top_k=self.top_k, + top_p=self.top_p, + temperature=self.temperature, + log_level=self.log_level) + stats = [] + for prompt, input_seqlen, output_seqlen in iter( + req_queue.get, [None, None, None]): + timestamps = [] + tokens = [] + timestamps.append(time.perf_counter()) + for _, _, n_token in chatbot.stream_infer( + session_id, + prompt, + request_output_len=output_seqlen, + sequence_start=True, + sequence_end=True): + timestamps.append(time.perf_counter()) + tokens.append(n_token) + first_token_latency = np.round(timestamps[1] - timestamps[0], 3) + token_latency = np.round(timestamps[-1] - timestamps[0], 3) + completion_tokens = tokens[-1] + assert output_seqlen <= completion_tokens <= output_seqlen + 1, \ + f'Error. session_id({session_id}) request {output_seqlen} ' \ + f'tokens, but generate {completion_tokens} tokens.\n' \ + f'prompt: {prompt}' + total_tokens = tokens[-1] + input_seqlen + stats.append([ + first_token_latency, completion_tokens, output_seqlen, + total_tokens, token_latency + ]) + print( + f'session {session_id}: ' + f'input_seqlen {input_seqlen}, output_seqlen {output_seqlen}, ' + f'completion_tokens {completion_tokens}') + res_queue.put((session_id, stats)) + + def process_request(self, + requests, + concurrency: int = 1, + stream_output: bool = False): + res_queue = Queue() + req_queue = Queue() + threads = [] + + # feed request to q + for req in requests: + req_queue.put(req) + for i in range(concurrency): + req_queue.put([None, None, None]) + + start = time.time() + + # start threads + for i in range(concurrency): + t = Thread(target=self._inference, + args=(req_queue, res_queue, i, stream_output)) + t.start() + threads.append(t) + + # wait for finish + for t in threads: + t.join() + + elapsed_time = time.time() - start + + stats = [] + while not res_queue.empty(): + session_id, _stats = res_queue.get() + # print(f'\n{"-" * 50}\n' + # f'session {session_id} stats: \n{_stats}\n{"-" * 50}\n') + stats.append(np.array(_stats)) + + stats = np.concatenate(stats).reshape(-1, 5) + + first_token_latency_min = np.min(stats[:, 0], axis=0) + first_token_latency_max = np.max(stats[:, 0], axis=0) + first_token_latency_ave = np.mean(stats[:, 0], axis=0) + completion_tokens = np.sum(stats[:, 1], axis=0) + request_output_tokens = np.sum(stats[:, 2], axis=0) + total_tokens = np.sum(stats[:, 3], axis=0) + prompt_tokens = total_tokens - completion_tokens + completion_token_throughput = completion_tokens / elapsed_time + total_token_throughput = total_tokens / elapsed_time + rqs = len(requests) / elapsed_time + rqm = rqs * 60 + + if (np.abs(stats[:, 1] - stats[:, 2]) <= 1).min() is False: + print(f'Did not generate requested number of tokens. ' + f'Request {request_output_tokens:.0f}, ' + f'but got {completion_tokens:.0f}') + + print(f'\n{"-" * 50}\nconcurrency: {concurrency}\n' + f'elapsed_time: {elapsed_time:.3f}s\n') + if stream_output: + print(f'first_token latency(min, max, ave): ' + f'{first_token_latency_min:.3f}s, ' + f'{first_token_latency_max:.3f}s, ' + f'{first_token_latency_ave:.3f}s\n') + print( + f'number of prompt tokens: {prompt_tokens:.0f}\n' + f'number of completion tokens: {completion_tokens:.0f}\n' + f'token throughput (completion token): {completion_token_throughput:.3f} token/s\n' # noqa + f'token throughput (prompt + completion token): {total_token_throughput:.3f} token/s\n' # noqa + f'RPS (request per second): {rqs:.3f} req/s\n' + f'RPM (request per minute): {rqm:.3f} req/min\n' + f'{"-" * 50}\n') + + +def main(server_addr: str, tokenizer_path: str, - dataset_path: str, + dataset: str, concurrency: int = 1, - session_len: int = 2048, - samples: int = 1000, + num_prompts: int = 1000, + top_k: int = 1, + top_p: float = 1.0, + temperature: float = 0.8, + stream_output: bool = False, + log_level: str = 'ERROR', seed: int = 0): + """Benchmark the request througput of the triton inference server. + + Args: + server_addr (str): Address of the triton inference server with format 0.0.0.0:0 + tokenizer_path (str): Path to the tokenizer model in localhost + dataset (str): Path to the dataset + concurrency (int, optional): Number of working threads to process the sampled prompts. + Defaults to 1. + num_prompts (int, optional): Number of prompts to process. Defaults to 1000. + top_k (int, optional): The number of highest probability vocabulary tokens + to keep for top-k-filtering. Defaults to 1. + top_p (float, optional): the set of most probable tokens with + probabilities that add up to top_p or higher + are kept for generation. Defaults to 1.0. + temperature (float, optional): The value used to modulate the next token probabilities. + Defaults to 0.8. + stream_output (bool, optional): Indicator for streaming output. Defaults to True. + seed (int, optional): Seed used in sampling prompts from dataset. Defaults to 0. + """ # noqa + random.seed(seed) - warmup(tritonserver_addr, concurrency, session_len - 1) - req_que = mp.Queue() - res_que = mp.Queue() - - procs = [] - for i in range(concurrency): - chatbot = Chatbot(tritonserver_addr=tritonserver_addr, - display=False, - profile_serving=True, - ignore_eos=True, - log_level=logging.ERROR) - proc = mp.Process(target=infer, - args=(chatbot, i + 1, req_que, res_que)) - procs.append(proc) - - # read data and put it to queue - n_req = read_dataset(tokenizer_path, dataset_path, samples, session_len, - req_que) - for i in range(concurrency): - req_que.put([None, None, None]) - _start = time.perf_counter() - for proc in procs: - proc.start() - - stats = [] - for i in range(concurrency): - session_id, _stats = res_que.get() - print(f'\n{"-" * 50}\n' - f'session {session_id}: processed reqs {len(_stats)}, ' - f'stats: \n{_stats}\n{"-" * 50}\n') - stats.append(np.array(_stats)) - _end = time.perf_counter() - - elapsed_time = _end - _start - - stats = np.concatenate(stats).reshape(-1, 5) - - first_token_latency_min = np.min(stats[:, 0], axis=0) - first_token_latency_max = np.max(stats[:, 0], axis=0) - first_token_latency_ave = np.mean(stats[:, 0], axis=0) - completion_tokens = np.sum(stats[:, 1], axis=0) - request_output_tokens = np.sum(stats[:, 2], axis=0) - total_tokens = np.sum(stats[:, 3], axis=0) - prompt_tokens = total_tokens - completion_tokens - completion_token_throughput = completion_tokens / elapsed_time - total_token_throughput = total_tokens / elapsed_time - rqs = n_req / elapsed_time - rqm = rqs * 60 - - if (np.abs(stats[:, 1] - stats[:, 2]) <= 1).min() is False: - print(f'Did not generate requested number of tokens. ' - f'Request {request_output_tokens:.0f}, ' - f'but got {completion_tokens:.0f}') - - print( - f'\n{"-" * 50}\nconcurrency: {concurrency}\n' - f'elapsed_time: {elapsed_time:.3f}s\n' - f'first_token latency(min, max, ave): ' - f'{first_token_latency_min:.3f}s, {first_token_latency_max:.3f}s, ' - f'{first_token_latency_ave:.3f}s\n' - f'number of prompt tokens: {prompt_tokens:.0f}\n' - f'number of completion tokens: {completion_tokens:.0f}\n' - f'token throughput (completion token): {completion_token_throughput:.3f} token/s\n' # noqa - f'token throughput (prompt + completion token): {total_token_throughput:.3f} token/s\n' # noqa - f'RPS (request per second): {rqs:.3f} req/s\n' - f'RPM (request per minute): {rqm:.3f} req/min\n' - f'{"-" * 50}\n') - for proc in procs: - proc.join() + + engine = Engine(server_addr, + tokenizer_path, + top_k=top_k, + top_p=top_p, + temperature=temperature, + log_level=log_level) + + requests = sample_requests(dataset, num_prompts, engine.tokenizer) + + engine.process_request(requests, concurrency, stream_output) if __name__ == '__main__': diff --git a/lmdeploy/serve/turbomind/chatbot.py b/lmdeploy/serve/turbomind/chatbot.py index 6419e53f2c..227e84d89d 100644 --- a/lmdeploy/serve/turbomind/chatbot.py +++ b/lmdeploy/serve/turbomind/chatbot.py @@ -97,6 +97,7 @@ def __init__(self, if ignore_eos: stop_words = None bad_words = np.array([[[self.eos_id], [1]]], dtype=np.int32) + self.eos_id = -1 self.cfg = mmengine.Config( dict(session_len=self.model.session_len, top_p=self.model.top_p, @@ -681,7 +682,10 @@ def stream_consumer(postprocess, res_queue, session, n_input_token, logger.error(f'catch exception: {e}') logger.error( f'session {session.session_id}: prompt: {session.prompt}') - + # `n_token` might be not updated since `if text.endswith('�')` + if n_token != output_ids.shape[-1]: + n_token = output_ids.shape[-1] + session.response += text # put session back to queue so that `_stream_infer` can update it in # `self.sessions` while not res_queue.empty(): From 533af8a032227ffac15a5383278283e20b1471af Mon Sep 17 00:00:00 2001 From: lvhan028 Date: Sun, 26 Nov 2023 17:46:00 +0800 Subject: [PATCH 07/17] update --- benchmark/profile_generation.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/benchmark/profile_generation.py b/benchmark/profile_generation.py index 4ad848525f..03ee43af6b 100644 --- a/benchmark/profile_generation.py +++ b/benchmark/profile_generation.py @@ -4,7 +4,6 @@ import csv import logging import os -import os.path as osp import time from dataclasses import dataclass from queue import Queue @@ -18,7 +17,6 @@ nvmlInit, nvmlShutdown, nvmlSystemGetDriverVersion) from tqdm import tqdm -from lmdeploy.tokenizer import Tokenizer from lmdeploy.turbomind import TurboMind @@ -106,17 +104,18 @@ def profile_throughput(model_path: str, test_round: int = 10, tp: int = 1, **kwargs): - tokenizer_model_path = osp.join(model_path, 'triton_models', 'tokenizer') - tokenizer = Tokenizer(tokenizer_model_path) - tm_model = TurboMind(model_path=model_path, tp=tp, **kwargs) + # avoid turbomind checking chat template name by setting + # `model_name='llama'` + tm_model = TurboMind(model_path=model_path, + tp=tp, + model_name='llama', + **kwargs) + tokenizer = tm_model.tokenizer # make up a prompt that can be tokenized into {input_seqlen} tokens assert input_seqlen > 0, 'input_seqlen should > 0' - prompt = 'hi' - input_ids = tokenizer.encode(prompt, add_bos=False) + input_ids = tokenizer('hi').input_ids input_ids = input_ids * input_seqlen - assert len(input_ids) == input_seqlen, \ - '#input_token {input_seqlen} but #dummy_input_token {len(input_ids)}' warmup(tm_model, concurrency, input_ids, output_seqlen) @@ -320,7 +319,7 @@ def parse_args(): default='profile_generation.csv') parser.add_argument('--log-level', help='set log level', - default='INFO', + default='ERROR', choices=list(logging._nameToLevel.keys())) parser.add_argument('--test-round', type=int, From 1f2c414b924f83cd27b4256630c9a7c0f3742592 Mon Sep 17 00:00:00 2001 From: lvhan028 Date: Mon, 27 Nov 2023 12:33:46 +0800 Subject: [PATCH 08/17] update --- benchmark/profile_generation.py | 9 ++--- benchmark/profile_restful_api.py | 53 +++++++++++++++-------------- lmdeploy/serve/async_engine.py | 5 +++ lmdeploy/serve/openai/api_server.py | 10 +++--- lmdeploy/turbomind/turbomind.py | 3 +- 5 files changed, 46 insertions(+), 34 deletions(-) diff --git a/benchmark/profile_generation.py b/benchmark/profile_generation.py index 03ee43af6b..df8b82fe7c 100644 --- a/benchmark/profile_generation.py +++ b/benchmark/profile_generation.py @@ -29,7 +29,7 @@ def infer(model, session_id: int, input_ids: List, output_seqlen: int, prev = time.perf_counter() n_pre_token = 0 """ - The iterator provided by `chatbot.stream_infer` denotes the number of generated tokens so far, + The iterator provided by `stream_infer` denotes the number of generated tokens so far, which is represented by the variable `n_token`. Please note that `n_token` is not a continuous value. In other words, during the iteration, its value might be 5, 7, 8, 16, and so on, rather than 1, 2, 3, 4, etc. @@ -275,14 +275,15 @@ class ProfileResult: def parse_args(): parser = argparse.ArgumentParser(description='Regression Test') - parser.add_argument('--model-path', + parser.add_argument('model_path', type=str, - help='benchmark test model path') + help='the path of the model in localhost or ' + 'the repo_id of the model in huggingface.co') parser.add_argument('--concurrency', nargs='+', type=int, help='how many requests launched concurrently', - default=[1, 32, 64, 128]) + default=[1, 16, 32, 64]) parser.add_argument( '--prompt-tokens', nargs='+', diff --git a/benchmark/profile_restful_api.py b/benchmark/profile_restful_api.py index a8299fa4d4..9803d1c336 100644 --- a/benchmark/profile_restful_api.py +++ b/benchmark/profile_restful_api.py @@ -1,5 +1,4 @@ import json -import os import random import time from queue import Queue @@ -8,8 +7,8 @@ import fire import numpy as np +import requests -from lmdeploy.serve.openai.api_client import get_streaming_response from lmdeploy.tokenizer import Tokenizer @@ -60,15 +59,10 @@ def __init__(self, server_addr: str, tokenzier_path: str, temperature: float = 0.8, - top_k: int = 1, top_p: float = 1.0): self.tokenizer = Tokenizer(tokenzier_path) - # We choose `v1/compeletions` API to profile the performance since - # it won't decorate prompt according to the served model's - # chat template - self.api_url = server_addr + '/v1/completions' + self.api_url = server_addr + '/v1/chat/completions' self.temperature = temperature - self.top_k = top_k self.top_p = top_p def _inference(self, req_queue: Queue, res_queue: Queue, session_id: int, @@ -80,15 +74,31 @@ def _inference(self, req_queue: Queue, res_queue: Queue, session_id: int, timestamps = [] tokens = [] timestamps.append(time.perf_counter()) - for _, n_token, _ in get_streaming_response( - prompt, - self.api_url, - session_id, - request_output_len=output_seqlen, - ignore_eos=True, - stream=stream_output): + headers = {'content-type': 'application/json'} + pload = { + 'model': 'llama', + 'messages': prompt, + 'temperature': self.temperature, + 'top_p': self.top_p, + 'n': 1, + 'max_tokens': output_seqlen, + 'stream': stream_output, + 'session_id': session_id, + 'ignore_eos': True, + } + response = requests.post(self.api_url, + headers=headers, + json=pload, + stream=stream_output) + for chunk in response.iter_lines(chunk_size=8192, + decode_unicode=False, + delimiter=b'\n'): timestamps.append(time.perf_counter()) - tokens.append(n_token) + if chunk: + data = json.loads(chunk.decode('utf-8')) + n_token = data.pop('tokens', 0) + tokens.append(n_token) + first_token_latency = np.round(timestamps[1] - timestamps[0], 3) token_latency = np.round(timestamps[-1] - timestamps[0], 3) completion_tokens = tokens[-1] @@ -184,11 +194,9 @@ def main(server_addr: str, dataset: str, concurrency: int = 1, num_prompts: int = 1000, - top_k: int = 1, top_p: float = 1.0, temperature: float = 0.8, stream_output: bool = False, - log_level: str = 'INFO', seed: int = 0): """Benchmark the request througput of api server. @@ -199,28 +207,23 @@ def main(server_addr: str, concurrency (int, optional): Number of working threads to process the sampled prompts. Defaults to 1. num_prompts (int, optional): Number of prompts to process. Defaults to 1000. - top_k (int, optional): The number of highest probability vocabulary tokens - to keep for top-k-filtering. Defaults to 1. top_p (float, optional): the set of most probable tokens with probabilities that add up to top_p or higher are kept for generation. Defaults to 1.0. temperature (float, optional): The value used to modulate the next token probabilities. Defaults to 0.8. stream_output (bool, optional): Indicator for streaming output. Defaults to True. - log_level(str, optional): The log level. Defaults to INFO seed (int, optional): Seed used in sampling prompts from dataset. Defaults to 0. """ # noqa if not server_addr.startswith('http://'): print(f'[WARNING] server_addr of the api_server should ' - f'start with http://, but got {server_addr}') - server_addr = 'http://' + server_addr + f'start with "http://", but got "{server_addr}"') + server_addr = 'http://' + server_addr.strip() random.seed(seed) - os.environ['TM_LOG_LEVEL'] = log_level engine = Engine(server_addr, tokenizer_path, - top_k=top_k, top_p=top_p, temperature=temperature) diff --git a/lmdeploy/serve/async_engine.py b/lmdeploy/serve/async_engine.py index eb1c317c1e..66ba6c68d2 100644 --- a/lmdeploy/serve/async_engine.py +++ b/lmdeploy/serve/async_engine.py @@ -241,6 +241,11 @@ async def generate( len(input_ids), tokens, finish_reason) response_size = tokens + # `response_size` might be note updated since + # ` if response.endswith('�')` + if response_size != tokens: + yield GenOut(response, self.steps[str(session_id)], + len(input_ids), tokens, finish_reason) # update step self.steps[str(session_id)] += len(input_ids) + tokens if sequence_end or stop: diff --git a/lmdeploy/serve/openai/api_server.py b/lmdeploy/serve/openai/api_server.py index f9e1e81952..d2720c8386 100644 --- a/lmdeploy/serve/openai/api_server.py +++ b/lmdeploy/serve/openai/api_server.py @@ -21,8 +21,6 @@ EmbeddingsRequest, ErrorResponse, GenerateRequest, GenerateResponse, ModelCard, ModelList, ModelPermission, UsageInfo) -os.environ['TM_LOG_LEVEL'] = 'ERROR' - class VariableInterface: """A IO interface maintaining variables.""" @@ -476,12 +474,13 @@ async def stream_results() -> AsyncGenerator[bytes, None]: def main(model_path: str, server_name: str = '0.0.0.0', server_port: int = 23333, - instance_num: int = 32, + instance_num: int = 64, tp: int = 1, allow_origins: List[str] = ['*'], allow_credentials: bool = True, allow_methods: List[str] = ['*'], allow_headers: List[str] = ['*'], + log_level: str = 'INFO', **kwargs): """An example to perform model inference through the command line interface. @@ -496,7 +495,10 @@ def main(model_path: str, allow_credentials (bool): whether to allow credentials for CORS allow_methods (List[str]): a list of allowed HTTP methods for CORS allow_headers (List[str]): a list of allowed HTTP headers for CORS - """ + log_level(str): set log level whose value among [CRITICAL, ERROR, WARNING, INFO, DEBUG] + """ # noqa E501 + os.environ['TM_LOG_LEVEL'] = log_level + if allow_origins: app.add_middleware( CORSMiddleware, diff --git a/lmdeploy/turbomind/turbomind.py b/lmdeploy/turbomind/turbomind.py index 8668dd803a..ad7c0cb518 100644 --- a/lmdeploy/turbomind/turbomind.py +++ b/lmdeploy/turbomind/turbomind.py @@ -586,7 +586,8 @@ def _broadcast_np(data, dtype, shape=(batch_size, )): outputs = [] for output, len_ in zip(output_ids, sequence_length): output, len_ = output, len_.item() - if len(output) > 0 and output[-1].item() == self.eos_id: + if len(output) > 0 and output[-1].item( + ) == self.eos_id and not ignore_eos: outputs.append((output[:-1], len_ - 1)) elif len(output) > 0 and output[-1].item() in self.stop_tokens: outputs.append((output[:-1], len_)) From 831d9d2c3de29d42bd82c6b2353cd3351d23387f Mon Sep 17 00:00:00 2001 From: lvhan028 Date: Mon, 27 Nov 2023 15:28:16 +0800 Subject: [PATCH 09/17] add progress bar --- benchmark/profile_restful_api.py | 64 ++++++++++++----------------- benchmark/profile_serving.py | 9 ++-- benchmark/profile_throughput.py | 16 +++++--- lmdeploy/serve/openai/api_server.py | 2 +- 4 files changed, 43 insertions(+), 48 deletions(-) diff --git a/benchmark/profile_restful_api.py b/benchmark/profile_restful_api.py index 9803d1c336..36b1b2846a 100644 --- a/benchmark/profile_restful_api.py +++ b/benchmark/profile_restful_api.py @@ -7,8 +7,9 @@ import fire import numpy as np -import requests +from tqdm import tqdm +from lmdeploy.serve.openai.api_client import APIClient from lmdeploy.tokenizer import Tokenizer @@ -61,60 +62,47 @@ def __init__(self, temperature: float = 0.8, top_p: float = 1.0): self.tokenizer = Tokenizer(tokenzier_path) - self.api_url = server_addr + '/v1/chat/completions' + self.server_addr = server_addr self.temperature = temperature self.top_p = top_p + client = APIClient(self.server_addr) + self.model_name = client.available_models[0] + self.pbar = None def _inference(self, req_queue: Queue, res_queue: Queue, session_id: int, stream_output: bool): stats = [] + client = APIClient(self.server_addr) + for prompt, input_seqlen, output_seqlen in iter( req_queue.get, [None, None, None]): timestamps = [] - tokens = [] timestamps.append(time.perf_counter()) - headers = {'content-type': 'application/json'} - pload = { - 'model': 'llama', - 'messages': prompt, - 'temperature': self.temperature, - 'top_p': self.top_p, - 'n': 1, - 'max_tokens': output_seqlen, - 'stream': stream_output, - 'session_id': session_id, - 'ignore_eos': True, - } - response = requests.post(self.api_url, - headers=headers, - json=pload, - stream=stream_output) - for chunk in response.iter_lines(chunk_size=8192, - decode_unicode=False, - delimiter=b'\n'): + for output in client.chat_completions_v1( + model=self.model_name, + messages=prompt, + temperature=self.temperature, + top_p=self.top_p, + n=1, + max_tokens=output_seqlen, + stream=stream_output, + session_id=session_id, + ignore_eos=True): timestamps.append(time.perf_counter()) - if chunk: - data = json.loads(chunk.decode('utf-8')) - n_token = data.pop('tokens', 0) - tokens.append(n_token) first_token_latency = np.round(timestamps[1] - timestamps[0], 3) token_latency = np.round(timestamps[-1] - timestamps[0], 3) - completion_tokens = tokens[-1] - assert output_seqlen <= completion_tokens <= output_seqlen + 1, \ - f'Error. session_id({session_id}) request {output_seqlen} ' \ - f'tokens, but generate {completion_tokens} tokens.\n' \ - f'prompt: {prompt}' - total_tokens = tokens[-1] + input_seqlen + # assert output.pop('finish_reason') == 'length', \ + # f'Error. session_id({session_id}) request {output_seqlen} ' \ + # f'tokens, but `finish_reason` is not `length`' + total_tokens = input_seqlen + output_seqlen stats.append([ - first_token_latency, completion_tokens, output_seqlen, + first_token_latency, output_seqlen, output_seqlen, total_tokens, token_latency ]) - print( - f'session {session_id}: ' - f'input_seqlen {input_seqlen}, output_seqlen {output_seqlen}, ' - f'completion_tokens {completion_tokens}') + self.pbar.update(1) + res_queue.put((session_id, stats)) def process_request(self, @@ -125,6 +113,8 @@ def process_request(self, req_queue = Queue() threads = [] + self.pbar = tqdm(total=len(requests)) + # feed request to q for req in requests: req_queue.put(req) diff --git a/benchmark/profile_serving.py b/benchmark/profile_serving.py index c6bf28ab1e..7d3fcc1b4e 100644 --- a/benchmark/profile_serving.py +++ b/benchmark/profile_serving.py @@ -7,6 +7,7 @@ import fire import numpy as np +from tqdm import tqdm from lmdeploy.serve.turbomind.chatbot import Chatbot from lmdeploy.tokenizer import Tokenizer @@ -68,6 +69,7 @@ def __init__(self, self.top_k = top_k self.top_p = top_p self.log_level = log_level + self.pbar = None def _inference(self, req_queue: Queue, res_queue: Queue, session_id: int, stream_output: bool): @@ -105,10 +107,7 @@ def _inference(self, req_queue: Queue, res_queue: Queue, session_id: int, first_token_latency, completion_tokens, output_seqlen, total_tokens, token_latency ]) - print( - f'session {session_id}: ' - f'input_seqlen {input_seqlen}, output_seqlen {output_seqlen}, ' - f'completion_tokens {completion_tokens}') + self.pbar.update(1) res_queue.put((session_id, stats)) def process_request(self, @@ -119,6 +118,8 @@ def process_request(self, req_queue = Queue() threads = [] + self.pbar = tqdm(total=len(requests)) + # feed request to q for req in requests: req_queue.put(req) diff --git a/benchmark/profile_throughput.py b/benchmark/profile_throughput.py index 1ea4f98380..8239414dec 100644 --- a/benchmark/profile_throughput.py +++ b/benchmark/profile_throughput.py @@ -8,6 +8,7 @@ import fire import numpy as np +from tqdm import tqdm from lmdeploy.tokenizer import Tokenizer from lmdeploy.turbomind import TurboMind @@ -65,6 +66,7 @@ def __init__(self, model_path: str, tp: int = 1, **kwargs): **kwargs) self.tm_model = tm_model self.tokenizer = tm_model.tokenizer + self.pbar = None def _inference(self, req_queue: Queue, res_queue: Queue, session_id: int, stream_output: bool): @@ -95,16 +97,16 @@ def _inference(self, req_queue: Queue, res_queue: Queue, session_id: int, first_token_latency = np.round(timestamps[1] - timestamps[0], 3) token_latency = np.round(timestamps[-1] - timestamps[0], 3) completion_tokens = tokens[-1] - assert output_seqlen <= completion_tokens <= output_seqlen + 1 - total_tokens = tokens[-1] + len(input_ids) + assert output_seqlen <= completion_tokens <= output_seqlen + 1, \ + f'Error. session_id({session_id}) request {output_seqlen} ' \ + f'tokens, but generate {completion_tokens} tokens.\n' \ + f'prompt: {prompt}' + total_tokens = tokens[-1] + input_seqlen stats.append([ first_token_latency, completion_tokens, output_seqlen, total_tokens, token_latency ]) - print( - f'session {session_id}: ' - f'input_seqlen {input_seqlen}, output_seqlen {output_seqlen}, ' - f'completion_tokens {completion_tokens}') + self.pbar.update(1) res_queue.put((session_id, stats)) def process_request(self, @@ -115,6 +117,8 @@ def process_request(self, req_queue = Queue() threads = [] + self.pbar = tqdm(total=len(requests)) + # feed request to q for req in requests: req_queue.put(req) diff --git a/lmdeploy/serve/openai/api_server.py b/lmdeploy/serve/openai/api_server.py index d2720c8386..0b61f7967b 100644 --- a/lmdeploy/serve/openai/api_server.py +++ b/lmdeploy/serve/openai/api_server.py @@ -480,7 +480,7 @@ def main(model_path: str, allow_credentials: bool = True, allow_methods: List[str] = ['*'], allow_headers: List[str] = ['*'], - log_level: str = 'INFO', + log_level: str = 'ERROR', **kwargs): """An example to perform model inference through the command line interface. From ebc0a503ecfe2d619a2ee2b94a5c7f9e75859641 Mon Sep 17 00:00:00 2001 From: lvhan028 Date: Mon, 27 Nov 2023 20:20:01 +0800 Subject: [PATCH 10/17] remove TODO comments --- benchmark/profile_generation.py | 1 - 1 file changed, 1 deletion(-) diff --git a/benchmark/profile_generation.py b/benchmark/profile_generation.py index df8b82fe7c..1e33e2ed7b 100644 --- a/benchmark/profile_generation.py +++ b/benchmark/profile_generation.py @@ -123,7 +123,6 @@ def profile_throughput(model_path: str, procs = [] _start = time.perf_counter() - # TODO: update to the multithread version for i in range(concurrency): proc = Thread(target=infer, args=(tm_model, i + 1, input_ids, output_seqlen, From defb5804781e1ee147273a51b6deac77f75245f4 Mon Sep 17 00:00:00 2001 From: lvhan028 Date: Tue, 28 Nov 2023 18:16:36 +0800 Subject: [PATCH 11/17] update --- benchmark/README.md | 2 +- benchmark/profile_generation.py | 7 +++-- benchmark/profile_restful_api.py | 38 ++++++++++++++++++++++----- benchmark/profile_serving.py | 37 +++++++++++++++++++++----- benchmark/profile_throughput.py | 45 ++++++++++++++++++++++---------- 5 files changed, 98 insertions(+), 31 deletions(-) diff --git a/benchmark/README.md b/benchmark/README.md index 3fa117210e..057d38bb11 100644 --- a/benchmark/README.md +++ b/benchmark/README.md @@ -29,7 +29,7 @@ pip install nvidia-ml-py ```bash python profile_generation.py \ - --model-path /path/to/your/model \ + /path/to/your/model \ --concurrency 1 8 --prompt-tokens 1 512 --completion-tokens 2048 512 ``` diff --git a/benchmark/profile_generation.py b/benchmark/profile_generation.py index 1e33e2ed7b..10ca53087a 100644 --- a/benchmark/profile_generation.py +++ b/benchmark/profile_generation.py @@ -1,5 +1,4 @@ # Copyright (c) OpenMMLab. All rights reserved. -# import multiprocessing as mp import argparse import csv import logging @@ -312,7 +311,7 @@ def parse_args(): type=float, help='The value used to modulate the next token ' 'probabilities', - default=0.8) + default=1.0) parser.add_argument('--csv', type=str, help='Where to save the result.', @@ -331,6 +330,10 @@ def parse_args(): def main(): args = parse_args() + assert len(args.prompt_tokens) == len(args.completion_tokens), \ + f'mismatched size between `prompt-tokens` and `completion-tokenes`' \ + f', {len(args.prompt_tokens)} vs {len(args.completion_tokens)}' + os.environ['TM_LOG_LEVEL'] = args.log_level results: List[ProfileResult] = [] for batch in tqdm(args.concurrency): diff --git a/benchmark/profile_restful_api.py b/benchmark/profile_restful_api.py index 36b1b2846a..85f8340b50 100644 --- a/benchmark/profile_restful_api.py +++ b/benchmark/profile_restful_api.py @@ -1,3 +1,4 @@ +import csv import json import random import time @@ -60,11 +61,14 @@ def __init__(self, server_addr: str, tokenzier_path: str, temperature: float = 0.8, - top_p: float = 1.0): + top_p: float = 1.0, + csv: str = '', + **kwargs): self.tokenizer = Tokenizer(tokenzier_path) self.server_addr = server_addr self.temperature = temperature self.top_p = top_p + self.csv = csv client = APIClient(self.server_addr) self.model_name = client.available_models[0] self.pbar = None @@ -178,15 +182,33 @@ def process_request(self, f'RPM (request per minute): {rqm:.3f} req/min\n' f'{"-" * 50}\n') + with open(self.csv, 'w') as csvfile: + writer = csv.writer(csvfile) + writer.writerow([ + 'batch', 'prompt_tokens', 'completion_tokens', + '1st_token_latency(min)(s)', '1st_token_latency(max)(s)', + '1st_token_latency(ave)(s)', 'output token thr(tokens/s', + 'total token thr(token/s)', 'RPM' + ]) + writer.writerow([ + concurrency, prompt_tokens, completion_tokens, + f'{first_token_latency_min:.3f}', + f'{first_token_latency_max:.3f}', + f'{first_token_latency_ave:.3f}', + f'{completion_token_throughput:.3f}', + f'{total_token_throughput:.3f}', f'{rqm:.3f}' + ]) + def main(server_addr: str, tokenizer_path: str, dataset: str, - concurrency: int = 1, + concurrency: int = 32, num_prompts: int = 1000, top_p: float = 1.0, - temperature: float = 0.8, + temperature: float = 1.0, stream_output: bool = False, + csv: str = './profile_api_server.csv', seed: int = 0): """Benchmark the request througput of api server. @@ -195,14 +217,15 @@ def main(server_addr: str, tokenizer_path (str): Path to the tokenizer model in localhost dataset (str): Path to the dataset concurrency (int, optional): Number of working threads to process the sampled prompts. - Defaults to 1. + Defaults to 32. num_prompts (int, optional): Number of prompts to process. Defaults to 1000. top_p (float, optional): the set of most probable tokens with probabilities that add up to top_p or higher are kept for generation. Defaults to 1.0. temperature (float, optional): The value used to modulate the next token probabilities. - Defaults to 0.8. - stream_output (bool, optional): Indicator for streaming output. Defaults to True. + Defaults to 1.0. + stream_output (bool, optional): Indicator for streaming output. Defaults to False. + csv (str, optional): The path to save the result. seed (int, optional): Seed used in sampling prompts from dataset. Defaults to 0. """ # noqa if not server_addr.startswith('http://'): @@ -215,7 +238,8 @@ def main(server_addr: str, engine = Engine(server_addr, tokenizer_path, top_p=top_p, - temperature=temperature) + temperature=temperature, + csv=csv) requests = sample_requests(dataset, num_prompts, engine.tokenizer) diff --git a/benchmark/profile_serving.py b/benchmark/profile_serving.py index 7d3fcc1b4e..8319c09f12 100644 --- a/benchmark/profile_serving.py +++ b/benchmark/profile_serving.py @@ -1,3 +1,4 @@ +import csv import json import random import time @@ -62,12 +63,15 @@ def __init__(self, temperature: float = 0.8, top_k: int = 1, top_p: float = 1.0, - log_level: str = 'ERROR'): + csv: str = '', + log_level: str = 'ERROR', + **kwargs): self.server_addr = server_addr self.tokenizer = Tokenizer(tokenzier_path) self.temperature = temperature self.top_k = top_k self.top_p = top_p + self.csv = csv self.log_level = log_level self.pbar = None @@ -182,17 +186,34 @@ def process_request(self, f'RPS (request per second): {rqs:.3f} req/s\n' f'RPM (request per minute): {rqm:.3f} req/min\n' f'{"-" * 50}\n') + with open(self.csv, 'w') as csvfile: + writer = csv.writer(csvfile) + writer.writerow([ + 'batch', 'prompt_tokens', 'completion_tokens', + '1st_token_latency(min)(s)', '1st_token_latency(max)(s)', + '1st_token_latency(ave)(s)', 'output token thr(tokens/s', + 'total token thr(token/s)', 'RPM' + ]) + writer.writerow([ + concurrency, prompt_tokens, completion_tokens, + f'{first_token_latency_min:.3f}', + f'{first_token_latency_max:.3f}', + f'{first_token_latency_ave:.3f}', + f'{completion_token_throughput:.3f}', + f'{total_token_throughput:.3f}', f'{rqm:.3f}' + ]) def main(server_addr: str, tokenizer_path: str, dataset: str, - concurrency: int = 1, + concurrency: int = 32, num_prompts: int = 1000, top_k: int = 1, top_p: float = 1.0, - temperature: float = 0.8, - stream_output: bool = False, + temperature: float = 1.0, + stream_output: bool = True, + csv: str = './profile_tis.csv', log_level: str = 'ERROR', seed: int = 0): """Benchmark the request througput of the triton inference server. @@ -202,7 +223,7 @@ def main(server_addr: str, tokenizer_path (str): Path to the tokenizer model in localhost dataset (str): Path to the dataset concurrency (int, optional): Number of working threads to process the sampled prompts. - Defaults to 1. + Defaults to 32. num_prompts (int, optional): Number of prompts to process. Defaults to 1000. top_k (int, optional): The number of highest probability vocabulary tokens to keep for top-k-filtering. Defaults to 1. @@ -210,8 +231,9 @@ def main(server_addr: str, probabilities that add up to top_p or higher are kept for generation. Defaults to 1.0. temperature (float, optional): The value used to modulate the next token probabilities. - Defaults to 0.8. + Defaults to 1.0. stream_output (bool, optional): Indicator for streaming output. Defaults to True. + log_level(str, optional): The log level. Defaults to INFO seed (int, optional): Seed used in sampling prompts from dataset. Defaults to 0. """ # noqa @@ -222,7 +244,8 @@ def main(server_addr: str, top_k=top_k, top_p=top_p, temperature=temperature, - log_level=log_level) + log_level=log_level, + csv=csv) requests = sample_requests(dataset, num_prompts, engine.tokenizer) diff --git a/benchmark/profile_throughput.py b/benchmark/profile_throughput.py index 8239414dec..c74c2756b5 100644 --- a/benchmark/profile_throughput.py +++ b/benchmark/profile_throughput.py @@ -1,3 +1,5 @@ +# Copyright (c) OpenMMLab. All rights reserved. +import csv import json import os import random @@ -57,7 +59,7 @@ def sample_requests( class Engine: - def __init__(self, model_path: str, tp: int = 1, **kwargs): + def __init__(self, model_path: str, tp: int, csv: str, **kwargs): # avoid turbomind checking chat template name by setting # `model_name='llama'` tm_model = TurboMind(model_path=model_path, @@ -66,6 +68,7 @@ def __init__(self, model_path: str, tp: int = 1, **kwargs): **kwargs) self.tm_model = tm_model self.tokenizer = tm_model.tokenizer + self.csv = csv self.pbar = None def _inference(self, req_queue: Queue, res_queue: Queue, session_id: int, @@ -153,7 +156,6 @@ def process_request(self, first_token_latency_max = np.max(stats[:, 0], axis=0) first_token_latency_ave = np.mean(stats[:, 0], axis=0) completion_tokens = np.sum(stats[:, 1], axis=0) - request_output_tokens = np.sum(stats[:, 2], axis=0) total_tokens = np.sum(stats[:, 3], axis=0) prompt_tokens = total_tokens - completion_tokens completion_token_throughput = completion_tokens / elapsed_time @@ -161,11 +163,6 @@ def process_request(self, rqs = len(requests) / elapsed_time rqm = rqs * 60 - if (np.abs(stats[:, 1] - stats[:, 2]) <= 1).min() is False: - print(f'Did not generate requested number of tokens. ' - f'Request {request_output_tokens:.0f}, ' - f'but got {completion_tokens:.0f}') - print(f'\n{"-" * 50}\nconcurrency: {concurrency}\n' f'elapsed_time: {elapsed_time:.3f}s\n') if stream_output: @@ -182,16 +179,34 @@ def process_request(self, f'RPM (request per minute): {rqm:.3f} req/min\n' f'{"-" * 50}\n') + with open(self.csv, 'w') as csvfile: + writer = csv.writer(csvfile) + writer.writerow([ + 'batch', 'prompt_tokens', 'completion_tokens', + '1st_token_latency(min)(s)', '1st_token_latency(max)(s)', + '1st_token_latency(ave)(s)', 'output token thr(tokens/s', + 'total token thr(token/s)', 'RPM' + ]) + writer.writerow([ + concurrency, prompt_tokens, completion_tokens, + f'{first_token_latency_min:.3f}', + f'{first_token_latency_max:.3f}', + f'{first_token_latency_ave:.3f}', + f'{completion_token_throughput:.3f}', + f'{total_token_throughput:.3f}', f'{rqm:.3f}' + ]) + def main(dataset: str, model_path: str, - concurrency: int = 1, - num_prompts: int = 1000, + concurrency: int = 64, + num_prompts: int = 2000, tp: int = 1, top_k: int = 1, top_p: float = 1.0, - temperature: float = 0.8, + temperature: float = 1.0, stream_output: bool = True, + csv: str = './profile_throughput.csv', log_level: str = 'ERROR', seed: int = 0): """Benchmark the request throughput of lmdeploy in localhost. @@ -200,8 +215,8 @@ def main(dataset: str, dataset (str): Path to the dataset model_path (str): Path to a model in localhost or a model_repo_id in huggingface.co concurrency (int, optional): Number of working threads to process the sampled prompts. - Defaults to 1. - num_prompts (int, optional): Number of prompts to process. Defaults to 1000. + Defaults to 64. + num_prompts (int, optional): Number of prompts to process. Defaults to 2000. tp (int, optional): Number of GPUs for tensor parallel. Defaults to 1. top_k (int, optional): The number of highest probability vocabulary tokens to keep for top-k-filtering. Defaults to 1. @@ -209,8 +224,9 @@ def main(dataset: str, probabilities that add up to top_p or higher are kept for generation. Defaults to 1.0. temperature (float, optional): The value used to modulate the next token probabilities. - Defaults to 0.8. + Defaults to 1.0. stream_output (bool, optional): Indicator for streaming output. Defaults to True. + csv (str, optional): The path to save the result. log_level(str, optional): The log level. Defaults to INFO seed (int, optional): Seed used in sampling prompts from dataset. Defaults to 0. """ # noqa @@ -221,7 +237,8 @@ def main(dataset: str, tp=tp, top_k=top_k, top_p=top_p, - temperature=temperature) + temperature=temperature, + csv=csv) requests = sample_requests(dataset, num_prompts, engine.tokenizer) From b48897b1d75d105d970ec11b8c1f81be2ec9b618 Mon Sep 17 00:00:00 2001 From: lvhan028 Date: Tue, 28 Nov 2023 18:23:17 +0800 Subject: [PATCH 12/17] remove useless profile_* argument --- lmdeploy/serve/turbomind/chatbot.py | 23 +++-------------------- 1 file changed, 3 insertions(+), 20 deletions(-) diff --git a/lmdeploy/serve/turbomind/chatbot.py b/lmdeploy/serve/turbomind/chatbot.py index 227e84d89d..e13aa9e4d4 100644 --- a/lmdeploy/serve/turbomind/chatbot.py +++ b/lmdeploy/serve/turbomind/chatbot.py @@ -67,7 +67,6 @@ class Chatbot: model_name (str): name of the to-be-deployed mode log_level (int): the level of the log display (bool): display the generated text on consolo or not - profile_generation (bool): profile token generation or not """ def __init__(self, @@ -76,8 +75,6 @@ def __init__(self, ignore_eos: bool = False, log_level: int = logging.INFO, display: bool = False, - profile_generation: bool = False, - profile_serving: bool = False, **model_kwargs): self.tritonserver_addr = tritonserver_addr self.model_name = model_name @@ -108,8 +105,6 @@ def __init__(self, bad_words=bad_words)) self.log_level = log_level self.display = display - self.profile_generation = profile_generation - self.profile_serving = profile_serving def stream_infer(self, session_id: int, @@ -417,8 +412,6 @@ def _stop_words(self, stop_words: List[str]): def _get_prompt(self, prompt: str, sequence_start: bool): """return the concatenated prompt according to the model's chat template.""" - if self.profile_generation or self.profile_serving: - return prompt return self.model.get_prompt(prompt, sequence_start) def _stream_infer(self, @@ -469,9 +462,7 @@ def _stream_infer(self, input_ids = np.array([[1]], dtype=np.uint32) input_lengths = np.array([[1]], dtype=np.uint32) input_tokens = input_lengths.squeeze() - if self.profile_generation: - yield StatusCode.TRITON_STREAM_ING, \ - 'ignore preprocessing during profiling generation', 0 + if request_output_len is None: request_output_len = max( 128, @@ -507,8 +498,7 @@ def _stream_infer(self, producer.start() for status, res, n_token in self.stream_consumer( self.postprocess, que, session, input_tokens, preseq_length, - cancel, logger, self.display, self.profile_generation, - self.eos_id): + cancel, logger, self.display, self.eos_id): yield status, res, n_token producer.join() @@ -601,8 +591,7 @@ def _stream_producer(tritonserver_addr, session, que, cfg, input_ids, @staticmethod def stream_consumer(postprocess, res_queue, session, n_input_token, - preseq_length, cancel, logger, display, - profile_generation, eos_id): + preseq_length, cancel, logger, display, eos_id): """Consume the response from the triton inference server. Args: @@ -615,7 +604,6 @@ def stream_consumer(postprocess, res_queue, session, n_input_token, cancel (bool): indicator for cancelling the session logger (util.Logger): display (bool): display the text in the consolo interface or not - profile_generation (bool): indicator for profiling token generation eos_id (int): eos token id Yields: @@ -659,11 +647,6 @@ def stream_consumer(postprocess, res_queue, session, n_input_token, session.sequence_length = session.sequence_length - 1 output_ids = output_ids[:, :, :-1] - if profile_generation: - yield (StatusCode.TRITON_STREAM_ING, - 'postprocessing is ignored during profiling ' - 'token generation', output_ids.shape[-1]) - continue output_str = postprocess( output_ids, np.array([[n_token]], dtype=np.uint32)) text = output_str[0].decode() From 680db05be8075e84195f136fa0b04ea559bdf54a Mon Sep 17 00:00:00 2001 From: lvhan028 Date: Tue, 28 Nov 2023 20:50:59 +0800 Subject: [PATCH 13/17] remove log level --- benchmark/profile_serving.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/benchmark/profile_serving.py b/benchmark/profile_serving.py index 8319c09f12..224aeb8777 100644 --- a/benchmark/profile_serving.py +++ b/benchmark/profile_serving.py @@ -214,7 +214,6 @@ def main(server_addr: str, temperature: float = 1.0, stream_output: bool = True, csv: str = './profile_tis.csv', - log_level: str = 'ERROR', seed: int = 0): """Benchmark the request througput of the triton inference server. @@ -233,7 +232,6 @@ def main(server_addr: str, temperature (float, optional): The value used to modulate the next token probabilities. Defaults to 1.0. stream_output (bool, optional): Indicator for streaming output. Defaults to True. - log_level(str, optional): The log level. Defaults to INFO seed (int, optional): Seed used in sampling prompts from dataset. Defaults to 0. """ # noqa @@ -244,7 +242,7 @@ def main(server_addr: str, top_k=top_k, top_p=top_p, temperature=temperature, - log_level=log_level, + log_level='ERROR', csv=csv) requests = sample_requests(dataset, num_prompts, engine.tokenizer) From c965545898450aef0d5b01060a0119a54ff4fab7 Mon Sep 17 00:00:00 2001 From: lvhan028 Date: Wed, 29 Nov 2023 11:37:35 +0800 Subject: [PATCH 14/17] change concurrency default value to 64 --- benchmark/profile_restful_api.py | 8 ++++---- lmdeploy/cli/serve.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/benchmark/profile_restful_api.py b/benchmark/profile_restful_api.py index 85f8340b50..a6f61da38a 100644 --- a/benchmark/profile_restful_api.py +++ b/benchmark/profile_restful_api.py @@ -203,8 +203,8 @@ def process_request(self, def main(server_addr: str, tokenizer_path: str, dataset: str, - concurrency: int = 32, - num_prompts: int = 1000, + concurrency: int = 64, + num_prompts: int = 2000, top_p: float = 1.0, temperature: float = 1.0, stream_output: bool = False, @@ -217,8 +217,8 @@ def main(server_addr: str, tokenizer_path (str): Path to the tokenizer model in localhost dataset (str): Path to the dataset concurrency (int, optional): Number of working threads to process the sampled prompts. - Defaults to 32. - num_prompts (int, optional): Number of prompts to process. Defaults to 1000. + Defaults to 64. + num_prompts (int, optional): Number of prompts to process. Defaults to 2000. top_p (float, optional): the set of most probable tokens with probabilities that add up to top_p or higher are kept for generation. Defaults to 1.0. diff --git a/lmdeploy/cli/serve.py b/lmdeploy/cli/serve.py index 30185376f5..b7680e8c95 100644 --- a/lmdeploy/cli/serve.py +++ b/lmdeploy/cli/serve.py @@ -48,7 +48,7 @@ def api_server(self, model_path: str, server_name: str = '0.0.0.0', server_port: int = 23333, - instance_num: int = 32, + instance_num: int = 64, tp: int = 1, allow_origins: List[str] = ['*'], allow_credentials: bool = True, From a05d73893a1bc77325bbf163689602abda007481 Mon Sep 17 00:00:00 2001 From: lvhan028 Date: Wed, 29 Nov 2023 12:18:54 +0800 Subject: [PATCH 15/17] update restful_api.md --- docs/en/restful_api.md | 2 +- docs/zh_cn/restful_api.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/restful_api.md b/docs/en/restful_api.md index 1f1523659d..279f563b53 100644 --- a/docs/en/restful_api.md +++ b/docs/en/restful_api.md @@ -9,7 +9,7 @@ The user can open the http url print by the following command in a browser. - **Please check the http url for the detailed api usage!!!** ```shell -lmdeploy serve api_server ./workspace --server_name 0.0.0.0 --server_port ${server_port} --instance_num 32 --tp 1 +lmdeploy serve api_server ./workspace --server_name 0.0.0.0 --server_port ${server_port} --instance_num 64 --tp 1 ``` We provide four restful api in total. Three of them are in OpenAI format. diff --git a/docs/zh_cn/restful_api.md b/docs/zh_cn/restful_api.md index 406c0b4768..96a3094ac7 100644 --- a/docs/zh_cn/restful_api.md +++ b/docs/zh_cn/restful_api.md @@ -9,7 +9,7 @@ 重要的事情说三遍。 ```shell -lmdeploy serve api_server ./workspace 0.0.0.0 --server_port ${server_port} --instance_num 32 --tp 1 +lmdeploy serve api_server ./workspace 0.0.0.0 --server_port ${server_port} --instance_num 64 --tp 1 ``` 我们一共提供四个 restful api,其中三个仿照 OpenAI 的形式。 From cbc72abb85b83295e3c5829719525a96dce8c1b6 Mon Sep 17 00:00:00 2001 From: lvhan028 Date: Wed, 29 Nov 2023 17:30:37 +0800 Subject: [PATCH 16/17] update according to review comments --- benchmark/profile_restful_api.py | 35 +++++++++++++++-------------- benchmark/profile_serving.py | 38 ++++++++++++++++++-------------- benchmark/profile_throughput.py | 35 +++++++++++++++-------------- 3 files changed, 59 insertions(+), 49 deletions(-) diff --git a/benchmark/profile_restful_api.py b/benchmark/profile_restful_api.py index a6f61da38a..bfe066ff6e 100644 --- a/benchmark/profile_restful_api.py +++ b/benchmark/profile_restful_api.py @@ -182,22 +182,25 @@ def process_request(self, f'RPM (request per minute): {rqm:.3f} req/min\n' f'{"-" * 50}\n') - with open(self.csv, 'w') as csvfile: - writer = csv.writer(csvfile) - writer.writerow([ - 'batch', 'prompt_tokens', 'completion_tokens', - '1st_token_latency(min)(s)', '1st_token_latency(max)(s)', - '1st_token_latency(ave)(s)', 'output token thr(tokens/s', - 'total token thr(token/s)', 'RPM' - ]) - writer.writerow([ - concurrency, prompt_tokens, completion_tokens, - f'{first_token_latency_min:.3f}', - f'{first_token_latency_max:.3f}', - f'{first_token_latency_ave:.3f}', - f'{completion_token_throughput:.3f}', - f'{total_token_throughput:.3f}', f'{rqm:.3f}' - ]) + if self.csv: + with open(self.csv, 'w') as csvfile: + writer = csv.writer(csvfile) + writer.writerow([ + 'batch', 'num_prompts', 'prompt_tokens', + 'completion_tokens', '1st_token_latency(min)(s)', + '1st_token_latency(max)(s)', '1st_token_latency(ave)(s)', + 'output token thr(tokens/s', 'total token thr(token/s)', + 'RPM' + ]) + writer.writerow([ + concurrency, + len(requests), prompt_tokens, completion_tokens, + f'{first_token_latency_min:.3f}' if stream_output else '-', + f'{first_token_latency_max:.3f}' if stream_output else '-', + f'{first_token_latency_ave:.3f}' if stream_output else '-', + f'{completion_token_throughput:.3f}', + f'{total_token_throughput:.3f}', f'{rqm:.3f}' + ]) def main(server_addr: str, diff --git a/benchmark/profile_serving.py b/benchmark/profile_serving.py index 224aeb8777..f8daafec87 100644 --- a/benchmark/profile_serving.py +++ b/benchmark/profile_serving.py @@ -117,7 +117,7 @@ def _inference(self, req_queue: Queue, res_queue: Queue, session_id: int, def process_request(self, requests, concurrency: int = 1, - stream_output: bool = False): + stream_output: bool = True): res_queue = Queue() req_queue = Queue() threads = [] @@ -186,22 +186,26 @@ def process_request(self, f'RPS (request per second): {rqs:.3f} req/s\n' f'RPM (request per minute): {rqm:.3f} req/min\n' f'{"-" * 50}\n') - with open(self.csv, 'w') as csvfile: - writer = csv.writer(csvfile) - writer.writerow([ - 'batch', 'prompt_tokens', 'completion_tokens', - '1st_token_latency(min)(s)', '1st_token_latency(max)(s)', - '1st_token_latency(ave)(s)', 'output token thr(tokens/s', - 'total token thr(token/s)', 'RPM' - ]) - writer.writerow([ - concurrency, prompt_tokens, completion_tokens, - f'{first_token_latency_min:.3f}', - f'{first_token_latency_max:.3f}', - f'{first_token_latency_ave:.3f}', - f'{completion_token_throughput:.3f}', - f'{total_token_throughput:.3f}', f'{rqm:.3f}' - ]) + + if self.csv: + with open(self.csv, 'w') as csvfile: + writer = csv.writer(csvfile) + writer.writerow([ + 'batch', 'num_prompts', 'prompt_tokens', + 'completion_tokens', '1st_token_latency(min)(s)', + '1st_token_latency(max)(s)', '1st_token_latency(ave)(s)', + 'output token thr(tokens/s', 'total token thr(token/s)', + 'RPM' + ]) + writer.writerow([ + concurrency, + len(requests), prompt_tokens, completion_tokens, + f'{first_token_latency_min:.3f}' if stream_output else '-', + f'{first_token_latency_max:.3f}' if stream_output else '-', + f'{first_token_latency_ave:.3f}' if stream_output else '-', + f'{completion_token_throughput:.3f}', + f'{total_token_throughput:.3f}', f'{rqm:.3f}' + ]) def main(server_addr: str, diff --git a/benchmark/profile_throughput.py b/benchmark/profile_throughput.py index c74c2756b5..6b32214231 100644 --- a/benchmark/profile_throughput.py +++ b/benchmark/profile_throughput.py @@ -179,22 +179,25 @@ def process_request(self, f'RPM (request per minute): {rqm:.3f} req/min\n' f'{"-" * 50}\n') - with open(self.csv, 'w') as csvfile: - writer = csv.writer(csvfile) - writer.writerow([ - 'batch', 'prompt_tokens', 'completion_tokens', - '1st_token_latency(min)(s)', '1st_token_latency(max)(s)', - '1st_token_latency(ave)(s)', 'output token thr(tokens/s', - 'total token thr(token/s)', 'RPM' - ]) - writer.writerow([ - concurrency, prompt_tokens, completion_tokens, - f'{first_token_latency_min:.3f}', - f'{first_token_latency_max:.3f}', - f'{first_token_latency_ave:.3f}', - f'{completion_token_throughput:.3f}', - f'{total_token_throughput:.3f}', f'{rqm:.3f}' - ]) + if self.csv: + with open(self.csv, 'w') as csvfile: + writer = csv.writer(csvfile) + writer.writerow([ + 'batch', 'num_promts', 'prompt_tokens', + 'completion_tokens', '1st_token_latency(min)(s)', + '1st_token_latency(max)(s)', '1st_token_latency(ave)(s)', + 'output token thr(tokens/s', 'total token thr(token/s)', + 'RPM' + ]) + writer.writerow([ + concurrency, + len(requests), prompt_tokens, completion_tokens, + f'{first_token_latency_min:.3f}' if stream_output else '-', + f'{first_token_latency_max:.3f}' if stream_output else '-', + f'{first_token_latency_ave:.3f}' if stream_output else '-', + f'{completion_token_throughput:.3f}', + f'{total_token_throughput:.3f}', f'{rqm:.3f}' + ]) def main(dataset: str, From 2a3b5c4f2f3e536d239b999eafa50893fc66bec9 Mon Sep 17 00:00:00 2001 From: lvhan028 Date: Wed, 29 Nov 2023 18:54:22 +0800 Subject: [PATCH 17/17] fix docstring --- benchmark/profile_restful_api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmark/profile_restful_api.py b/benchmark/profile_restful_api.py index bfe066ff6e..1e0d2388ee 100644 --- a/benchmark/profile_restful_api.py +++ b/benchmark/profile_restful_api.py @@ -216,7 +216,7 @@ def main(server_addr: str, """Benchmark the request througput of api server. Args: - server_addr (str): Address of the triton inference server with format http://0.0.0.0:0 + server_addr (str): http url of api_server with format http://0.0.0.0:0 tokenizer_path (str): Path to the tokenizer model in localhost dataset (str): Path to the dataset concurrency (int, optional): Number of working threads to process the sampled prompts.