mirror of
https://github.com/pytorch/pytorch.git
synced 2025-10-20 21:14:14 +08:00
Support rolling over a percentage of workflows (#134816)
In order to support adding a rollover percentage, this ended up being a complete rewrite of runner_determinator.py. Details of the new format are in the comments up top. On the plus side, this now includes some unit tests. Pull Request resolved: https://github.com/pytorch/pytorch/pull/134816 Approved by: https://github.com/PaliC, https://github.com/zxiiro
This commit is contained in:
committed by
PyTorch MergeBot
parent
5314ae2660
commit
09519eb195
350
.github/scripts/runner_determinator.py
vendored
350
.github/scripts/runner_determinator.py
vendored
@ -3,49 +3,94 @@
|
||||
"""
|
||||
This runner determinator is used to determine which set of runners to run a
|
||||
GitHub job on. It uses the first comment of a GitHub issue (by default
|
||||
https://github.com/pytorch/test-infra/issues/5132) as a user list to determine
|
||||
which users will get their jobs to run on experimental runners. This user list
|
||||
is also a comma separated list of additional features or experiments which the
|
||||
user could be opted in to.
|
||||
https://github.com/pytorch/test-infra/issues/5132) to define the configuration
|
||||
of which runners should be used to run which job.
|
||||
|
||||
The configuration has two parts, the settings and a list of opted-in users,
|
||||
separated by a line containing "---". If the line is not present, the
|
||||
settings are considered to be empty with only the second part, the user
|
||||
list, defined.
|
||||
|
||||
The first part is a YAML block that defines the rollout settings. This can be
|
||||
used to define any settings that are needed to determine which runners to use.
|
||||
It's fields are defined by the RolloutSettings class below.
|
||||
|
||||
The second part is a list of users who are explicitly opted in to the LF fleet.
|
||||
The user list is also a comma separated list of additional features or
|
||||
experiments which the user could be opted in to.
|
||||
|
||||
The user list has the following rules:
|
||||
|
||||
- Users are GitHub usernames with the @ prefix
|
||||
- If the first line is a "*" then all users will use the new runners
|
||||
- If the first line is a "!" then all users will use the old runners
|
||||
- Users are GitHub usernames, which must start with the @ prefix
|
||||
- Each user is also a comma-separated list of features/experiments to enable
|
||||
- A "#" prefix indicates the user is opted out of the new runners but is opting
|
||||
into features/experiments.
|
||||
- A "#" prefix opts the user out of all experiments
|
||||
|
||||
Example user list:
|
||||
Example config:
|
||||
# A list of experiments that can be opted into.
|
||||
# This defines the behavior they'll induce when opted into.
|
||||
# Expected syntax is:
|
||||
# [experiment_name]: # Name of the experiment. Also used for the label prefix.
|
||||
# rollout_perc: [int] # % of workflows to run with this experiment when users are not opted in.
|
||||
|
||||
@User1
|
||||
@User2,amz2023
|
||||
#@UserOptOutOfNewRunner,amz2023
|
||||
experiments:
|
||||
lf:
|
||||
rollout_percent: 25
|
||||
|
||||
---
|
||||
|
||||
# Opt-ins:
|
||||
# Users can opt into the LF fleet by adding their GitHub username to this list
|
||||
# and specifying experiments to enable in a comma-separated list.
|
||||
# Experiments should be from the above list.
|
||||
|
||||
@User1,lf,split_build
|
||||
@User2,lf
|
||||
@User3,split_build
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
from argparse import ArgumentParser
|
||||
from logging import LogRecord
|
||||
from typing import Any, Iterable
|
||||
from typing import Any, Dict, Iterable, List, NamedTuple, Tuple
|
||||
|
||||
import yaml
|
||||
from github import Auth, Github
|
||||
from github.Issue import Issue
|
||||
|
||||
|
||||
WORKFLOW_LABEL_META = "" # use meta runners
|
||||
DEFAULT_LABEL_PREFIX = "" # use meta runners
|
||||
WORKFLOW_LABEL_LF = "lf." # use runners from the linux foundation
|
||||
WORKFLOW_LABEL_LF_CANARY = "lf.c." # use canary runners from the linux foundation
|
||||
|
||||
RUNNER_AMI_LEGACY = ""
|
||||
RUNNER_AMI_AMZ2023 = "amz2023"
|
||||
|
||||
GITHUB_OUTPUT = os.getenv("GITHUB_OUTPUT", "")
|
||||
GH_OUTPUT_KEY_AMI = "runner-ami"
|
||||
GH_OUTPUT_KEY_LABEL_TYPE = "label-type"
|
||||
|
||||
|
||||
SETTING_EXPERIMENTS = "experiments"
|
||||
|
||||
LF_FLEET_EXPERIMENT = "lf"
|
||||
CANARY_FLEET_SUFFIX = ".c"
|
||||
|
||||
|
||||
class Experiment(NamedTuple):
|
||||
rollout_perc: float = (
|
||||
0 # Percentage of workflows to experiment on when user is not opted-in.
|
||||
)
|
||||
|
||||
# Add more fields as needed
|
||||
|
||||
|
||||
class Settings(NamedTuple):
|
||||
"""
|
||||
Settings for the experiments that can be opted into.
|
||||
"""
|
||||
|
||||
experiments: Dict[str, Experiment] = {}
|
||||
|
||||
|
||||
class ColorFormatter(logging.Formatter):
|
||||
"""Color codes the log messages based on the log level"""
|
||||
|
||||
@ -172,85 +217,180 @@ def is_exception_branch(branch: str) -> bool:
|
||||
return branch.split("/")[0] in {"main", "nightly", "release", "landchecks"}
|
||||
|
||||
|
||||
def get_fleet(rollout_state: str, workflow_requestors: Iterable[str]) -> str:
|
||||
"""
|
||||
Determines if the job should run on the LF fleet or the Meta fleet
|
||||
|
||||
Returns:
|
||||
The appropriate label prefix for the runner, corresponding to the fleet to use.
|
||||
This gets prefixed to the very start of the runner label.
|
||||
"""
|
||||
|
||||
def load_yaml(yaml_text: str) -> Any:
|
||||
try:
|
||||
if rollout_state[0] == "!":
|
||||
log.info("LF Workflows are disabled for everyone. Using meta runners.")
|
||||
return WORKFLOW_LABEL_META
|
||||
elif rollout_state[0] == "*":
|
||||
log.info("LF Workflows are enabled for everyone. Using LF runners.")
|
||||
return WORKFLOW_LABEL_LF
|
||||
else:
|
||||
all_opted_in_users = {
|
||||
usr_raw.strip("\n\t@ ").split(",")[0]
|
||||
for usr_raw in rollout_state.split()
|
||||
}
|
||||
opted_in_requestors = {
|
||||
usr for usr in workflow_requestors if usr in all_opted_in_users
|
||||
}
|
||||
if opted_in_requestors:
|
||||
log.info(
|
||||
f"LF Workflows are enabled for {', '.join(opted_in_requestors)}. Using LF runners."
|
||||
)
|
||||
return WORKFLOW_LABEL_LF
|
||||
else:
|
||||
log.info(
|
||||
f"LF Workflows are disabled for {', '.join(workflow_requestors)}. Using meta runners."
|
||||
)
|
||||
return WORKFLOW_LABEL_META
|
||||
|
||||
except Exception as e:
|
||||
log.error(
|
||||
f"Failed to get determine workflow type. Falling back to meta runners. Exception: {e}"
|
||||
)
|
||||
return WORKFLOW_LABEL_META
|
||||
data = yaml.safe_load(yaml_text)
|
||||
return data
|
||||
except yaml.YAMLError as exc:
|
||||
log.exception("Error loading YAML")
|
||||
raise
|
||||
|
||||
|
||||
def get_optin_feature(
|
||||
rollout_state: str, workflow_requestors: Iterable[str], feature: str, fallback: str
|
||||
def extract_settings_user_opt_in_from_text(rollout_state: str) -> Tuple[str, str]:
|
||||
"""
|
||||
Extracts the text with settings, if any, and the opted in users from the rollout state.
|
||||
|
||||
If the issue body contains "---" then the text above that is the settings
|
||||
and the text below is the list of opted in users.
|
||||
|
||||
If it doesn't contain "---" then the settings are empty and the rest is the users.
|
||||
"""
|
||||
rollout_state_parts = rollout_state.split("---")
|
||||
if len(rollout_state_parts) >= 2:
|
||||
return rollout_state_parts[0], rollout_state_parts[1]
|
||||
else:
|
||||
return "", rollout_state
|
||||
|
||||
|
||||
class UserOptins(Dict[str, List[str]]):
|
||||
"""
|
||||
Dictionary of users with a list of features they have opted into
|
||||
"""
|
||||
|
||||
|
||||
def parse_user_opt_in_from_text(user_optin_text: str) -> UserOptins:
|
||||
"""
|
||||
Parse the user opt-in text into a key value pair of username and the list of features they have opted into
|
||||
|
||||
Users are GitHub usernames with the @ prefix. Each user is also a comma-separated list of features/experiments to enable.
|
||||
- Example line: "@User1,lf,split_build"
|
||||
- A "#" prefix indicates the user is opted out of all experiments
|
||||
|
||||
|
||||
"""
|
||||
optins = UserOptins()
|
||||
for user in user_optin_text.split("\n"):
|
||||
user = user.strip("\r\n\t -")
|
||||
if not user or not user.startswith("@"):
|
||||
# Not a valid user. Skip
|
||||
continue
|
||||
|
||||
if user:
|
||||
usr_name = user.split(",")[0].strip("@")
|
||||
optins[usr_name] = [exp.strip(" ") for exp in user.split(",")[1:]]
|
||||
|
||||
return optins
|
||||
|
||||
|
||||
def parse_settings_from_text(settings_text: str) -> Settings:
|
||||
"""
|
||||
Parse the experiments from the issue body into a list of ExperimentSettings
|
||||
"""
|
||||
try:
|
||||
if settings_text:
|
||||
# Escape the backtick as well so that we can have the settings in a code block on the GH issue
|
||||
# for easy reading
|
||||
# Note: Using ascii for the backtick so that the cat step in _runner-determinator.yml doesn't choke on
|
||||
# the backtick character in shell commands.
|
||||
backtick = chr(96) # backtick character
|
||||
settings_text = settings_text.strip(f"\r\n\t{backtick} ")
|
||||
settings = load_yaml(settings_text)
|
||||
|
||||
# For now we just load experiments. We can expand this if/when we add more settings
|
||||
experiments = {}
|
||||
|
||||
for exp_name, exp_settings in settings.get(SETTING_EXPERIMENTS).items():
|
||||
valid_settings = {}
|
||||
for setting in exp_settings:
|
||||
if setting not in Experiment._fields:
|
||||
log.warning(
|
||||
f"Unexpected setting in experiment: {setting} = {exp_settings[setting]}"
|
||||
)
|
||||
else:
|
||||
valid_settings[setting] = exp_settings[setting]
|
||||
|
||||
experiments[exp_name] = Experiment(**valid_settings)
|
||||
return Settings(experiments)
|
||||
|
||||
except Exception:
|
||||
log.exception("Failed to parse settings")
|
||||
|
||||
return Settings()
|
||||
|
||||
|
||||
def parse_settings(rollout_state: str) -> Settings:
|
||||
"""
|
||||
Parse settings, if any, from the rollout state.
|
||||
|
||||
If the issue body contains "---" then the text above that is the settings
|
||||
and the text below is the list of opted in users.
|
||||
|
||||
If it doesn't contain "---" then the settings are empty and the default values are used.
|
||||
"""
|
||||
settings_text, _ = extract_settings_user_opt_in_from_text(rollout_state)
|
||||
return parse_settings_from_text(settings_text)
|
||||
|
||||
|
||||
def parse_users(rollout_state: str) -> UserOptins:
|
||||
"""
|
||||
Parse users from the rollout state.
|
||||
|
||||
"""
|
||||
_, users_text = extract_settings_user_opt_in_from_text(rollout_state)
|
||||
return parse_user_opt_in_from_text(users_text)
|
||||
|
||||
|
||||
def is_user_opted_in(user: str, user_optins: UserOptins, experiment_name: str) -> bool:
|
||||
"""
|
||||
Check if a user is opted into an experiment
|
||||
"""
|
||||
return experiment_name in user_optins.get(user, [])
|
||||
|
||||
|
||||
def get_runner_prefix(
|
||||
rollout_state: str, workflow_requestors: Iterable[str], is_canary: bool = False
|
||||
) -> str:
|
||||
"""
|
||||
Used to dynamically opt in jobs to specific runner-type variants.
|
||||
settings = parse_settings(rollout_state)
|
||||
user_optins = parse_users(rollout_state)
|
||||
|
||||
Returns:
|
||||
The runner-type's variant name if the user has opted in to the feature, otherwise returns an empty string.
|
||||
This variant name is prefixed to the runner-type in the label.
|
||||
"""
|
||||
try:
|
||||
userlist = {u.lstrip("#").strip("\n\t@ ") for u in rollout_state.split()}
|
||||
all_opted_in_users = set()
|
||||
for user in userlist:
|
||||
for i in user.split(","):
|
||||
if i == feature:
|
||||
all_opted_in_users.add(user.split(",")[0])
|
||||
opted_in_requestors = {
|
||||
usr for usr in workflow_requestors if usr in all_opted_in_users
|
||||
}
|
||||
fleet_prefix = ""
|
||||
prefixes = []
|
||||
for experiment_name, experiment_settings in settings.experiments.items():
|
||||
enabled = False
|
||||
|
||||
if opted_in_requestors:
|
||||
# Is any workflow_requestor opted in to this experiment?
|
||||
opted_in_users = [
|
||||
requestor
|
||||
for requestor in workflow_requestors
|
||||
if is_user_opted_in(requestor, user_optins, experiment_name)
|
||||
]
|
||||
|
||||
if opted_in_users:
|
||||
log.info(
|
||||
f"Feature {feature} is enabled for {', '.join(opted_in_requestors)}. Using feature {feature}."
|
||||
f"{', '.join(opted_in_users)} have opted into experiment {experiment_name}."
|
||||
)
|
||||
return feature
|
||||
else:
|
||||
log.info(
|
||||
f"Feature {feature} is disabled for {', '.join(workflow_requestors)}. Using fallback \"{fallback}\"."
|
||||
)
|
||||
return fallback
|
||||
enabled = True
|
||||
elif experiment_settings.rollout_perc:
|
||||
# If no user is opted in, then we randomly enable the experiment based on the rollout percentage
|
||||
if random.uniform(0, 100) <= experiment_settings.rollout_perc:
|
||||
log.info(
|
||||
f"Based on rollout percentage of {experiment_settings.rollout_perc}%, enabling experiment {experiment_name}."
|
||||
)
|
||||
enabled = True
|
||||
|
||||
except Exception as e:
|
||||
if enabled:
|
||||
label = experiment_name
|
||||
if experiment_name == LF_FLEET_EXPERIMENT:
|
||||
# We give some special treatment to the "lf" experiment since determines the fleet we use
|
||||
# - If it's enabled, then we always list it's prefix first
|
||||
# - If we're in the canary branch, then we append ".c" to the lf prefix
|
||||
if is_canary:
|
||||
label += CANARY_FLEET_SUFFIX
|
||||
fleet_prefix = label
|
||||
else:
|
||||
prefixes.append(label)
|
||||
|
||||
if len(prefixes) > 1:
|
||||
log.error(
|
||||
f'Failed to determine if user has opted-in to feature {feature}. Using fallback "{fallback}". Exception: {e}'
|
||||
f"Only a fleet and one other experiment can be enabled for a job at any time. Enabling {prefixes[0]} and ignoring the rest, which are {', '.join(prefixes[1:])}"
|
||||
)
|
||||
return fallback
|
||||
prefixes = prefixes[:1]
|
||||
|
||||
# Fleet always comes first
|
||||
if fleet_prefix:
|
||||
prefixes.insert(0, fleet_prefix)
|
||||
|
||||
return ".".join(prefixes) + "." if prefixes else ""
|
||||
|
||||
|
||||
def get_rollout_state_from_issue(github_token: str, repo: str, issue_num: int) -> str:
|
||||
@ -268,9 +408,10 @@ def main() -> None:
|
||||
args = parse_args()
|
||||
|
||||
if args.github_ref_type == "branch" and is_exception_branch(args.github_branch):
|
||||
log.info(f"Exception branch: '{args.github_branch}', using meta runners")
|
||||
label_type = WORKFLOW_LABEL_META
|
||||
runner_ami = RUNNER_AMI_LEGACY
|
||||
log.info(
|
||||
f"Exception branch: '{args.github_branch}', using Meta runners and no experiments."
|
||||
)
|
||||
runner_label_prefix = DEFAULT_LABEL_PREFIX
|
||||
else:
|
||||
try:
|
||||
rollout_state = get_rollout_state_from_issue(
|
||||
@ -285,35 +426,18 @@ def main() -> None:
|
||||
args.github_branch,
|
||||
)
|
||||
|
||||
label_type = get_fleet(
|
||||
rollout_state,
|
||||
(
|
||||
args.github_issue_owner,
|
||||
username,
|
||||
),
|
||||
)
|
||||
runner_ami = get_optin_feature(
|
||||
rollout_state=rollout_state,
|
||||
workflow_requestors=(
|
||||
args.github_issue_owner,
|
||||
username,
|
||||
),
|
||||
feature=RUNNER_AMI_AMZ2023,
|
||||
fallback=RUNNER_AMI_LEGACY,
|
||||
is_canary = args.github_repo == "pytorch/pytorch-canary"
|
||||
|
||||
runner_label_prefix = get_runner_prefix(
|
||||
rollout_state, (args.github_issue_owner, username), is_canary
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
log.error(
|
||||
f"Failed to get issue. Falling back to meta runners. Exception: {e}"
|
||||
f"Failed to get issue. Defaulting to Meta runners and no experiments. Exception: {e}"
|
||||
)
|
||||
label_type = WORKFLOW_LABEL_META
|
||||
runner_ami = RUNNER_AMI_LEGACY
|
||||
|
||||
# For Canary builds use canary runners
|
||||
if args.github_repo == "pytorch/pytorch-canary" and label_type == WORKFLOW_LABEL_LF:
|
||||
label_type = WORKFLOW_LABEL_LF_CANARY
|
||||
|
||||
set_github_output(GH_OUTPUT_KEY_LABEL_TYPE, label_type)
|
||||
set_github_output(GH_OUTPUT_KEY_AMI, runner_ami)
|
||||
set_github_output(GH_OUTPUT_KEY_LABEL_TYPE, runner_label_prefix)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
237
.github/scripts/test_runner_determinator.py
vendored
Normal file
237
.github/scripts/test_runner_determinator.py
vendored
Normal file
@ -0,0 +1,237 @@
|
||||
from unittest import main, TestCase
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import runner_determinator as rd
|
||||
|
||||
|
||||
class TestRunnerDeterminatorIssueParser(TestCase):
|
||||
def test_parse_settings(self) -> None:
|
||||
settings_text = """
|
||||
experiments:
|
||||
lf:
|
||||
rollout_perc: 25
|
||||
otherExp:
|
||||
rollout_perc: 0
|
||||
---
|
||||
|
||||
Users:
|
||||
@User1,lf
|
||||
@User2,lf,otherExp
|
||||
|
||||
"""
|
||||
|
||||
settings = rd.parse_settings(settings_text)
|
||||
|
||||
self.assertTupleEqual(
|
||||
rd.Experiment(rollout_perc=25),
|
||||
settings.experiments["lf"],
|
||||
"lf settings not parsed correctly",
|
||||
)
|
||||
self.assertTupleEqual(
|
||||
rd.Experiment(rollout_perc=0),
|
||||
settings.experiments["otherExp"],
|
||||
"otherExp settings not parsed correctly",
|
||||
)
|
||||
|
||||
def test_parse_settings_in_code_block(self) -> None:
|
||||
settings_text = """
|
||||
|
||||
```
|
||||
experiments:
|
||||
lf:
|
||||
rollout_perc: 25
|
||||
otherExp:
|
||||
rollout_perc: 0
|
||||
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
Users:
|
||||
@User1,lf
|
||||
@User2,lf,otherExp
|
||||
|
||||
"""
|
||||
|
||||
settings = rd.parse_settings(settings_text)
|
||||
|
||||
self.assertTupleEqual(
|
||||
rd.Experiment(rollout_perc=25),
|
||||
settings.experiments["lf"],
|
||||
"lf settings not parsed correctly",
|
||||
)
|
||||
self.assertTupleEqual(
|
||||
rd.Experiment(rollout_perc=0),
|
||||
settings.experiments["otherExp"],
|
||||
"otherExp settings not parsed correctly",
|
||||
)
|
||||
|
||||
def test_parse_users(self) -> None:
|
||||
settings_text = """
|
||||
experiments:
|
||||
lf:
|
||||
rollout_perc: 0
|
||||
otherExp:
|
||||
rollout_perc: 0
|
||||
---
|
||||
|
||||
Users:
|
||||
@User1,lf
|
||||
@User2,lf,otherExp
|
||||
|
||||
"""
|
||||
|
||||
users = rd.parse_users(settings_text)
|
||||
self.assertDictEqual(
|
||||
{"User1": ["lf"], "User2": ["lf", "otherExp"]},
|
||||
users,
|
||||
"Users not parsed correctly",
|
||||
)
|
||||
|
||||
def test_parse_users_without_settings(self) -> None:
|
||||
settings_text = """
|
||||
|
||||
@User1,lf
|
||||
@User2,lf,otherExp
|
||||
|
||||
"""
|
||||
|
||||
users = rd.parse_users(settings_text)
|
||||
self.assertDictEqual(
|
||||
{"User1": ["lf"], "User2": ["lf", "otherExp"]},
|
||||
users,
|
||||
"Users not parsed correctly",
|
||||
)
|
||||
|
||||
|
||||
class TestRunnerDeterminatorGetRunnerPrefix(TestCase):
|
||||
def test_opted_in_user(self) -> None:
|
||||
settings_text = """
|
||||
experiments:
|
||||
lf:
|
||||
rollout_perc: 0
|
||||
otherExp:
|
||||
rollout_perc: 0
|
||||
---
|
||||
|
||||
Users:
|
||||
@User1,lf
|
||||
@User2,lf,otherExp
|
||||
|
||||
"""
|
||||
prefix = rd.get_runner_prefix(settings_text, ["User1"])
|
||||
self.assertEqual("lf.", prefix, "Runner prefix not correct for User1")
|
||||
|
||||
def test_opted_in_user_two_experiments(self) -> None:
|
||||
settings_text = """
|
||||
experiments:
|
||||
lf:
|
||||
rollout_perc: 0
|
||||
otherExp:
|
||||
rollout_perc: 0
|
||||
---
|
||||
|
||||
Users:
|
||||
@User1,lf
|
||||
@User2,lf,otherExp
|
||||
|
||||
"""
|
||||
prefix = rd.get_runner_prefix(settings_text, ["User2"])
|
||||
self.assertEqual("lf.otherExp.", prefix, "Runner prefix not correct for User2")
|
||||
|
||||
@patch("random.uniform", return_value=50)
|
||||
def test_opted_out_user(self, mock_uniform: Mock) -> None:
|
||||
settings_text = """
|
||||
experiments:
|
||||
lf:
|
||||
rollout_perc: 25
|
||||
otherExp:
|
||||
rollout_perc: 25
|
||||
---
|
||||
|
||||
Users:
|
||||
@User1,lf
|
||||
@User2,lf,otherExp
|
||||
|
||||
"""
|
||||
prefix = rd.get_runner_prefix(settings_text, ["User3"])
|
||||
self.assertEqual("", prefix, "Runner prefix not correct for user")
|
||||
|
||||
@patch("random.uniform", return_value=10)
|
||||
def test_opted_out_user_was_pulled_in_by_rollout(self, mock_uniform: Mock) -> None:
|
||||
settings_text = """
|
||||
experiments:
|
||||
lf:
|
||||
rollout_perc: 25
|
||||
otherExp:
|
||||
rollout_perc: 25
|
||||
---
|
||||
|
||||
Users:
|
||||
@User1,lf
|
||||
@User2,lf,otherExp
|
||||
|
||||
"""
|
||||
|
||||
# User3 is opted out, but is pulled into both experiments by the 10% rollout
|
||||
prefix = rd.get_runner_prefix(settings_text, ["User3"])
|
||||
self.assertEqual("lf.otherExp.", prefix, "Runner prefix not correct for user")
|
||||
|
||||
def test_lf_prefix_always_comes_first(self) -> None:
|
||||
settings_text = """
|
||||
experiments:
|
||||
otherExp:
|
||||
rollout_perc: 0
|
||||
lf:
|
||||
rollout_perc: 0
|
||||
---
|
||||
|
||||
Users:
|
||||
@User1,lf
|
||||
@User2,otherExp,lf
|
||||
|
||||
"""
|
||||
|
||||
prefix = rd.get_runner_prefix(settings_text, ["User2"])
|
||||
self.assertEqual("lf.otherExp.", prefix, "Runner prefix not correct for user")
|
||||
|
||||
def test_ignores_commented_users(self) -> None:
|
||||
settings_text = """
|
||||
experiments:
|
||||
lf:
|
||||
rollout_perc: 0
|
||||
otherExp:
|
||||
rollout_perc: 0
|
||||
---
|
||||
|
||||
Users:
|
||||
#@User1,lf
|
||||
@User2,lf,otherExp
|
||||
|
||||
"""
|
||||
|
||||
prefix = rd.get_runner_prefix(settings_text, ["User1"])
|
||||
self.assertEqual("", prefix, "Runner prefix not correct for user")
|
||||
|
||||
def test_ignores_extra_experiments(self) -> None:
|
||||
settings_text = """
|
||||
experiments:
|
||||
lf:
|
||||
rollout_perc: 0
|
||||
otherExp:
|
||||
rollout_perc: 0
|
||||
foo:
|
||||
rollout_perc: 0
|
||||
---
|
||||
|
||||
Users:
|
||||
@User1,lf,otherExp,foo
|
||||
|
||||
"""
|
||||
|
||||
prefix = rd.get_runner_prefix(settings_text, ["User1"])
|
||||
self.assertEqual("lf.otherExp.", prefix, "Runner prefix not correct for user")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
350
.github/workflows/_runner-determinator.yml
vendored
350
.github/workflows/_runner-determinator.yml
vendored
@ -62,49 +62,94 @@ jobs:
|
||||
"""
|
||||
This runner determinator is used to determine which set of runners to run a
|
||||
GitHub job on. It uses the first comment of a GitHub issue (by default
|
||||
https://github.com/pytorch/test-infra/issues/5132) as a user list to determine
|
||||
which users will get their jobs to run on experimental runners. This user list
|
||||
is also a comma separated list of additional features or experiments which the
|
||||
user could be opted in to.
|
||||
https://github.com/pytorch/test-infra/issues/5132) to define the configuration
|
||||
of which runners should be used to run which job.
|
||||
|
||||
The configuration has two parts, the settings and a list of opted-in users,
|
||||
separated by a line containing "---". If the line is not present, the
|
||||
settings are considered to be empty with only the second part, the user
|
||||
list, defined.
|
||||
|
||||
The first part is a YAML block that defines the rollout settings. This can be
|
||||
used to define any settings that are needed to determine which runners to use.
|
||||
It's fields are defined by the RolloutSettings class below.
|
||||
|
||||
The second part is a list of users who are explicitly opted in to the LF fleet.
|
||||
The user list is also a comma separated list of additional features or
|
||||
experiments which the user could be opted in to.
|
||||
|
||||
The user list has the following rules:
|
||||
|
||||
- Users are GitHub usernames with the @ prefix
|
||||
- If the first line is a "*" then all users will use the new runners
|
||||
- If the first line is a "!" then all users will use the old runners
|
||||
- Users are GitHub usernames, which must start with the @ prefix
|
||||
- Each user is also a comma-separated list of features/experiments to enable
|
||||
- A "#" prefix indicates the user is opted out of the new runners but is opting
|
||||
into features/experiments.
|
||||
- A "#" prefix opts the user out of all experiments
|
||||
|
||||
Example user list:
|
||||
Example config:
|
||||
# A list of experiments that can be opted into.
|
||||
# This defines the behavior they'll induce when opted into.
|
||||
# Expected syntax is:
|
||||
# [experiment_name]: # Name of the experiment. Also used for the label prefix.
|
||||
# rollout_perc: [int] # % of workflows to run with this experiment when users are not opted in.
|
||||
|
||||
@User1
|
||||
@User2,amz2023
|
||||
#@UserOptOutOfNewRunner,amz2023
|
||||
experiments:
|
||||
lf:
|
||||
rollout_percent: 25
|
||||
|
||||
---
|
||||
|
||||
# Opt-ins:
|
||||
# Users can opt into the LF fleet by adding their GitHub username to this list
|
||||
# and specifying experiments to enable in a comma-separated list.
|
||||
# Experiments should be from the above list.
|
||||
|
||||
@User1,lf,split_build
|
||||
@User2,lf
|
||||
@User3,split_build
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
from argparse import ArgumentParser
|
||||
from logging import LogRecord
|
||||
from typing import Any, Iterable
|
||||
from typing import Any, Dict, Iterable, List, NamedTuple, Tuple
|
||||
|
||||
import yaml
|
||||
from github import Auth, Github
|
||||
from github.Issue import Issue
|
||||
|
||||
|
||||
WORKFLOW_LABEL_META = "" # use meta runners
|
||||
DEFAULT_LABEL_PREFIX = "" # use meta runners
|
||||
WORKFLOW_LABEL_LF = "lf." # use runners from the linux foundation
|
||||
WORKFLOW_LABEL_LF_CANARY = "lf.c." # use canary runners from the linux foundation
|
||||
|
||||
RUNNER_AMI_LEGACY = ""
|
||||
RUNNER_AMI_AMZ2023 = "amz2023"
|
||||
|
||||
GITHUB_OUTPUT = os.getenv("GITHUB_OUTPUT", "")
|
||||
GH_OUTPUT_KEY_AMI = "runner-ami"
|
||||
GH_OUTPUT_KEY_LABEL_TYPE = "label-type"
|
||||
|
||||
|
||||
SETTING_EXPERIMENTS = "experiments"
|
||||
|
||||
LF_FLEET_EXPERIMENT = "lf"
|
||||
CANARY_FLEET_SUFFIX = ".c"
|
||||
|
||||
|
||||
class Experiment(NamedTuple):
|
||||
rollout_perc: float = (
|
||||
0 # Percentage of workflows to experiment on when user is not opted-in.
|
||||
)
|
||||
|
||||
# Add more fields as needed
|
||||
|
||||
|
||||
class Settings(NamedTuple):
|
||||
"""
|
||||
Settings for the experiments that can be opted into.
|
||||
"""
|
||||
|
||||
experiments: Dict[str, Experiment] = {}
|
||||
|
||||
|
||||
class ColorFormatter(logging.Formatter):
|
||||
"""Color codes the log messages based on the log level"""
|
||||
|
||||
@ -231,85 +276,180 @@ jobs:
|
||||
return branch.split("/")[0] in {"main", "nightly", "release", "landchecks"}
|
||||
|
||||
|
||||
def get_fleet(rollout_state: str, workflow_requestors: Iterable[str]) -> str:
|
||||
"""
|
||||
Determines if the job should run on the LF fleet or the Meta fleet
|
||||
|
||||
Returns:
|
||||
The appropriate label prefix for the runner, corresponding to the fleet to use.
|
||||
This gets prefixed to the very start of the runner label.
|
||||
"""
|
||||
|
||||
def load_yaml(yaml_text: str) -> Any:
|
||||
try:
|
||||
if rollout_state[0] == "!":
|
||||
log.info("LF Workflows are disabled for everyone. Using meta runners.")
|
||||
return WORKFLOW_LABEL_META
|
||||
elif rollout_state[0] == "*":
|
||||
log.info("LF Workflows are enabled for everyone. Using LF runners.")
|
||||
return WORKFLOW_LABEL_LF
|
||||
else:
|
||||
all_opted_in_users = {
|
||||
usr_raw.strip("\n\t@ ").split(",")[0]
|
||||
for usr_raw in rollout_state.split()
|
||||
}
|
||||
opted_in_requestors = {
|
||||
usr for usr in workflow_requestors if usr in all_opted_in_users
|
||||
}
|
||||
if opted_in_requestors:
|
||||
log.info(
|
||||
f"LF Workflows are enabled for {', '.join(opted_in_requestors)}. Using LF runners."
|
||||
)
|
||||
return WORKFLOW_LABEL_LF
|
||||
else:
|
||||
log.info(
|
||||
f"LF Workflows are disabled for {', '.join(workflow_requestors)}. Using meta runners."
|
||||
)
|
||||
return WORKFLOW_LABEL_META
|
||||
|
||||
except Exception as e:
|
||||
log.error(
|
||||
f"Failed to get determine workflow type. Falling back to meta runners. Exception: {e}"
|
||||
)
|
||||
return WORKFLOW_LABEL_META
|
||||
data = yaml.safe_load(yaml_text)
|
||||
return data
|
||||
except yaml.YAMLError as exc:
|
||||
log.exception("Error loading YAML")
|
||||
raise
|
||||
|
||||
|
||||
def get_optin_feature(
|
||||
rollout_state: str, workflow_requestors: Iterable[str], feature: str, fallback: str
|
||||
def extract_settings_user_opt_in_from_text(rollout_state: str) -> Tuple[str, str]:
|
||||
"""
|
||||
Extracts the text with settings, if any, and the opted in users from the rollout state.
|
||||
|
||||
If the issue body contains "---" then the text above that is the settings
|
||||
and the text below is the list of opted in users.
|
||||
|
||||
If it doesn't contain "---" then the settings are empty and the rest is the users.
|
||||
"""
|
||||
rollout_state_parts = rollout_state.split("---")
|
||||
if len(rollout_state_parts) >= 2:
|
||||
return rollout_state_parts[0], rollout_state_parts[1]
|
||||
else:
|
||||
return "", rollout_state
|
||||
|
||||
|
||||
class UserOptins(Dict[str, List[str]]):
|
||||
"""
|
||||
Dictionary of users with a list of features they have opted into
|
||||
"""
|
||||
|
||||
|
||||
def parse_user_opt_in_from_text(user_optin_text: str) -> UserOptins:
|
||||
"""
|
||||
Parse the user opt-in text into a key value pair of username and the list of features they have opted into
|
||||
|
||||
Users are GitHub usernames with the @ prefix. Each user is also a comma-separated list of features/experiments to enable.
|
||||
- Example line: "@User1,lf,split_build"
|
||||
- A "#" prefix indicates the user is opted out of all experiments
|
||||
|
||||
|
||||
"""
|
||||
optins = UserOptins()
|
||||
for user in user_optin_text.split("\n"):
|
||||
user = user.strip("\r\n\t -")
|
||||
if not user or not user.startswith("@"):
|
||||
# Not a valid user. Skip
|
||||
continue
|
||||
|
||||
if user:
|
||||
usr_name = user.split(",")[0].strip("@")
|
||||
optins[usr_name] = [exp.strip(" ") for exp in user.split(",")[1:]]
|
||||
|
||||
return optins
|
||||
|
||||
|
||||
def parse_settings_from_text(settings_text: str) -> Settings:
|
||||
"""
|
||||
Parse the experiments from the issue body into a list of ExperimentSettings
|
||||
"""
|
||||
try:
|
||||
if settings_text:
|
||||
# Escape the backtick as well so that we can have the settings in a code block on the GH issue
|
||||
# for easy reading
|
||||
# Note: Using ascii for the backtick so that the cat step in _runner-determinator.yml doesn't choke on
|
||||
# the backtick character in shell commands.
|
||||
backtick = chr(96) # backtick character
|
||||
settings_text = settings_text.strip(f"\r\n\t{backtick} ")
|
||||
settings = load_yaml(settings_text)
|
||||
|
||||
# For now we just load experiments. We can expand this if/when we add more settings
|
||||
experiments = {}
|
||||
|
||||
for exp_name, exp_settings in settings.get(SETTING_EXPERIMENTS).items():
|
||||
valid_settings = {}
|
||||
for setting in exp_settings:
|
||||
if setting not in Experiment._fields:
|
||||
log.warning(
|
||||
f"Unexpected setting in experiment: {setting} = {exp_settings[setting]}"
|
||||
)
|
||||
else:
|
||||
valid_settings[setting] = exp_settings[setting]
|
||||
|
||||
experiments[exp_name] = Experiment(**valid_settings)
|
||||
return Settings(experiments)
|
||||
|
||||
except Exception:
|
||||
log.exception("Failed to parse settings")
|
||||
|
||||
return Settings()
|
||||
|
||||
|
||||
def parse_settings(rollout_state: str) -> Settings:
|
||||
"""
|
||||
Parse settings, if any, from the rollout state.
|
||||
|
||||
If the issue body contains "---" then the text above that is the settings
|
||||
and the text below is the list of opted in users.
|
||||
|
||||
If it doesn't contain "---" then the settings are empty and the default values are used.
|
||||
"""
|
||||
settings_text, _ = extract_settings_user_opt_in_from_text(rollout_state)
|
||||
return parse_settings_from_text(settings_text)
|
||||
|
||||
|
||||
def parse_users(rollout_state: str) -> UserOptins:
|
||||
"""
|
||||
Parse users from the rollout state.
|
||||
|
||||
"""
|
||||
_, users_text = extract_settings_user_opt_in_from_text(rollout_state)
|
||||
return parse_user_opt_in_from_text(users_text)
|
||||
|
||||
|
||||
def is_user_opted_in(user: str, user_optins: UserOptins, experiment_name: str) -> bool:
|
||||
"""
|
||||
Check if a user is opted into an experiment
|
||||
"""
|
||||
return experiment_name in user_optins.get(user, [])
|
||||
|
||||
|
||||
def get_runner_prefix(
|
||||
rollout_state: str, workflow_requestors: Iterable[str], is_canary: bool = False
|
||||
) -> str:
|
||||
"""
|
||||
Used to dynamically opt in jobs to specific runner-type variants.
|
||||
settings = parse_settings(rollout_state)
|
||||
user_optins = parse_users(rollout_state)
|
||||
|
||||
Returns:
|
||||
The runner-type's variant name if the user has opted in to the feature, otherwise returns an empty string.
|
||||
This variant name is prefixed to the runner-type in the label.
|
||||
"""
|
||||
try:
|
||||
userlist = {u.lstrip("#").strip("\n\t@ ") for u in rollout_state.split()}
|
||||
all_opted_in_users = set()
|
||||
for user in userlist:
|
||||
for i in user.split(","):
|
||||
if i == feature:
|
||||
all_opted_in_users.add(user.split(",")[0])
|
||||
opted_in_requestors = {
|
||||
usr for usr in workflow_requestors if usr in all_opted_in_users
|
||||
}
|
||||
fleet_prefix = ""
|
||||
prefixes = []
|
||||
for experiment_name, experiment_settings in settings.experiments.items():
|
||||
enabled = False
|
||||
|
||||
if opted_in_requestors:
|
||||
# Is any workflow_requestor opted in to this experiment?
|
||||
opted_in_users = [
|
||||
requestor
|
||||
for requestor in workflow_requestors
|
||||
if is_user_opted_in(requestor, user_optins, experiment_name)
|
||||
]
|
||||
|
||||
if opted_in_users:
|
||||
log.info(
|
||||
f"Feature {feature} is enabled for {', '.join(opted_in_requestors)}. Using feature {feature}."
|
||||
f"{', '.join(opted_in_users)} have opted into experiment {experiment_name}."
|
||||
)
|
||||
return feature
|
||||
else:
|
||||
log.info(
|
||||
f"Feature {feature} is disabled for {', '.join(workflow_requestors)}. Using fallback \"{fallback}\"."
|
||||
)
|
||||
return fallback
|
||||
enabled = True
|
||||
elif experiment_settings.rollout_perc:
|
||||
# If no user is opted in, then we randomly enable the experiment based on the rollout percentage
|
||||
if random.uniform(0, 100) <= experiment_settings.rollout_perc:
|
||||
log.info(
|
||||
f"Based on rollout percentage of {experiment_settings.rollout_perc}%, enabling experiment {experiment_name}."
|
||||
)
|
||||
enabled = True
|
||||
|
||||
except Exception as e:
|
||||
if enabled:
|
||||
label = experiment_name
|
||||
if experiment_name == LF_FLEET_EXPERIMENT:
|
||||
# We give some special treatment to the "lf" experiment since determines the fleet we use
|
||||
# - If it's enabled, then we always list it's prefix first
|
||||
# - If we're in the canary branch, then we append ".c" to the lf prefix
|
||||
if is_canary:
|
||||
label += CANARY_FLEET_SUFFIX
|
||||
fleet_prefix = label
|
||||
else:
|
||||
prefixes.append(label)
|
||||
|
||||
if len(prefixes) > 1:
|
||||
log.error(
|
||||
f'Failed to determine if user has opted-in to feature {feature}. Using fallback "{fallback}". Exception: {e}'
|
||||
f"Only a fleet and one other experiment can be enabled for a job at any time. Enabling {prefixes[0]} and ignoring the rest, which are {', '.join(prefixes[1:])}"
|
||||
)
|
||||
return fallback
|
||||
prefixes = prefixes[:1]
|
||||
|
||||
# Fleet always comes first
|
||||
if fleet_prefix:
|
||||
prefixes.insert(0, fleet_prefix)
|
||||
|
||||
return ".".join(prefixes) + "." if prefixes else ""
|
||||
|
||||
|
||||
def get_rollout_state_from_issue(github_token: str, repo: str, issue_num: int) -> str:
|
||||
@ -327,9 +467,10 @@ jobs:
|
||||
args = parse_args()
|
||||
|
||||
if args.github_ref_type == "branch" and is_exception_branch(args.github_branch):
|
||||
log.info(f"Exception branch: '{args.github_branch}', using meta runners")
|
||||
label_type = WORKFLOW_LABEL_META
|
||||
runner_ami = RUNNER_AMI_LEGACY
|
||||
log.info(
|
||||
f"Exception branch: '{args.github_branch}', using Meta runners and no experiments."
|
||||
)
|
||||
runner_label_prefix = DEFAULT_LABEL_PREFIX
|
||||
else:
|
||||
try:
|
||||
rollout_state = get_rollout_state_from_issue(
|
||||
@ -344,35 +485,18 @@ jobs:
|
||||
args.github_branch,
|
||||
)
|
||||
|
||||
label_type = get_fleet(
|
||||
rollout_state,
|
||||
(
|
||||
args.github_issue_owner,
|
||||
username,
|
||||
),
|
||||
)
|
||||
runner_ami = get_optin_feature(
|
||||
rollout_state=rollout_state,
|
||||
workflow_requestors=(
|
||||
args.github_issue_owner,
|
||||
username,
|
||||
),
|
||||
feature=RUNNER_AMI_AMZ2023,
|
||||
fallback=RUNNER_AMI_LEGACY,
|
||||
is_canary = args.github_repo == "pytorch/pytorch-canary"
|
||||
|
||||
runner_label_prefix = get_runner_prefix(
|
||||
rollout_state, (args.github_issue_owner, username), is_canary
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
log.error(
|
||||
f"Failed to get issue. Falling back to meta runners. Exception: {e}"
|
||||
f"Failed to get issue. Defaulting to Meta runners and no experiments. Exception: {e}"
|
||||
)
|
||||
label_type = WORKFLOW_LABEL_META
|
||||
runner_ami = RUNNER_AMI_LEGACY
|
||||
|
||||
# For Canary builds use canary runners
|
||||
if args.github_repo == "pytorch/pytorch-canary" and label_type == WORKFLOW_LABEL_LF:
|
||||
label_type = WORKFLOW_LABEL_LF_CANARY
|
||||
|
||||
set_github_output(GH_OUTPUT_KEY_LABEL_TYPE, label_type)
|
||||
set_github_output(GH_OUTPUT_KEY_AMI, runner_ami)
|
||||
set_github_output(GH_OUTPUT_KEY_LABEL_TYPE, runner_label_prefix)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
Reference in New Issue
Block a user