Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 164 additions & 0 deletions etc/scripts/dataset_pipeline/build_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
# extracts required phrases from .RULE files
# outputs a JSONL dataset for NER model training
import hashlib
import json
import unicodedata
from collections import Counter
from pathlib import Path
import click

from licensedcode.models import Rule
from licensedcode.models import rules_data_dir as default_rules_data_dir
from licensedcode.required_phrases import get_required_phrase_verbatim
from licensedcode.tokenize import required_phrase_splitter


def get_rule_type(rule):
"""Return the is_* flag set on the rule"""
for flag in ('is_license_text', 'is_license_notice', 'is_license_reference',
'is_license_tag', 'is_license_intro', 'is_license_clue',
'is_false_positive'):
if getattr(rule, flag, False):
return flag
return 'unknown'


def tag_tokens(text):
"""Tag each word token with a BIOES label based on {{ }} markers"""
tokens = []
labels = []
in_phrase = False
count = 0

for tok in required_phrase_splitter(text):
if tok == '{{':
in_phrase = True
count = 0
continue
if tok == '}}':
if in_phrase and count > 0:
labels[-1] = 'S-REQ' if count == 1 else 'E-REQ'
in_phrase = False
count = 0
continue
tokens.append(tok)
if in_phrase:
labels.append('B-REQ' if count == 0 else 'I-REQ')
count += 1
else:
labels.append('O')

assert len(tokens) == len(labels), f'token/label mismatch: {len(tokens)} vs {len(labels)}'
return tokens, labels


def assign_splits(results, threshold=50):
"""80/10/10 split by license expression to prevent data leakage.
Expressions with >= threshold rules get split per-rule via hash,
rare ones stay together in one split"""
expr_counts = Counter(e['license_expression'] for e in results)
heavy = {e for e, c in expr_counts.items() if c >= threshold}

light_exprs = sorted((e for e in expr_counts if e not in heavy),
key=lambda x: (-expr_counts[x], x))
total = sum(expr_counts[e] for e in light_exprs)
targets = {'train': 0.8 * total, 'val': 0.1 * total, 'test': 0.1 * total}
filled = {'train': 0, 'val': 0, 'test': 0}
assignment = {}
for expr in light_exprs:
best = min(targets, key=lambda s: filled[s] / max(targets[s], 1))
assignment[expr] = best
filled[best] += expr_counts[expr]

return heavy, assignment


@click.command()
@click.option('--rules-dir', type=click.Path(exists=True), default=None,
help='Path to rules directory (defaults to repo rules dir)')
@click.option('--output-dir', default='dataset-output',
help='Output directory for train/val/test JSONL files')
def main(rules_dir, output_dir):
"""Extract required phrases from rule files for NER training"""
if not rules_dir:
repo_rules = Path(__file__).resolve().parents[3] / 'src' / 'licensedcode' / 'data' / 'rules'
rules_dir = str(repo_rules) if repo_rules.is_dir() else default_rules_data_dir

rules_path = Path(rules_dir)
out_dir = Path(output_dir)
out_dir.mkdir(parents=True, exist_ok=True)

total_rules = 0
annotated = 0
results = []

click.echo(f'scanning rules from: {rules_path}')
for rf in sorted(rules_path.glob('*.RULE')):
try:
rule = Rule.from_file(rule_file=str(rf))
except Exception as e:
click.echo(f' skipping {rf.name}: {e}', err=True)
continue
total_rules += 1

if getattr(rule, 'is_required_phrase', False):
continue

text = rule.text or ''
if not text:
continue

# normalize line endings and unicode
text = text.replace('\r\n', '\n').replace('\r', '\n')
text = unicodedata.normalize('NFKC', text)

phrases = list(get_required_phrase_verbatim(text))
if not phrases:
continue

tokens, bioes_labels = tag_tokens(text)

# strip markers for the clean text field
clean_text = text.replace('{{', '').replace('}}', '')

