| import re |
| import dns.resolver |
| import smtplib |
| import requests |
| import threading |
| import queue |
| import dns.reversename |
|
|
| CACHE_TTL = 600 |
|
|
| |
| resolver = dns.resolver.Resolver(configure=False) |
| resolver.nameservers = ['8.8.8.8'] |
| resolver.cache = dns.resolver.Cache() |
|
|
|
|
| |
| |
| |
| |
|
|
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
|
|
| def is_valid_email(email): |
| |
| pattern = r''' |
| ^ # Start of string |
| (?!.*[._%+-]{2}) # No consecutive special characters |
| [a-zA-Z0-9._%+-]{1,64} # Local part: allowed characters and length limit |
| (?<![._%+-]) # No special characters at the end of local part |
| @ # "@" symbol |
| [a-zA-Z0-9.-]+ # Domain part: allowed characters |
| (?<![.-]) # No special characters at the end of domain |
| \.[a-zA-Z]{2,}$ # Top-level domain with minimum 2 characters |
| ''' |
| |
| |
| return re.match(pattern, email, re.VERBOSE) is not None |
|
|
| |
| |
|
|
| def query_dns(record_type, domain): |
| try: |
| |
| record_name = domain if record_type == 'MX' else f'{domain}.' |
| cache_result = resolver.cache.get((record_name, record_type)) |
| if cache_result is not None and (dns.resolver.mtime() - cache_result.time) < CACHE_TTL: |
| return True |
|
|
| |
| resolver.timeout = 2 |
| resolver.lifetime = 2 |
| resolver.resolve(record_name, record_type) |
| return True |
| except dns.resolver.NXDOMAIN: |
| |
| return False |
| except dns.resolver.NoAnswer: |
| |
| return False |
| except dns.resolver.Timeout: |
| |
| return False |
| except: |
| |
| return False |
|
|
|
|
| def has_valid_mx_record(domain): |
| |
| def query_mx(results_queue): |
| results_queue.put(query_dns('MX', domain)) |
|
|
| def query_a(results_queue): |
| results_queue.put(query_dns('A', domain)) |
|
|
| |
| mx_queue = queue.Queue() |
| a_queue = queue.Queue() |
| mx_thread = threading.Thread(target=query_mx, args=(mx_queue,)) |
| a_thread = threading.Thread(target=query_a, args=(a_queue,)) |
| mx_thread.start() |
| a_thread.start() |
|
|
| |
| mx_thread.join() |
| a_thread.join() |
| mx_result = mx_queue.get() |
| a_result = a_queue.get() |
|
|
| return mx_result or a_result |
|
|
|
|
| |
| def verify_email(email): |
| |
| domain = email.split('@')[1] |
|
|
| |
| try: |
| mx_records = dns.resolver.resolve(domain, 'MX') |
| except dns.resolver.NoAnswer: |
| return False |
|
|
| |
| for mx in mx_records: |
| try: |
| smtp_server = smtplib.SMTP(str(mx.exchange)) |
| smtp_server.ehlo() |
| smtp_server.mail('') |
| code, message = smtp_server.rcpt(str(email)) |
| smtp_server.quit() |
| if code == 250: |
| return True |
| except: |
| pass |
|
|
| return False |
|
|
|
|
| |
| def is_disposable(domain): |
| blacklists = [ |
| 'https://raw.githubusercontent.com/andreis/disposable-email-domains/master/domains.txt', |
| 'https://raw.githubusercontent.com/wesbos/burner-email-providers/master/emails.txt' |
| ] |
|
|
| for blacklist_url in blacklists: |
| try: |
| blacklist = set(requests.get(blacklist_url).text.strip().split('\n')) |
| if domain in blacklist: |
| return True |
| except Exception as e: |
| print(f'Error loading blacklist {blacklist_url}: {e}') |
| return False |
|
|