Loading snippets...
API wrapper berbasis Flask untuk mengambil data dari situs DonghuaStream, termasuk homepage (latest episode & slider), detail donghua (sinopsis, genre, episode list), streaming link, search, dan jadwal rilis.
"""
【 DonghuaStream API Scraper 】
Creator : ShanMolvyr
Base : https://donghuastream.org
Desc : untuk mengambil data dari situs DonghuaStream, termasuk homepage (latest episode & slider), detail anime (sinopsis, genre, episode list), streaming link, search, dan jadwal rilis.
Channel : https://whatsapp.com/channel/0029VbB4Kw8EFeXfeExaXc3Q
Note : Kembangin sendiri
"""
import requests
import base64
import re
from bs4 import BeautifulSoup
from flask import Flask, jsonify, request
from flask_cors import CORS
app = Flask(__name__)
CORS(app)
BASE_URL = "https://donghuastream.org"
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
class DonghuaScraper:
def get_home(self):
"""Scrape homepage - latest episodes & popular series"""
response = requests.get(BASE_URL, headers=HEADERS)
soup = BeautifulSoup(response.text, 'html.parser')
# Latest episodes from "Hot Series Update" and "Latest Release"
latest_episodes = []
for item in soup.select('.listupd .bsx'):
link = item.select_one('a')
title = item.select_one('.tt h2')
episode = item.select_one('.epx')
img = item.select_one('img')
if link:
latest_episodes.append({
'title': title.text.strip() if title else 'Unknown',
'episode': episode.text.strip() if episode else 'Episode',
'url': link['href'],
'thumbnail': img['src'] if img else None
})
# Slider items (featured)
slider = []
for slide in soup.select('.slide-item'):
title_el = slide.select_one('.ellipsis a')
summary_el = slide.select_one('.excerpt .story p')
img_el = slide.select_one('.poster img')
link_el = slide.select_one('.ellipsis a')
if title_el:
slider.append({
'title': title_el.text.strip(),
'url': link_el['href'] if link_el else None,
'summary': summary_el.text.strip()[:200] + '...' if summary_el else '',
'thumbnail': img_el['src'] if img_el else None
})
return {
'slider': slider,
'latest_episodes': latest_episodes[:20]
}
def get_detail(self, slug):
"""Get series detail and episode list"""
url = f"{BASE_URL}/anime/{slug}/"
response = requests.get(url, headers=HEADERS)
soup = BeautifulSoup(response.text, 'html.parser')
# Basic info
title = soup.select_one('.infox h1')
if not title:
return {'error': 'Series not found'}
# Extract metadata from .spe spans
status = None
total_episodes = None
for span in soup.select('.spe span'):
text = span.get_text(strip=True)
if 'Status:' in text:
status = text.replace('Status:', '').strip()
if 'Episodes:' in text:
total_episodes = text.replace('Episodes:', '').strip()
# Genre
genres = [a.text.strip() for a in soup.select('.genxed a')]
# Synopsis
synopsis_el = soup.select_one('.entry-content p')
synopsis = synopsis_el.text.strip() if synopsis_el else ''
# Poster
poster = soup.select_one('.thumb img')
poster_url = poster['src'] if poster else None
# Rating
rating_el = soup.select_one('.rating strong')
rating = rating_el.text.replace('Rating', '').strip() if rating_el else None
# Episode list
episodes = []
for ep in soup.select('.eplister ul li a'):
num_el = ep.select_one('.epl-num')
title_el = ep.select_one('.epl-title')
date_el = ep.select_one('.epl-date')
# Clean episode number (remove extra text like "(Good Sub)(4K)")
episode_num = num_el.text.strip() if num_el else '0'
episode_num = re.sub(r'\([^)]*\)', '', episode_num).strip()
episode_num = re.sub(r'Preview', '', episode_num).strip()
episodes.append({
'episode': episode_num,
'title': title_el.text.strip() if title_el else '',
'url': ep['href'],
'release_date': date_el.text.strip() if date_el else None
})
# Sort episodes by episode number
episodes.sort(key=lambda x: self._extract_episode_num(x['episode']))
return {
'title': title.text.strip(),
'status': status,
'total_episodes': total_episodes,
'genres': genres,
'synopsis': synopsis,
'poster': poster_url,
'rating': rating,
'episodes': episodes
}
def _extract_episode_num(self, episode_str):
"""Extract numeric episode number from string"""
match = re.search(r'(\d+)', str(episode_str))
return int(match.group(1)) if match else 0
def get_watch(self, episode_slug):
"""Get video URL and navigation for specific episode"""
url = f"{BASE_URL}/{episode_slug}"
response = requests.get(url, headers=HEADERS)
soup = BeautifulSoup(response.text, 'html.parser')
# Episode title
title = soup.select_one('h1.entry-title')
# Video URL from iframe
iframe = soup.select_one('#embed_holder iframe')
video_url = None
if iframe:
video_url = iframe.get('src') or iframe.get('data-src')
if video_url and video_url.startswith('//'):
video_url = 'https:' + video_url
# Alternative: iframe from .player-embed
if not video_url:
player_embed = soup.select_one('.player-embed iframe')
if player_embed:
video_url = player_embed.get('src') or player_embed.get('data-src')
if video_url and video_url.startswith('//'):
video_url = 'https:' + video_url
# Mirror servers (decoded from base64)
servers = []
for option in soup.select('.mirror option'):
value = option.get('value')
name = option.text.strip()
if value and value.strip() and name not in ['', 'Select Video Server']:
try:
# Value is often base64 encoded iframe
decoded = base64.b64decode(value).decode('utf-8')
# Extract src from iframe
src_match = re.search(r'src=["\']([^"\']+)["\']', decoded)
if src_match:
server_url = src_match.group(1)
if server_url.startswith('//'):
server_url = 'https:' + server_url
servers.append({'name': name, 'url': server_url})
else:
servers.append({'name': name, 'url': value})
except:
servers.append({'name': name, 'url': value})
# Navigation (prev/next episode)
nav_links = soup.select('.naveps .nvs a')
prev_url = None
next_url = None
for link in nav_links:
href = link['href']
text = link.get_text(strip=True).lower()
if 'prev' in text or '‹' in text:
prev_url = href
elif 'next' in text or '›' in text:
next_url = href
# Series info from sidebar
series_title = soup.select_one('#singlepisode .det h3 a')
series_url = series_title['href'] if series_title else None
series_name = series_title.text.strip() if series_title else None
return {
'title': title.text.strip() if title else episode_slug,
'video_url': video_url,
'servers': servers,
'prev_episode': prev_url,
'next_episode': next_url,
'series': {
'title': series_name,
'url': series_url
}
}
def search(self, query):
"""Search for series or episodes"""
url = f"{BASE_URL}/?s={query}"
response = requests.get(url, headers=HEADERS)
soup = BeautifulSoup(response.text, 'html.parser')
results = []
for item in soup.select('.listupd .bsx'):
link = item.select_one('a')
title = item.select_one('.tt h2')
img = item.select_one('img')
episode = item.select_one('.epx')
if link and title:
results.append({
'title': title.text.strip(),
'url': link['href'],
'thumbnail': img['src'] if img else None,
'episode': episode.text.strip() if episode else None
})
return {'query': query, 'results': results}
def get_schedule(self):
"""Get release schedule"""
url = f"{BASE_URL}/schedule/"
response = requests.get(url, headers=HEADERS)
soup = BeautifulSoup(response.text, 'html.parser')
schedule = []
# Find schedule table or list - adjust selector based on actual structure
for item in soup.select('.schedule-item, .episode-list li'):
day = item.select_one('.day')
title = item.select_one('.title a')
if title:
schedule.append({
'day': day.text.strip() if day else None,
'title': title.text.strip(),
'url': title['href']
})
return schedule
# Flask API Endpoints
scraper = DonghuaScraper()
@app.route('/')
def index():
return jsonify({
'api': 'DonghuaStream API Wrapper',
'endpoints': {
'GET /api/home': 'Get homepage (latest episodes, slider)',
'GET /api/detail/<slug>': 'Get series details and episode list',
'GET /api/watch/<episode_slug>': 'Get video URL and navigation',
'GET /api/search?q=<query>': 'Search for series or episodes',
'GET /api/schedule': 'Get release schedule'
},
'example_slug': 'renegade-immortal1',
'example_episode': 'renegade-immortal-episode-1-multiple-subtitles'
})
@app.route('/api/home')
def api_home():
return jsonify(scraper.get_home())
@app.route('/api/detail/<slug>')
def api_detail(slug):
return jsonify(scraper.get_detail(slug))
@app.route('/api/watch/<path:episode_slug>')
def api_watch(episode_slug):
return jsonify(scraper.get_watch(episode_slug))
@app.route('/api/search')
def api_search():
query = request.args.get('q', '')
if not query:
return jsonify({'error': 'Query parameter q required'}), 400
return jsonify(scraper.search(query))
@app.route('/api/schedule')
def api_schedule():
return jsonify(scraper.get_schedule())
if __name__ == '__main__':
print("=" * 50)
print("🚀 DonghuaStream API Wrapper")
print("=" * 50)
print("📋 Available endpoints:")
print(" GET /api/home")
print(" GET /api/detail/<slug>")
print(" GET /api/watch/<episode_slug>")
print(" GET /api/search?q=<query>")
print(" GET /api/schedule")
print("")
print("📝 Examples:")
print(" http://localhost:5000/api/detail/renegade-immortal1")
print(" http://localhost:5000/api/watch/renegade-immortal-episode-1-multiple-subtitles")
print(" http://localhost:5000/api/search?q=renegade")
print("=" * 50)
app.run(debug=True, host='0.0.0.0', port=5000)