Lead Enricher
Implementation Spec
Tài liệu đầy đủ để Claude Code SSH vào Mac mini và triển khai hệ thống crawl email từ danh sách công ty CSV. Đọc hết trước khi bắt đầu.
Lead Enricher — Technical Implementation Spec
Tài liệu này dành cho Claude Code SSH vào Mac mini để triển khai toàn bộ hệ thống. Đọc hết trước khi bắt đầu. Không bỏ qua bất kỳ bước nào.
0. Context & Mục tiêu
Bài toán: Tự động crawl website từ danh sách công ty (CSV), extract email/phone/social, ghi kết quả vào output CSV.
Scale: 1,000 – 5,000 domains per batch
Target data: Email (ưu tiên nhất), phone, Facebook, LinkedIn, Zalo
Runtime: Mac mini M4, macOS, Python 3.11+
Input: input.csv lưu local
Output: output.csv lưu local (enriched)
1. Environment
Machine : Mac mini M4, macOS Sequoia
Python : 3.11+ (kiểm tra bằng python3 --version)
Path : ~/lead-enricher/
User : chạy dưới user hiện tại, không cần sudo
1.1 Tạo project
mkdir -p ~/lead-enricher/logs
cd ~/lead-enricher
python3 -m venv venv
source venv/bin/activate
1.2 Install dependencies
pip install \
httpx[asyncio]==0.27.0 \
beautifulsoup4==4.12.3 \
dnspython==2.6.1 \
python-dotenv==1.0.1 \
pandas==2.2.2 \
playwright==1.44.0 \
psutil==5.9.8 \
lxml==5.2.2
playwright install chromium
1.3 File .env
Tạo file ~/lead-enricher/.env:
JINA_API_KEY=jina_xxxxxxxxxxxxxx
Lấy API key tại: https://jina.ai/reader → Get API Key (free, không cần thẻ)
2. Cấu trúc project
~/lead-enricher/
├── .env
├── config.yaml
├── main.py
├── market.py
├── crawler.py
├── extractor.py
├── validator.py
├── csv_handler.py
├── logs/
├── input.csv ← user chuẩn bị
└── output.csv ← generated
3. File config.yaml
# Jina Reader settings
jina:
base_url: "https://r.jina.ai"
engine: "browser" # force Chrome render
proxy: "auto" # rotate residential IPs
timeout: 30 # seconds per request
# Crawl settings
crawl:
concurrency: 20 # số requests song song (tăng lên 50 nếu RAM > 8GB)
max_urls_per_domain: 5 # số URL patterns thử per domain
head_timeout: 5 # seconds cho HEAD check
retry_max: 2 # số lần retry khi fail
retry_delay: [5, 15] # seconds giữa các retry
batch_write: 50 # ghi CSV mỗi N rows
rate_limit_delay: 1.0 # seconds giữa requests đến cùng 1 domain
# Playwright fallback
playwright:
enabled: true
timeout: 15000 # ms
js_enabled: false # tắt JS cho safety, bật nếu cần
block_resources:
- "image"
- "media"
- "font"
- "stylesheet"
# CSV
csv:
input: "input.csv"
output: "output.csv"
encoding: "utf-8-sig" # đọc đúng tiếng Việt trong Excel
# Logging
log:
level: "INFO"
file: "logs/run.log"
4. File market.py
Detect thị trường từ country column hoặc TLD → chọn URL pattern set phù hợp.
# market.py
from __future__ import annotations
import re
from urllib.parse import urlparse
# Map TLD → market key
TLD_MAP: dict[str, str] = {
".vn": "vietnamese", ".com.vn": "vietnamese",
".kr": "korean", ".co.kr": "korean",
".jp": "japanese", ".co.jp": "japanese",
".cn": "chinese", ".com.cn": "chinese",
".de": "german",
".fr": "french",
".it": "italian",
".es": "spanish",
".nl": "dutch",
".com": "english", ".net": "english",
".io": "english", ".co": "english",
".org": "english",
}
# Map country code → market key
COUNTRY_MAP: dict[str, str] = {
"VN": "vietnamese",
"KR": "korean",
"JP": "japanese",
"CN": "chinese",
"DE": "german",
"FR": "french",
"IT": "italian",
"ES": "spanish",
"NL": "dutch",
"US": "english", "GB": "english",
"AU": "english", "SG": "english",
"CA": "english",
}
# URL patterns theo market
URL_PATTERNS: dict[str, list[str]] = {
"vietnamese": [
"/lien-he", "/lien-he-voi-chung-toi", "/lienhe",
"/ve-chung-toi", "/gioi-thieu", "/gioi-thieu-cong-ty",
"/thong-tin-lien-he", "/ban-lanh-dao",
"/contact", "/about",
],
"english": [
"/contact", "/contact-us", "/about", "/about-us",
"/team", "/our-team", "/leadership", "/staff",
"/pages/contact", # Shopify
"/get-in-touch", "/reach-us",
],
"korean": [
"/contact", "/about", "/company",
"/contact-us", "/inquiry", "/contactus",
"/about-us", "/company/about",
],
"japanese": [
"/contact", "/about", "/company",
"/inquiry", "/access",
"/corporate/contact", "/company/profile",
"/company/overview",
],
"chinese": [
"/contact", "/about", "/aboutus",
"/contactus", "/company", "/gywm",
],
"german": [
"/impressum", # bắt buộc theo luật DE → hit rate cao
"/kontakt", "/ueber-uns", "/uber-uns",
"/contact", "/unternehmen", "/ansprechpartner",
],
"french": [
"/mentions-legales", # bắt buộc theo luật FR → hit rate cao
"/contact", "/contactez-nous", "/nous-contacter",
"/a-propos", "/qui-sommes-nous", "/equipe",
],
"italian": [
"/contatti", "/chi-siamo", "/contact", "/about",
],
"spanish": [
"/contacto", "/contactanos", "/contact",
"/sobre-nosotros", "/quienes-somos", "/equipo",
],
"dutch": [
"/contact", "/over-ons", "/contacteer-ons", "/bedrijf",
],
}
# Language signals để detect từ homepage content
LANGUAGE_SIGNALS: dict[str, list[str]] = {
"vietnamese": ["liên hệ", "về chúng tôi", "giới thiệu", "công ty", "điện thoại"],
"korean": ["연락처", "회사소개", "문의하기", "소개"],
"japanese": ["お問い合わせ", "会社概要", "アクセス", "お問合せ"],
"chinese": ["联系我们", "关于我们", "公司简介", "联系方式"],
"german": ["impressum", "kontakt", "über uns", "datenschutz"],
"french": ["contactez", "à propos", "mentions légales", "équipe"],
"spanish": ["contáctenos", "sobre nosotros", "contacto", "quiénes somos"],
}
def detect_market(domain: str, country: str = "", homepage_text: str = "") -> str:
"""
Priority:
1. country column từ CSV (most reliable)
2. TLD của domain
3. Language signals từ homepage content (fallback .com)
"""
# Priority 1: country column
if country:
key = COUNTRY_MAP.get(country.strip().upper())
if key:
return key
# Priority 2: TLD
domain_lower = domain.lower().rstrip("/")
# Check 2-part TLD trước (.com.vn, .co.kr)
for tld, market in TLD_MAP.items():
if domain_lower.endswith(tld):
return market
# Priority 3: detect từ homepage content
if homepage_text:
text_lower = homepage_text.lower()
scores: dict[str, int] = {}
for lang, signals in LANGUAGE_SIGNALS.items():
scores[lang] = sum(1 for s in signals if s in text_lower)
best = max(scores, key=scores.get)
if scores[best] > 0:
return best
return "english"
def get_patterns(market: str) -> list[str]:
"""
Trả về pattern list cho market.
Non-English markets luôn append English patterns làm fallback.
Dedup giữ thứ tự.
"""
primary = URL_PATTERNS.get(market, URL_PATTERNS["english"])
if market == "english":
return primary
# Append English fallback, dedup
combined = primary + URL_PATTERNS["english"]
seen: set[str] = set()
result: list[str] = []
for p in combined:
if p not in seen:
seen.add(p)
result.append(p)
return result
5. File validator.py
DNS check và URL safety validation.
# validator.py
from __future__ import annotations
import ipaddress
import socket
import urllib.parse
import dns.resolver
# Private IP ranges để block SSRF
_PRIVATE_NETS = [
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("172.16.0.0/12"),
ipaddress.ip_network("192.168.0.0/16"),
ipaddress.ip_network("127.0.0.0/8"),
ipaddress.ip_network("169.254.0.0/16"),
ipaddress.ip_network("::1/128"),
]
# Known parked domain signals
PARKED_SIGNALS = [
"domain for sale", "buy this domain", "this domain is parked",
"domain is for sale", "make an offer", "godaddy.com/domains",
"namecheap.com", "dan.com", "sedo.com", "hugedomains.com",
"afternic.com", "undeveloped.com", "domain đang được rao bán",
]
def is_safe_url(url: str) -> bool:
"""Block private IPs (SSRF protection) và non-http schemes."""
try:
parsed = urllib.parse.urlparse(url)
if parsed.scheme not in ("http", "https"):
return False
hostname = parsed.hostname or ""
# Thử parse như IP
try:
ip = ipaddress.ip_address(hostname)
for net in _PRIVATE_NETS:
if ip in net:
return False
except ValueError:
pass # hostname bình thường
return True
except Exception:
return False
def check_dns(domain: str, timeout: float = 3.0) -> bool:
"""Kiểm tra domain có resolve được không."""
try:
socket.setdefaulttimeout(timeout)
socket.getaddrinfo(domain, 80)
return True
except (socket.gaierror, socket.timeout):
return False
def validate_email_domain(email: str) -> bool:
"""Kiểm tra domain của email có MX record không."""
try:
domain = email.split("@")[1].strip()
dns.resolver.resolve(domain, "MX", lifetime=5)
return True
except Exception:
return False
def is_parked(text: str) -> bool:
"""Detect parked/for-sale domain từ content."""
if not text or len(text.strip()) < 300:
return True
text_lower = text.lower()
return any(sig in text_lower for sig in PARKED_SIGNALS)
6. File extractor.py
Extract email, phone, social links từ Markdown/HTML. Handle Cloudflare obfuscation.
# extractor.py
from __future__ import annotations
import re
from bs4 import BeautifulSoup
# ── Regex patterns ──────────────────────────────────────────────────────────
EMAIL_RE = re.compile(
r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}",
re.IGNORECASE,
)
# Phone: hỗ trợ VN (+84, 0xxx) và international
PHONE_RE = re.compile(
r"(?:\+84|0084|0)(?:\s*[-.\s]?\s*)"
r"(?:3[2-9]|5[689]|7[06-9]|8[0-9]|9[0-9])"
r"(?:\s*[-.\s]?\s*\d){7}"
r"|(?:\+\d{1,3}[\s.-]?)?\(?\d{2,4}\)?[\s.-]?\d{3,4}[\s.-]?\d{3,4}",
re.IGNORECASE,
)
FACEBOOK_RE = re.compile(
r"(?:https?://)?(?:www\.)?facebook\.com/(?!sharer|share|plugins|photo)"
r"[a-zA-Z0-9._%+\-/]+",
re.IGNORECASE,
)
LINKEDIN_RE = re.compile(
r"(?:https?://)?(?:www\.)?linkedin\.com/(?:company|in)/[a-zA-Z0-9.\-_/]+",
re.IGNORECASE,
)
ZALO_RE = re.compile(
r"(?:https?://)?(?:zalo\.me|chat\.zalo\.me)/[a-zA-Z0-9.\-_/+]+",
re.IGNORECASE,
)
# Emails để loại bỏ (false positives)
EMAIL_BLACKLIST = {
"[email protected]", "[email protected]", "[email protected]",
"[email protected]", "[email protected]", "[email protected]",
"[email protected]", "[email protected]",
"[email protected]", "[email protected]",
}
# File extensions → không phải email
EMAIL_FAKE_DOMAINS = {
"png", "jpg", "jpeg", "gif", "svg", "webp",
"css", "js", "ts", "jsx", "tsx", "vue",
"woff", "woff2", "ttf", "eot",
}
def decode_cf_email(encoded: str) -> str:
"""Decode Cloudflare XOR email obfuscation."""
try:
b = bytes.fromhex(encoded)
key = b[0]
return "".join(chr(c ^ key) for c in b[1:])
except Exception:
return ""
def _clean_email(email: str) -> str | None:
"""Validate và clean email."""
email = email.lower().strip()
if email in EMAIL_BLACKLIST:
return None
# Check fake extension
domain_ext = email.split(".")[-1]
if domain_ext in EMAIL_FAKE_DOMAINS:
return None
# Phải có @ và ít nhất 1 dấu . sau @
if "@" not in email:
return None
local, domain = email.split("@", 1)
if "." not in domain or len(local) < 1:
return None
return email
def extract_from_html(html: str) -> dict:
"""Extract từ raw HTML — handle Cloudflare obfuscation."""
emails: set[str] = set()
phones: set[str] = set()
socials: dict[str, str] = {}
soup = BeautifulSoup(html, "lxml")
# Cloudflare obfuscated emails — data-cfemail attribute
for tag in soup.select("[data-cfemail]"):
decoded = decode_cf_email(tag.get("data-cfemail", ""))
if "@" in decoded:
cleaned = _clean_email(decoded)
if cleaned:
emails.add(cleaned)
# Cloudflare href encoded: /cdn-cgi/l/email-protection#<hex>
for a in soup.find_all("a", href=True):
href = str(a.get("href", ""))
if "/cdn-cgi/l/email-protection#" in href:
encoded = href.split("#")[-1]
decoded = decode_cf_email(encoded)
if "@" in decoded:
cleaned = _clean_email(decoded)
if cleaned:
emails.add(cleaned)
# mailto: links
for a in soup.find_all("a", href=True):
href = str(a.get("href", ""))
if href.startswith("mailto:"):
email = href.replace("mailto:", "").split("?")[0].strip()
cleaned = _clean_email(email)
if cleaned:
emails.add(cleaned)
# Plain text scan
text = soup.get_text(" ")
for m in EMAIL_RE.finditer(text):
cleaned = _clean_email(m.group())
if cleaned:
emails.add(cleaned)
# Phone
for m in PHONE_RE.finditer(text):
phones.add(m.group().strip())
# Social
for m in FACEBOOK_RE.finditer(html):
socials.setdefault("facebook", m.group())
for m in LINKEDIN_RE.finditer(html):
socials.setdefault("linkedin", m.group())
for m in ZALO_RE.finditer(html):
socials.setdefault("zalo", m.group())
return {
"emails": sorted(emails),
"phones": sorted(phones),
"socials": socials,
}
def extract_from_markdown(text: str) -> dict:
"""Extract từ Jina Markdown output."""
emails: set[str] = set()
phones: set[str] = set()
socials: dict[str, str] = {}
# Emails
for m in EMAIL_RE.finditer(text):
cleaned = _clean_email(m.group())
if cleaned:
emails.add(cleaned)
# Phones
for m in PHONE_RE.finditer(text):
phones.add(m.group().strip())
# Social links
for m in FACEBOOK_RE.finditer(text):
socials.setdefault("facebook", m.group())
for m in LINKEDIN_RE.finditer(text):
socials.setdefault("linkedin", m.group())
for m in ZALO_RE.finditer(text):
socials.setdefault("zalo", m.group())
return {
"emails": sorted(emails),
"phones": sorted(phones),
"socials": socials,
}
def merge_results(*results: dict) -> dict:
"""Merge kết quả từ nhiều pages."""
emails: set[str] = set()
phones: set[str] = set()
socials: dict[str, str] = {}
for r in results:
emails.update(r.get("emails", []))
phones.update(r.get("phones", []))
socials.update(r.get("socials", {}))
return {
"emails": sorted(emails),
"phones": sorted(phones),
"socials": socials,
}
7. File crawler.py
Core crawl logic — HEAD check, Jina fetch, Playwright fallback.
# crawler.py
from __future__ import annotations
import asyncio
import logging
from urllib.parse import urlparse
import httpx
import psutil
from playwright.async_api import async_playwright
from extractor import extract_from_markdown, extract_from_html, merge_results
from validator import is_safe_url, is_parked
logger = logging.getLogger(__name__)
# Status codes
STATUS_OK = "ok"
STATUS_DEAD = "dead"
STATUS_PARKED = "parked"
STATUS_BLOCKED = "blocked_403"
STATUS_NO_EMAIL = "no_email_found"
STATUS_REDIRECTED = "ok_redirected"
STATUS_SERVER_ERROR = "server_error"
STATUS_TIMEOUT = "timeout"
class Crawler:
def __init__(self, config: dict):
self.jina_key = config["jina_api_key"]
self.jina_base = config["jina"]["base_url"]
self.jina_timeout = config["jina"]["timeout"]
self.head_timeout = config["crawl"]["head_timeout"]
self.retry_max = config["crawl"]["retry_max"]
self.retry_delay = config["crawl"]["retry_delay"]
self.pw_enabled = config["playwright"]["enabled"]
self.pw_timeout = config["playwright"]["timeout"]
self.pw_js = config["playwright"]["js_enabled"]
self._client: httpx.AsyncClient | None = None
async def __aenter__(self):
self._client = httpx.AsyncClient(
follow_redirects=True,
timeout=self.head_timeout,
headers={"User-Agent": "Mozilla/5.0 (compatible; LeadEnricher/1.0)"},
)
return self
async def __aexit__(self, *_):
if self._client:
await self._client.aclose()
# ── HEAD CHECK ───────────────────────────────────────────────────────────
async def head_check(self, url: str) -> tuple[bool, int, str]:
"""
Returns: (exists, status_code, final_url)
exists=True nếu URL có thể crawl được
"""
if not is_safe_url(url):
return False, 0, url
try:
r = await self._client.head(url, timeout=self.head_timeout)
# Một số server không support HEAD → fallback GET stream
if r.status_code == 405:
r = await self._client.get(
url, timeout=self.head_timeout,
headers={"Range": "bytes=0-0"}, # chỉ lấy 1 byte
)
final_url = str(r.url)
code = r.status_code
if code == 200:
return True, code, final_url
elif code in (301, 302, 303, 307, 308):
return True, code, final_url
elif code == 403:
return True, code, final_url # tồn tại nhưng bị block, thử Jina
elif code == 429:
return False, code, final_url # rate limited
elif code == 404:
return False, code, final_url
else:
return False, code, final_url
except (httpx.ConnectError, httpx.TimeoutException, httpx.ConnectTimeout):
return False, 0, url
except Exception as e:
logger.debug(f"HEAD check error {url}: {e}")
return False, 0, url
# ── JINA FETCH ───────────────────────────────────────────────────────────
async def jina_fetch(self, url: str, retry: int = 0) -> str | None:
"""Fetch URL qua Jina Reader, trả về Markdown string."""
jina_url = f"{self.jina_base}/{url}"
headers = {
"Authorization": f"Bearer {self.jina_key}",
"X-Engine": "browser",
"X-Proxy": "auto",
"X-Return-Format": "markdown",
"X-Timeout": str(self.jina_timeout),
}
try:
async with httpx.AsyncClient(timeout=self.jina_timeout + 5) as client:
r = await client.get(jina_url, headers=headers)
if r.status_code == 200:
return r.text
elif r.status_code == 429 and retry < self.retry_max:
delay = self.retry_delay[min(retry, len(self.retry_delay) - 1)]
logger.info(f"Jina rate limited, waiting {delay}s...")
await asyncio.sleep(delay)
return await self.jina_fetch(url, retry + 1)
else:
logger.debug(f"Jina {r.status_code} for {url}")
return None
except asyncio.TimeoutError:
if retry < self.retry_max:
await asyncio.sleep(self.retry_delay[0])
return await self.jina_fetch(url, retry + 1)
return None
except Exception as e:
logger.debug(f"Jina fetch error {url}: {e}")
return None
# ── PLAYWRIGHT FALLBACK ──────────────────────────────────────────────────
async def playwright_fetch(self, url: str) -> str | None:
"""Fallback khi Jina fail. Chạy headless Chrome thật."""
# Safety: check CPU trước khi spin Chrome
if psutil.cpu_percent(interval=0.5) > 85:
logger.warning("CPU too high, skipping Playwright")
return None
try:
async with async_playwright() as pw:
browser = await pw.chromium.launch(
headless=True,
args=[
"--disable-extensions",
"--disable-plugins",
"--blink-settings=imagesEnabled=false",
"--disable-web-security",
"--no-first-run",
],
)
context = await browser.new_context(
java_script_enabled=self.pw_js,
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/125.0.0.0 Safari/537.36",
)
page = await context.new_page()
# Block heavy resources
async def block_resource(route):
if route.request.resource_type in ("image", "media", "font"):
await route.abort()
else:
await route.continue_()
await page.route("**/*", block_resource)
page.set_default_timeout(self.pw_timeout)
await page.goto(url, wait_until="domcontentloaded")
html = await page.content()
await browser.close()
return html
except Exception as e:
logger.debug(f"Playwright error {url}: {e}")
return None
# ── MAIN CRAWL LOGIC ─────────────────────────────────────────────────────
async def crawl_domain(
self,
domain: str,
patterns: list[str],
market: str,
) -> dict:
"""
Crawl 1 domain qua tất cả các steps.
Returns dict với email, phone, social, status.
"""
base_url = f"https://{domain}"
all_results = []
# ── STEP 2: Homepage quick scan ──────────────────────────────────────
logger.info(f"[{domain}] Scanning homepage...")
homepage_md = await self.jina_fetch(base_url)
if homepage_md:
if is_parked(homepage_md):
return self._result(STATUS_PARKED)
r = extract_from_markdown(homepage_md)
if r["emails"]:
logger.info(f"[{domain}] ✓ Email found on homepage")
return self._result(STATUS_OK, r, source=base_url)
all_results.append(r)
# ── STEP 3+4: Loop URL patterns ──────────────────────────────────────
for path in patterns:
url = f"{base_url}{path}"
# HEAD check
exists, code, final_url = await self.head_check(url)
if not exists:
if code == 404:
logger.debug(f"[{domain}] 404 {path}, skipping")
continue
elif code == 429:
logger.info(f"[{domain}] Rate limited on HEAD, waiting...")
await asyncio.sleep(30)
exists, code, final_url = await self.head_check(url)
if not exists:
continue
elif code == 0:
continue
else:
continue
logger.info(f"[{domain}] Fetching {path} (HEAD={code})")
# Jina fetch
content_md = await self.jina_fetch(final_url)
if content_md:
r = extract_from_markdown(content_md)
if r["emails"]:
logger.info(f"[{domain}] ✓ Email found at {path}")
merged = merge_results(*all_results, r)
return self._result(STATUS_OK, merged, source=final_url)
all_results.append(r)
elif self.pw_enabled and code == 403:
# Playwright fallback chỉ khi Jina fail + bị block
logger.info(f"[{domain}] Trying Playwright fallback for {path}")
html = await self.playwright_fetch(final_url)
if html:
r = extract_from_html(html)
if r["emails"]:
merged = merge_results(*all_results, r)
return self._result(STATUS_OK, merged, source=final_url)
all_results.append(r)
# Rate limit giữa các requests đến cùng domain
await asyncio.sleep(1.0)
# Hết patterns
merged = merge_results(*all_results)
if merged["phones"] or merged["socials"]:
# Có phone/social dù không có email → vẫn ghi
return self._result(STATUS_NO_EMAIL, merged)
return self._result(STATUS_NO_EMAIL)
def _result(self, status: str, data: dict | None = None, source: str = "") -> dict:
result = {
"status": status,
"email": "",
"email_2": "",
"phone": "",
"facebook": "",
"linkedin": "",
"zalo": "",
"source_url": source,
}
if data:
emails = data.get("emails", [])
if emails:
result["email"] = emails[0]
if len(emails) > 1:
result["email_2"] = emails[1]
phones = data.get("phones", [])
if phones:
result["phone"] = phones[0]
socials = data.get("socials", {})
result["facebook"] = socials.get("facebook", "")
result["linkedin"] = socials.get("linkedin", "")
result["zalo"] = socials.get("zalo", "")
return result
8. File csv_handler.py
Đọc/ghi CSV với resume support.
# csv_handler.py
from __future__ import annotations
import logging
from pathlib import Path
from datetime import datetime
import pandas as pd
logger = logging.getLogger(__name__)
OUTPUT_COLS = [
"company_name", "website", "country",
"email", "email_2", "phone",
"facebook", "linkedin", "zalo",
"status", "last_checked", "source_url", "market_detected",
]
def load(path: str) -> pd.DataFrame:
"""Load input CSV. Thêm output columns nếu chưa có."""
df = pd.read_csv(path, encoding="utf-8-sig", dtype=str)
df = df.fillna("")
# Normalize column names
df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns]
# Đảm bảo có cột website
if "website" not in df.columns:
raise ValueError("CSV phải có cột 'website'")
# Thêm output columns nếu chưa có
for col in OUTPUT_COLS:
if col not in df.columns:
df[col] = ""
logger.info(f"Loaded {len(df)} rows from {path}")
return df
def save(df: pd.DataFrame, path: str):
"""Ghi CSV với encoding utf-8-sig (đọc đúng tiếng Việt trong Excel)."""
df.to_csv(path, index=False, encoding="utf-8-sig")
logger.debug(f"Saved {len(df)} rows to {path}")
def get_pending(df: pd.DataFrame) -> pd.DataFrame:
"""Lấy rows chưa được xử lý (status rỗng)."""
pending = df[df["status"] == ""].copy()
logger.info(f"Pending: {len(pending)} / {len(df)} rows")
return pending
def update_row(df: pd.DataFrame, idx: int, result: dict, market: str):
"""Cập nhật 1 row với kết quả crawl."""
df.at[idx, "email"] = result.get("email", "")
df.at[idx, "email_2"] = result.get("email_2", "")
df.at[idx, "phone"] = result.get("phone", "")
df.at[idx, "facebook"] = result.get("facebook", "")
df.at[idx, "linkedin"] = result.get("linkedin", "")
df.at[idx, "zalo"] = result.get("zalo", "")
df.at[idx, "status"] = result.get("status", "")
df.at[idx, "source_url"] = result.get("source_url", "")
df.at[idx, "market_detected"]= market
df.at[idx, "last_checked"] = datetime.now().strftime("%Y-%m-%d %H:%M")
def print_stats(df: pd.DataFrame):
"""In thống kê kết quả."""
total = len(df)
done = df[df["status"] != ""].shape[0]
ok = df[df["status"] == "ok"].shape[0]
dead = df[df["status"] == "dead"].shape[0]
parked= df[df["status"] == "parked"].shape[0]
noem = df[df["status"] == "no_email_found"].shape[0]
print(f"""
┌─────────────────────────────────────┐
│ CRAWL STATS │
├─────────────────────────────────────┤
│ Total rows : {total:<5} │
│ Processed : {done:<5} │
│ Email found ✓ : {ok:<5} ({ok/max(done,1)*100:.0f}%) │
│ Dead domain : {dead:<5} │
│ Parked : {parked:<5} │
│ No email found : {noem:<5} │
└─────────────────────────────────────┘
""")
9. File main.py
Orchestrator — đọc CSV, chạy pipeline, ghi kết quả.
# main.py
from __future__ import annotations
import argparse
import asyncio
import logging
import os
import sys
from pathlib import Path
from urllib.parse import urlparse
import yaml
from dotenv import load_dotenv
from market import detect_market, get_patterns
from validator import check_dns
from crawler import Crawler, STATUS_DEAD
from csv_handler import load, save, get_pending, update_row, print_stats
# ── Setup ────────────────────────────────────────────────────────────────────
load_dotenv()
def setup_logging(log_file: str, level: str = "INFO"):
Path(log_file).parent.mkdir(exist_ok=True)
logging.basicConfig(
level=getattr(logging, level.upper()),
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler(log_file, encoding="utf-8"),
],
)
def load_config(path: str = "config.yaml") -> dict:
with open(path) as f:
cfg = yaml.safe_load(f)
cfg["jina_api_key"] = os.getenv("JINA_API_KEY", "")
if not cfg["jina_api_key"]:
raise ValueError("JINA_API_KEY chưa được set trong .env")
return cfg
def normalize_domain(raw: str) -> str:
"""Lấy domain thuần từ URL bất kỳ."""
raw = raw.strip()
if not raw.startswith("http"):
raw = "https://" + raw
return urlparse(raw).netloc.lstrip("www.")
# ── Core worker ──────────────────────────────────────────────────────────────
async def process_row(
idx: int,
row: dict,
crawler: Crawler,
semaphore: asyncio.Semaphore,
dry_run: bool = False,
) -> tuple[int, dict, str]:
"""Xử lý 1 domain. Returns (idx, result, market)."""
async with semaphore:
domain = normalize_domain(str(row.get("website", "")))
country = str(row.get("country", ""))
logger = logging.getLogger(__name__)
if not domain:
return idx, {"status": "invalid_url"}, "unknown"
logger.info(f"Processing [{idx}] {domain}")
# ── STEP 1: DNS check ────────────────────────────────────────────────
if not check_dns(domain):
logger.info(f"[{domain}] DNS failed → DEAD")
return idx, {"status": STATUS_DEAD}, "unknown"
# ── STEP 0: Market detection ─────────────────────────────────────────
market = detect_market(domain, country)
patterns = get_patterns(market)
logger.info(f"[{domain}] Market: {market}, {len(patterns)} patterns")
if dry_run:
return idx, {"status": "dry_run_ok"}, market
# ── STEP 2-5: Crawl ──────────────────────────────────────────────────
result = await crawler.crawl_domain(domain, patterns, market)
return idx, result, market
# ── Main ─────────────────────────────────────────────────────────────────────
async def main():
parser = argparse.ArgumentParser(description="Lead Enricher")
parser.add_argument("--input", default="input.csv", help="Input CSV path")
parser.add_argument("--output", default="output.csv", help="Output CSV path")
parser.add_argument("--config", default="config.yaml")
parser.add_argument("--limit", type=int, default=0, help="Giới hạn số rows (0=tất cả)")
parser.add_argument("--market", default="", help="Filter theo market cụ thể")
parser.add_argument("--dry-run", action="store_true", help="Không ghi output")
args = parser.parse_args()
cfg = load_config(args.config)
setup_logging(cfg["log"]["file"], cfg["log"]["level"])
logger = logging.getLogger(__name__)
# Load CSV
import shutil
# Copy input → output nếu output chưa tồn tại
if not Path(args.output).exists():
shutil.copy2(args.input, args.output)
df = load(args.output)
pending = get_pending(df)
if args.market:
pending = pending[pending["country"].str.upper() == args.market.upper()]
if args.limit > 0:
pending = pending.head(args.limit)
logger.info(f"Starting: {len(pending)} rows to process")
if pending.empty:
logger.info("No pending rows. All done!")
print_stats(df)
return
concurrency = cfg["crawl"]["concurrency"]
batch_write = cfg["crawl"]["batch_write"]
semaphore = asyncio.Semaphore(concurrency)
async with Crawler(cfg) as crawler:
tasks = [
process_row(idx, row.to_dict(), crawler, semaphore, args.dry_run)
for idx, row in pending.iterrows()
]
done_count = 0
for coro in asyncio.as_completed(tasks):
idx, result, market = await coro
update_row(df, idx, result, market)
done_count += 1
status = result.get("status", "")
email = result.get("email", "")
domain = normalize_domain(str(df.at[idx, "website"]))
icon = "✓" if email else "·"
logger.info(f"[{icon}] {domain} → {status} {email}")
# Batch write
if done_count % batch_write == 0 and not args.dry_run:
save(df, args.output)
logger.info(f"Progress: {done_count}/{len(pending)} saved")
# Final save
if not args.dry_run:
save(df, args.output)
print_stats(df)
logger.info(f"Done. Output: {args.output}")
if __name__ == "__main__":
asyncio.run(main())
10. Input CSV format
File input.csv mà user chuẩn bị — tối thiểu cần cột website:
company_name,website,country
Công ty ABC,abc.vn,VN
XYZ Corporation,https://xyz.com,US
Korea Co Ltd,korea.co.kr,KR
Firma GmbH,firma.de,DE
Unknown Market,somesite.com,
Lưu ý:
- country optional nhưng recommended để market detection chính xác
- website có thể là domain thuần (abc.vn) hoặc full URL (https://abc.vn)
- Encoding: UTF-8 hoặc UTF-8-BOM đều OK
11. Các lệnh chạy
cd ~/lead-enricher
source venv/bin/activate
# Test Jina API key
curl -H "Authorization: Bearer $(grep JINA_API_KEY .env | cut -d= -f2)" \
https://r.jina.ai/https://example.com | head -20
# Dry run 50 rows đầu (không ghi file)
python main.py --input input.csv --dry-run --limit 50
# Chạy thật 200 rows đầu
python main.py --input input.csv --output output.csv --limit 200
# Chạy full batch
python main.py --input input.csv --output output.csv
# Resume (tự skip rows đã có status)
python main.py --input output.csv --output output.csv
# Chỉ chạy market VN
python main.py --input input.csv --output output_vn.csv --market VN
# Background với log
nohup python main.py --input input.csv --output output.csv \
> logs/run_$(date +%y%m%d_%H%M).log 2>&1 &
# Xem progress
tail -f logs/run_*.log
12. Checklist triển khai
□ 1. python3 --version → >= 3.11
□ 2. pip install xong không lỗi
□ 3. playwright install chromium xong không lỗi
□ 4. .env có JINA_API_KEY
□ 5. Test curl Jina → trả về Markdown
□ 6. input.csv có cột 'website'
□ 7. python main.py --dry-run --limit 10 → không crash
□ 8. python main.py --limit 50 → output.csv có data
□ 9. Mở output.csv kiểm tra email quality
□ 10. Chạy full batch
13. Troubleshooting
| Lỗi | Nguyên nhân | Fix |
|---|---|---|
JINA_API_KEY chưa được set |
.env không đúng path | Chạy cat .env kiểm tra |
playwright install lỗi |
Thiếu system deps | brew install chromium |
ModuleNotFoundError |
Chưa activate venv | source venv/bin/activate |
| Hit rate < 20% | Jina API key hết token | Kiểm tra jina.ai/dashboard |
Toàn bộ dead |
DNS timeout quá thấp | Tăng head_timeout lên 10 |
| Output CSV lỗi encoding | Mở bằng Excel: Data → From Text/CSV → UTF-8 | |
CPU too high warning |
Playwright chạy nhiều | Giảm concurrency xuống 10 |
14. Expected output
Sau khi chạy xong 5,000 domains:
┌─────────────────────────────────────┐
│ CRAWL STATS │
├─────────────────────────────────────┤
│ Total rows : 5000 │
│ Processed : 5000 │
│ Email found ✓ : ~2500 (50%) │
│ Dead domain : ~850 │
│ Parked : ~250 │
│ No email found : ~1250 │
└─────────────────────────────────────┘
Thời gian ước tính: ~1.5–2 giờ với concurrency=20 trên Mac mini M4.
Spec version: 1.0 — 2026-05-28
Runtime: Mac mini M4, macOS, Python 3.11+