-
Notifications
You must be signed in to change notification settings - Fork 688
fix(ghidra): implement flow-insensitive block discovery #2990
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -261,19 +261,48 @@ def evaluate(self, features: FeatureSet, short_circuit=True): | |
| raise ValueError("cannot evaluate a subscope directly!") | ||
|
|
||
|
|
||
| # mapping from rule name to list of: (location of match, result object) | ||
| # | ||
| # used throughout matching and rendering to collection the results | ||
| # of statement evaluation and their locations. | ||
| # | ||
| # to check if a rule matched, do: `"TCP client" in matches`. | ||
| # to find where a rule matched, do: `map(first, matches["TCP client"])` | ||
| # to see how a rule matched, do: | ||
| # | ||
| # for address, match_details in matches["TCP client"]: | ||
| # inspect(match_details) | ||
| # | ||
| # aliased here so that the type can be documented and xref'd. | ||
| class _RuleFeatureIndex: | ||
| """ | ||
| index rules by their constituent features for efficient candidate selection. | ||
| """ | ||
|
|
||
| def __init__(self, rules: Iterable["capa.rules.Rule"]): | ||
| self.features: dict[Feature, list["capa.rules.Rule"]] = collections.defaultdict(list) | ||
| # map from prefix byte (or -1 for empty) to rules containing that byte feature | ||
| self.bytes_prefix_index: dict[int, list["capa.rules.Rule"]] = collections.defaultdict(list) | ||
|
|
||
| for rule in rules: | ||
| for feature in rule.extract_all_features(): | ||
| self._index_rule_by_feature(rule, feature) | ||
|
|
||
| def _index_rule_by_feature(self, rule: "capa.rules.Rule", feature: Feature): | ||
| if isinstance(feature, capa.features.common.Bytes): | ||
| # build the prefix index directly, removing one full pass | ||
| prefix = feature.value[0] if len(feature.value) > 0 else -1 | ||
| self.bytes_prefix_index[prefix].append(rule) | ||
| else: | ||
| self.features[feature].append(rule) | ||
|
|
||
| def get_candidates(self, features: FeatureSet) -> set["capa.rules.Rule"]: | ||
| candidates = set() | ||
|
|
||
| for feature in features: | ||
| if feature in self.features: | ||
| candidates.update(self.features[feature]) | ||
|
|
||
| if isinstance(feature, capa.features.common.Bytes): | ||
| # Bytes.value type is now narrowed via class-level annotation in common.py | ||
| prefix = feature.value[0] if len(feature.value) > 0 else -1 | ||
| if prefix in self.bytes_prefix_index: | ||
| candidates.update(self.bytes_prefix_index[prefix]) | ||
|
|
||
| # guard to avoid temporary object creation for the short-pattern fallback | ||
| if -1 in self.bytes_prefix_index: | ||
| candidates.update(self.bytes_prefix_index[-1]) | ||
|
|
||
| return candidates | ||
|
|
||
|
|
||
| MatchResults = Mapping[str, list[tuple[Address, Result]]] | ||
|
|
||
|
|
||
|
|
@@ -289,10 +318,7 @@ def index_rule_matches(features: FeatureSet, rule: "capa.rules.Rule", locations: | |
| """ | ||
| record into the given featureset that the given rule matched at the given locations. | ||
|
|
||
| naively, this is just adding a MatchedRule feature; | ||
| however, we also want to record matches for the rule's namespaces. | ||
|
|
||
| updates `features` in-place. doesn't modify the remaining arguments. | ||
| updates `features` in-place. | ||
| """ | ||
| features[capa.features.common.MatchedRule(rule.name)].update(locations) | ||
| for namespace in get_rule_namespaces(rule): | ||
|
|
@@ -301,46 +327,26 @@ def index_rule_matches(features: FeatureSet, rule: "capa.rules.Rule", locations: | |
|
|
||
| def match(rules: list["capa.rules.Rule"], features: FeatureSet, addr: Address) -> tuple[FeatureSet, MatchResults]: | ||
| """ | ||
| match the given rules against the given features, | ||
| returning an updated set of features and the matches. | ||
|
|
||
| the updated features are just like the input, | ||
| but extended to include the match features (e.g. names of rules that matched). | ||
| the given feature set is not modified; an updated copy is returned. | ||
|
|
||
| the given list of rules must be ordered topologically by dependency, | ||
| or else `match` statements will not be handled correctly. | ||
|
|
||
| this routine should be fairly optimized, but is not guaranteed to be the fastest matcher possible. | ||
| it has a particularly convenient signature: (rules, features) -> matches | ||
| other strategies can be imagined that match differently; implement these elsewhere. | ||
| specifically, this routine does "top down" matching of the given rules against the feature set. | ||
| match the given rules against the given features. | ||
| """ | ||
| results: MatchResults = collections.defaultdict(list) | ||
|
|
||
| # copy features so that we can modify it | ||
| # without affecting the caller (keep this function pure) | ||
| # | ||
| # note: copy doesn't notice this is a defaultdict, so we'll recreate that manually. | ||
| features = collections.defaultdict(set, copy.copy(features)) | ||
|
|
||
| # create an index for efficient candidate rule selection | ||
| index = _RuleFeatureIndex(rules) | ||
| candidates = index.get_candidates(features) | ||
|
Comment on lines
+336
to
+337
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instantiating |
||
|
|
||
| # only evaluate rules that could potentially match based on the feature set | ||
| for rule in rules: | ||
| if rule not in candidates: | ||
| continue | ||
|
|
||
| res = rule.evaluate(features, short_circuit=True) | ||
| if res: | ||
| # we first matched the rule with short circuiting enabled. | ||
| # this is much faster than without short circuiting. | ||
| # however, we want to collect all results thoroughly, | ||
| # so once we've found a match quickly, | ||
| # go back and capture results without short circuiting. | ||
| res = rule.evaluate(features, short_circuit=False) | ||
|
|
||
| # sanity check | ||
| assert bool(res) is True | ||
|
|
||
| results[rule.name].append((addr, res)) | ||
| # we need to update the current `features` | ||
| # because subsequent iterations of this loop may use newly added features, | ||
| # such as rule or namespace matches. | ||
| index_rule_matches(features, rule, [addr]) | ||
|
|
||
| return (features, results) | ||
| return (features, results) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
_RuleFeatureIndeximplementation has a critical correctness bug: it breaks rules that useSubstringorRegexfeatures. These features match againstStringfeatures in theFeatureSetvia partial or pattern matching. However,get_candidatesperforms an exact lookup (feature in self.features). Since the extracted features in theFeatureSetareStringobjects and the indexed features areSubstring/Regexobjects, they will never match exactly, causing these rules to be incorrectly filtered out and never evaluated.