package main

import (
	"context"
	"encoding/json"
	"fmt"
	"io"
	"log"
	"net/http"
	"net/url"
	"os"
	"strings"
	"time"
)

var (
	REDIS_API_BASE = os.Getenv("REDIS_API_BASE")
)

func init() {
	if REDIS_API_BASE == "" {
		REDIS_API_BASE = "http://3.108.55.133:8181"
	}
}

// RedisValue represents the structure of a value from Redis
type RedisValue struct {
	RecordID  string                 `json:"record_id"`
	SystemID  string                 `json:"system_id"`
	AssetID   string                 `json:"asset_id"`
	Domain    string                 `json:"domain"`
	Protocol  string                 `json:"protocol"`
	OccurredAt string                `json:"occurred_at"`
	Values    map[string]interface{} `json:"values"`
	Quality   string                 `json:"quality"`
	Alarm     string                 `json:"alarm"`
}

// IngestionStats tracks worker statistics
type IngestionStats struct {
	RedisKeysScanned int
	AssetsMatched    int
	AssetsUnmatched  int
	RowsInserted     int
	TagsSkipped      int
	UnmatchedSerials []string
	UnmappedTags     []string
	StartTime        time.Time
	EndTime          time.Time
}

// getRedisKeys fetches all keys matching the pattern from Redis REST API with cursor pagination
func getRedisKeys(pattern string) ([]string, error) {
	encodedPattern := url.QueryEscape(pattern)
	keys := []string{}
	cursor := 0

	for {
		u := fmt.Sprintf("%s/api/redis/keys?pattern=%s&cursor=%d", REDIS_API_BASE, encodedPattern, cursor)

		resp, err := http.Get(u)
		if err != nil {
			return nil, fmt.Errorf("failed to fetch Redis keys: %w", err)
		}
		defer resp.Body.Close()

		body, _ := io.ReadAll(resp.Body)
		var result map[string]interface{}
		if err := json.Unmarshal(body, &result); err != nil {
			return nil, fmt.Errorf("failed to parse Redis keys response: %w", err)
		}

		if k, ok := result["keys"].([]interface{}); ok {
			for _, keyObj := range k {
				// Each key is an object with "key", "type", "ttl" fields
				if keyMap, ok := keyObj.(map[string]interface{}); ok {
					if keyStr, ok := keyMap["key"].(string); ok {
						keys = append(keys, keyStr)
					}
				}
			}
		}

		// Check for next cursor
		if nextCursor, ok := result["cursor"].(float64); ok && nextCursor == 0 {
			break // No more pages
		} else if ok {
			cursor = int(nextCursor)
		} else {
			break
		}
	}

	return keys, nil
}

// getRedisValue fetches a value from Redis by key
func getRedisValue(key string) (*RedisValue, error) {
	encodedKey := url.QueryEscape(key)
	u := fmt.Sprintf("%s/api/redis/key?k=%s", REDIS_API_BASE, encodedKey)

	resp, err := http.Get(u)
	if err != nil {
		return nil, fmt.Errorf("failed to fetch Redis value: %w", err)
	}
	defer resp.Body.Close()

	body, _ := io.ReadAll(resp.Body)
	var result map[string]interface{}
	if err := json.Unmarshal(body, &result); err != nil {
		return nil, fmt.Errorf("failed to parse Redis value response: %w", err)
	}

	valueStr, ok := result["value"].(string)
	if !ok {
		return nil, fmt.Errorf("invalid value format")
	}

	var rv RedisValue
	if err := json.Unmarshal([]byte(valueStr), &rv); err != nil {
		return nil, fmt.Errorf("failed to parse Redis value JSON: %w", err)
	}

	return &rv, nil
}

// normalizeSerialNumber normalizes a serial number for fuzzy matching
func normalizeSerialNumber(s string) string {
	s = strings.ToUpper(s)
	s = strings.ReplaceAll(s, " ", "")
	s = strings.ReplaceAll(s, "-", "_")
	return s
}

