Skip to content

Concepts & architecture

Fujin Shuttle is the Shopware-side port of the Unirgy Rapidflow import/export engine. If you know Rapidflow, the vocabulary carries over directly; if you don't, this page is the map.

The engine runs embedded in-process, but the ServiceRequest/Backend channel seam from Rapidflow is preserved so a remote-cluster variant can be layered on later without rewriting the deputies.

The Rapidflow vocabulary

Rapidflow term Fujin Shuttle class(es) Role
Agent src/DataType/Agent/* The flyweight selected for one (dataType, transferMode) pair. Owns the run.
Deputy src/Deputy/* The concrete worker for a data type — orchestrates Pass A / Pass B.
Processor / pipeline src/Pipeline/*, amp-data-pipeline Streams and transforms data items (CSV row → batch).
Batch emitter BatchEmitter (in fujin-shuttle-message) Accumulates rows into batches of N.
Channel / Backend src/Channel/* The request/response seam between deputy and the database layer.
Request handler src/Channel/RequestHandler/* (47) Executes one kind of ServiceRequest.
Depot src/DataDepot/* The raw bulk-SQL reader/writer for an entity type.
ID resolver src/IdResolver/Shopware/* Resolves codes/names/paths → Shopware UUIDs (and can auto-create).

The big picture

                    ┌─────────────────────────────────────────────┐
  CLI / Admin ──►   │  ProfileFacade            (src/Model)        │
                    │    builds SystemConfig + Profile + Context   │
                    └───────────────┬─────────────────────────────┘
                    ┌─────────────────────────────────────────────┐
                    │  AgentContainer → DefaultSelector            │
                    │    picks + clones the Agent for (type, mode) │
                    └───────────────┬─────────────────────────────┘
                    ┌─────────────────────────────────────────────┐
                    │  Deputy           (src/Deputy)               │
                    │    Pass A: resolve all referenced codes      │
                    │    Pass B: stream rows → batches             │
                    └───────────────┬─────────────────────────────┘
                                    ▼  ServiceRequest
                    ┌─────────────────────────────────────────────┐
                    │  Backend → HandlerRegistry → RequestHandler  │
                    │             (src/Channel)                    │
                    └───────────────┬─────────────────────────────┘
                    ┌─────────────────────────────────────────────┐
                    │  Depot            (src/DataDepot)            │
                    │    InsertOnDuplicate → bulk SQL on the       │
                    │    async / sync connection pool              │
                    └─────────────────────────────────────────────┘

Agents and deputies

An agent is a thin flyweight registered against a (dataType, transferMode) pair (e.g. product + import). AgentContainer holds the registry; DefaultSelector matches and clones the agent for each run so per-run state is isolated. The matching agents are wired in Resources/config/services/agent.xml and the per-entity service files.

The agent delegates the real work to a deputy — the class that knows how a particular data type is shaped (src/Deputy/ProductImport.php, CategoryImport.php, InventoryImport.php, …). The deputy is where Pass A / Pass B live.

The two-pass import model

Most imports run in two passes. Take products as the worked example:

Pass A — resolve references (sequential)

The deputy scans the CSV once and collects every distinct referenced code:

  • category paths (Home > Furniture > Chairs)
  • sales-channel, manufacturer, delivery-time, unit and tax keys
  • property Group:Option pairs and custom-field names
  • media sources (URLs / paths)
  • rule names

Each set is resolved once through an ID resolver to a code → idBytes map, auto-creating missing targets where the profile allows it (categories, property options, custom fields). The maps are held on the deputy as run-wide assets, so Pass B never does a lookup it can avoid.

Why a second physical pass is fine

Re-reading the file is cheap relative to the write work, and the CLI is the large-file path. Resolving up front lets Pass B run with foreign-key checks off (all targets already exist) and write in big concurrent batches.

Pass B — write rows (concurrent)

The CSV is streamed again. A reader fiber pushes rows into a bounded queue; a BatchEmitter accumulates them into batches (flushing at batch_size or when the row type changes). Each batch is processed by a worker that:

  1. looks up IDs from the Pass A asset maps,
  2. builds the entity / translation / category / visibility / price / property / media rows,
  3. sends one ServiceRequest per entity table through the channel,
  4. ticks the run monitor for progress and checks for a stop request.

Batches run concurrently up to the configured concurrency limit.

Ordering hazards stay sequential

Concurrency is only safe for independent batches (products, inventory). Data with intra-file ordering dependencies — categories (parent → child), entity metadata (group → option), custom-field JSON merges — is processed sequentially regardless of the concurrency setting.

The channel seam

Deputies never touch the database directly. They send typed ServiceRequest objects to a Backend; the local implementation (LocalBackend) looks the request up in a HandlerRegistry (a compile-time map of request class → handler service) and dispatches to the matching RequestHandler, returning a typed Response. Exceptions are caught and returned as an error response, so the seam is wire-compatible with a future remote transport.

Handlers are registered with the fujin_shuttle.request_handler DI tag, keyed by the request class they handle — a profile can add handlers without editing shared files.

The 47 request handlers, grouped

src/Channel/RequestHandler/ holds ~47 handlers. By naming pattern:

Group Pattern What they do
Resolve Resolve…IdsHandler, ResolveCategoryPathsHandler, ResolveCustomFieldsHandler Pass A: look up / auto-create FK targets, return code → id maps
Load Load…Handler Export reads (products, variants, categories, inventory, rules, metadata)
Upsert …UpsertHandler Pass B: INSERT … ON DUPLICATE KEY UPDATE into entity + translation tables
Link …LinkHandler Insert into composite-key link tables (product_category, product_media, options)
Delete Delete…Handler Clear stale associations / rule conditions before re-import
Special ProductCustomFieldMergeHandler, StockUpdateHandler, RuleConditionInsertHandler Read-modify-write merges and other one-offs

Depots and raw bulk SQL

A depot (src/DataDepot/<Entity>/ImportDepot.php / ExportDepot.php) is the only place that talks SQL. On the write side it builds an INSERT … ON DUPLICATE KEY UPDATE with the bulk writer from flyokai/laminas-db-bulk-update, filtering columns the live schema doesn't have (forward/backward version compatibility) and excluding immutable columns (id, version_id, created_at, …) from the ON DUPLICATE clause. Link tables use INSERT IGNORE.

On the read (export) side a depot pages through rows with keyset pagination and assembles the wide-column shape (scalars + prices + translations + associations) the export agent emits.

Async, in one paragraph

Pass B runs on the Revolt event loop with a pooled, non-blocking ext-mysqli driver. Each batch is a fiber; when its INSERT awaits the network the loop runs another batch's fiber, so up to concurrency statements are in flight on a single PHP thread. Foreign-key checks are set off once per pooled link (Pass A guarantees the targets exist), and the bulk writer skips the per-statement FK toggle on async connections — both are documented wins in Performance & tuning. The sync/PDO driver (dbDriver = sync) is available as a fallback and is byte-identical on output.

Sync vs async driver

dbDriver (admin config) picks the connection pool. Async (default) is faster on a networked database, where round-trip latency is hidden across concurrent batches. On a localhost database the insert path is MySQL-bound and concurrency barely matters — see the flat sweep in Performance.

Tracking a run

CLI ad-hoc runs (fujin:shuttle:run) are untracked. Tracked runs — the admin Run button, queued RunProfileMessage, and fujin:shuttle:run-profile — flow through a run monitor that:

  • writes throttled progress (processed, progress, phase) and a heartbeat_at liveness stamp to the fujin_job row,
  • reads the job status, so flipping it to stopping (the admin Stop button) makes the next batch boundary break the loop cooperatively — rows already written are kept,
  • lets a duplicate-run guard refuse a second concurrent run of the same profile (a stale heartbeat reaps an abandoned job instead of blocking forever).

Job lifecycle and the exact columns are in Database schema.

Source-tree map

Path Contents
src/Command/ The four CLI commands
src/Administration/Controller/ Admin REST controller
src/Model/ ProfileFacade, ProfileStore, run monitor
src/DataType/ + Agent/ Agent registry, selector, agents
src/Deputy/ Per-type deputies (Pass A / Pass B logic)
src/Channel/ + RequestHandler/ Backend seam, handler registry, connection pools, 47 handlers
src/DataDepot/ Per-entity bulk SQL depots
src/IdResolver/Shopware/ Code/name/path → UUID resolvers
src/Pipeline/Common/ ColumnSpec (header classifier) and pipeline glue
src/Template/ TemplateCatalog — downloadable templates/examples
src/Ownership/ Ownership guard seam (marketplace per-seller scoping)
src/Migration/ The four plugin tables + later column adds
src/Resources/config/ config.xml, routes.xml, services/*.xml
src/Resources/app/administration/ The admin Vue module