uploader-bot/app/core/models/content_compatible.py

388 lines
14 KiB
Python

"""Compatible content models for MariaDB."""
from datetime import datetime
from typing import Optional, List, Dict, Any
from sqlalchemy import Column, String, Boolean, Text, Integer, DateTime, BigInteger, Index, ForeignKey
from sqlalchemy.orm import relationship
from app.core.models.base_compatible import BaseModel
class Content(BaseModel):
"""Content model compatible with existing MariaDB schema."""
__tablename__ = "content"
# Basic content information
user_id = Column(Integer, ForeignKey('users.id'), nullable=False, index=True)
filename = Column(String(255), nullable=False)
original_filename = Column(String(255), nullable=False)
file_path = Column(String(500), nullable=False)
# File metadata
file_size = Column(BigInteger, nullable=False) # bytes
file_type = Column(String(100), nullable=False)
mime_type = Column(String(100), nullable=False)
file_extension = Column(String(10), nullable=False)
# Content metadata
title = Column(String(255), nullable=True)
description = Column(Text, nullable=True)
tags = Column(Text, nullable=True) # JSON or comma-separated
# Status and visibility
is_public = Column(Boolean, default=False, nullable=False)
is_active = Column(Boolean, default=True, nullable=False)
is_indexed = Column(Boolean, default=False, nullable=False)
is_converted = Column(Boolean, default=False, nullable=False)
# Access and security
access_password = Column(String(255), nullable=True)
download_count = Column(Integer, default=0, nullable=False)
view_count = Column(Integer, default=0, nullable=False)
# Processing status
processing_status = Column(String(50), default="pending", nullable=False)
processing_error = Column(Text, nullable=True)
processing_started = Column(DateTime, nullable=True)
processing_completed = Column(DateTime, nullable=True)
# File hashes for integrity
md5_hash = Column(String(32), nullable=True, index=True)
sha256_hash = Column(String(64), nullable=True, index=True)
# Thumbnails and previews
thumbnail_path = Column(String(500), nullable=True)
preview_path = Column(String(500), nullable=True)
# TON Blockchain integration
ton_transaction_hash = Column(String(100), nullable=True, index=True)
ton_storage_proof = Column(Text, nullable=True)
ton_storage_fee = Column(BigInteger, default=0, nullable=False) # nanotons
# Expiration and cleanup
expires_at = Column(DateTime, nullable=True)
auto_delete = Column(Boolean, default=False, nullable=False)
# Relationships
user = relationship("User", back_populates="content")
# Table indexes for performance
__table_args__ = (
Index('idx_content_user_active', 'user_id', 'is_active'),
Index('idx_content_public_indexed', 'is_public', 'is_indexed'),
Index('idx_content_file_type', 'file_type', 'mime_type'),
Index('idx_content_created', 'created_at'),
Index('idx_content_size', 'file_size'),
Index('idx_content_processing', 'processing_status'),
Index('idx_content_ton_tx', 'ton_transaction_hash'),
Index('idx_content_expires', 'expires_at', 'auto_delete'),
)
def is_expired(self) -> bool:
"""Check if content is expired."""
if not self.expires_at:
return False
return datetime.utcnow() > self.expires_at
def is_image(self) -> bool:
"""Check if content is an image."""
return self.file_type.lower() in ['image', 'img'] or \
self.mime_type.startswith('image/')
def is_video(self) -> bool:
"""Check if content is a video."""
return self.file_type.lower() == 'video' or \
self.mime_type.startswith('video/')
def is_document(self) -> bool:
"""Check if content is a document."""
return self.file_type.lower() in ['document', 'doc', 'pdf'] or \
self.mime_type in ['application/pdf', 'application/msword', 'text/plain']
def get_file_size_human(self) -> str:
"""Get human-readable file size."""
size = self.file_size
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size < 1024.0:
return f"{size:.1f} {unit}"
size /= 1024.0
return f"{size:.1f} PB"
def increment_download_count(self) -> None:
"""Increment download counter."""
self.download_count += 1
def increment_view_count(self) -> None:
"""Increment view counter."""
self.view_count += 1
def mark_as_indexed(self) -> None:
"""Mark content as indexed."""
self.is_indexed = True
def mark_as_converted(self) -> None:
"""Mark content as converted."""
self.is_converted = True
self.processing_status = "completed"
self.processing_completed = datetime.utcnow()
def set_processing_error(self, error: str) -> None:
"""Set processing error."""
self.processing_status = "error"
self.processing_error = error
self.processing_completed = datetime.utcnow()
def start_processing(self) -> None:
"""Mark processing as started."""
self.processing_status = "processing"
self.processing_started = datetime.utcnow()
self.processing_error = None
def get_tags_list(self) -> List[str]:
"""Get tags as list."""
if not self.tags:
return []
# Try to parse as JSON first, fallback to comma-separated
try:
import json
return json.loads(self.tags)
except:
return [tag.strip() for tag in self.tags.split(',') if tag.strip()]
def set_tags_list(self, tags: List[str]) -> None:
"""Set tags from list."""
import json
self.tags = json.dumps(tags) if tags else None
def to_dict(self, include_sensitive: bool = False) -> Dict[str, Any]:
"""Convert to dictionary with option to exclude sensitive data."""
exclude = set()
if not include_sensitive:
exclude.update({"access_password", "file_path", "processing_error"})
data = super().to_dict(exclude=exclude)
# Add computed fields
data.update({
"file_size_human": self.get_file_size_human(),
"is_image": self.is_image(),
"is_video": self.is_video(),
"is_document": self.is_document(),
"is_expired": self.is_expired(),
"tags_list": self.get_tags_list(),
})
return data
def to_public_dict(self) -> Dict[str, Any]:
"""Convert to public dictionary (minimal content info)."""
return {
"id": self.id,
"filename": self.filename,
"title": self.title,
"description": self.description,
"file_type": self.file_type,
"file_size": self.file_size,
"file_size_human": self.get_file_size_human(),
"is_image": self.is_image(),
"is_video": self.is_video(),
"is_document": self.is_document(),
"download_count": self.download_count,
"view_count": self.view_count,
"tags_list": self.get_tags_list(),
"created_at": self.created_at.isoformat() if self.created_at else None,
}
class ContentShare(BaseModel):
"""Content sharing model for tracking shared content."""
__tablename__ = "content_shares"
content_id = Column(Integer, ForeignKey('content.id'), nullable=False, index=True)
user_id = Column(Integer, ForeignKey('users.id'), nullable=True, index=True) # Can be null for anonymous shares
# Share metadata
share_token = Column(String(100), unique=True, nullable=False, index=True)
share_url = Column(String(500), nullable=False)
# Share settings
is_active = Column(Boolean, default=True, nullable=False)
is_password_protected = Column(Boolean, default=False, nullable=False)
share_password = Column(String(255), nullable=True)
# Access control
max_downloads = Column(Integer, nullable=True) # Null = unlimited
download_count = Column(Integer, default=0, nullable=False)
view_count = Column(Integer, default=0, nullable=False)
# Time limits
expires_at = Column(DateTime, nullable=True)
# Tracking
ip_address = Column(String(45), nullable=True)
user_agent = Column(Text, nullable=True)
# Relationships
content = relationship("Content")
user = relationship("User")
__table_args__ = (
Index('idx_shares_content_active', 'content_id', 'is_active'),
Index('idx_shares_token', 'share_token'),
Index('idx_shares_expires', 'expires_at'),
)
def is_expired(self) -> bool:
"""Check if share is expired."""
if not self.expires_at:
return False
return datetime.utcnow() > self.expires_at
def is_download_limit_reached(self) -> bool:
"""Check if download limit is reached."""
if not self.max_downloads:
return False
return self.download_count >= self.max_downloads
def is_valid(self) -> bool:
"""Check if share is valid."""
return (self.is_active and
not self.is_expired() and
not self.is_download_limit_reached())
def increment_download(self) -> bool:
"""Increment download count and return if still valid."""
if not self.is_valid():
return False
self.download_count += 1
return self.is_valid()
def increment_view(self) -> None:
"""Increment view count."""
self.view_count += 1
class ContentMetadata(BaseModel):
"""Extended metadata for content files."""
__tablename__ = "content_metadata"
content_id = Column(Integer, ForeignKey('content.id'), unique=True, nullable=False, index=True)
# Image metadata
image_width = Column(Integer, nullable=True)
image_height = Column(Integer, nullable=True)
image_dpi = Column(Integer, nullable=True)
image_color_space = Column(String(50), nullable=True)
# Video metadata
video_duration = Column(Integer, nullable=True) # seconds
video_bitrate = Column(Integer, nullable=True)
video_fps = Column(Integer, nullable=True)
video_resolution = Column(String(20), nullable=True) # e.g., "1920x1080"
video_codec = Column(String(50), nullable=True)
# Audio metadata
audio_duration = Column(Integer, nullable=True) # seconds
audio_bitrate = Column(Integer, nullable=True)
audio_sample_rate = Column(Integer, nullable=True)
audio_channels = Column(Integer, nullable=True)
audio_codec = Column(String(50), nullable=True)
# Document metadata
document_pages = Column(Integer, nullable=True)
document_words = Column(Integer, nullable=True)
document_language = Column(String(10), nullable=True)
document_author = Column(String(255), nullable=True)
# EXIF data (JSON)
exif_data = Column(Text, nullable=True)
# GPS coordinates
gps_latitude = Column(String(50), nullable=True)
gps_longitude = Column(String(50), nullable=True)
gps_altitude = Column(String(50), nullable=True)
# Technical metadata
compression_ratio = Column(String(20), nullable=True)
quality_score = Column(Integer, nullable=True) # 0-100
# Relationships
content = relationship("Content")
def to_dict(self) -> Dict[str, Any]:
"""Convert metadata to dictionary."""
data = super().to_dict(exclude={"content_id"})
# Parse EXIF data if present
if self.exif_data:
try:
import json
data["exif_data"] = json.loads(self.exif_data)
except:
data["exif_data"] = None
return data
def set_exif_data(self, exif_dict: Dict[str, Any]) -> None:
"""Set EXIF data from dictionary."""
if exif_dict:
import json
self.exif_data = json.dumps(exif_dict)
else:
self.exif_data = None
def get_exif_data(self) -> Optional[Dict[str, Any]]:
"""Get EXIF data as dictionary."""
if not self.exif_data:
return None
try:
import json
return json.loads(self.exif_data)
except:
return None
class ContentVersion(BaseModel):
"""Content version history for tracking changes."""
__tablename__ = "content_versions"
content_id = Column(Integer, ForeignKey('content.id'), nullable=False, index=True)
user_id = Column(Integer, ForeignKey('users.id'), nullable=False, index=True)
# Version information
version_number = Column(Integer, nullable=False)
version_name = Column(String(100), nullable=True)
change_description = Column(Text, nullable=True)
# File information
file_path = Column(String(500), nullable=False)
file_size = Column(BigInteger, nullable=False)
file_hash = Column(String(64), nullable=False)
# Status
is_current = Column(Boolean, default=False, nullable=False)
# Relationships
content = relationship("Content")
user = relationship("User")
__table_args__ = (
Index('idx_versions_content_number', 'content_id', 'version_number'),
Index('idx_versions_current', 'content_id', 'is_current'),
)
def mark_as_current(self) -> None:
"""Mark this version as current."""
self.is_current = True
# Add relationship to User model
# This would be added to the User model:
# content = relationship("Content", back_populates="user")