// processRedisToMonitoring is the main ingestion worker
func processRedisToMonitoring(db *DB) IngestionStats {
	stats := IngestionStats{
		StartTime:        time.Now(),
		UnmatchedSerials: []string{},
		UnmappedTags:     []string{},
	}
	defer func() {
		stats.EndTime = time.Now()
	}()

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
	defer cancel()

	// Step 1: Get all Redis keys
	keys, err := getRedisKeys("ic3:latest:*")
	if err != nil {
		log.Printf("ERROR: Failed to get Redis keys: %v", err)
		return stats
	}
	stats.RedisKeysScanned = len(keys)
	log.Printf("[Redis Ingestion] Found %d Redis keys", len(keys))

	if len(keys) == 0 {
		log.Println("[Redis Ingestion] No keys found, exiting")
		return stats
	}

	type monitoringRow struct {
		assetID   int
		tagCode   string
		paramID   int
		valueText string
		valueNum  *float64
		unit      string
		recordedAt string
	}

	var allRows []monitoringRow

	// Step 2-5: Process each key
	for _, key := range keys {
		rv, err := getRedisValue(key)
		if err != nil {
			log.Printf("DEBUG: Failed to parse value for key %s: %v", key, err)
			continue
		}

		// Step 3: Match to ic3_asset_master by serialnumber (exact then fuzzy)
		var assetID int
		normalizedRedisID := normalizeSerialNumber(rv.AssetID)

		// Try exact match first
		err = db.pool.QueryRow(ctx,
			"SELECT asset_id FROM ic3_asset_master WHERE serialnumber = $1 LIMIT 1",
			rv.AssetID).Scan(&assetID)

		// Try fuzzy match if exact fails
		if err != nil {
			err = db.pool.QueryRow(ctx, `
				SELECT asset_id FROM ic3_asset_master
				WHERE UPPER(REPLACE(REPLACE(serialnumber, ' ', ''), '-', '_')) = $1
				LIMIT 1
			`, normalizedRedisID).Scan(&assetID)
		}

		if err != nil {
			stats.AssetsUnmatched++
			stats.UnmatchedSerials = append(stats.UnmatchedSerials, rv.AssetID)
			log.Printf("DEBUG: Unmatched serial: %s", rv.AssetID)
			continue
		}
		stats.AssetsMatched++

		// Step 4: Match each parameter value to tag_map
		for paramTagCode, paramValue := range rv.Values {
			var tagCode string
			var paramID int
			var unit string

			err := db.pool.QueryRow(ctx, `
				SELECT atm.tag_code, atm.param_id, pm.unit
				FROM ic3_asset_tag_map atm
				INNER JOIN ic3_parameter_master pm ON pm.param_id = atm.param_id
				WHERE atm.asset_id = $1 AND atm.tag_code LIKE $2
				LIMIT 1
			`, assetID, fmt.Sprintf("%%.%s", paramTagCode)).Scan(&tagCode, &paramID, &unit)

			if err != nil {
				stats.TagsSkipped++
				stats.UnmappedTags = append(stats.UnmappedTags, fmt.Sprintf("%s/%s", rv.AssetID, paramTagCode))
				log.Printf("DEBUG: Unmapped tag: %s / %s", rv.AssetID, paramTagCode)
				continue
			}

			// Step 5: Prepare row for insertion
			var valueNum *float64
			valueStr := fmt.Sprintf("%v", paramValue)
			if numVal, ok := paramValue.(float64); ok {
				valueNum = &numVal
			}

			allRows = append(allRows, monitoringRow{
				assetID:    assetID,
				tagCode:    tagCode,
				paramID:    paramID,
				valueText:  valueStr,
				valueNum:   valueNum,
				unit:       unit,
				recordedAt: rv.OccurredAt,
			})
		}

		// Step 6: Insert common metadata fields
		if rv.OccurredAt != "" {
			// QUALITY
			var qParamID int
			if err := db.pool.QueryRow(ctx,
				"SELECT param_id FROM ic3_parameter_master WHERE param_tag_code = $1 AND asset_group = 'COMMON' LIMIT 1",
				"GOOD_UNCERTAIN_BAD_BAD_COMM_FAILURE_BAD_OUT_OF_SERVICE").Scan(&qParamID); err == nil {
				allRows = append(allRows, monitoringRow{
					assetID:    assetID,
					tagCode:    "COMMON.QUALITY",
					paramID:    qParamID,
					valueText:  rv.Quality,
					unit:       "enum",
					recordedAt: rv.OccurredAt,
				})
			}

			// ALARM
			var aParamID int
			if err := db.pool.QueryRow(ctx,
				"SELECT param_id FROM ic3_parameter_master WHERE param_tag_code = $1 AND asset_group = 'COMMON' LIMIT 1",
				"NORMAL_WARNING_ALARM_CRITICAL_STALE").Scan(&aParamID); err == nil {
				allRows = append(allRows, monitoringRow{
					assetID:    assetID,
					tagCode:    "COMMON.ALARM",
					paramID:    aParamID,
					valueText:  rv.Alarm,
					unit:       "enum",
					recordedAt: rv.OccurredAt,
				})
			}

			// RECORD_ID
			var rParamID int
			if err := db.pool.QueryRow(ctx,
				"SELECT param_id FROM ic3_parameter_master WHERE param_tag_code = $1 AND asset_group = 'COMMON' LIMIT 1",
				"UNIQUE_INGESTION_RECORD_ID").Scan(&rParamID); err == nil {
				allRows = append(allRows, monitoringRow{
					assetID:    assetID,
					tagCode:    "COMMON.RECORD_ID",
					paramID:    rParamID,
					valueText:  rv.RecordID,
					unit:       "string",
					recordedAt: rv.OccurredAt,
				})
			}

			// FIELD_TIMESTAMP
			var tParamID int
			if err := db.pool.QueryRow(ctx,
				"SELECT param_id FROM ic3_parameter_master WHERE param_tag_code = $1 AND asset_group = 'COMMON' LIMIT 1",
				"FIELD_TIMESTAMP_FROM_DEVICE_SOURCE").Scan(&tParamID); err == nil {
				allRows = append(allRows, monitoringRow{
					assetID:    assetID,
					tagCode:    "COMMON.TIMESTAMP",
					paramID:    tParamID,
					valueText:  rv.OccurredAt,
					unit:       "timestamp",
					recordedAt: rv.OccurredAt,
				})
			}
		}
	}

	// Batch insert all rows
	if len(allRows) > 0 {
		for _, row := range allRows {
			_, err := db.pool.Exec(ctx, `
				INSERT INTO ic3_asset_monitoring_data
				(asset_id, tag_code, param_id, value_text, value_num, unit, recorded_at)
				VALUES ($1, $2, $3, $4, $5, $6, $7)
				ON CONFLICT (tag_code, recorded_at) DO NOTHING
			`, row.assetID, row.tagCode, row.paramID, row.valueText, row.valueNum, row.unit, row.recordedAt)
			if err != nil {
				log.Printf("ERROR: Failed to insert monitoring data: %v", err)
			}
		}
		stats.RowsInserted = len(allRows)
		log.Printf("Inserted %d monitoring data rows", len(allRows))
	}

	return stats
}