annotated += 1
results.append({
'identifier': rule.identifier,
'license_expression': rule.license_expression or '',
'rule_type': get_rule_type(rule),
'text': clean_text,
'tokens': tokens,
'bioes_labels': bioes_labels,
})

# split by license expression and write
heavy, assignment = assign_splits(results)
splits = {'train': [], 'val': [], 'test': []}
for entry in results:
expr = entry['license_expression']
if expr in heavy:
bucket = int(hashlib.md5(entry['identifier'].encode('utf-8')).hexdigest(), 16) % 100
if bucket < 80:
splits['train'].append(entry)
elif bucket < 90:
splits['val'].append(entry)
else:
splits['test'].append(entry)
else:
splits[assignment[expr]].append(entry)

for name, records in splits.items():
path = out_dir / f'{name}.jsonl'
with open(path, 'w', encoding='utf-8') as f:
for entry in records:
f.write(json.dumps(entry, ensure_ascii=False) + '\n')

click.echo('\ndone')
click.echo(f' rules scanned: {total_rules}')
click.echo(f' annotated: {annotated}')
click.echo(f' train: {len(splits["train"])} val: {len(splits["val"])} test: {len(splits["test"])}')
click.echo(f' output: {out_dir}')


if __name__ == '__main__':
main()
71 changes: 71 additions & 0 deletions etc/scripts/dataset_pipeline/test_build_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# tests for build_dataset.py
import sys
from pathlib import Path

sys.path.insert(0, str(Path(__file__).parent))
from build_dataset import tag_tokens, assign_splits


class TestTagTokens:

def test_single_phrase(self):
tokens, labels = tag_tokens('under the {{Apache License}} terms')
assert tokens == ['under', 'the', 'Apache', 'License', 'terms']
assert labels == ['O', 'O', 'B-REQ', 'E-REQ', 'O']

def test_single_word_phrase(self):
tokens, labels = tag_tokens('use {{MIT}} license')
assert tokens == ['use', 'MIT', 'license']
assert labels == ['O', 'S-REQ', 'O']

def test_multiple_phrases(self):
tokens, labels = tag_tokens('{{Apache}} and {{MIT}} stuff')
assert tokens == ['Apache', 'and', 'MIT', 'stuff']
assert labels == ['S-REQ', 'O', 'S-REQ', 'O']

def test_long_phrase(self):
tokens, labels = tag_tokens('{{GNU General Public License}}')
assert tokens == ['GNU', 'General', 'Public', 'License']
assert labels == ['B-REQ', 'I-REQ', 'I-REQ', 'E-REQ']

def test_no_markers(self):
tokens, labels = tag_tokens('released under the license')
assert tokens == ['released', 'under', 'the', 'license']
assert labels == ['O', 'O', 'O', 'O']

def test_alignment(self):
tokens, labels = tag_tokens('licensed under {{Apache License}} or {{MIT}}')
assert len(tokens) == len(labels)

def test_empty_input(self):
tokens, labels = tag_tokens('')
assert tokens == []
assert labels == []

def test_empty_markers_ignored(self):
tokens, labels = tag_tokens('licensed under {{}} the GPL')
assert tokens == ['licensed', 'under', 'the', 'GPL']
assert labels == ['O', 'O', 'O', 'O']


class TestAssignSplits:

def test_light_expressions_no_leakage(self):
results = []
for i in range(5):
for j in range(10):
results.append({'license_expression': f'license-{i}', 'identifier': f'rule_{i}_{j}.RULE'})

heavy, assignment = assign_splits(results)
assert len(heavy) == 0
assert len(assignment) == 5
assert all(s in ('train', 'val', 'test') for s in assignment.values())

def test_heavy_expressions_detected(self):
results = [{'license_expression': 'mit', 'identifier': f'mit_{i}.RULE'} for i in range(100)]
results += [{'license_expression': 'rare-1.0', 'identifier': 'rare_1.RULE'}]

heavy, assignment = assign_splits(results)
assert 'mit' in heavy
assert 'rare-1.0' not in heavy
assert 'rare-1.0' in assignment
Loading