Concurrent Video Fetching with Go Goroutines Across 8 Global Regions
TrendVidStream fetches trending video data from 8 regions: AE, FI, DK, CZ, BE, CH, US, GB. Go's goroutines and channels make concurrent fetching clean and controllable. Here's the production-ready implementation.
Worker Pool Architecture
package fetcher
import (
"context"
"log"
"sync"
"time"
)
type FetchJob struct {
Region string
APIKey string
}
type FetchResult struct {
Region string
Videos []Video
Error error
Took time.Duration
}
type Pool struct {
workers int
jobs chan FetchJob
results chan FetchResult
client *YTClient
wg sync.WaitGroup
}
func NewPool(workers int, client *YTClient) *Pool {
return &Pool{
workers: workers,
jobs: make(chan FetchJob, 20),
results: make(chan FetchResult, 20),
client: client,
}
}
func (p *Pool) Start(ctx context.Context) {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go p.worker(ctx)
}
go func() {
p.wg.Wait()
close(p.results)
}()
}
func (p *Pool) worker(ctx context.Context) {
defer p.wg.Done()
for {
select {
case <-ctx.Done():
return
case job, ok := <-p.jobs:
if !ok {
return
}
start := time.Now()
videos, err := p.client.FetchTrending(ctx, job.Region, job.APIKey, 50)
p.results <- FetchResult{
Region: job.Region, Videos: videos,
Error: err, Took: time.Since(start),
}
}
}
}
func (p *Pool) Submit(job FetchJob) { p.jobs <- job }
func (p *Pool) Close() { close(p.jobs) }
func (p *Pool) Results() <-chan FetchResult { return p.results }
YouTube Client for TrendVidStream Regions
package fetcher
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"time"
"golang.org/x/time/rate"
)
// Arabic (AE) titles require no special HTTP handling β YouTube returns UTF-8.
// The direction (RTL) is a display concern, not a fetch concern.
type YTClient struct {
http *http.Client
limiter *rate.Limiter
}
func NewYTClient() *YTClient {
return &YTClient{
http: &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
MaxIdleConnsPerHost: 10,
},
},
limiter: rate.NewLimiter(rate.Limit(4), 8), // 4 req/s, burst 8
}
}
func (c *YTClient) FetchTrending(ctx context.Context, region, apiKey string, n int) ([]Video, error) {
if err := c.limiter.Wait(ctx); err != nil {
return nil, fmt.Errorf("rate limit: %w", err)
}
q := url.Values{
"part": {"snippet,statistics"},
"chart": {"mostPopular"},
"regionCode": {region},
"maxResults": {fmt.Sprintf("%d", n)},
"key": {apiKey},
}
req, _ := http.NewRequestWithContext(
ctx, http.MethodGet,
"https://www.googleapis.com/youtube/v3/videos?"+q.Encode(),
nil,
)
resp, err := c.http.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusOK:
// good
case http.StatusTooManyRequests:
// Back off and retry once
select {
case <-time.After(10 * time.Second):
case <-ctx.Done():
return nil, ctx.Err()
}
return c.FetchTrending(ctx, region, apiKey, n)
default:
return nil, fmt.Errorf("youtube API HTTP %d for region %s", resp.StatusCode, region)
}
var payload struct {
Items []struct {
ID string `json:"id"`
Snippet struct {
Title string `json:"title"` // UTF-8, handles Arabic natively
ChannelTitle string `json:"channelTitle"`
DefaultAudioLang string `json:"defaultAudioLanguage"`
} `json:"snippet"`
Statistics struct {
ViewCount string `json:"viewCount"`
} `json:"statistics"`
} `json:"items"`
}
if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil {
return nil, err
}
vids := make([]Video, 0, len(payload.Items))
for _, item := range payload.Items {
vids = append(vids, Video{
VideoID: item.ID,
Title: item.Snippet.Title,
ChannelTitle: item.Snippet.ChannelTitle,
Language: item.Snippet.DefaultAudioLang,
Region: region,
})
}
return vids, nil
}
Running the Pipeline
var tvStreamRegions = []string{"AE", "FI", "DK", "CZ", "BE", "CH", "US", "GB"}
func RunPipeline(ctx context.Context, apiKey string) ([]Video, error) {
client := NewYTClient()
pool := NewPool(3, client) // 3 workers for 8 regions
ctx, cancel := context.WithTimeout(ctx, 3*time.Minute)
defer cancel()
pool.Start(ctx)
go func() {
for _, r := range tvStreamRegions {
pool.Submit(FetchJob{Region: r, APIKey: apiKey})
}
pool.Close()
}()
var all []Video
var errCount int
for result := range pool.Results() {
if result.Error != nil {
log.Printf("[%s] error: %v (took %v)", result.Region, result.Error, result.Took)
errCount++
continue
}
log.Printf("[%s] %d videos in %v", result.Region, len(result.Videos), result.Took)
all = append(all, result.Videos...)
}
if len(all) == 0 {
return nil, fmt.Errorf("all regions failed (%d errors)", errCount)
}
return all, nil
}
Performance for TrendVidStream
| Workers | Time (8 regions) | Notes |
|---|---|---|
| 1 | ~16s | Sequential |
| 3 | ~6s | Good balance |
| 8 | ~3s | Full parallel |
Three workers handles TrendVidStream's 8 regions in about 6 seconds β fast enough for cron-based fetching while staying well under YouTube's rate limits. The rate limiter (4 req/s) ensures we never burst past API limits regardless of the worker count.
This article is part of the Building TrendVidStream series. Check out TrendVidStream to see these techniques in action.