// startRedisIngestionWorker starts the Redis ingestion worker in the background
func startRedisIngestionWorker(db *DB, interval time.Duration) {
	go func() {
		ticker := time.NewTicker(interval)
		defer ticker.Stop()

		for range ticker.C {
			stats := processRedisToMonitoring(db)
			elapsed := stats.EndTime.Sub(stats.StartTime)

			log.Printf("[IC3 Redis Worker] %s", stats.StartTime.Format("2006-01-02 15:04:05"))
			log.Printf("  Redis keys scanned : %d", stats.RedisKeysScanned)
			log.Printf("  Assets matched     : %d", stats.AssetsMatched)
			log.Printf("  Assets unmatched   : %d", stats.AssetsUnmatched)
			log.Printf("  Rows inserted      : %d", stats.RowsInserted)
			log.Printf("  Tags skipped       : %d", stats.TagsSkipped)
			log.Printf("  Elapsed time       : %.2fs", elapsed.Seconds())

			if len(stats.UnmatchedSerials) > 0 && len(stats.UnmatchedSerials) <= 10 {
				log.Printf("  Unmatched serials  : %s", strings.Join(stats.UnmatchedSerials, ", "))
			}
			if len(stats.UnmappedTags) > 0 && len(stats.UnmappedTags) <= 10 {
				log.Printf("  Unmapped tags      : %s", strings.Join(stats.UnmappedTags, ", "))
			}
		}
	}()

	log.Printf("Redis ingestion worker started (interval: %v)", interval)
}
